• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/grpc.h>
18 #include <grpc/support/json.h>
19 #include <stddef.h>
20 
21 #include <algorithm>
22 #include <array>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/log/log.h"
29 #include "absl/status/status.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/str_join.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/strings/strip.h"
34 #include "absl/types/optional.h"
35 #include "absl/types/span.h"
36 #include "gmock/gmock.h"
37 #include "gtest/gtest.h"
38 #include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/load_balancing/lb_policy.h"
41 #include "src/core/resolver/endpoint_addresses.h"
42 #include "src/core/resolver/xds/xds_config.h"
43 #include "src/core/util/json/json.h"
44 #include "src/core/util/ref_counted_ptr.h"
45 #include "src/core/xds/grpc/xds_health_status.h"
46 #include "test/core/load_balancing/lb_policy_test_lib.h"
47 #include "test/core/test_util/test_config.h"
48 
49 namespace grpc_core {
50 namespace testing {
51 namespace {
52 class XdsOverrideHostTest : public LoadBalancingPolicyTest {
53  protected:
XdsOverrideHostTest()54   XdsOverrideHostTest()
55       : LoadBalancingPolicyTest("xds_override_host_experimental") {}
56 
MakeXdsConfig(absl::Span<const absl::string_view> override_host_statuses={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt,std::string cluster_name="cluster_name")57   static RefCountedPtr<const XdsConfig> MakeXdsConfig(
58       absl::Span<const absl::string_view> override_host_statuses = {"UNKNOWN",
59                                                                     "HEALTHY"},
60       absl::optional<Duration> connection_idle_timeout = absl::nullopt,
61       std::string cluster_name = "cluster_name") {
62     auto cluster_resource = std::make_shared<XdsClusterResource>();
63     for (const absl::string_view host_status : override_host_statuses) {
64       cluster_resource->override_host_statuses.Add(
65           XdsHealthStatus::FromString(host_status).value());
66     }
67     if (connection_idle_timeout.has_value()) {
68       cluster_resource->connection_idle_timeout = *connection_idle_timeout;
69     }
70     auto xds_config = MakeRefCounted<XdsConfig>();
71     xds_config->clusters[std::move(cluster_name)].emplace(
72         std::move(cluster_resource), nullptr, "");
73     return xds_config;
74   }
75 
UpdateXdsOverrideHostPolicy(absl::Span<const EndpointAddresses> endpoints,absl::Span<const absl::string_view> override_host_statuses={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt,std::string cluster_name="cluster_name",std::string child_policy="round_robin")76   absl::Status UpdateXdsOverrideHostPolicy(
77       absl::Span<const EndpointAddresses> endpoints,
78       absl::Span<const absl::string_view> override_host_statuses = {"UNKNOWN",
79                                                                     "HEALTHY"},
80       absl::optional<Duration> connection_idle_timeout = absl::nullopt,
81       std::string cluster_name = "cluster_name",
82       std::string child_policy = "round_robin") {
83     auto config = MakeConfig(Json::FromArray({Json::FromObject(
84         {{"xds_override_host_experimental",
85           Json::FromObject(
86               {{"clusterName", Json::FromString(cluster_name)},
87                {"childPolicy",
88                 Json::FromArray({Json::FromObject(
89                     {{child_policy, Json::FromObject({})}})})}})}})}));
90     auto xds_config = MakeXdsConfig(override_host_statuses,
91                                     connection_idle_timeout, cluster_name);
92     return ApplyUpdate(
93         BuildUpdate(endpoints, std::move(config),
94                     ChannelArgs().SetObject(std::move(xds_config))),
95         lb_policy());
96   }
97 
UpdateXdsOverrideHostPolicy(absl::Span<const absl::string_view> addresses,absl::Span<const absl::string_view> override_host_statuses={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt,std::string cluster_name="cluster_name",std::string child_policy="round_robin")98   absl::Status UpdateXdsOverrideHostPolicy(
99       absl::Span<const absl::string_view> addresses,
100       absl::Span<const absl::string_view> override_host_statuses = {"UNKNOWN",
101                                                                     "HEALTHY"},
102       absl::optional<Duration> connection_idle_timeout = absl::nullopt,
103       std::string cluster_name = "cluster_name",
104       std::string child_policy = "round_robin") {
105     return UpdateXdsOverrideHostPolicy(
106         MakeEndpointAddressesListFromAddressList(addresses),
107         override_host_statuses, connection_idle_timeout,
108         std::move(cluster_name), std::move(child_policy));
109   }
110 
111   RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses,SourceLocation location=SourceLocation ())112   ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses,
113                               SourceLocation location = SourceLocation()) {
114     EXPECT_EQ(UpdateXdsOverrideHostPolicy(addresses), absl::OkStatus())
115         << location.file() << ":" << location.line();
116     return ExpectRoundRobinStartup(addresses, location);
117   }
118 
MakeAddressWithHealthStatus(absl::string_view address,XdsHealthStatus::HealthStatus status)119   EndpointAddresses MakeAddressWithHealthStatus(
120       absl::string_view address, XdsHealthStatus::HealthStatus status) {
121     return EndpointAddresses(
122         MakeAddress(address),
123         ChannelArgs().Set(GRPC_ARG_XDS_HEALTH_STATUS, status));
124   }
125 
ApplyUpdateWithHealthStatuses(absl::Span<const std::pair<const absl::string_view,XdsHealthStatus::HealthStatus>> addresses_and_statuses,absl::Span<const absl::string_view> override_host_status={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt)126   void ApplyUpdateWithHealthStatuses(
127       absl::Span<const std::pair<const absl::string_view,
128                                  XdsHealthStatus::HealthStatus>>
129           addresses_and_statuses,
130       absl::Span<const absl::string_view> override_host_status = {"UNKNOWN",
131                                                                   "HEALTHY"},
132       absl::optional<Duration> connection_idle_timeout = absl::nullopt) {
133     EndpointAddressesList endpoints;
134     for (auto address_and_status : addresses_and_statuses) {
135       endpoints.push_back(MakeAddressWithHealthStatus(
136           address_and_status.first, address_and_status.second));
137     }
138     EXPECT_EQ(UpdateXdsOverrideHostPolicy(endpoints, override_host_status,
139                                           connection_idle_timeout),
140               absl::OkStatus());
141   }
142 
143   struct OverrideHostAttributeStorage {
144     // Need to store the string externally, since
145     // XdsOverrideHostAttribute only holds a string_view.
146     std::string address_list;
147     XdsOverrideHostAttribute attribute;
148 
OverrideHostAttributeStoragegrpc_core::testing::__anon71c171110111::XdsOverrideHostTest::OverrideHostAttributeStorage149     explicit OverrideHostAttributeStorage(std::string addresses)
150         : address_list(std::move(addresses)), attribute(address_list) {}
151   };
152 
MakeOverrideHostAttribute(absl::Span<const absl::string_view> addresses)153   XdsOverrideHostAttribute* MakeOverrideHostAttribute(
154       absl::Span<const absl::string_view> addresses) {
155     std::vector<absl::string_view> address_list;
156     address_list.reserve(addresses.size());
157     for (absl::string_view address : addresses) {
158       address_list.emplace_back(absl::StripPrefix(address, "ipv4:"));
159     }
160     attribute_storage_.emplace_back(
161         std::make_unique<OverrideHostAttributeStorage>(
162             absl::StrJoin(address_list, ",")));
163     return &attribute_storage_.back()->attribute;
164   }
165 
MakeOverrideHostAttribute(absl::string_view address)166   XdsOverrideHostAttribute* MakeOverrideHostAttribute(
167       absl::string_view address) {
168     const std::array<absl::string_view, 1> addresses = {address};
169     return MakeOverrideHostAttribute(addresses);
170   }
171 
ExpectOverridePicks(LoadBalancingPolicy::SubchannelPicker * picker,XdsOverrideHostAttribute * attribute,absl::string_view expected,absl::Span<const absl::string_view> expected_address_list={},SourceLocation location=SourceLocation ())172   void ExpectOverridePicks(
173       LoadBalancingPolicy::SubchannelPicker* picker,
174       XdsOverrideHostAttribute* attribute, absl::string_view expected,
175       absl::Span<const absl::string_view> expected_address_list = {},
176       SourceLocation location = SourceLocation()) {
177     std::array<absl::string_view, 1> kArray = {expected};
178     if (expected_address_list.empty()) expected_address_list = kArray;
179     std::vector<absl::string_view> expected_addresses;
180     expected_addresses.reserve(expected_address_list.size());
181     for (absl::string_view address : expected_address_list) {
182       expected_addresses.push_back(absl::StripPrefix(address, "ipv4:"));
183     }
184     std::string expected_addresses_str = absl::StrJoin(expected_addresses, ",");
185     for (size_t i = 0; i < 3; ++i) {
186       EXPECT_EQ(ExpectPickComplete(picker, {attribute}, /*metadata=*/{},
187                                    /*subchannel_call_tracker=*/nullptr,
188                                    /*picked_subchannel=*/nullptr, location),
189                 expected)
190           << location.file() << ":" << location.line();
191       EXPECT_EQ(attribute->actual_address_list(), expected_addresses_str)
192           << "  Actual: " << attribute->actual_address_list() << "\n"
193           << "Expected: " << expected_addresses_str << "\n"
194           << location.file() << ":" << location.line();
195     }
196   }
197 
ExpectRoundRobinPicksWithAttribute(LoadBalancingPolicy::SubchannelPicker * picker,XdsOverrideHostAttribute * attribute,absl::Span<const absl::string_view> expected,SourceLocation location=SourceLocation ())198   void ExpectRoundRobinPicksWithAttribute(
199       LoadBalancingPolicy::SubchannelPicker* picker,
200       XdsOverrideHostAttribute* attribute,
201       absl::Span<const absl::string_view> expected,
202       SourceLocation location = SourceLocation()) {
203     std::vector<std::string> actual_picks;
204     for (size_t i = 0; i < expected.size(); ++i) {
205       auto address =
206           ExpectPickComplete(picker, {attribute}, /*metadata=*/{},
207                              /*subchannel_call_tracker=*/nullptr,
208                              /*picked_subchannel=*/nullptr, location);
209       ASSERT_TRUE(address.has_value())
210           << location.file() << ":" << location.line();
211       EXPECT_THAT(*address, ::testing::AnyOfArray(expected))
212           << location.file() << ":" << location.line();
213       EXPECT_EQ(attribute->actual_address_list(),
214                 absl::StripPrefix(*address, "ipv4:"))
215           << "  Actual: " << attribute->actual_address_list() << "\n"
216           << "Expected: " << absl::StripPrefix(*address, "ipv4:") << "\n"
217           << location.file() << ":" << location.line();
218       actual_picks.push_back(std::move(*address));
219     }
220     EXPECT_TRUE(PicksAreRoundRobin(expected, actual_picks))
221         << location.file() << ":" << location.line();
222   }
223 
224   std::vector<std::unique_ptr<OverrideHostAttributeStorage>> attribute_storage_;
225 };
226 
TEST_F(XdsOverrideHostTest,DelegatesToChild)227 TEST_F(XdsOverrideHostTest, DelegatesToChild) {
228   ExpectStartupWithRoundRobin(
229       {"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"});
230 }
231 
TEST_F(XdsOverrideHostTest,NoConfigReportsError)232 TEST_F(XdsOverrideHostTest, NoConfigReportsError) {
233   EXPECT_EQ(
234       ApplyUpdate(
235           BuildUpdate({"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"}, nullptr),
236           lb_policy()),
237       absl::InvalidArgumentError("Missing policy config"));
238 }
239 
TEST_F(XdsOverrideHostTest,OverrideHost)240 TEST_F(XdsOverrideHostTest, OverrideHost) {
241   const std::array<absl::string_view, 3> kAddresses = {
242       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
243   auto picker = ExpectStartupWithRoundRobin(kAddresses);
244   ASSERT_NE(picker, nullptr);
245   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
246   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
247   auto* address0_attribute = MakeOverrideHostAttribute(kAddresses[0]);
248   ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
249 }
250 
TEST_F(XdsOverrideHostTest,SubchannelNotFound)251 TEST_F(XdsOverrideHostTest, SubchannelNotFound) {
252   const std::array<absl::string_view, 3> kAddresses = {
253       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
254   auto picker = ExpectStartupWithRoundRobin(kAddresses);
255   ASSERT_NE(picker, nullptr);
256   auto* attribute = MakeOverrideHostAttribute("no such host");
257   ExpectRoundRobinPicksWithAttribute(picker.get(), attribute, kAddresses);
258 }
259 
TEST_F(XdsOverrideHostTest,SubchannelsComeAndGo)260 TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) {
261   const std::array<absl::string_view, 3> kAddresses = {
262       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
263   auto picker = ExpectStartupWithRoundRobin(kAddresses);
264   ASSERT_NE(picker, nullptr);
265   // Check that the host override works.
266   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
267   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
268   // The override address is removed.
269   EXPECT_EQ(UpdateXdsOverrideHostPolicy({kAddresses[0], kAddresses[2]}),
270             absl::OkStatus());
271   picker =
272       WaitForRoundRobinListChange(kAddresses, {kAddresses[0], kAddresses[2]});
273   // Picks are returned in round-robin order, because the address
274   // pointed to by the cookie is not present.
275   ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
276                                      {kAddresses[0], kAddresses[2]});
277   // The override address comes back.
278   EXPECT_EQ(UpdateXdsOverrideHostPolicy({kAddresses[1], kAddresses[2]}),
279             absl::OkStatus());
280   picker = WaitForRoundRobinListChange({kAddresses[0], kAddresses[2]},
281                                        {kAddresses[1], kAddresses[2]});
282   // Make sure host override works.
283   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
284 }
285 
TEST_F(XdsOverrideHostTest,OverrideIsQueuedInIdleOrConnectingAndFailedInTransientFailure)286 TEST_F(XdsOverrideHostTest,
287        OverrideIsQueuedInIdleOrConnectingAndFailedInTransientFailure) {
288   const std::array<absl::string_view, 3> kAddresses = {
289       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
290   auto picker = ExpectStartupWithRoundRobin(kAddresses);
291   ASSERT_NE(picker, nullptr);
292   // Check that the host is overridden
293   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
294   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
295   // Subchannel for address 1 becomes disconnected.
296   LOG(INFO) << "### subchannel 1 reporting IDLE";
297   auto subchannel = FindSubchannel(kAddresses[1]);
298   ASSERT_NE(subchannel, nullptr);
299   subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
300   EXPECT_TRUE(subchannel->ConnectionRequested());
301   LOG(INFO) << "### expecting re-resolution request";
302   ExpectReresolutionRequest();
303   LOG(INFO) << "### expecting RR picks to exclude the disconnected subchannel";
304   picker =
305       WaitForRoundRobinListChange(kAddresses, {kAddresses[0], kAddresses[2]});
306   // Picks with the override will be queued.
307   ExpectPickQueued(picker.get(), {address1_attribute});
308   // The subchannel starts trying to reconnect.
309   LOG(INFO) << "### subchannel 1 reporting CONNECTING";
310   subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
311   picker = ExpectState(GRPC_CHANNEL_READY);
312   ASSERT_NE(picker, nullptr);
313   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
314   // Picks with the override will still be queued.
315   ExpectPickQueued(picker.get(), {address1_attribute});
316   // The connection attempt fails.
317   LOG(INFO) << "### subchannel 1 reporting TRANSIENT_FAILURE";
318   subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
319                                    absl::ResourceExhaustedError("Hmmmm"));
320   LOG(INFO) << "### expecting re-resolution request";
321   ExpectReresolutionRequest();
322   picker = ExpectState(GRPC_CHANNEL_READY);
323   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
324   // The host override is not used.
325   LOG(INFO) << "### checking that host override is not used";
326   ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
327                                      {kAddresses[0], kAddresses[2]});
328 }
329 
TEST_F(XdsOverrideHostTest,DrainingState)330 TEST_F(XdsOverrideHostTest, DrainingState) {
331   const std::array<absl::string_view, 3> kAddresses = {
332       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
333   auto picker = ExpectStartupWithRoundRobin(kAddresses);
334   ASSERT_NE(picker, nullptr);
335   // Do one override pick for endpoint 1, so that it will still be within
336   // the idle threshold and will therefore be retained when it moves to
337   // state DRAINING.
338   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
339   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
340   // Now move endpoint 1 to state DRAINING.
341   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
342                                  {kAddresses[1], XdsHealthStatus::kDraining},
343                                  {kAddresses[2], XdsHealthStatus::kHealthy}},
344                                 {"UNKNOWN", "HEALTHY", "DRAINING"});
345   picker = ExpectState(GRPC_CHANNEL_READY);
346   // Make sure subchannels get orphaned in the WorkSerializer.
347   WaitForWorkSerializerToFlush();
348   // Picks without an override will round-robin over the two endpoints
349   // that are not in draining state.
350   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
351   // Picks with an override are able to select the draining endpoint.
352   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
353   // Send the LB policy an update that removes the draining endpoint.
354   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
355                                  {kAddresses[2], XdsHealthStatus::kHealthy}});
356   picker = ExpectState(GRPC_CHANNEL_READY);
357   ASSERT_NE(picker, nullptr);
358   // Gone!
359   ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
360                                      {kAddresses[0], kAddresses[2]});
361 }
362 
TEST_F(XdsOverrideHostTest,DrainingSubchannelIsConnecting)363 TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) {
364   const std::array<absl::string_view, 3> kAddresses = {
365       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
366   auto picker = ExpectStartupWithRoundRobin(kAddresses);
367   ASSERT_NE(picker, nullptr);
368   // Check that the host is overridden
369   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
370   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
371   // Send an update that marks the endpoints with different EDS health
372   // states, but those states are present in override_host_status.
373   // The picker should use the DRAINING host when a call's override
374   // points to that hose, but the host should not be used if there is no
375   // override pointing to it.
376   LOG(INFO) << "### sending update with DRAINING host";
377   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
378                                  {kAddresses[1], XdsHealthStatus::kDraining},
379                                  {kAddresses[2], XdsHealthStatus::kHealthy}},
380                                 {"UNKNOWN", "HEALTHY", "DRAINING"});
381   auto subchannel = FindSubchannel(kAddresses[1]);
382   ASSERT_NE(subchannel, nullptr);
383   picker = ExpectState(GRPC_CHANNEL_READY);
384   // Make sure subchannels get orphaned in the WorkSerializer.
385   WaitForWorkSerializerToFlush();
386   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
387   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
388   // Now the connection to the draining host gets dropped.
389   // The picker should queue picks where the override host is IDLE.
390   // All picks without an override host should not use this host.
391   LOG(INFO) << "### closing connection to DRAINING host";
392   subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
393   picker = ExpectState(GRPC_CHANNEL_READY);
394   ExpectPickQueued(picker.get(), {address1_attribute});
395   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
396   // The subchannel should have been asked to reconnect as a result of the
397   // queued pick above.  It will therefore transition into state CONNECTING.
398   // The pick behavior is the same as above: The picker should queue
399   // picks where the override host is CONNECTING.  All picks without an
400   // override host should not use this host.
401   LOG(INFO) << "### subchannel starts reconnecting";
402   WaitForWorkSerializerToFlush();
403   EXPECT_TRUE(subchannel->ConnectionRequested());
404   ExpectQueueEmpty();
405   subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
406   picker = ExpectState(GRPC_CHANNEL_READY);
407   ExpectPickQueued(picker.get(), {address1_attribute});
408   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
409   // The subchannel now becomes connected again.
410   // Now picks with this override host can be completed again.
411   // Picks without an override host still don't use the draining host.
412   LOG(INFO) << "### subchannel becomes reconnected";
413   subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
414   picker = ExpectState(GRPC_CHANNEL_READY);
415   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
416   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
417 }
418 
TEST_F(XdsOverrideHostTest,DrainingToHealthy)419 TEST_F(XdsOverrideHostTest, DrainingToHealthy) {
420   const std::array<absl::string_view, 3> kAddresses = {
421       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
422   auto picker = ExpectStartupWithRoundRobin(kAddresses);
423   ASSERT_NE(picker, nullptr);
424   // Do one override pick for endpoint 1, so that it will still be within
425   // the idle threshold and will therefore be retained when it moves to
426   // state DRAINING.
427   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
428   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
429   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
430                                  {kAddresses[1], XdsHealthStatus::kDraining},
431                                  {kAddresses[2], XdsHealthStatus::kHealthy}},
432                                 {"UNKNOWN", "HEALTHY", "DRAINING"});
433   picker = ExpectState(GRPC_CHANNEL_READY);
434   // Make sure subchannels get orphaned in the WorkSerializer.
435   WaitForWorkSerializerToFlush();
436   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
437   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
438   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kHealthy},
439                                  {kAddresses[1], XdsHealthStatus::kHealthy},
440                                  {kAddresses[2], XdsHealthStatus::kHealthy}},
441                                 {"UNKNOWN", "HEALTHY", "DRAINING"});
442   picker = ExpectState(GRPC_CHANNEL_READY);
443   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
444   ExpectRoundRobinPicks(picker.get(), kAddresses);
445 }
446 
TEST_F(XdsOverrideHostTest,OverrideHostStatus)447 TEST_F(XdsOverrideHostTest, OverrideHostStatus) {
448   const std::array<absl::string_view, 3> kAddresses = {
449       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
450   auto* address0_attribute = MakeOverrideHostAttribute(kAddresses[0]);
451   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
452   auto* address2_attribute = MakeOverrideHostAttribute(kAddresses[2]);
453   auto picker = ExpectStartupWithRoundRobin(kAddresses);
454   ASSERT_NE(picker, nullptr);
455   // Do one override pick for endpoint 2, so that it will still be within
456   // the idle threshold and will therefore be retained when it moves to
457   // state DRAINING.
458   ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
459   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
460                                  {kAddresses[1], XdsHealthStatus::kHealthy},
461                                  {kAddresses[2], XdsHealthStatus::kDraining}},
462                                 {"UNKNOWN", "HEALTHY", "DRAINING"});
463   picker = ExpectState(GRPC_CHANNEL_READY);
464   ASSERT_NE(picker, nullptr);
465   // Make sure subchannels get orphaned in the WorkSerializer.
466   WaitForWorkSerializerToFlush();
467   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
468   ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
469   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
470   ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
471   // UNKNOWN excluded: overrides for first endpoint are not honored.
472   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
473                                  {kAddresses[1], XdsHealthStatus::kHealthy},
474                                  {kAddresses[2], XdsHealthStatus::kDraining}},
475                                 {"HEALTHY", "DRAINING"});
476   picker = ExpectState(GRPC_CHANNEL_READY);
477   ASSERT_NE(picker, nullptr);
478   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
479   ExpectRoundRobinPicksWithAttribute(picker.get(), address0_attribute,
480                                      {kAddresses[0], kAddresses[1]});
481   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
482   ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
483   // HEALTHY excluded: overrides for second endpoint are not honored.
484   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
485                                  {kAddresses[1], XdsHealthStatus::kHealthy},
486                                  {kAddresses[2], XdsHealthStatus::kDraining}},
487                                 {"UNKNOWN", "DRAINING"});
488   picker = ExpectState(GRPC_CHANNEL_READY);
489   ASSERT_NE(picker, nullptr);
490   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
491   ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
492   ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
493                                      {kAddresses[0], kAddresses[1]});
494   ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
495   // DRAINING excluded: overrides for third endpoint are not honored.
496   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
497                                  {kAddresses[1], XdsHealthStatus::kHealthy},
498                                  {kAddresses[2], XdsHealthStatus::kDraining}},
499                                 {"UNKNOWN", "HEALTHY"});
500   picker = ExpectState(GRPC_CHANNEL_READY);
501   ASSERT_NE(picker, nullptr);
502   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
503   ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
504   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
505   ExpectRoundRobinPicksWithAttribute(picker.get(), address2_attribute,
506                                      {kAddresses[0], kAddresses[1]});
507 }
508 
TEST_F(XdsOverrideHostTest,MultipleAddressesPerEndpoint)509 TEST_F(XdsOverrideHostTest, MultipleAddressesPerEndpoint) {
510   constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
511       "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
512   constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
513       "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
514   constexpr std::array<absl::string_view, 2> kEndpoint3Addresses = {
515       "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
516   const std::array<EndpointAddresses, 3> kEndpoints = {
517       MakeEndpointAddresses(kEndpoint1Addresses),
518       MakeEndpointAddresses(kEndpoint2Addresses),
519       MakeEndpointAddresses(kEndpoint3Addresses)};
520   EXPECT_EQ(UpdateXdsOverrideHostPolicy(kEndpoints), absl::OkStatus());
521   auto picker = ExpectRoundRobinStartup(kEndpoints);
522   ASSERT_NE(picker, nullptr);
523   // Check that the host is overridden.
524   auto* endpoint1_attribute = MakeOverrideHostAttribute(kEndpoint1Addresses);
525   ExpectOverridePicks(picker.get(), endpoint1_attribute, kEndpoint1Addresses[0],
526                       kEndpoint1Addresses);
527   auto* endpoint2_attribute = MakeOverrideHostAttribute(kEndpoint2Addresses);
528   ExpectOverridePicks(picker.get(), endpoint2_attribute, kEndpoint2Addresses[0],
529                       kEndpoint2Addresses);
530   // Change endpoint 1 to connect to its second address.
531   ExpectEndpointAddressChange(kEndpoint1Addresses, 0, 1, [&]() {
532     WaitForRoundRobinListChange(
533         {kEndpoint1Addresses[0], kEndpoint2Addresses[0],
534          kEndpoint3Addresses[0]},
535         {kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
536   });
537   WaitForRoundRobinListChange(
538       {kEndpoint2Addresses[0], kEndpoint3Addresses[0]},
539       {kEndpoint1Addresses[1], kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
540   // Now the cookie for endpoint 1 should cause us to use the second address.
541   ExpectOverridePicks(picker.get(), endpoint1_attribute, kEndpoint1Addresses[1],
542                       {kEndpoint1Addresses[1], kEndpoint1Addresses[0]});
543 }
544 
TEST_F(XdsOverrideHostTest,ChildPolicyNeverCreatedSubchannel)545 TEST_F(XdsOverrideHostTest, ChildPolicyNeverCreatedSubchannel) {
546   const std::array<absl::string_view, 3> kAddresses = {
547       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
548   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
549                                  {kAddresses[1], XdsHealthStatus::kDraining},
550                                  {kAddresses[2], XdsHealthStatus::kHealthy}},
551                                 {"UNKNOWN", "HEALTHY", "DRAINING"});
552   // The draining endpoint is not passed down to the child policy.
553   // Picks without an override will round-robin over the two endpoints
554   // that are not in draining state.
555   auto picker = ExpectRoundRobinStartup({kAddresses[0], kAddresses[2]});
556   // Subchannels should exist for the non-draining endpoints only.
557   auto* subchannel = FindSubchannel(kAddresses[0]);
558   ASSERT_NE(subchannel, nullptr);
559   EXPECT_GE(subchannel->NumWatchers(), 1);
560   auto* subchannel2 = FindSubchannel(kAddresses[1]);
561   EXPECT_EQ(subchannel2, nullptr);
562   auto* subchannel3 = FindSubchannel(kAddresses[2]);
563   ASSERT_NE(subchannel3, nullptr);
564   EXPECT_GE(subchannel3->NumWatchers(), 1);
565   // A pick with an override pointing to the draining endpoint should
566   // queue the pick and trigger subchannel creation.
567   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
568   ExpectPickQueued(picker.get(), {address1_attribute});
569   WaitForWorkSerializerToFlush();
570   subchannel2 = FindSubchannel(kAddresses[1]);
571   ASSERT_NE(subchannel2, nullptr);
572   EXPECT_EQ(subchannel2->NumWatchers(), 1);
573   // Subchannel creation will trigger returning a new picker.
574   // Picks without an override should continue to use only the
575   // non-draining endpoints.
576   picker = ExpectState(GRPC_CHANNEL_READY);
577   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
578   // Trying the pick again with the new picker will trigger a connection
579   // attempt on the new subchannel.
580   ExpectPickQueued(picker.get(), {address1_attribute});
581   WaitForWorkSerializerToFlush();
582   EXPECT_TRUE(subchannel2->ConnectionRequested());
583   subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
584   // Subchannel state change will trigger returning a new picker.
585   // Picks without an override should continue to use only the
586   // non-draining endpoints.
587   picker = ExpectState(GRPC_CHANNEL_READY);
588   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
589   // Trying the pick with override again should queue, because the
590   // connection attempt is still pending.
591   ExpectPickQueued(picker.get(), {address1_attribute});
592   // Connection attempt succeeds.
593   subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
594   // Subchannel state change will trigger returning a new picker.
595   // Picks without an override should continue to use only the
596   // non-draining endpoints.
597   picker = ExpectState(GRPC_CHANNEL_READY);
598   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
599   // Now the pick with override should complete.
600   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
601 }
602 
TEST_F(XdsOverrideHostTest,ChildPolicyUnrefsSubchannelNotUsedWithinIdleThreshold)603 TEST_F(XdsOverrideHostTest,
604        ChildPolicyUnrefsSubchannelNotUsedWithinIdleThreshold) {
605   const std::array<absl::string_view, 3> kAddresses = {
606       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
607   auto picker = ExpectStartupWithRoundRobin(kAddresses);
608   ASSERT_NE(picker, nullptr);
609   // Now move endpoint 1 to state DRAINING.
610   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
611                                  {kAddresses[1], XdsHealthStatus::kDraining},
612                                  {kAddresses[2], XdsHealthStatus::kHealthy}},
613                                 {"UNKNOWN", "HEALTHY", "DRAINING"});
614   picker = ExpectState(GRPC_CHANNEL_READY);
615   // Make sure subchannels get orphaned in the WorkSerializer.
616   WaitForWorkSerializerToFlush();
617   // Picks without an override will round-robin over the two endpoints
618   // that are not in draining state.
619   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
620   // Child policy should drop its ref to the draining endpoint, and
621   // xds_override_host should not take ownership, since the entry never
622   // had an override pick.
623   auto* subchannel0 = FindSubchannel(kAddresses[0]);
624   ASSERT_NE(subchannel0, nullptr);
625   EXPECT_GE(subchannel0->NumWatchers(), 1);
626   auto* subchannel1 = FindSubchannel(kAddresses[1]);
627   ASSERT_NE(subchannel1, nullptr);
628   EXPECT_EQ(subchannel1->NumWatchers(), 0);
629   auto* subchannel2 = FindSubchannel(kAddresses[2]);
630   ASSERT_NE(subchannel2, nullptr);
631   EXPECT_GE(subchannel2->NumWatchers(), 1);
632 }
633 
TEST_F(XdsOverrideHostTest,IdleTimer)634 TEST_F(XdsOverrideHostTest, IdleTimer) {
635   std::vector<grpc_event_engine::experimental::EventEngine::Duration>
636       timer_durations;
637   fuzzing_ee_->SetRunAfterDurationCallback(
638       [&timer_durations](
639           grpc_event_engine::experimental::EventEngine::Duration duration) {
640         timer_durations.push_back(duration);
641       });
642   const std::array<absl::string_view, 3> kAddresses = {
643       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
644   LOG(INFO) << "### sending initial update";
645   EXPECT_EQ(UpdateXdsOverrideHostPolicy(kAddresses, {"UNKNOWN", "HEALTHY"},
646                                         Duration::Minutes(1)),
647             absl::OkStatus());
648   // Initial update should have caused the timer to be set for the idle
649   // timeout.
650   EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Minutes(1)));
651   timer_durations.clear();
652   auto picker = ExpectRoundRobinStartup(kAddresses);
653   ASSERT_NE(picker, nullptr);
654   // Do an override pick for endpoints 1 and 2, so that they will still be
655   // within the idle threshold and will therefore be retained when they move
656   // to state DRAINING.
657   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
658   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
659   auto* address2_attribute = MakeOverrideHostAttribute(kAddresses[2]);
660   ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
661   // Increment time by 5 seconds and send an update that moves endpoints 1
662   // and 2 to state DRAINING.
663   LOG(INFO) << "### moving endpoints 1 and 2 to state DRAINING";
664   IncrementTimeBy(Duration::Seconds(5));
665   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
666                                  {kAddresses[1], XdsHealthStatus::kDraining},
667                                  {kAddresses[2], XdsHealthStatus::kDraining}},
668                                 {"UNKNOWN", "HEALTHY", "DRAINING"},
669                                 Duration::Minutes(1));
670   // The update should cause the timer to be reset for the next
671   // expiration time.
672   EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Seconds(55)));
673   timer_durations.clear();
674   picker = ExpectState(GRPC_CHANNEL_READY);
675   // Make sure subchannels get orphaned in the WorkSerializer.
676   WaitForWorkSerializerToFlush();
677   // Picks without an override will use only the endpoint that is not in
678   // draining state.
679   ExpectRoundRobinPicks(picker.get(), {kAddresses[0]});
680   // Picks with an override are able to select the draining endpoints.
681   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
682   ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
683   // Both subchannels are owned by the xds_override_host policy.
684   auto* subchannel1 = FindSubchannel(kAddresses[1]);
685   ASSERT_NE(subchannel1, nullptr);
686   EXPECT_EQ(subchannel1->NumWatchers(), 1);
687   auto* subchannel2 = FindSubchannel(kAddresses[2]);
688   ASSERT_NE(subchannel2, nullptr);
689   EXPECT_EQ(subchannel2->NumWatchers(), 1);
690   // Trigger the timer.  Both subchannels have gotten an override pick more
691   // recently than the timer was scheduled, so neither one will be unreffed.
692   IncrementTimeBy(Duration::Seconds(55));
693   EXPECT_EQ(subchannel1->NumWatchers(), 1);
694   EXPECT_EQ(subchannel2->NumWatchers(), 1);
695   // The timer will be reset for 5 seconds.
696   EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Seconds(5)));
697   timer_durations.clear();
698   // Send another override pick for endpoint 1.
699   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
700   // Trigger the timer again.  This time, it should unref endpoint 2 but
701   // keep endpoint 1.
702   IncrementTimeBy(Duration::Seconds(5));
703   EXPECT_EQ(subchannel1->NumWatchers(), 1);
704   EXPECT_EQ(subchannel2->NumWatchers(), 0);
705   // The timer should now be set for 55 seconds, which is how long it
706   // will be until endpoint 1 should be unreffed.
707   EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Seconds(55)));
708 }
709 
710 }  // namespace
711 }  // namespace testing
712 }  // namespace grpc_core
713 
main(int argc,char ** argv)714 int main(int argc, char** argv) {
715   ::testing::InitGoogleTest(&argc, argv);
716   grpc::testing::TestEnvironment env(&argc, argv);
717   return RUN_ALL_TESTS();
718 }
719