• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <stddef.h>
18 
19 #include <algorithm>
20 #include <array>
21 #include <memory>
22 #include <string>
23 #include <utility>
24 #include <vector>
25 
26 #include "absl/status/status.h"
27 #include "absl/status/statusor.h"
28 #include "absl/strings/str_join.h"
29 #include "absl/strings/string_view.h"
30 #include "absl/strings/strip.h"
31 #include "absl/types/optional.h"
32 #include "absl/types/span.h"
33 #include "gmock/gmock.h"
34 #include "gtest/gtest.h"
35 
36 #include <grpc/grpc.h>
37 #include <grpc/support/json.h>
38 #include <grpc/support/log.h>
39 
40 #include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
41 #include "src/core/ext/xds/xds_health_status.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/gprpp/debug_location.h"
44 #include "src/core/lib/gprpp/ref_counted_ptr.h"
45 #include "src/core/lib/json/json.h"
46 #include "src/core/load_balancing/lb_policy.h"
47 #include "src/core/resolver/endpoint_addresses.h"
48 #include "src/core/resolver/xds/xds_dependency_manager.h"
49 #include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
50 #include "test/core/util/test_config.h"
51 
52 namespace grpc_core {
53 namespace testing {
54 namespace {
55 class XdsOverrideHostTest : public LoadBalancingPolicyTest {
56  protected:
XdsOverrideHostTest()57   XdsOverrideHostTest()
58       : LoadBalancingPolicyTest("xds_override_host_experimental") {}
59 
MakeXdsConfig(absl::Span<const absl::string_view> override_host_statuses={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt,std::string cluster_name="cluster_name")60   static RefCountedPtr<const XdsDependencyManager::XdsConfig> MakeXdsConfig(
61       absl::Span<const absl::string_view> override_host_statuses = {"UNKNOWN",
62                                                                     "HEALTHY"},
63       absl::optional<Duration> connection_idle_timeout = absl::nullopt,
64       std::string cluster_name = "cluster_name") {
65     auto cluster_resource = std::make_shared<XdsClusterResource>();
66     for (const absl::string_view host_status : override_host_statuses) {
67       cluster_resource->override_host_statuses.Add(
68           XdsHealthStatus::FromString(host_status).value());
69     }
70     if (connection_idle_timeout.has_value()) {
71       cluster_resource->connection_idle_timeout = *connection_idle_timeout;
72     }
73     auto xds_config = MakeRefCounted<XdsDependencyManager::XdsConfig>();
74     xds_config->clusters[std::move(cluster_name)].emplace(
75         std::move(cluster_resource), nullptr, "");
76     return xds_config;
77   }
78 
UpdateXdsOverrideHostPolicy(absl::Span<const EndpointAddresses> endpoints,absl::Span<const absl::string_view> override_host_statuses={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt,std::string cluster_name="cluster_name",std::string child_policy="round_robin")79   absl::Status UpdateXdsOverrideHostPolicy(
80       absl::Span<const EndpointAddresses> endpoints,
81       absl::Span<const absl::string_view> override_host_statuses = {"UNKNOWN",
82                                                                     "HEALTHY"},
83       absl::optional<Duration> connection_idle_timeout = absl::nullopt,
84       std::string cluster_name = "cluster_name",
85       std::string child_policy = "round_robin") {
86     auto config = MakeConfig(Json::FromArray({Json::FromObject(
87         {{"xds_override_host_experimental",
88           Json::FromObject(
89               {{"clusterName", Json::FromString(cluster_name)},
90                {"childPolicy",
91                 Json::FromArray({Json::FromObject(
92                     {{child_policy, Json::FromObject({})}})})}})}})}));
93     auto xds_config = MakeXdsConfig(override_host_statuses,
94                                     connection_idle_timeout, cluster_name);
95     return ApplyUpdate(
96         BuildUpdate(endpoints, std::move(config),
97                     ChannelArgs().SetObject(std::move(xds_config))),
98         lb_policy());
99   }
100 
UpdateXdsOverrideHostPolicy(absl::Span<const absl::string_view> addresses,absl::Span<const absl::string_view> override_host_statuses={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt,std::string cluster_name="cluster_name",std::string child_policy="round_robin")101   absl::Status UpdateXdsOverrideHostPolicy(
102       absl::Span<const absl::string_view> addresses,
103       absl::Span<const absl::string_view> override_host_statuses = {"UNKNOWN",
104                                                                     "HEALTHY"},
105       absl::optional<Duration> connection_idle_timeout = absl::nullopt,
106       std::string cluster_name = "cluster_name",
107       std::string child_policy = "round_robin") {
108     return UpdateXdsOverrideHostPolicy(
109         MakeEndpointAddressesListFromAddressList(addresses),
110         override_host_statuses, connection_idle_timeout,
111         std::move(cluster_name), std::move(child_policy));
112   }
113 
114   RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses,SourceLocation location=SourceLocation ())115   ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses,
116                               SourceLocation location = SourceLocation()) {
117     EXPECT_EQ(UpdateXdsOverrideHostPolicy(addresses), absl::OkStatus())
118         << location.file() << ":" << location.line();
119     return ExpectRoundRobinStartup(addresses, location);
120   }
121 
MakeAddressWithHealthStatus(absl::string_view address,XdsHealthStatus::HealthStatus status)122   EndpointAddresses MakeAddressWithHealthStatus(
123       absl::string_view address, XdsHealthStatus::HealthStatus status) {
124     return EndpointAddresses(
125         MakeAddress(address),
126         ChannelArgs().Set(GRPC_ARG_XDS_HEALTH_STATUS, status));
127   }
128 
ApplyUpdateWithHealthStatuses(absl::Span<const std::pair<const absl::string_view,XdsHealthStatus::HealthStatus>> addresses_and_statuses,absl::Span<const absl::string_view> override_host_status={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt)129   void ApplyUpdateWithHealthStatuses(
130       absl::Span<const std::pair<const absl::string_view,
131                                  XdsHealthStatus::HealthStatus>>
132           addresses_and_statuses,
133       absl::Span<const absl::string_view> override_host_status = {"UNKNOWN",
134                                                                   "HEALTHY"},
135       absl::optional<Duration> connection_idle_timeout = absl::nullopt) {
136     EndpointAddressesList endpoints;
137     for (auto address_and_status : addresses_and_statuses) {
138       endpoints.push_back(MakeAddressWithHealthStatus(
139           address_and_status.first, address_and_status.second));
140     }
141     EXPECT_EQ(UpdateXdsOverrideHostPolicy(endpoints, override_host_status,
142                                           connection_idle_timeout),
143               absl::OkStatus());
144   }
145 
146   struct OverrideHostAttributeStorage {
147     // Need to store the string externally, since
148     // XdsOverrideHostAttribute only holds a string_view.
149     std::string address_list;
150     XdsOverrideHostAttribute attribute;
151 
OverrideHostAttributeStoragegrpc_core::testing::__anon830dd4100111::XdsOverrideHostTest::OverrideHostAttributeStorage152     explicit OverrideHostAttributeStorage(std::string addresses)
153         : address_list(std::move(addresses)), attribute(address_list) {}
154   };
155 
MakeOverrideHostAttribute(absl::Span<const absl::string_view> addresses)156   XdsOverrideHostAttribute* MakeOverrideHostAttribute(
157       absl::Span<const absl::string_view> addresses) {
158     std::vector<absl::string_view> address_list;
159     address_list.reserve(addresses.size());
160     for (absl::string_view address : addresses) {
161       address_list.emplace_back(absl::StripPrefix(address, "ipv4:"));
162     }
163     attribute_storage_.emplace_back(
164         std::make_unique<OverrideHostAttributeStorage>(
165             absl::StrJoin(address_list, ",")));
166     return &attribute_storage_.back()->attribute;
167   }
168 
MakeOverrideHostAttribute(absl::string_view address)169   XdsOverrideHostAttribute* MakeOverrideHostAttribute(
170       absl::string_view address) {
171     const std::array<absl::string_view, 1> addresses = {address};
172     return MakeOverrideHostAttribute(addresses);
173   }
174 
ExpectOverridePicks(LoadBalancingPolicy::SubchannelPicker * picker,XdsOverrideHostAttribute * attribute,absl::string_view expected,absl::Span<const absl::string_view> expected_address_list={},SourceLocation location=SourceLocation ())175   void ExpectOverridePicks(
176       LoadBalancingPolicy::SubchannelPicker* picker,
177       XdsOverrideHostAttribute* attribute, absl::string_view expected,
178       absl::Span<const absl::string_view> expected_address_list = {},
179       SourceLocation location = SourceLocation()) {
180     std::array<absl::string_view, 1> kArray = {expected};
181     if (expected_address_list.empty()) expected_address_list = kArray;
182     std::vector<absl::string_view> expected_addresses;
183     expected_addresses.reserve(expected_address_list.size());
184     for (absl::string_view address : expected_address_list) {
185       expected_addresses.push_back(absl::StripPrefix(address, "ipv4:"));
186     }
187     std::string expected_addresses_str = absl::StrJoin(expected_addresses, ",");
188     for (size_t i = 0; i < 3; ++i) {
189       EXPECT_EQ(ExpectPickComplete(picker, {attribute},
190                                    /*subchannel_call_tracker=*/nullptr,
191                                    /*picked_subchannel=*/nullptr, location),
192                 expected)
193           << location.file() << ":" << location.line();
194       EXPECT_EQ(attribute->actual_address_list(), expected_addresses_str)
195           << "  Actual: " << attribute->actual_address_list() << "\n"
196           << "Expected: " << expected_addresses_str << "\n"
197           << location.file() << ":" << location.line();
198     }
199   }
200 
ExpectRoundRobinPicksWithAttribute(LoadBalancingPolicy::SubchannelPicker * picker,XdsOverrideHostAttribute * attribute,absl::Span<const absl::string_view> expected,SourceLocation location=SourceLocation ())201   void ExpectRoundRobinPicksWithAttribute(
202       LoadBalancingPolicy::SubchannelPicker* picker,
203       XdsOverrideHostAttribute* attribute,
204       absl::Span<const absl::string_view> expected,
205       SourceLocation location = SourceLocation()) {
206     std::vector<std::string> actual_picks;
207     for (size_t i = 0; i < expected.size(); ++i) {
208       auto address = ExpectPickComplete(
209           picker, {attribute}, /*subchannel_call_tracker=*/nullptr,
210           /*picked_subchannel=*/nullptr, location);
211       ASSERT_TRUE(address.has_value())
212           << location.file() << ":" << location.line();
213       EXPECT_THAT(*address, ::testing::AnyOfArray(expected))
214           << location.file() << ":" << location.line();
215       EXPECT_EQ(attribute->actual_address_list(),
216                 absl::StripPrefix(*address, "ipv4:"))
217           << "  Actual: " << attribute->actual_address_list() << "\n"
218           << "Expected: " << absl::StripPrefix(*address, "ipv4:") << "\n"
219           << location.file() << ":" << location.line();
220       actual_picks.push_back(std::move(*address));
221     }
222     EXPECT_TRUE(PicksAreRoundRobin(expected, actual_picks))
223         << location.file() << ":" << location.line();
224   }
225 
226   std::vector<std::unique_ptr<OverrideHostAttributeStorage>> attribute_storage_;
227 };
228 
TEST_F(XdsOverrideHostTest,DelegatesToChild)229 TEST_F(XdsOverrideHostTest, DelegatesToChild) {
230   ExpectStartupWithRoundRobin(
231       {"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"});
232 }
233 
TEST_F(XdsOverrideHostTest,NoConfigReportsError)234 TEST_F(XdsOverrideHostTest, NoConfigReportsError) {
235   EXPECT_EQ(
236       ApplyUpdate(
237           BuildUpdate({"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"}, nullptr),
238           lb_policy()),
239       absl::InvalidArgumentError("Missing policy config"));
240 }
241 
TEST_F(XdsOverrideHostTest,OverrideHost)242 TEST_F(XdsOverrideHostTest, OverrideHost) {
243   const std::array<absl::string_view, 3> kAddresses = {
244       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
245   auto picker = ExpectStartupWithRoundRobin(kAddresses);
246   ASSERT_NE(picker, nullptr);
247   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
248   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
249   auto* address0_attribute = MakeOverrideHostAttribute(kAddresses[0]);
250   ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
251 }
252 
TEST_F(XdsOverrideHostTest,SubchannelNotFound)253 TEST_F(XdsOverrideHostTest, SubchannelNotFound) {
254   const std::array<absl::string_view, 3> kAddresses = {
255       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
256   auto picker = ExpectStartupWithRoundRobin(kAddresses);
257   ASSERT_NE(picker, nullptr);
258   auto* attribute = MakeOverrideHostAttribute("no such host");
259   ExpectRoundRobinPicksWithAttribute(picker.get(), attribute, kAddresses);
260 }
261 
TEST_F(XdsOverrideHostTest,SubchannelsComeAndGo)262 TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) {
263   const std::array<absl::string_view, 3> kAddresses = {
264       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
265   auto picker = ExpectStartupWithRoundRobin(kAddresses);
266   ASSERT_NE(picker, nullptr);
267   // Check that the host override works.
268   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
269   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
270   // The override address is removed.
271   EXPECT_EQ(UpdateXdsOverrideHostPolicy({kAddresses[0], kAddresses[2]}),
272             absl::OkStatus());
273   picker =
274       WaitForRoundRobinListChange(kAddresses, {kAddresses[0], kAddresses[2]});
275   // Picks are returned in round-robin order, because the address
276   // pointed to by the cookie is not present.
277   ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
278                                      {kAddresses[0], kAddresses[2]});
279   // The override address comes back.
280   EXPECT_EQ(UpdateXdsOverrideHostPolicy({kAddresses[1], kAddresses[2]}),
281             absl::OkStatus());
282   picker = WaitForRoundRobinListChange({kAddresses[0], kAddresses[2]},
283                                        {kAddresses[1], kAddresses[2]});
284   // Make sure host override works.
285   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
286 }
287 
TEST_F(XdsOverrideHostTest,OverrideIsQueuedInIdleOrConnectingAndFailedInTransientFailure)288 TEST_F(XdsOverrideHostTest,
289        OverrideIsQueuedInIdleOrConnectingAndFailedInTransientFailure) {
290   const std::array<absl::string_view, 3> kAddresses = {
291       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
292   auto picker = ExpectStartupWithRoundRobin(kAddresses);
293   ASSERT_NE(picker, nullptr);
294   // Check that the host is overridden
295   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
296   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
297   // Subchannel for address 1 becomes disconnected.
298   gpr_log(GPR_INFO, "### subchannel 1 reporting IDLE");
299   auto subchannel = FindSubchannel(kAddresses[1]);
300   ASSERT_NE(subchannel, nullptr);
301   subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
302   EXPECT_TRUE(subchannel->ConnectionRequested());
303   gpr_log(GPR_INFO, "### expecting re-resolution request");
304   ExpectReresolutionRequest();
305   gpr_log(GPR_INFO,
306           "### expecting RR picks to exclude the disconnected subchannel");
307   picker =
308       WaitForRoundRobinListChange(kAddresses, {kAddresses[0], kAddresses[2]});
309   // Picks with the override will be queued.
310   ExpectPickQueued(picker.get(), {address1_attribute});
311   // The subchannel starts trying to reconnect.
312   gpr_log(GPR_INFO, "### subchannel 1 reporting CONNECTING");
313   subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
314   picker = ExpectState(GRPC_CHANNEL_READY);
315   ASSERT_NE(picker, nullptr);
316   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
317   // Picks with the override will still be queued.
318   ExpectPickQueued(picker.get(), {address1_attribute});
319   // The connection attempt fails.
320   gpr_log(GPR_INFO, "### subchannel 1 reporting TRANSIENT_FAILURE");
321   subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
322                                    absl::ResourceExhaustedError("Hmmmm"));
323   gpr_log(GPR_INFO, "### expecting re-resolution request");
324   ExpectReresolutionRequest();
325   picker = ExpectState(GRPC_CHANNEL_READY);
326   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
327   // The host override is not used.
328   gpr_log(GPR_INFO, "### checking that host override is not used");
329   ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
330                                      {kAddresses[0], kAddresses[2]});
331 }
332 
TEST_F(XdsOverrideHostTest,DrainingState)333 TEST_F(XdsOverrideHostTest, DrainingState) {
334   const std::array<absl::string_view, 3> kAddresses = {
335       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
336   auto picker = ExpectStartupWithRoundRobin(kAddresses);
337   ASSERT_NE(picker, nullptr);
338   // Do one override pick for endpoint 1, so that it will still be within
339   // the idle threshold and will therefore be retained when it moves to
340   // state DRAINING.
341   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
342   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
343   // Now move endpoint 1 to state DRAINING.
344   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
345                                  {kAddresses[1], XdsHealthStatus::kDraining},
346                                  {kAddresses[2], XdsHealthStatus::kHealthy}},
347                                 {"UNKNOWN", "HEALTHY", "DRAINING"});
348   picker = ExpectState(GRPC_CHANNEL_READY);
349   // Make sure subchannels get orphaned in the WorkSerializer.
350   WaitForWorkSerializerToFlush();
351   // Picks without an override will round-robin over the two endpoints
352   // that are not in draining state.
353   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
354   // Picks with an override are able to select the draining endpoint.
355   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
356   // Send the LB policy an update that removes the draining endpoint.
357   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
358                                  {kAddresses[2], XdsHealthStatus::kHealthy}});
359   picker = ExpectState(GRPC_CHANNEL_READY);
360   ASSERT_NE(picker, nullptr);
361   // Gone!
362   ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
363                                      {kAddresses[0], kAddresses[2]});
364 }
365 
TEST_F(XdsOverrideHostTest,DrainingSubchannelIsConnecting)366 TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) {
367   const std::array<absl::string_view, 3> kAddresses = {
368       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
369   auto picker = ExpectStartupWithRoundRobin(kAddresses);
370   ASSERT_NE(picker, nullptr);
371   // Check that the host is overridden
372   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
373   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
374   // Send an update that marks the endpoints with different EDS health
375   // states, but those states are present in override_host_status.
376   // The picker should use the DRAINING host when a call's override
377   // points to that hose, but the host should not be used if there is no
378   // override pointing to it.
379   gpr_log(GPR_INFO, "### sending update with DRAINING host");
380   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
381                                  {kAddresses[1], XdsHealthStatus::kDraining},
382                                  {kAddresses[2], XdsHealthStatus::kHealthy}},
383                                 {"UNKNOWN", "HEALTHY", "DRAINING"});
384   auto subchannel = FindSubchannel(kAddresses[1]);
385   ASSERT_NE(subchannel, nullptr);
386   picker = ExpectState(GRPC_CHANNEL_READY);
387   // Make sure subchannels get orphaned in the WorkSerializer.
388   WaitForWorkSerializerToFlush();
389   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
390   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
391   // Now the connection to the draining host gets dropped.
392   // The picker should queue picks where the override host is IDLE.
393   // All picks without an override host should not use this host.
394   gpr_log(GPR_INFO, "### closing connection to DRAINING host");
395   subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
396   picker = ExpectState(GRPC_CHANNEL_READY);
397   ExpectPickQueued(picker.get(), {address1_attribute});
398   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
399   // The subchannel should have been asked to reconnect as a result of the
400   // queued pick above.  It will therefore transition into state CONNECTING.
401   // The pick behavior is the same as above: The picker should queue
402   // picks where the override host is CONNECTING.  All picks without an
403   // override host should not use this host.
404   gpr_log(GPR_INFO, "### subchannel starts reconnecting");
405   WaitForWorkSerializerToFlush();
406   EXPECT_TRUE(subchannel->ConnectionRequested());
407   ExpectQueueEmpty();
408   subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
409   picker = ExpectState(GRPC_CHANNEL_READY);
410   ExpectPickQueued(picker.get(), {address1_attribute});
411   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
412   // The subchannel now becomes connected again.
413   // Now picks with this override host can be completed again.
414   // Picks without an override host still don't use the draining host.
415   gpr_log(GPR_INFO, "### subchannel becomes reconnected");
416   subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
417   picker = ExpectState(GRPC_CHANNEL_READY);
418   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
419   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
420 }
421 
TEST_F(XdsOverrideHostTest,DrainingToHealthy)422 TEST_F(XdsOverrideHostTest, DrainingToHealthy) {
423   const std::array<absl::string_view, 3> kAddresses = {
424       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
425   auto picker = ExpectStartupWithRoundRobin(kAddresses);
426   ASSERT_NE(picker, nullptr);
427   // Do one override pick for endpoint 1, so that it will still be within
428   // the idle threshold and will therefore be retained when it moves to
429   // state DRAINING.
430   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
431   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
432   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
433                                  {kAddresses[1], XdsHealthStatus::kDraining},
434                                  {kAddresses[2], XdsHealthStatus::kHealthy}},
435                                 {"UNKNOWN", "HEALTHY", "DRAINING"});
436   picker = ExpectState(GRPC_CHANNEL_READY);
437   // Make sure subchannels get orphaned in the WorkSerializer.
438   WaitForWorkSerializerToFlush();
439   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
440   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
441   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kHealthy},
442                                  {kAddresses[1], XdsHealthStatus::kHealthy},
443                                  {kAddresses[2], XdsHealthStatus::kHealthy}},
444                                 {"UNKNOWN", "HEALTHY", "DRAINING"});
445   picker = ExpectState(GRPC_CHANNEL_READY);
446   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
447   ExpectRoundRobinPicks(picker.get(), kAddresses);
448 }
449 
TEST_F(XdsOverrideHostTest,OverrideHostStatus)450 TEST_F(XdsOverrideHostTest, OverrideHostStatus) {
451   const std::array<absl::string_view, 3> kAddresses = {
452       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
453   auto* address0_attribute = MakeOverrideHostAttribute(kAddresses[0]);
454   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
455   auto* address2_attribute = MakeOverrideHostAttribute(kAddresses[2]);
456   auto picker = ExpectStartupWithRoundRobin(kAddresses);
457   ASSERT_NE(picker, nullptr);
458   // Do one override pick for endpoint 2, so that it will still be within
459   // the idle threshold and will therefore be retained when it moves to
460   // state DRAINING.
461   ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
462   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
463                                  {kAddresses[1], XdsHealthStatus::kHealthy},
464                                  {kAddresses[2], XdsHealthStatus::kDraining}},
465                                 {"UNKNOWN", "HEALTHY", "DRAINING"});
466   picker = ExpectState(GRPC_CHANNEL_READY);
467   ASSERT_NE(picker, nullptr);
468   // Make sure subchannels get orphaned in the WorkSerializer.
469   WaitForWorkSerializerToFlush();
470   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
471   ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
472   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
473   ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
474   // UNKNOWN excluded: overrides for first endpoint are not honored.
475   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
476                                  {kAddresses[1], XdsHealthStatus::kHealthy},
477                                  {kAddresses[2], XdsHealthStatus::kDraining}},
478                                 {"HEALTHY", "DRAINING"});
479   picker = ExpectState(GRPC_CHANNEL_READY);
480   ASSERT_NE(picker, nullptr);
481   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
482   ExpectRoundRobinPicksWithAttribute(picker.get(), address0_attribute,
483                                      {kAddresses[0], kAddresses[1]});
484   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
485   ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
486   // HEALTHY excluded: overrides for second endpoint are not honored.
487   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
488                                  {kAddresses[1], XdsHealthStatus::kHealthy},
489                                  {kAddresses[2], XdsHealthStatus::kDraining}},
490                                 {"UNKNOWN", "DRAINING"});
491   picker = ExpectState(GRPC_CHANNEL_READY);
492   ASSERT_NE(picker, nullptr);
493   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
494   ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
495   ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
496                                      {kAddresses[0], kAddresses[1]});
497   ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
498   // DRAINING excluded: overrides for third endpoint are not honored.
499   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
500                                  {kAddresses[1], XdsHealthStatus::kHealthy},
501                                  {kAddresses[2], XdsHealthStatus::kDraining}},
502                                 {"UNKNOWN", "HEALTHY"});
503   picker = ExpectState(GRPC_CHANNEL_READY);
504   ASSERT_NE(picker, nullptr);
505   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
506   ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
507   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
508   ExpectRoundRobinPicksWithAttribute(picker.get(), address2_attribute,
509                                      {kAddresses[0], kAddresses[1]});
510 }
511 
TEST_F(XdsOverrideHostTest,MultipleAddressesPerEndpoint)512 TEST_F(XdsOverrideHostTest, MultipleAddressesPerEndpoint) {
513   constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
514       "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
515   constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
516       "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
517   constexpr std::array<absl::string_view, 2> kEndpoint3Addresses = {
518       "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
519   const std::array<EndpointAddresses, 3> kEndpoints = {
520       MakeEndpointAddresses(kEndpoint1Addresses),
521       MakeEndpointAddresses(kEndpoint2Addresses),
522       MakeEndpointAddresses(kEndpoint3Addresses)};
523   EXPECT_EQ(UpdateXdsOverrideHostPolicy(kEndpoints), absl::OkStatus());
524   auto picker = ExpectRoundRobinStartup(kEndpoints);
525   ASSERT_NE(picker, nullptr);
526   // Check that the host is overridden.
527   auto* endpoint1_attribute = MakeOverrideHostAttribute(kEndpoint1Addresses);
528   ExpectOverridePicks(picker.get(), endpoint1_attribute, kEndpoint1Addresses[0],
529                       kEndpoint1Addresses);
530   auto* endpoint2_attribute = MakeOverrideHostAttribute(kEndpoint2Addresses);
531   ExpectOverridePicks(picker.get(), endpoint2_attribute, kEndpoint2Addresses[0],
532                       kEndpoint2Addresses);
533   // Change endpoint 1 to connect to its second address.
534   ExpectEndpointAddressChange(kEndpoint1Addresses, 0, 1, [&]() {
535     WaitForRoundRobinListChange(
536         {kEndpoint1Addresses[0], kEndpoint2Addresses[0],
537          kEndpoint3Addresses[0]},
538         {kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
539   });
540   WaitForRoundRobinListChange(
541       {kEndpoint2Addresses[0], kEndpoint3Addresses[0]},
542       {kEndpoint1Addresses[1], kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
543   // Now the cookie for endpoint 1 should cause us to use the second address.
544   ExpectOverridePicks(picker.get(), endpoint1_attribute, kEndpoint1Addresses[1],
545                       {kEndpoint1Addresses[1], kEndpoint1Addresses[0]});
546 }
547 
TEST_F(XdsOverrideHostTest,ChildPolicyNeverCreatedSubchannel)548 TEST_F(XdsOverrideHostTest, ChildPolicyNeverCreatedSubchannel) {
549   const std::array<absl::string_view, 3> kAddresses = {
550       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
551   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
552                                  {kAddresses[1], XdsHealthStatus::kDraining},
553                                  {kAddresses[2], XdsHealthStatus::kHealthy}},
554                                 {"UNKNOWN", "HEALTHY", "DRAINING"});
555   // The draining endpoint is not passed down to the child policy.
556   // Picks without an override will round-robin over the two endpoints
557   // that are not in draining state.
558   auto picker = ExpectRoundRobinStartup({kAddresses[0], kAddresses[2]});
559   // Subchannels should exist for the non-draining endpoints only.
560   auto* subchannel = FindSubchannel(kAddresses[0]);
561   ASSERT_NE(subchannel, nullptr);
562   EXPECT_GE(subchannel->NumWatchers(), 1);
563   auto* subchannel2 = FindSubchannel(kAddresses[1]);
564   EXPECT_EQ(subchannel2, nullptr);
565   auto* subchannel3 = FindSubchannel(kAddresses[2]);
566   ASSERT_NE(subchannel3, nullptr);
567   EXPECT_GE(subchannel3->NumWatchers(), 1);
568   // A pick with an override pointing to the draining endpoint should
569   // queue the pick and trigger subchannel creation.
570   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
571   ExpectPickQueued(picker.get(), {address1_attribute});
572   WaitForWorkSerializerToFlush();
573   subchannel2 = FindSubchannel(kAddresses[1]);
574   ASSERT_NE(subchannel2, nullptr);
575   EXPECT_EQ(subchannel2->NumWatchers(), 1);
576   // Subchannel creation will trigger returning a new picker.
577   // Picks without an override should continue to use only the
578   // non-draining endpoints.
579   picker = ExpectState(GRPC_CHANNEL_READY);
580   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
581   // Trying the pick again with the new picker will trigger a connection
582   // attempt on the new subchannel.
583   ExpectPickQueued(picker.get(), {address1_attribute});
584   WaitForWorkSerializerToFlush();
585   EXPECT_TRUE(subchannel2->ConnectionRequested());
586   subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
587   // Subchannel state change will trigger returning a new picker.
588   // Picks without an override should continue to use only the
589   // non-draining endpoints.
590   picker = ExpectState(GRPC_CHANNEL_READY);
591   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
592   // Trying the pick with override again should queue, because the
593   // connection attempt is still pending.
594   ExpectPickQueued(picker.get(), {address1_attribute});
595   // Connection attempt succeeds.
596   subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
597   // Subchannel state change will trigger returning a new picker.
598   // Picks without an override should continue to use only the
599   // non-draining endpoints.
600   picker = ExpectState(GRPC_CHANNEL_READY);
601   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
602   // Now the pick with override should complete.
603   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
604 }
605 
TEST_F(XdsOverrideHostTest,ChildPolicyUnrefsSubchannelNotUsedWithinIdleThreshold)606 TEST_F(XdsOverrideHostTest,
607        ChildPolicyUnrefsSubchannelNotUsedWithinIdleThreshold) {
608   const std::array<absl::string_view, 3> kAddresses = {
609       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
610   auto picker = ExpectStartupWithRoundRobin(kAddresses);
611   ASSERT_NE(picker, nullptr);
612   // Now move endpoint 1 to state DRAINING.
613   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
614                                  {kAddresses[1], XdsHealthStatus::kDraining},
615                                  {kAddresses[2], XdsHealthStatus::kHealthy}},
616                                 {"UNKNOWN", "HEALTHY", "DRAINING"});
617   picker = ExpectState(GRPC_CHANNEL_READY);
618   // Make sure subchannels get orphaned in the WorkSerializer.
619   WaitForWorkSerializerToFlush();
620   // Picks without an override will round-robin over the two endpoints
621   // that are not in draining state.
622   ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
623   // Child policy should drop its ref to the draining endpoint, and
624   // xds_override_host should not take ownership, since the entry never
625   // had an override pick.
626   auto* subchannel0 = FindSubchannel(kAddresses[0]);
627   ASSERT_NE(subchannel0, nullptr);
628   EXPECT_GE(subchannel0->NumWatchers(), 1);
629   auto* subchannel1 = FindSubchannel(kAddresses[1]);
630   ASSERT_NE(subchannel1, nullptr);
631   EXPECT_EQ(subchannel1->NumWatchers(), 0);
632   auto* subchannel2 = FindSubchannel(kAddresses[2]);
633   ASSERT_NE(subchannel2, nullptr);
634   EXPECT_GE(subchannel2->NumWatchers(), 1);
635 }
636 
TEST_F(XdsOverrideHostTest,IdleTimer)637 TEST_F(XdsOverrideHostTest, IdleTimer) {
638   std::vector<grpc_event_engine::experimental::EventEngine::Duration>
639       timer_durations;
640   fuzzing_ee_->SetRunAfterDurationCallback(
641       [&timer_durations](
642           grpc_event_engine::experimental::EventEngine::Duration duration) {
643         timer_durations.push_back(duration);
644       });
645   const std::array<absl::string_view, 3> kAddresses = {
646       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
647   gpr_log(GPR_INFO, "### sending initial update");
648   EXPECT_EQ(UpdateXdsOverrideHostPolicy(kAddresses, {"UNKNOWN", "HEALTHY"},
649                                         Duration::Minutes(1)),
650             absl::OkStatus());
651   // Initial update should have caused the timer to be set for the idle
652   // timeout.
653   EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Minutes(1)));
654   timer_durations.clear();
655   auto picker = ExpectRoundRobinStartup(kAddresses);
656   ASSERT_NE(picker, nullptr);
657   // Do an override pick for endpoints 1 and 2, so that they will still be
658   // within the idle threshold and will therefore be retained when they move
659   // to state DRAINING.
660   auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
661   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
662   auto* address2_attribute = MakeOverrideHostAttribute(kAddresses[2]);
663   ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
664   // Increment time by 5 seconds and send an update that moves endpoints 1
665   // and 2 to state DRAINING.
666   gpr_log(GPR_INFO, "### moving endpoints 1 and 2 to state DRAINING");
667   IncrementTimeBy(Duration::Seconds(5));
668   ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
669                                  {kAddresses[1], XdsHealthStatus::kDraining},
670                                  {kAddresses[2], XdsHealthStatus::kDraining}},
671                                 {"UNKNOWN", "HEALTHY", "DRAINING"},
672                                 Duration::Minutes(1));
673   // The update should cause the timer to be reset for the next
674   // expiration time.
675   EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Seconds(55)));
676   timer_durations.clear();
677   picker = ExpectState(GRPC_CHANNEL_READY);
678   // Make sure subchannels get orphaned in the WorkSerializer.
679   WaitForWorkSerializerToFlush();
680   // Picks without an override will use only the endpoint that is not in
681   // draining state.
682   ExpectRoundRobinPicks(picker.get(), {kAddresses[0]});
683   // Picks with an override are able to select the draining endpoints.
684   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
685   ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
686   // Both subchannels are owned by the xds_override_host policy.
687   auto* subchannel1 = FindSubchannel(kAddresses[1]);
688   ASSERT_NE(subchannel1, nullptr);
689   EXPECT_EQ(subchannel1->NumWatchers(), 1);
690   auto* subchannel2 = FindSubchannel(kAddresses[2]);
691   ASSERT_NE(subchannel2, nullptr);
692   EXPECT_EQ(subchannel2->NumWatchers(), 1);
693   // Trigger the timer.  Both subchannels have gotten an override pick more
694   // recently than the timer was scheduled, so neither one will be unreffed.
695   IncrementTimeBy(Duration::Seconds(55));
696   EXPECT_EQ(subchannel1->NumWatchers(), 1);
697   EXPECT_EQ(subchannel2->NumWatchers(), 1);
698   // The timer will be reset for 5 seconds.
699   EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Seconds(5)));
700   timer_durations.clear();
701   // Send another override pick for endpoint 1.
702   ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
703   // Trigger the timer again.  This time, it should unref endpoint 2 but
704   // keep endpoint 1.
705   IncrementTimeBy(Duration::Seconds(5));
706   EXPECT_EQ(subchannel1->NumWatchers(), 1);
707   EXPECT_EQ(subchannel2->NumWatchers(), 0);
708   // The timer should now be set for 55 seconds, which is how long it
709   // will be until endpoint 1 should be unreffed.
710   EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Seconds(55)));
711 }
712 
713 }  // namespace
714 }  // namespace testing
715 }  // namespace grpc_core
716 
main(int argc,char ** argv)717 int main(int argc, char** argv) {
718   ::testing::InitGoogleTest(&argc, argv);
719   grpc::testing::TestEnvironment env(&argc, argv);
720   return RUN_ALL_TESTS();
721 }
722