• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 #include <gmock/gmock.h>
16 #include <grpcpp/create_channel.h>
17 #include <grpcpp/security/credentials.h>
18 #include <grpcpp/support/status.h>
19 #include <gtest/gtest.h>
20 
21 #include <memory>
22 #include <string>
23 #include <string_view>
24 #include <type_traits>
25 
26 #include "absl/cleanup/cleanup.h"
27 #include "absl/strings/str_format.h"
28 #include "absl/strings/strip.h"
29 #include "src/core/config/config_vars.h"
30 #include "src/proto/grpc/testing/echo.pb.h"
31 #include "src/proto/grpc/testing/echo_messages.pb.h"
32 #include "test/core/test_util/scoped_env_var.h"
33 #include "test/core/test_util/test_config.h"
34 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
35 #include "test/cpp/end2end/xds/xds_utils.h"
36 
37 namespace grpc {
38 namespace testing {
39 namespace {
40 
41 constexpr char const* kErrorMessage = "test forced ADS stream failure";
42 
43 class XdsFallbackTest : public XdsEnd2endTest {
44  public:
XdsFallbackTest()45   XdsFallbackTest()
46       : fallback_balancer_(CreateAndStartBalancer("Fallback Balancer")) {}
47 
SetUp()48   void SetUp() override {
49     // Overrides SetUp from a base class so we can call InitClient per-test case
50   }
51 
TearDown()52   void TearDown() override {
53     fallback_balancer_->Shutdown();
54     XdsEnd2endTest::TearDown();
55   }
56 
SetXdsResourcesForServer(BalancerServerThread * balancer,size_t backend,absl::string_view server_name="",absl::string_view authority="")57   void SetXdsResourcesForServer(BalancerServerThread* balancer, size_t backend,
58                                 absl::string_view server_name = "",
59                                 absl::string_view authority = "") {
60     Listener listener = default_listener_;
61     RouteConfiguration route_config = default_route_config_;
62     Cluster cluster = default_cluster_;
63     // Default server uses default resources when no authority, to enable using
64     // more test framework functions.
65     if (!server_name.empty() || !authority.empty()) {
66       auto get_resource_name = [&](absl::string_view resource_type) {
67         absl::string_view stripped_resource_type =
68             absl::StripPrefix(resource_type, "type.googleapis.com/");
69         if (authority.empty()) {
70           if (resource_type == kLdsTypeUrl) return std::string(server_name);
71           return absl::StrFormat("%s_%s", stripped_resource_type, server_name);
72         }
73         return absl::StrFormat("xdstp://%s/%s/%s", authority,
74                                stripped_resource_type, server_name);
75       };
76       listener.set_name(get_resource_name(kLdsTypeUrl));
77       cluster.set_name(get_resource_name(kCdsTypeUrl));
78       cluster.mutable_eds_cluster_config()->set_service_name(
79           get_resource_name(kEdsTypeUrl));
80       route_config.set_name(get_resource_name(kRdsTypeUrl));
81       route_config.mutable_virtual_hosts(0)
82           ->mutable_routes(0)
83           ->mutable_route()
84           ->set_cluster(cluster.name());
85     }
86     SetListenerAndRouteConfiguration(balancer, listener, route_config);
87     balancer->ads_service()->SetCdsResource(cluster);
88     balancer->ads_service()->SetEdsResource(BuildEdsResource(
89         EdsResourceArgs(
90             {{"locality0", CreateEndpointsForBackends(backend, backend + 1)}}),
91         cluster.eds_cluster_config().service_name()));
92   }
93 
ExpectBackendCall(EchoTestService::Stub * stub,int backend,grpc_core::DebugLocation location)94   void ExpectBackendCall(EchoTestService::Stub* stub, int backend,
95                          grpc_core::DebugLocation location) {
96     ClientContext context;
97     EchoRequest request;
98     EchoResponse response;
99     RpcOptions().SetupRpc(&context, &request);
100     Status status = stub->Echo(&context, request, &response);
101     EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
102                              << " message=" << status.error_message() << "\n"
103                              << location.file() << ':' << location.line();
104     EXPECT_EQ(1U, backends_[backend]->backend_service()->request_count())
105         << "\n"
106         << location.file() << ':' << location.line();
107   }
108 
109  protected:
110   std::unique_ptr<BalancerServerThread> fallback_balancer_;
111 };
112 
TEST_P(XdsFallbackTest,FallbackAndRecover)113 TEST_P(XdsFallbackTest, FallbackAndRecover) {
114   auto broken_balancer = CreateAndStartBalancer("Broken balancer");
115   broken_balancer->ads_service()->ForceADSFailure(
116       Status(StatusCode::RESOURCE_EXHAUSTED, kErrorMessage));
117   InitClient(XdsBootstrapBuilder().SetServers({
118       balancer_->target(),
119       broken_balancer->target(),
120       fallback_balancer_->target(),
121   }));
122   // Primary xDS server has backends_[0] configured and fallback server has
123   // backends_[1]
124   CreateAndStartBackends(2);
125   SetXdsResourcesForServer(balancer_.get(), 0);
126   SetXdsResourcesForServer(fallback_balancer_.get(), 1);
127   balancer_->ads_service()->ForceADSFailure(
128       Status(StatusCode::RESOURCE_EXHAUSTED, kErrorMessage));
129   // Primary server down, fallback server data is used (backends_[1])
130   CheckRpcSendOk(DEBUG_LOCATION);
131   EXPECT_EQ(backends_[0]->backend_service()->request_count(), 0);
132   EXPECT_EQ(backends_[1]->backend_service()->request_count(), 1);
133   // Primary server is back. backends_[0] will be used when the data makes it
134   // all way to the client
135   balancer_->ads_service()->ClearADSFailure();
136   WaitForBackend(DEBUG_LOCATION, 0);
137   broken_balancer->Shutdown();
138 }
139 
TEST_P(XdsFallbackTest,PrimarySecondaryNotAvailable)140 TEST_P(XdsFallbackTest, PrimarySecondaryNotAvailable) {
141   InitClient(XdsBootstrapBuilder().SetServers(
142       {balancer_->target(), fallback_balancer_->target()}));
143   balancer_->ads_service()->ForceADSFailure(
144       Status(StatusCode::RESOURCE_EXHAUSTED, kErrorMessage));
145   fallback_balancer_->ads_service()->ForceADSFailure(
146       Status(StatusCode::RESOURCE_EXHAUSTED, kErrorMessage));
147   CheckRpcSendFailure(
148       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
149       absl::StrFormat(
150           "empty address list \\(LDS resource server.example.com: "
151           "xDS channel for server localhost:%d: "
152           "xDS call failed with no responses received; "
153           "status: RESOURCE_EXHAUSTED: test forced ADS stream failure "
154           "\\(node ID:xds_end2end_test\\)\\)",
155           fallback_balancer_->port()));
156 }
157 
TEST_P(XdsFallbackTest,UsesCachedResourcesAfterFailure)158 TEST_P(XdsFallbackTest, UsesCachedResourcesAfterFailure) {
159   constexpr absl::string_view kServerName2 = "server2.example.com";
160   InitClient(XdsBootstrapBuilder().SetServers(
161       {balancer_->target(), fallback_balancer_->target()}));
162   // 4 backends - cross product of two data plane targets and two balancers
163   CreateAndStartBackends(4);
164   SetXdsResourcesForServer(balancer_.get(), 0);
165   SetXdsResourcesForServer(fallback_balancer_.get(), 1);
166   SetXdsResourcesForServer(balancer_.get(), 2, kServerName2);
167   SetXdsResourcesForServer(fallback_balancer_.get(), 3, kServerName2);
168   CheckRpcSendOk(DEBUG_LOCATION);
169   EXPECT_EQ(backends_[0]->backend_service()->request_count(), 1);
170   balancer_->ads_service()->ForceADSFailure(
171       Status(StatusCode::RESOURCE_EXHAUSTED, kErrorMessage));
172   auto channel = CreateChannel(0, std::string(kServerName2).c_str());
173   auto stub = grpc::testing::EchoTestService::NewStub(channel);
174   // server2.example.com is configured from the fallback server
175   ExpectBackendCall(stub.get(), 3, DEBUG_LOCATION);
176   // Calling server.example.com still uses cached value
177   CheckRpcSendOk(DEBUG_LOCATION);
178   EXPECT_EQ(backends_[0]->backend_service()->request_count(), 2);
179   EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0);
180 }
181 
TEST_P(XdsFallbackTest,PerAuthorityFallback)182 TEST_P(XdsFallbackTest, PerAuthorityFallback) {
183   auto fallback_balancer2 = CreateAndStartBalancer("Fallback for Authority2");
184   // Use cleanup in case test assertion fails
185   auto balancer2_cleanup =
186       absl::MakeCleanup([&]() { fallback_balancer2->Shutdown(); });
187   grpc_core::testing::ScopedExperimentalEnvVar env_var(
188       "GRPC_EXPERIMENTAL_XDS_FEDERATION");
189   const char* kAuthority1 = "xds1.example.com";
190   const char* kAuthority2 = "xds2.example.com";
191   constexpr absl::string_view kServer1Name = "server1.example.com";
192   constexpr absl::string_view kServer2Name = "server2.example.com";
193   // Authority1 uses balancer_ and fallback_balancer_
194   // Authority2 uses balancer_ and fallback_balancer2
195   XdsBootstrapBuilder builder;
196   builder.SetServers({balancer_->target()});
197   builder.AddAuthority(kAuthority1,
198                        {balancer_->target(), fallback_balancer_->target()});
199   builder.AddAuthority(kAuthority2,
200                        {balancer_->target(), fallback_balancer2->target()});
201   InitClient(builder);
202   CreateAndStartBackends(4);
203   SetXdsResourcesForServer(fallback_balancer_.get(), 0, kServer1Name,
204                            kAuthority1);
205   SetXdsResourcesForServer(fallback_balancer2.get(), 1, kServer2Name,
206                            kAuthority2);
207   SetXdsResourcesForServer(balancer_.get(), 2, kServer1Name, kAuthority1);
208   SetXdsResourcesForServer(balancer_.get(), 3, kServer2Name, kAuthority2);
209   // Primary balancer is down, using the fallback servers
210   balancer_->ads_service()->ForceADSFailure(
211       Status(StatusCode::RESOURCE_EXHAUSTED, kErrorMessage));
212   // Create second channel to new target URI and send 1 RPC.
213   auto authority1_stub = grpc::testing::EchoTestService::NewStub(CreateChannel(
214       /*failover_timeout_ms=*/0, std::string(kServer1Name).c_str(),
215       kAuthority1));
216   auto authority2_stub = grpc::testing::EchoTestService::NewStub(CreateChannel(
217       /*failover_timeout_ms=*/0, std::string(kServer2Name).c_str(),
218       kAuthority2));
219   ExpectBackendCall(authority1_stub.get(), 0, DEBUG_LOCATION);
220   ExpectBackendCall(authority2_stub.get(), 1, DEBUG_LOCATION);
221   // Primary balancer is up, its data will be used now.
222   balancer_->ads_service()->ClearADSFailure();
223   auto deadline =
224       absl::Now() + (absl::Seconds(5) * grpc_test_slowdown_factor());
225   while (absl::Now() < deadline &&
226          (backends_[2]->backend_service()->request_count() == 0 ||
227           backends_[3]->backend_service()->request_count() == 0)) {
228     ClientContext context;
229     EchoRequest request;
230     EchoResponse response;
231     RpcOptions().SetupRpc(&context, &request);
232     Status status = authority1_stub->Echo(&context, request, &response);
233     EXPECT_TRUE(status.ok()) << status.error_message();
234     ClientContext context2;
235     EchoRequest request2;
236     EchoResponse response2;
237     RpcOptions().SetupRpc(&context2, &request2);
238     status = authority2_stub->Echo(&context2, request2, &response2);
239     EXPECT_TRUE(status.ok()) << status.error_message();
240   }
241   ASSERT_LE(1U, backends_[2]->backend_service()->request_count());
242   ASSERT_LE(1U, backends_[3]->backend_service()->request_count());
243 }
244 
245 INSTANTIATE_TEST_SUITE_P(XdsTest, XdsFallbackTest,
246                          ::testing::Values(XdsTestType().set_bootstrap_source(
247                              XdsTestType::kBootstrapFromEnvVar)),
248                          &XdsTestType::Name);
249 
250 }  // namespace
251 }  // namespace testing
252 }  // namespace grpc
253 
main(int argc,char ** argv)254 int main(int argc, char** argv) {
255   grpc::testing::TestEnvironment env(&argc, argv);
256   ::testing::InitGoogleTest(&argc, argv);
257   // Make the backup poller poll very frequently in order to pick up
258   // updates from all the subchannels's FDs.
259   grpc_core::ConfigVars::Overrides overrides;
260   overrides.client_channel_backup_poll_interval_ms = 1;
261   grpc_core::ConfigVars::SetOverrides(overrides);
262 #if TARGET_OS_IPHONE
263   // Workaround Apple CFStream bug
264   grpc_core::SetEnv("grpc_cfstream", "0");
265 #endif
266   grpc_init();
267   const auto result = RUN_ALL_TESTS();
268   grpc_shutdown();
269   return result;
270 }
271