• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 
16 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
17 
18 #include <gmock/gmock.h>
19 #include <grpcpp/security/tls_certificate_provider.h>
20 #include <gtest/gtest.h>
21 
22 #include <functional>
23 #include <map>
24 #include <memory>
25 #include <set>
26 #include <string>
27 #include <thread>
28 #include <vector>
29 
30 #include "absl/log/check.h"
31 #include "absl/log/log.h"
32 #include "absl/memory/memory.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_format.h"
35 #include "absl/strings/str_join.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38 #include "envoy/extensions/filters/http/router/v3/router.pb.h"
39 #include "src/core/ext/filters/http/server/http_server_filter.h"
40 #include "src/core/server/server.h"
41 #include "src/core/util/env.h"
42 #include "src/core/util/tmpfile.h"
43 #include "src/core/xds/grpc/xds_client_grpc.h"
44 #include "src/core/xds/xds_client/xds_channel_args.h"
45 #include "test/core/test_util/resolve_localhost_ip46.h"
46 #include "test/core/test_util/tls_utils.h"
47 #include "test/cpp/util/credentials.h"
48 #include "test/cpp/util/tls_test_utils.h"
49 
50 namespace grpc {
51 namespace testing {
52 
53 using ::envoy::config::core::v3::HealthStatus;
54 using ::envoy::service::discovery::v3::DiscoveryRequest;
55 using ::envoy::service::load_stats::v3::LoadStatsRequest;
56 
57 using ::grpc::experimental::ExternalCertificateVerifier;
58 using ::grpc::experimental::IdentityKeyCertPair;
59 using ::grpc::experimental::ServerMetricRecorder;
60 using ::grpc::experimental::StaticDataCertificateProvider;
61 
62 //
63 // XdsEnd2endTest::ServerThread::XdsServingStatusNotifier
64 //
65 
66 void XdsEnd2endTest::ServerThread::XdsServingStatusNotifier::
OnServingStatusUpdate(std::string uri,ServingStatusUpdate update)67     OnServingStatusUpdate(std::string uri, ServingStatusUpdate update) {
68   grpc_core::MutexLock lock(&mu_);
69   status_map[uri] = update.status;
70   cond_.Signal();
71 }
72 
73 bool XdsEnd2endTest::ServerThread::XdsServingStatusNotifier::
WaitOnServingStatusChange(const std::string & uri,grpc::StatusCode expected_status,absl::Duration timeout)74     WaitOnServingStatusChange(const std::string& uri,
75                               grpc::StatusCode expected_status,
76                               absl::Duration timeout) {
77   grpc_core::MutexLock lock(&mu_);
78   absl::Time deadline = absl::Now() + timeout * grpc_test_slowdown_factor();
79   std::map<std::string, grpc::Status>::iterator it;
80   while ((it = status_map.find(uri)) == status_map.end() ||
81          it->second.error_code() != expected_status) {
82     if (cond_.WaitWithDeadline(&mu_, deadline)) {
83       LOG(ERROR) << "\nTimeout Elapsed waiting on serving status "
84                     "change\nExpected status: "
85                  << expected_status << "\nActual:"
86                  << (it == status_map.end()
87                          ? "Entry not found in map"
88                          : absl::StrCat(it->second.error_code()));
89       return false;
90     }
91   }
92   return true;
93 }
94 
95 //
96 // XdsEnd2endTest::ServerThread::XdsChannelArgsServerBuilderOption
97 //
98 
99 namespace {
100 
101 // Channel arg pointer vtable for storing xDS channel args in the parent
102 // channel's channel args.
ChannelArgsArgCopy(void * p)103 void* ChannelArgsArgCopy(void* p) {
104   auto* args = static_cast<grpc_channel_args*>(p);
105   return grpc_channel_args_copy(args);
106 }
ChannelArgsArgDestroy(void * p)107 void ChannelArgsArgDestroy(void* p) {
108   auto* args = static_cast<grpc_channel_args*>(p);
109   grpc_channel_args_destroy(args);
110 }
ChannelArgsArgCmp(void * a,void * b)111 int ChannelArgsArgCmp(void* a, void* b) {
112   auto* args_a = static_cast<grpc_channel_args*>(a);
113   auto* args_b = static_cast<grpc_channel_args*>(b);
114   return grpc_channel_args_compare(args_a, args_b);
115 }
116 const grpc_arg_pointer_vtable kChannelArgsArgVtable = {
117     ChannelArgsArgCopy, ChannelArgsArgDestroy, ChannelArgsArgCmp};
118 
119 }  // namespace
120 
121 class XdsEnd2endTest::ServerThread::XdsChannelArgsServerBuilderOption
122     : public grpc::ServerBuilderOption {
123  public:
XdsChannelArgsServerBuilderOption(XdsEnd2endTest * test_obj)124   explicit XdsChannelArgsServerBuilderOption(XdsEnd2endTest* test_obj)
125       : test_obj_(test_obj) {}
126 
UpdateArguments(grpc::ChannelArguments * args)127   void UpdateArguments(grpc::ChannelArguments* args) override {
128     args->SetString(GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG,
129                     test_obj_->bootstrap_);
130     args->SetPointerWithVtable(
131         GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS,
132         &test_obj_->xds_channel_args_, &kChannelArgsArgVtable);
133   }
134 
UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>> *)135   void UpdatePlugins(
136       std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/)
137       override {}
138 
139  private:
140   XdsEnd2endTest* test_obj_;
141 };
142 
143 //
144 // XdsEnd2endTest::ServerThread
145 //
146 
ServerThread(XdsEnd2endTest * test_obj,bool use_xds_enabled_server,std::shared_ptr<ServerCredentials> credentials)147 XdsEnd2endTest::ServerThread::ServerThread(
148     XdsEnd2endTest* test_obj, bool use_xds_enabled_server,
149     std::shared_ptr<ServerCredentials> credentials)
150     : test_obj_(test_obj),
151       use_xds_enabled_server_(use_xds_enabled_server),
152       credentials_(credentials == nullptr ? CreateFakeServerCredentials()
153                                           : std::move(credentials)),
154       port_(grpc_pick_unused_port_or_die()) {}
155 
Start()156 void XdsEnd2endTest::ServerThread::Start() {
157   LOG(INFO) << "starting " << Type() << " server on port " << port_;
158   CHECK(!running_);
159   running_ = true;
160   StartAllServices();
161   grpc_core::Mutex mu;
162   // We need to acquire the lock here in order to prevent the notify_one
163   // by ServerThread::Serve from firing before the wait below is hit.
164   grpc_core::MutexLock lock(&mu);
165   grpc_core::CondVar cond;
166   thread_ = std::make_unique<std::thread>(
167       std::bind(&ServerThread::Serve, this, &mu, &cond));
168   cond.Wait(&mu);
169   LOG(INFO) << Type() << " server startup complete";
170 }
171 
Shutdown()172 void XdsEnd2endTest::ServerThread::Shutdown() {
173   if (!running_) return;
174   LOG(INFO) << Type() << " about to shutdown";
175   ShutdownAllServices();
176   server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
177   thread_->join();
178   LOG(INFO) << Type() << " shutdown completed";
179   running_ = false;
180 }
181 
StopListeningAndSendGoaways()182 void XdsEnd2endTest::ServerThread::StopListeningAndSendGoaways() {
183   LOG(INFO) << Type() << " sending GOAWAYs";
184   {
185     grpc_core::ExecCtx exec_ctx;
186     auto* server = grpc_core::Server::FromC(server_->c_server());
187     server->StopListening();
188     server->SendGoaways();
189   }
190   LOG(INFO) << Type() << " done sending GOAWAYs";
191 }
192 
StopListening()193 void XdsEnd2endTest::ServerThread::StopListening() {
194   LOG(INFO) << Type() << " about to stop listening";
195   {
196     grpc_core::ExecCtx exec_ctx;
197     auto* server = grpc_core::Server::FromC(server_->c_server());
198     server->StopListening();
199   }
200   LOG(INFO) << Type() << " stopped listening";
201 }
202 
Serve(grpc_core::Mutex * mu,grpc_core::CondVar * cond)203 void XdsEnd2endTest::ServerThread::Serve(grpc_core::Mutex* mu,
204                                          grpc_core::CondVar* cond) {
205   // We need to acquire the lock here in order to prevent the notify_one
206   // below from firing before its corresponding wait is executed.
207   grpc_core::MutexLock lock(mu);
208   std::string server_address = absl::StrCat("localhost:", port_);
209   if (use_xds_enabled_server_) {
210     XdsServerBuilder builder;
211     if (GetParam().bootstrap_source() ==
212         XdsTestType::kBootstrapFromChannelArg) {
213       builder.SetOption(
214           std::make_unique<XdsChannelArgsServerBuilderOption>(test_obj_));
215     }
216     builder.set_status_notifier(&notifier_);
217     builder.experimental().set_drain_grace_time(
218         test_obj_->xds_drain_grace_time_ms_);
219     builder.AddListeningPort(server_address, credentials_);
220     // Allow gRPC Core's HTTP server to accept PUT requests for testing
221     // purposes.
222     if (allow_put_requests_) {
223       builder.AddChannelArgument(
224           GRPC_ARG_DO_NOT_USE_UNLESS_YOU_HAVE_PERMISSION_FROM_GRPC_TEAM_ALLOW_BROKEN_PUT_REQUESTS,
225           true);
226     }
227     RegisterAllServices(&builder);
228     server_ = builder.BuildAndStart();
229   } else {
230     ServerBuilder builder;
231     builder.AddListeningPort(server_address, credentials_);
232     RegisterAllServices(&builder);
233     server_ = builder.BuildAndStart();
234   }
235   cond->Signal();
236 }
237 
238 //
239 // XdsEnd2endTest::BackendServerThread
240 //
241 
BackendServerThread(XdsEnd2endTest * test_obj,bool use_xds_enabled_server,std::shared_ptr<ServerCredentials> credentials)242 XdsEnd2endTest::BackendServerThread::BackendServerThread(
243     XdsEnd2endTest* test_obj, bool use_xds_enabled_server,
244     std::shared_ptr<ServerCredentials> credentials)
245     : ServerThread(test_obj, use_xds_enabled_server, std::move(credentials)) {
246   if (use_xds_enabled_server) {
247     test_obj->SetServerListenerNameAndRouteConfiguration(
248         test_obj->balancer_.get(), test_obj->default_server_listener_, port(),
249         test_obj->default_server_route_config_);
250   }
251 }
252 
RegisterAllServices(ServerBuilder * builder)253 void XdsEnd2endTest::BackendServerThread::RegisterAllServices(
254     ServerBuilder* builder) {
255   server_metric_recorder_ = ServerMetricRecorder::Create();
256   ServerBuilder::experimental_type(builder).EnableCallMetricRecording(
257       server_metric_recorder_.get());
258   builder->RegisterService(&backend_service_);
259   builder->RegisterService(&backend_service1_);
260   builder->RegisterService(&backend_service2_);
261 }
262 
StartAllServices()263 void XdsEnd2endTest::BackendServerThread::StartAllServices() {
264   backend_service_.Start();
265   backend_service1_.Start();
266   backend_service2_.Start();
267 }
268 
ShutdownAllServices()269 void XdsEnd2endTest::BackendServerThread::ShutdownAllServices() {
270   backend_service_.Shutdown();
271   backend_service1_.Shutdown();
272   backend_service2_.Shutdown();
273 }
274 
275 //
276 // XdsEnd2endTest::BalancerServerThread
277 //
278 
BalancerServerThread(XdsEnd2endTest * test_obj,absl::string_view debug_label,std::shared_ptr<ServerCredentials> credentials)279 XdsEnd2endTest::BalancerServerThread::BalancerServerThread(
280     XdsEnd2endTest* test_obj, absl::string_view debug_label,
281     std::shared_ptr<ServerCredentials> credentials)
282     : ServerThread(test_obj, /*use_xds_enabled_server=*/false,
283                    std::move(credentials)),
284       ads_service_(new AdsServiceImpl(
285           // First request must have node set with the right client
286           // features.
287           [&](const DiscoveryRequest& request) {
288             EXPECT_TRUE(request.has_node());
289             EXPECT_THAT(request.node().client_features(),
290                         ::testing::UnorderedElementsAre(
291                             "envoy.lb.does_not_support_overprovisioning",
292                             "xds.config.resource-in-sotw"));
293           },
294           // NACKs must use the right status code.
__anon98fd15a40302(absl::StatusCode code) 295           [&](absl::StatusCode code) {
296             EXPECT_EQ(code, absl::StatusCode::kInvalidArgument);
297           },
298           debug_label)),
299       lrs_service_(new LrsServiceImpl(
300           (GetParam().enable_load_reporting() ? 20 : 0), {kDefaultClusterName},
301           // Fail if load reporting is used when not enabled.
__anon98fd15a40402() 302           [&]() { EXPECT_TRUE(GetParam().enable_load_reporting()); },
303           // Make sure we send the client feature saying that we support
304           // send_all_clusters.
__anon98fd15a40502(const LoadStatsRequest& request) 305           [&](const LoadStatsRequest& request) {
306             EXPECT_THAT(
307                 request.node().client_features(),
308                 ::testing::Contains("envoy.lrs.supports_send_all_clusters"));
309           },
310           debug_label)) {}
311 
RegisterAllServices(ServerBuilder * builder)312 void XdsEnd2endTest::BalancerServerThread::RegisterAllServices(
313     ServerBuilder* builder) {
314   builder->RegisterService(ads_service_.get());
315   builder->RegisterService(lrs_service_.get());
316 }
317 
StartAllServices()318 void XdsEnd2endTest::BalancerServerThread::StartAllServices() {
319   ads_service_->Start();
320   lrs_service_->Start();
321 }
322 
ShutdownAllServices()323 void XdsEnd2endTest::BalancerServerThread::ShutdownAllServices() {
324   ads_service_->Shutdown();
325   lrs_service_->Shutdown();
326 }
327 
328 //
329 // XdsEnd2endTest::RpcOptions
330 //
331 
SetupRpc(ClientContext * context,EchoRequest * request) const332 void XdsEnd2endTest::RpcOptions::SetupRpc(ClientContext* context,
333                                           EchoRequest* request) const {
334   for (const auto& item : metadata) {
335     context->AddMetadata(item.first, item.second);
336   }
337   if (timeout_ms != 0) {
338     context->set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
339   }
340   if (wait_for_ready) context->set_wait_for_ready(true);
341   request->set_message(kRequestMessage);
342   if (server_fail) {
343     request->mutable_param()->mutable_expected_error()->set_code(
344         GRPC_STATUS_FAILED_PRECONDITION);
345   }
346   if (server_sleep_us != 0) {
347     request->mutable_param()->set_server_sleep_us(server_sleep_us);
348   }
349   if (client_cancel_after_us != 0) {
350     request->mutable_param()->set_client_cancel_after_us(
351         client_cancel_after_us);
352   }
353   if (skip_cancelled_check) {
354     request->mutable_param()->set_skip_cancelled_check(true);
355   }
356   if (backend_metrics.has_value()) {
357     *request->mutable_param()->mutable_backend_metrics() = *backend_metrics;
358   }
359   if (server_notify_client_when_started) {
360     request->mutable_param()->set_server_notify_client_when_started(true);
361   }
362   if (echo_host_from_authority_header) {
363     request->mutable_param()->set_echo_host_from_authority_header(true);
364   }
365   if (echo_metadata_initially) {
366     request->mutable_param()->set_echo_metadata_initially(true);
367   }
368 }
369 
370 //
371 // XdsEnd2endTest
372 //
373 
374 const char XdsEnd2endTest::kCaCertPath[] = "src/core/tsi/test_creds/ca.pem";
375 const char XdsEnd2endTest::kServerCertPath[] =
376     "src/core/tsi/test_creds/server1.pem";
377 const char XdsEnd2endTest::kServerKeyPath[] =
378     "src/core/tsi/test_creds/server1.key";
379 
380 const char XdsEnd2endTest::kRequestMessage[] = "Live long and prosper.";
381 
XdsEnd2endTest(std::shared_ptr<ServerCredentials> balancer_credentials)382 XdsEnd2endTest::XdsEnd2endTest(
383     std::shared_ptr<ServerCredentials> balancer_credentials)
384     : balancer_(CreateAndStartBalancer("Default Balancer",
385                                        std::move(balancer_credentials))) {
386   // Initialize default client-side xDS resources.
387   default_listener_ = XdsResourceUtils::DefaultListener();
388   default_route_config_ = XdsResourceUtils::DefaultRouteConfig();
389   default_cluster_ = XdsResourceUtils::DefaultCluster();
390   if (GetParam().enable_load_reporting()) {
391     default_cluster_.mutable_lrs_server()->mutable_self();
392   }
393   // Initialize client-side resources on balancer.
394   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
395                                    default_route_config_);
396   balancer_->ads_service()->SetCdsResource(default_cluster_);
397   // Initialize default server-side xDS resources.
398   default_server_route_config_ = XdsResourceUtils::DefaultServerRouteConfig();
399   default_server_listener_ = XdsResourceUtils::DefaultServerListener();
400 }
401 
TearDown()402 void XdsEnd2endTest::TearDown() {
403   ShutdownAllBackends();
404   balancer_->Shutdown();
405   // Clear global xDS channel args, since they will go out of scope
406   // when this test object is destroyed.
407   grpc_core::internal::SetXdsChannelArgsForTest(nullptr);
408   grpc_core::UnsetEnv("GRPC_XDS_BOOTSTRAP");
409   grpc_core::UnsetEnv("GRPC_XDS_BOOTSTRAP_CONFIG");
410   if (bootstrap_file_ != nullptr) {
411     remove(bootstrap_file_);
412     gpr_free(bootstrap_file_);
413   }
414 }
415 
416 std::unique_ptr<XdsEnd2endTest::BalancerServerThread>
CreateAndStartBalancer(absl::string_view debug_label,std::shared_ptr<ServerCredentials> credentials)417 XdsEnd2endTest::CreateAndStartBalancer(
418     absl::string_view debug_label,
419     std::shared_ptr<ServerCredentials> credentials) {
420   std::unique_ptr<BalancerServerThread> balancer =
421       std::make_unique<BalancerServerThread>(this, debug_label,
422                                              std::move(credentials));
423   balancer->Start();
424   return balancer;
425 }
426 
427 std::vector<XdsEnd2endTest::EdsResourceArgs::Endpoint>
CreateEndpointsForBackends(size_t start_index,size_t stop_index,HealthStatus health_status,int lb_weight)428 XdsEnd2endTest::CreateEndpointsForBackends(size_t start_index,
429                                            size_t stop_index,
430                                            HealthStatus health_status,
431                                            int lb_weight) {
432   if (stop_index == 0) stop_index = backends_.size();
433   std::vector<EdsResourceArgs::Endpoint> endpoints;
434   for (size_t i = start_index; i < stop_index; ++i) {
435     endpoints.emplace_back(CreateEndpoint(i, health_status, lb_weight));
436   }
437   return endpoints;
438 }
439 
ResetBackendCounters(size_t start_index,size_t stop_index)440 void XdsEnd2endTest::ResetBackendCounters(size_t start_index,
441                                           size_t stop_index) {
442   if (stop_index == 0) stop_index = backends_.size();
443   for (size_t i = start_index; i < stop_index; ++i) {
444     backends_[i]->backend_service()->ResetCounters();
445     backends_[i]->backend_service1()->ResetCounters();
446     backends_[i]->backend_service2()->ResetCounters();
447   }
448 }
449 
SeenBackend(size_t backend_idx,const RpcService rpc_service)450 bool XdsEnd2endTest::SeenBackend(size_t backend_idx,
451                                  const RpcService rpc_service) {
452   switch (rpc_service) {
453     case SERVICE_ECHO:
454       if (backends_[backend_idx]->backend_service()->request_count() == 0) {
455         return false;
456       }
457       break;
458     case SERVICE_ECHO1:
459       if (backends_[backend_idx]->backend_service1()->request_count() == 0) {
460         return false;
461       }
462       break;
463     case SERVICE_ECHO2:
464       if (backends_[backend_idx]->backend_service2()->request_count() == 0) {
465         return false;
466       }
467       break;
468   }
469   return true;
470 }
471 
SeenAllBackends(size_t start_index,size_t stop_index,const RpcService rpc_service)472 bool XdsEnd2endTest::SeenAllBackends(size_t start_index, size_t stop_index,
473                                      const RpcService rpc_service) {
474   if (stop_index == 0) stop_index = backends_.size();
475   for (size_t i = start_index; i < stop_index; ++i) {
476     if (!SeenBackend(i, rpc_service)) {
477       return false;
478     }
479   }
480   return true;
481 }
482 
GetBackendPorts(size_t start_index,size_t stop_index) const483 std::vector<int> XdsEnd2endTest::GetBackendPorts(size_t start_index,
484                                                  size_t stop_index) const {
485   if (stop_index == 0) stop_index = backends_.size();
486   std::vector<int> backend_ports;
487   for (size_t i = start_index; i < stop_index; ++i) {
488     backend_ports.push_back(backends_[i]->port());
489   }
490   return backend_ports;
491 }
492 
InitClient(absl::optional<XdsBootstrapBuilder> builder,std::string lb_expected_authority,int xds_resource_does_not_exist_timeout_ms,std::string balancer_authority_override,ChannelArguments * args,std::shared_ptr<ChannelCredentials> credentials)493 void XdsEnd2endTest::InitClient(
494     absl::optional<XdsBootstrapBuilder> builder,
495     std::string lb_expected_authority,
496     int xds_resource_does_not_exist_timeout_ms,
497     std::string balancer_authority_override, ChannelArguments* args,
498     std::shared_ptr<ChannelCredentials> credentials) {
499   if (!builder.has_value()) {
500     builder = MakeBootstrapBuilder();
501   }
502   if (xds_resource_does_not_exist_timeout_ms > 0) {
503     xds_channel_args_to_add_.emplace_back(grpc_channel_arg_integer_create(
504         const_cast<char*>(GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS),
505         xds_resource_does_not_exist_timeout_ms * grpc_test_slowdown_factor()));
506   }
507   if (!lb_expected_authority.empty()) {
508     constexpr char authority_const[] = "localhost:%d";
509     if (lb_expected_authority == authority_const) {
510       lb_expected_authority =
511           absl::StrFormat(authority_const, balancer_->port());
512     }
513     xds_channel_args_to_add_.emplace_back(grpc_channel_arg_string_create(
514         const_cast<char*>(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS),
515         const_cast<char*>(lb_expected_authority.c_str())));
516   }
517   if (!balancer_authority_override.empty()) {
518     xds_channel_args_to_add_.emplace_back(grpc_channel_arg_string_create(
519         const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
520         const_cast<char*>(balancer_authority_override.c_str())));
521   }
522   xds_channel_args_.num_args = xds_channel_args_to_add_.size();
523   xds_channel_args_.args = xds_channel_args_to_add_.data();
524   bootstrap_ = builder->Build();
525   if (GetParam().bootstrap_source() == XdsTestType::kBootstrapFromEnvVar) {
526     grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP_CONFIG", bootstrap_.c_str());
527   } else if (GetParam().bootstrap_source() == XdsTestType::kBootstrapFromFile) {
528     FILE* out = gpr_tmpfile("xds_bootstrap_v3", &bootstrap_file_);
529     fputs(bootstrap_.c_str(), out);
530     fclose(out);
531     grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP", bootstrap_file_);
532   }
533   if (GetParam().bootstrap_source() != XdsTestType::kBootstrapFromChannelArg) {
534     // If getting bootstrap from channel arg, we'll pass these args in
535     // via the parent channel args in CreateChannel() instead.
536     grpc_core::internal::SetXdsChannelArgsForTest(&xds_channel_args_);
537     // Make sure each test creates a new XdsClient instance rather than
538     // reusing the one from the previous test.  This avoids spurious failures
539     // caused when a load reporting test runs after a non-load reporting test
540     // and the XdsClient is still talking to the old LRS server, which fails
541     // because it's not expecting the client to connect.  It also
542     // ensures that each test can independently set the global channel
543     // args for the xDS channel.
544     grpc_core::internal::UnsetGlobalXdsClientsForTest();
545   }
546   // Create channel and stub.
547   ResetStub(/*failover_timeout_ms=*/0, args, std::move(credentials));
548 }
549 
ResetStub(int failover_timeout_ms,ChannelArguments * args,std::shared_ptr<ChannelCredentials> credentials)550 void XdsEnd2endTest::ResetStub(
551     int failover_timeout_ms, ChannelArguments* args,
552     std::shared_ptr<ChannelCredentials> credentials) {
553   channel_ = CreateChannel(failover_timeout_ms, kServerName, "", args,
554                            std::move(credentials));
555   stub_ = grpc::testing::EchoTestService::NewStub(channel_);
556   stub1_ = grpc::testing::EchoTest1Service::NewStub(channel_);
557   stub2_ = grpc::testing::EchoTest2Service::NewStub(channel_);
558 }
559 
CreateChannel(int failover_timeout_ms,const char * server_name,const char * xds_authority,ChannelArguments * args,std::shared_ptr<ChannelCredentials> credentials)560 std::shared_ptr<Channel> XdsEnd2endTest::CreateChannel(
561     int failover_timeout_ms, const char* server_name, const char* xds_authority,
562     ChannelArguments* args, std::shared_ptr<ChannelCredentials> credentials) {
563   ChannelArguments local_args;
564   if (args == nullptr) args = &local_args;
565   // TODO(roth): Remove this once we enable retries by default internally.
566   args->SetInt(GRPC_ARG_ENABLE_RETRIES, 1);
567   if (failover_timeout_ms > 0) {
568     args->SetInt(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS,
569                  failover_timeout_ms * grpc_test_slowdown_factor());
570   }
571   if (GetParam().bootstrap_source() == XdsTestType::kBootstrapFromChannelArg) {
572     // We're getting the bootstrap from a channel arg, so we do the
573     // same thing for the response generator to use for the xDS
574     // channel and the xDS resource-does-not-exist timeout value.
575     args->SetString(GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG,
576                     bootstrap_);
577     args->SetPointerWithVtable(
578         GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS,
579         &xds_channel_args_, &kChannelArgsArgVtable);
580   }
581   // Construct target URI.
582   std::vector<absl::string_view> parts = {"xds:"};
583   if (xds_authority != nullptr && xds_authority[0] != '\0') {
584     parts.emplace_back("//");
585     parts.emplace_back(xds_authority);
586     parts.emplace_back("/");
587   }
588   parts.emplace_back(server_name);
589   std::string uri = absl::StrJoin(parts, "");
590   // Credentials defaults to fake credentials.
591   if (credentials == nullptr) {
592     credentials = std::make_shared<FakeTransportSecurityChannelCredentials>();
593   }
594   return grpc::CreateCustomChannel(uri, credentials, *args);
595 }
596 
SendRpc(const RpcOptions & rpc_options,EchoResponse * response,std::multimap<std::string,std::string> * server_initial_metadata)597 Status XdsEnd2endTest::SendRpc(
598     const RpcOptions& rpc_options, EchoResponse* response,
599     std::multimap<std::string, std::string>* server_initial_metadata) {
600   EchoResponse local_response;
601   if (response == nullptr) response = &local_response;
602   ClientContext context;
603   EchoRequest request;
604   if (rpc_options.server_expected_error != StatusCode::OK) {
605     auto* error = request.mutable_param()->mutable_expected_error();
606     error->set_code(rpc_options.server_expected_error);
607   }
608   rpc_options.SetupRpc(&context, &request);
609   Status status;
610   switch (rpc_options.service) {
611     case SERVICE_ECHO:
612       status =
613           SendRpcMethod(stub_.get(), rpc_options, &context, request, response);
614       break;
615     case SERVICE_ECHO1:
616       status =
617           SendRpcMethod(stub1_.get(), rpc_options, &context, request, response);
618       break;
619     case SERVICE_ECHO2:
620       status =
621           SendRpcMethod(stub2_.get(), rpc_options, &context, request, response);
622       break;
623   }
624   if (server_initial_metadata != nullptr) {
625     for (const auto& it : context.GetServerInitialMetadata()) {
626       std::string header(it.first.data(), it.first.size());
627       // Guard against implementation-specific header case - RFC 2616
628       absl::AsciiStrToLower(&header);
629       server_initial_metadata->emplace(
630           header, std::string(it.second.data(), it.second.size()));
631     }
632   }
633   return status;
634 }
635 
SendRpcsUntil(const grpc_core::DebugLocation & debug_location,std::function<bool (const RpcResult &)> continue_predicate,int timeout_ms,const RpcOptions & rpc_options)636 void XdsEnd2endTest::SendRpcsUntil(
637     const grpc_core::DebugLocation& debug_location,
638     std::function<bool(const RpcResult&)> continue_predicate, int timeout_ms,
639     const RpcOptions& rpc_options) {
640   absl::Time deadline = absl::InfiniteFuture();
641   if (timeout_ms != 0) {
642     deadline = absl::Now() +
643                (absl::Milliseconds(timeout_ms) * grpc_test_slowdown_factor());
644   }
645   while (true) {
646     RpcResult result;
647     result.status = SendRpc(rpc_options, &result.response);
648     if (!continue_predicate(result)) return;
649     EXPECT_LE(absl::Now(), deadline)
650         << debug_location.file() << ":" << debug_location.line();
651     if (absl::Now() >= deadline) break;
652   }
653 }
654 
CheckRpcSendOk(const grpc_core::DebugLocation & debug_location,const size_t times,const RpcOptions & rpc_options)655 void XdsEnd2endTest::CheckRpcSendOk(
656     const grpc_core::DebugLocation& debug_location, const size_t times,
657     const RpcOptions& rpc_options) {
658   SendRpcsUntil(
659       debug_location,
660       [debug_location, times, n = size_t{0}](const RpcResult& result) mutable {
661         EXPECT_TRUE(result.status.ok())
662             << "code=" << result.status.error_code()
663             << " message=" << result.status.error_message() << " at "
664             << debug_location.file() << ":" << debug_location.line();
665         EXPECT_EQ(result.response.message(), kRequestMessage);
666         return ++n < times;
667       },
668       /*timeout_ms=*/0, rpc_options);
669 }
670 
CheckRpcSendFailure(const grpc_core::DebugLocation & debug_location,StatusCode expected_status,absl::string_view expected_message_regex,const RpcOptions & rpc_options)671 void XdsEnd2endTest::CheckRpcSendFailure(
672     const grpc_core::DebugLocation& debug_location, StatusCode expected_status,
673     absl::string_view expected_message_regex, const RpcOptions& rpc_options) {
674   const Status status = SendRpc(rpc_options);
675   EXPECT_FALSE(status.ok())
676       << debug_location.file() << ":" << debug_location.line();
677   EXPECT_EQ(expected_status, status.error_code())
678       << debug_location.file() << ":" << debug_location.line();
679   EXPECT_THAT(status.error_message(),
680               ::testing::MatchesRegex(expected_message_regex))
681       << debug_location.file() << ":" << debug_location.line();
682 }
683 
SendRpcsUntilFailure(const grpc_core::DebugLocation & debug_location,StatusCode expected_status,absl::string_view expected_message_regex,int timeout_ms,const RpcOptions & rpc_options)684 void XdsEnd2endTest::SendRpcsUntilFailure(
685     const grpc_core::DebugLocation& debug_location, StatusCode expected_status,
686     absl::string_view expected_message_regex, int timeout_ms,
687     const RpcOptions& rpc_options) {
688   SendRpcsUntil(
689       debug_location,
690       [&](const RpcResult& result) {
691         // Might still succeed if channel hasn't yet seen the server go down.
692         if (result.status.ok()) return true;
693         // RPC failed.  Make sure the failure status is as expected and stop.
694         EXPECT_EQ(result.status.error_code(), expected_status)
695             << debug_location.file() << ":" << debug_location.line();
696         EXPECT_THAT(result.status.error_message(),
697                     ::testing::MatchesRegex(expected_message_regex))
698             << debug_location.file() << ":" << debug_location.line();
699         return false;
700       },
701       timeout_ms, rpc_options);
702 }
703 
SendRpcsAndCountFailuresWithMessage(const grpc_core::DebugLocation & debug_location,size_t num_rpcs,StatusCode expected_status,absl::string_view expected_message_prefix,const RpcOptions & rpc_options)704 size_t XdsEnd2endTest::SendRpcsAndCountFailuresWithMessage(
705     const grpc_core::DebugLocation& debug_location, size_t num_rpcs,
706     StatusCode expected_status, absl::string_view expected_message_prefix,
707     const RpcOptions& rpc_options) {
708   size_t num_failed = 0;
709   SendRpcsUntil(
710       debug_location,
711       [&, n = size_t{0}](const RpcResult& result) mutable {
712         if (!result.status.ok()) {
713           EXPECT_EQ(result.status.error_code(), expected_status)
714               << debug_location.file() << ":" << debug_location.line();
715           EXPECT_THAT(result.status.error_message(),
716                       ::testing::StartsWith(expected_message_prefix))
717               << debug_location.file() << ":" << debug_location.line();
718           ++num_failed;
719         }
720         return ++n < num_rpcs;
721       },
722       /*timeout_ms=*/0, rpc_options);
723   return num_failed;
724 }
725 
StartRpc(grpc::testing::EchoTestService::Stub * stub,const RpcOptions & rpc_options)726 void XdsEnd2endTest::LongRunningRpc::StartRpc(
727     grpc::testing::EchoTestService::Stub* stub, const RpcOptions& rpc_options) {
728   sender_thread_ = std::thread([this, stub, rpc_options]() {
729     EchoRequest request;
730     EchoResponse response;
731     rpc_options.SetupRpc(&context_, &request);
732     status_ = stub->Echo(&context_, request, &response);
733   });
734 }
735 
CancelRpc()736 void XdsEnd2endTest::LongRunningRpc::CancelRpc() {
737   context_.TryCancel();
738   if (sender_thread_.joinable()) sender_thread_.join();
739 }
740 
GetStatus()741 Status XdsEnd2endTest::LongRunningRpc::GetStatus() {
742   if (sender_thread_.joinable()) sender_thread_.join();
743   return status_;
744 }
745 
746 std::vector<std::unique_ptr<XdsEnd2endTest::ConcurrentRpc>>
SendConcurrentRpcs(const grpc_core::DebugLocation & debug_location,grpc::testing::EchoTestService::Stub * stub,size_t num_rpcs,const RpcOptions & rpc_options)747 XdsEnd2endTest::SendConcurrentRpcs(
748     const grpc_core::DebugLocation& debug_location,
749     grpc::testing::EchoTestService::Stub* stub, size_t num_rpcs,
750     const RpcOptions& rpc_options) {
751   // Variables for RPCs.
752   std::vector<std::unique_ptr<ConcurrentRpc>> rpcs;
753   rpcs.reserve(num_rpcs);
754   EchoRequest request;
755   // Variables for synchronization
756   grpc_core::Mutex mu;
757   grpc_core::CondVar cv;
758   size_t completed = 0;
759   // Set-off callback RPCs
760   for (size_t i = 0; i < num_rpcs; ++i) {
761     auto rpc = std::make_unique<ConcurrentRpc>();
762     rpc_options.SetupRpc(&rpc->context, &request);
763     grpc_core::Timestamp t0 = NowFromCycleCounter();
764     stub->async()->Echo(
765         &rpc->context, &request, &rpc->response,
766         [rpc = rpc.get(), &mu, &completed, &cv, num_rpcs, t0](Status s) {
767           rpc->status = s;
768           rpc->elapsed_time = NowFromCycleCounter() - t0;
769           bool done;
770           {
771             grpc_core::MutexLock lock(&mu);
772             done = (++completed) == num_rpcs;
773           }
774           if (done) cv.Signal();
775         });
776     rpcs.push_back(std::move(rpc));
777   }
778   {
779     grpc_core::MutexLock lock(&mu);
780     cv.Wait(&mu);
781   }
782   EXPECT_EQ(completed, num_rpcs)
783       << " at " << debug_location.file() << ":" << debug_location.line();
784   return rpcs;
785 }
786 
WaitForAllBackends(const grpc_core::DebugLocation & debug_location,size_t start_index,size_t stop_index,std::function<void (const RpcResult &)> check_status,const WaitForBackendOptions & wait_options,const RpcOptions & rpc_options)787 size_t XdsEnd2endTest::WaitForAllBackends(
788     const grpc_core::DebugLocation& debug_location, size_t start_index,
789     size_t stop_index, std::function<void(const RpcResult&)> check_status,
790     const WaitForBackendOptions& wait_options, const RpcOptions& rpc_options) {
791   if (check_status == nullptr) {
792     check_status = [&](const RpcResult& result) {
793       EXPECT_TRUE(result.status.ok())
794           << "code=" << result.status.error_code()
795           << " message=" << result.status.error_message() << " at "
796           << debug_location.file() << ":" << debug_location.line();
797     };
798   }
799   LOG(INFO) << "========= WAITING FOR BACKENDS [" << start_index << ", "
800             << stop_index << ") ==========";
801   size_t num_rpcs = 0;
802   SendRpcsUntil(
803       debug_location,
804       [&](const RpcResult& result) {
805         ++num_rpcs;
806         check_status(result);
807         return !SeenAllBackends(start_index, stop_index, rpc_options.service);
808       },
809       wait_options.timeout_ms, rpc_options);
810   if (wait_options.reset_counters) ResetBackendCounters();
811   LOG(INFO) << "Backends up; sent " << num_rpcs << " warm up requests";
812   return num_rpcs;
813 }
814 
WaitForNack(const grpc_core::DebugLocation & debug_location,std::function<absl::optional<AdsServiceImpl::ResponseState> ()> get_state,const RpcOptions & rpc_options,StatusCode expected_status)815 absl::optional<AdsServiceImpl::ResponseState> XdsEnd2endTest::WaitForNack(
816     const grpc_core::DebugLocation& debug_location,
817     std::function<absl::optional<AdsServiceImpl::ResponseState>()> get_state,
818     const RpcOptions& rpc_options, StatusCode expected_status) {
819   absl::optional<AdsServiceImpl::ResponseState> response_state;
820   auto deadline =
821       absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor());
822   auto continue_predicate = [&]() {
823     if (absl::Now() >= deadline) {
824       return false;
825     }
826     response_state = get_state();
827     return !response_state.has_value() ||
828            response_state->state != AdsServiceImpl::ResponseState::NACKED;
829   };
830   do {
831     const Status status = SendRpc(rpc_options);
832     EXPECT_EQ(expected_status, status.error_code())
833         << "code=" << status.error_code()
834         << " message=" << status.error_message() << " at "
835         << debug_location.file() << ":" << debug_location.line();
836   } while (continue_predicate());
837   return response_state;
838 }
839 
SetProtoDuration(grpc_core::Duration duration,google::protobuf::Duration * duration_proto)840 void XdsEnd2endTest::SetProtoDuration(
841     grpc_core::Duration duration, google::protobuf::Duration* duration_proto) {
842   duration *= grpc_test_slowdown_factor();
843   gpr_timespec ts = duration.as_timespec();
844   duration_proto->set_seconds(ts.tv_sec);
845   duration_proto->set_nanos(ts.tv_nsec);
846 }
847 
MakeConnectionFailureRegex(absl::string_view prefix,bool has_resolution_note)848 std::string XdsEnd2endTest::MakeConnectionFailureRegex(
849     absl::string_view prefix, bool has_resolution_note) {
850   return absl::StrCat(
851       prefix,
852       "(UNKNOWN|UNAVAILABLE): "
853       // IP address
854       "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
855       // Prefixes added for context
856       "(Failed to connect to remote host: )?"
857       "(Timeout occurred: )?"
858       // Syscall
859       "((connect|sendmsg|recvmsg|getsockopt\\(SO\\_ERROR\\)): ?)?"
860       // strerror() output or other message
861       "(Connection refused"
862       "|Connection reset by peer"
863       "|Socket closed"
864       "|Broken pipe"
865       "|FD shutdown)"
866       // errno value
867       "( \\([0-9]+\\))?",
868       // xDS node ID
869       has_resolution_note ? " \\(xDS node ID:xds_end2end_test\\)" : "");
870 }
871 
MakeTlsHandshakeFailureRegex(absl::string_view prefix)872 std::string XdsEnd2endTest::MakeTlsHandshakeFailureRegex(
873     absl::string_view prefix) {
874   return absl::StrCat(
875       prefix,
876       "(UNKNOWN|UNAVAILABLE): "
877       // IP address
878       "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
879       // Prefixes added for context
880       "(Failed to connect to remote host: )?"
881       // Tls handshake failure
882       "Tls handshake failed \\(TSI_PROTOCOL_FAILURE\\): SSL_ERROR_SSL: "
883       "error:1000007d:SSL routines:OPENSSL_internal:CERTIFICATE_VERIFY_FAILED"
884       // Detailed reason for certificate verify failure
885       "(: .*)?");
886 }
887 
ReadTlsIdentityPair(const char * key_path,const char * cert_path)888 grpc_core::PemKeyCertPairList XdsEnd2endTest::ReadTlsIdentityPair(
889     const char* key_path, const char* cert_path) {
890   return grpc_core::PemKeyCertPairList{grpc_core::PemKeyCertPair(
891       grpc_core::testing::GetFileContents(key_path),
892       grpc_core::testing::GetFileContents(cert_path))};
893 }
894 
895 std::vector<experimental::IdentityKeyCertPair>
MakeIdentityKeyCertPairForTlsCreds()896 XdsEnd2endTest::MakeIdentityKeyCertPairForTlsCreds() {
897   std::string identity_cert =
898       grpc_core::testing::GetFileContents(kServerCertPath);
899   std::string private_key = grpc_core::testing::GetFileContents(kServerKeyPath);
900   return {{std::move(private_key), std::move(identity_cert)}};
901 }
902 
903 std::shared_ptr<ChannelCredentials>
CreateXdsChannelCredentials()904 XdsEnd2endTest::CreateXdsChannelCredentials() {
905   return XdsCredentials(CreateTlsChannelCredentials());
906 }
907 
908 std::shared_ptr<ChannelCredentials>
CreateTlsChannelCredentials()909 XdsEnd2endTest::CreateTlsChannelCredentials() {
910   auto certificate_provider = std::make_shared<StaticDataCertificateProvider>(
911       grpc_core::testing::GetFileContents(kCaCertPath),
912       MakeIdentityKeyCertPairForTlsCreds());
913   grpc::experimental::TlsChannelCredentialsOptions options;
914   options.set_certificate_provider(std::move(certificate_provider));
915   options.watch_root_certs();
916   options.watch_identity_key_cert_pairs();
917   auto verifier =
918       ExternalCertificateVerifier::Create<SyncCertificateVerifier>(true);
919   options.set_certificate_verifier(std::move(verifier));
920   options.set_verify_server_certs(true);
921   options.set_check_call_host(false);
922   return grpc::experimental::TlsCredentials(options);
923 }
924 
925 std::shared_ptr<ServerCredentials>
CreateFakeServerCredentials()926 XdsEnd2endTest::CreateFakeServerCredentials() {
927   return std::make_shared<SecureServerCredentials>(
928       grpc_fake_transport_security_server_credentials_create());
929 }
930 
931 std::shared_ptr<ServerCredentials>
CreateMtlsServerCredentials()932 XdsEnd2endTest::CreateMtlsServerCredentials() {
933   std::string root_cert = grpc_core::testing::GetFileContents(kCaCertPath);
934   auto certificate_provider =
935       std::make_shared<grpc::experimental::StaticDataCertificateProvider>(
936           std::move(root_cert), MakeIdentityKeyCertPairForTlsCreds());
937   grpc::experimental::TlsServerCredentialsOptions options(
938       std::move(certificate_provider));
939   options.watch_root_certs();
940   options.watch_identity_key_cert_pairs();
941   options.set_cert_request_type(GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY);
942   return grpc::experimental::TlsServerCredentials(options);
943 }
944 
945 std::shared_ptr<ServerCredentials>
CreateTlsServerCredentials()946 XdsEnd2endTest::CreateTlsServerCredentials() {
947   auto certificate_provider =
948       std::make_shared<grpc::experimental::StaticDataCertificateProvider>(
949           MakeIdentityKeyCertPairForTlsCreds());
950   grpc::experimental::TlsServerCredentialsOptions options(
951       std::move(certificate_provider));
952   options.watch_identity_key_cert_pairs();
953   return grpc::experimental::TlsServerCredentials(options);
954 }
955 
956 }  // namespace testing
957 }  // namespace grpc
958