1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15
16 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
17
18 #include <gmock/gmock.h>
19 #include <grpcpp/security/tls_certificate_provider.h>
20 #include <gtest/gtest.h>
21
22 #include <functional>
23 #include <map>
24 #include <memory>
25 #include <set>
26 #include <string>
27 #include <thread>
28 #include <vector>
29
30 #include "absl/log/check.h"
31 #include "absl/log/log.h"
32 #include "absl/memory/memory.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_format.h"
35 #include "absl/strings/str_join.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38 #include "envoy/extensions/filters/http/router/v3/router.pb.h"
39 #include "src/core/ext/filters/http/server/http_server_filter.h"
40 #include "src/core/server/server.h"
41 #include "src/core/util/env.h"
42 #include "src/core/util/tmpfile.h"
43 #include "src/core/xds/grpc/xds_client_grpc.h"
44 #include "src/core/xds/xds_client/xds_channel_args.h"
45 #include "test/core/test_util/resolve_localhost_ip46.h"
46 #include "test/core/test_util/tls_utils.h"
47 #include "test/cpp/util/credentials.h"
48 #include "test/cpp/util/tls_test_utils.h"
49
50 namespace grpc {
51 namespace testing {
52
53 using ::envoy::config::core::v3::HealthStatus;
54 using ::envoy::service::discovery::v3::DiscoveryRequest;
55 using ::envoy::service::load_stats::v3::LoadStatsRequest;
56
57 using ::grpc::experimental::ExternalCertificateVerifier;
58 using ::grpc::experimental::IdentityKeyCertPair;
59 using ::grpc::experimental::ServerMetricRecorder;
60 using ::grpc::experimental::StaticDataCertificateProvider;
61
62 //
63 // XdsEnd2endTest::ServerThread::XdsServingStatusNotifier
64 //
65
66 void XdsEnd2endTest::ServerThread::XdsServingStatusNotifier::
OnServingStatusUpdate(std::string uri,ServingStatusUpdate update)67 OnServingStatusUpdate(std::string uri, ServingStatusUpdate update) {
68 grpc_core::MutexLock lock(&mu_);
69 status_map[uri] = update.status;
70 cond_.Signal();
71 }
72
73 bool XdsEnd2endTest::ServerThread::XdsServingStatusNotifier::
WaitOnServingStatusChange(const std::string & uri,grpc::StatusCode expected_status,absl::Duration timeout)74 WaitOnServingStatusChange(const std::string& uri,
75 grpc::StatusCode expected_status,
76 absl::Duration timeout) {
77 grpc_core::MutexLock lock(&mu_);
78 absl::Time deadline = absl::Now() + timeout * grpc_test_slowdown_factor();
79 std::map<std::string, grpc::Status>::iterator it;
80 while ((it = status_map.find(uri)) == status_map.end() ||
81 it->second.error_code() != expected_status) {
82 if (cond_.WaitWithDeadline(&mu_, deadline)) {
83 LOG(ERROR) << "\nTimeout Elapsed waiting on serving status "
84 "change\nExpected status: "
85 << expected_status << "\nActual:"
86 << (it == status_map.end()
87 ? "Entry not found in map"
88 : absl::StrCat(it->second.error_code()));
89 return false;
90 }
91 }
92 return true;
93 }
94
95 //
96 // XdsEnd2endTest::ServerThread::XdsChannelArgsServerBuilderOption
97 //
98
99 namespace {
100
101 // Channel arg pointer vtable for storing xDS channel args in the parent
102 // channel's channel args.
ChannelArgsArgCopy(void * p)103 void* ChannelArgsArgCopy(void* p) {
104 auto* args = static_cast<grpc_channel_args*>(p);
105 return grpc_channel_args_copy(args);
106 }
ChannelArgsArgDestroy(void * p)107 void ChannelArgsArgDestroy(void* p) {
108 auto* args = static_cast<grpc_channel_args*>(p);
109 grpc_channel_args_destroy(args);
110 }
ChannelArgsArgCmp(void * a,void * b)111 int ChannelArgsArgCmp(void* a, void* b) {
112 auto* args_a = static_cast<grpc_channel_args*>(a);
113 auto* args_b = static_cast<grpc_channel_args*>(b);
114 return grpc_channel_args_compare(args_a, args_b);
115 }
116 const grpc_arg_pointer_vtable kChannelArgsArgVtable = {
117 ChannelArgsArgCopy, ChannelArgsArgDestroy, ChannelArgsArgCmp};
118
119 } // namespace
120
121 class XdsEnd2endTest::ServerThread::XdsChannelArgsServerBuilderOption
122 : public grpc::ServerBuilderOption {
123 public:
XdsChannelArgsServerBuilderOption(XdsEnd2endTest * test_obj)124 explicit XdsChannelArgsServerBuilderOption(XdsEnd2endTest* test_obj)
125 : test_obj_(test_obj) {}
126
UpdateArguments(grpc::ChannelArguments * args)127 void UpdateArguments(grpc::ChannelArguments* args) override {
128 args->SetString(GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG,
129 test_obj_->bootstrap_);
130 args->SetPointerWithVtable(
131 GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS,
132 &test_obj_->xds_channel_args_, &kChannelArgsArgVtable);
133 }
134
UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>> *)135 void UpdatePlugins(
136 std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/)
137 override {}
138
139 private:
140 XdsEnd2endTest* test_obj_;
141 };
142
143 //
144 // XdsEnd2endTest::ServerThread
145 //
146
ServerThread(XdsEnd2endTest * test_obj,bool use_xds_enabled_server,std::shared_ptr<ServerCredentials> credentials)147 XdsEnd2endTest::ServerThread::ServerThread(
148 XdsEnd2endTest* test_obj, bool use_xds_enabled_server,
149 std::shared_ptr<ServerCredentials> credentials)
150 : test_obj_(test_obj),
151 use_xds_enabled_server_(use_xds_enabled_server),
152 credentials_(credentials == nullptr ? CreateFakeServerCredentials()
153 : std::move(credentials)),
154 port_(grpc_pick_unused_port_or_die()) {}
155
Start()156 void XdsEnd2endTest::ServerThread::Start() {
157 LOG(INFO) << "starting " << Type() << " server on port " << port_;
158 CHECK(!running_);
159 running_ = true;
160 StartAllServices();
161 grpc_core::Mutex mu;
162 // We need to acquire the lock here in order to prevent the notify_one
163 // by ServerThread::Serve from firing before the wait below is hit.
164 grpc_core::MutexLock lock(&mu);
165 grpc_core::CondVar cond;
166 thread_ = std::make_unique<std::thread>(
167 std::bind(&ServerThread::Serve, this, &mu, &cond));
168 cond.Wait(&mu);
169 LOG(INFO) << Type() << " server startup complete";
170 }
171
Shutdown()172 void XdsEnd2endTest::ServerThread::Shutdown() {
173 if (!running_) return;
174 LOG(INFO) << Type() << " about to shutdown";
175 ShutdownAllServices();
176 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
177 thread_->join();
178 LOG(INFO) << Type() << " shutdown completed";
179 running_ = false;
180 }
181
StopListeningAndSendGoaways()182 void XdsEnd2endTest::ServerThread::StopListeningAndSendGoaways() {
183 LOG(INFO) << Type() << " sending GOAWAYs";
184 {
185 grpc_core::ExecCtx exec_ctx;
186 auto* server = grpc_core::Server::FromC(server_->c_server());
187 server->StopListening();
188 server->SendGoaways();
189 }
190 LOG(INFO) << Type() << " done sending GOAWAYs";
191 }
192
StopListening()193 void XdsEnd2endTest::ServerThread::StopListening() {
194 LOG(INFO) << Type() << " about to stop listening";
195 {
196 grpc_core::ExecCtx exec_ctx;
197 auto* server = grpc_core::Server::FromC(server_->c_server());
198 server->StopListening();
199 }
200 LOG(INFO) << Type() << " stopped listening";
201 }
202
Serve(grpc_core::Mutex * mu,grpc_core::CondVar * cond)203 void XdsEnd2endTest::ServerThread::Serve(grpc_core::Mutex* mu,
204 grpc_core::CondVar* cond) {
205 // We need to acquire the lock here in order to prevent the notify_one
206 // below from firing before its corresponding wait is executed.
207 grpc_core::MutexLock lock(mu);
208 std::string server_address = absl::StrCat("localhost:", port_);
209 if (use_xds_enabled_server_) {
210 XdsServerBuilder builder;
211 if (GetParam().bootstrap_source() ==
212 XdsTestType::kBootstrapFromChannelArg) {
213 builder.SetOption(
214 std::make_unique<XdsChannelArgsServerBuilderOption>(test_obj_));
215 }
216 builder.set_status_notifier(¬ifier_);
217 builder.experimental().set_drain_grace_time(
218 test_obj_->xds_drain_grace_time_ms_);
219 builder.AddListeningPort(server_address, credentials_);
220 // Allow gRPC Core's HTTP server to accept PUT requests for testing
221 // purposes.
222 if (allow_put_requests_) {
223 builder.AddChannelArgument(
224 GRPC_ARG_DO_NOT_USE_UNLESS_YOU_HAVE_PERMISSION_FROM_GRPC_TEAM_ALLOW_BROKEN_PUT_REQUESTS,
225 true);
226 }
227 RegisterAllServices(&builder);
228 server_ = builder.BuildAndStart();
229 } else {
230 ServerBuilder builder;
231 builder.AddListeningPort(server_address, credentials_);
232 RegisterAllServices(&builder);
233 server_ = builder.BuildAndStart();
234 }
235 cond->Signal();
236 }
237
238 //
239 // XdsEnd2endTest::BackendServerThread
240 //
241
BackendServerThread(XdsEnd2endTest * test_obj,bool use_xds_enabled_server,std::shared_ptr<ServerCredentials> credentials)242 XdsEnd2endTest::BackendServerThread::BackendServerThread(
243 XdsEnd2endTest* test_obj, bool use_xds_enabled_server,
244 std::shared_ptr<ServerCredentials> credentials)
245 : ServerThread(test_obj, use_xds_enabled_server, std::move(credentials)) {
246 if (use_xds_enabled_server) {
247 test_obj->SetServerListenerNameAndRouteConfiguration(
248 test_obj->balancer_.get(), test_obj->default_server_listener_, port(),
249 test_obj->default_server_route_config_);
250 }
251 }
252
RegisterAllServices(ServerBuilder * builder)253 void XdsEnd2endTest::BackendServerThread::RegisterAllServices(
254 ServerBuilder* builder) {
255 server_metric_recorder_ = ServerMetricRecorder::Create();
256 ServerBuilder::experimental_type(builder).EnableCallMetricRecording(
257 server_metric_recorder_.get());
258 builder->RegisterService(&backend_service_);
259 builder->RegisterService(&backend_service1_);
260 builder->RegisterService(&backend_service2_);
261 }
262
StartAllServices()263 void XdsEnd2endTest::BackendServerThread::StartAllServices() {
264 backend_service_.Start();
265 backend_service1_.Start();
266 backend_service2_.Start();
267 }
268
ShutdownAllServices()269 void XdsEnd2endTest::BackendServerThread::ShutdownAllServices() {
270 backend_service_.Shutdown();
271 backend_service1_.Shutdown();
272 backend_service2_.Shutdown();
273 }
274
275 //
276 // XdsEnd2endTest::BalancerServerThread
277 //
278
BalancerServerThread(XdsEnd2endTest * test_obj,absl::string_view debug_label,std::shared_ptr<ServerCredentials> credentials)279 XdsEnd2endTest::BalancerServerThread::BalancerServerThread(
280 XdsEnd2endTest* test_obj, absl::string_view debug_label,
281 std::shared_ptr<ServerCredentials> credentials)
282 : ServerThread(test_obj, /*use_xds_enabled_server=*/false,
283 std::move(credentials)),
284 ads_service_(new AdsServiceImpl(
285 // First request must have node set with the right client
286 // features.
287 [&](const DiscoveryRequest& request) {
288 EXPECT_TRUE(request.has_node());
289 EXPECT_THAT(request.node().client_features(),
290 ::testing::UnorderedElementsAre(
291 "envoy.lb.does_not_support_overprovisioning",
292 "xds.config.resource-in-sotw"));
293 },
294 // NACKs must use the right status code.
__anon98fd15a40302(absl::StatusCode code) 295 [&](absl::StatusCode code) {
296 EXPECT_EQ(code, absl::StatusCode::kInvalidArgument);
297 },
298 debug_label)),
299 lrs_service_(new LrsServiceImpl(
300 (GetParam().enable_load_reporting() ? 20 : 0), {kDefaultClusterName},
301 // Fail if load reporting is used when not enabled.
__anon98fd15a40402() 302 [&]() { EXPECT_TRUE(GetParam().enable_load_reporting()); },
303 // Make sure we send the client feature saying that we support
304 // send_all_clusters.
__anon98fd15a40502(const LoadStatsRequest& request) 305 [&](const LoadStatsRequest& request) {
306 EXPECT_THAT(
307 request.node().client_features(),
308 ::testing::Contains("envoy.lrs.supports_send_all_clusters"));
309 },
310 debug_label)) {}
311
RegisterAllServices(ServerBuilder * builder)312 void XdsEnd2endTest::BalancerServerThread::RegisterAllServices(
313 ServerBuilder* builder) {
314 builder->RegisterService(ads_service_.get());
315 builder->RegisterService(lrs_service_.get());
316 }
317
StartAllServices()318 void XdsEnd2endTest::BalancerServerThread::StartAllServices() {
319 ads_service_->Start();
320 lrs_service_->Start();
321 }
322
ShutdownAllServices()323 void XdsEnd2endTest::BalancerServerThread::ShutdownAllServices() {
324 ads_service_->Shutdown();
325 lrs_service_->Shutdown();
326 }
327
328 //
329 // XdsEnd2endTest::RpcOptions
330 //
331
SetupRpc(ClientContext * context,EchoRequest * request) const332 void XdsEnd2endTest::RpcOptions::SetupRpc(ClientContext* context,
333 EchoRequest* request) const {
334 for (const auto& item : metadata) {
335 context->AddMetadata(item.first, item.second);
336 }
337 if (timeout_ms != 0) {
338 context->set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
339 }
340 if (wait_for_ready) context->set_wait_for_ready(true);
341 request->set_message(kRequestMessage);
342 if (server_fail) {
343 request->mutable_param()->mutable_expected_error()->set_code(
344 GRPC_STATUS_FAILED_PRECONDITION);
345 }
346 if (server_sleep_us != 0) {
347 request->mutable_param()->set_server_sleep_us(server_sleep_us);
348 }
349 if (client_cancel_after_us != 0) {
350 request->mutable_param()->set_client_cancel_after_us(
351 client_cancel_after_us);
352 }
353 if (skip_cancelled_check) {
354 request->mutable_param()->set_skip_cancelled_check(true);
355 }
356 if (backend_metrics.has_value()) {
357 *request->mutable_param()->mutable_backend_metrics() = *backend_metrics;
358 }
359 if (server_notify_client_when_started) {
360 request->mutable_param()->set_server_notify_client_when_started(true);
361 }
362 if (echo_host_from_authority_header) {
363 request->mutable_param()->set_echo_host_from_authority_header(true);
364 }
365 if (echo_metadata_initially) {
366 request->mutable_param()->set_echo_metadata_initially(true);
367 }
368 }
369
370 //
371 // XdsEnd2endTest
372 //
373
374 const char XdsEnd2endTest::kCaCertPath[] = "src/core/tsi/test_creds/ca.pem";
375 const char XdsEnd2endTest::kServerCertPath[] =
376 "src/core/tsi/test_creds/server1.pem";
377 const char XdsEnd2endTest::kServerKeyPath[] =
378 "src/core/tsi/test_creds/server1.key";
379
380 const char XdsEnd2endTest::kRequestMessage[] = "Live long and prosper.";
381
XdsEnd2endTest(std::shared_ptr<ServerCredentials> balancer_credentials)382 XdsEnd2endTest::XdsEnd2endTest(
383 std::shared_ptr<ServerCredentials> balancer_credentials)
384 : balancer_(CreateAndStartBalancer("Default Balancer",
385 std::move(balancer_credentials))) {
386 // Initialize default client-side xDS resources.
387 default_listener_ = XdsResourceUtils::DefaultListener();
388 default_route_config_ = XdsResourceUtils::DefaultRouteConfig();
389 default_cluster_ = XdsResourceUtils::DefaultCluster();
390 if (GetParam().enable_load_reporting()) {
391 default_cluster_.mutable_lrs_server()->mutable_self();
392 }
393 // Initialize client-side resources on balancer.
394 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
395 default_route_config_);
396 balancer_->ads_service()->SetCdsResource(default_cluster_);
397 // Initialize default server-side xDS resources.
398 default_server_route_config_ = XdsResourceUtils::DefaultServerRouteConfig();
399 default_server_listener_ = XdsResourceUtils::DefaultServerListener();
400 }
401
TearDown()402 void XdsEnd2endTest::TearDown() {
403 ShutdownAllBackends();
404 balancer_->Shutdown();
405 // Clear global xDS channel args, since they will go out of scope
406 // when this test object is destroyed.
407 grpc_core::internal::SetXdsChannelArgsForTest(nullptr);
408 grpc_core::UnsetEnv("GRPC_XDS_BOOTSTRAP");
409 grpc_core::UnsetEnv("GRPC_XDS_BOOTSTRAP_CONFIG");
410 if (bootstrap_file_ != nullptr) {
411 remove(bootstrap_file_);
412 gpr_free(bootstrap_file_);
413 }
414 }
415
416 std::unique_ptr<XdsEnd2endTest::BalancerServerThread>
CreateAndStartBalancer(absl::string_view debug_label,std::shared_ptr<ServerCredentials> credentials)417 XdsEnd2endTest::CreateAndStartBalancer(
418 absl::string_view debug_label,
419 std::shared_ptr<ServerCredentials> credentials) {
420 std::unique_ptr<BalancerServerThread> balancer =
421 std::make_unique<BalancerServerThread>(this, debug_label,
422 std::move(credentials));
423 balancer->Start();
424 return balancer;
425 }
426
427 std::vector<XdsEnd2endTest::EdsResourceArgs::Endpoint>
CreateEndpointsForBackends(size_t start_index,size_t stop_index,HealthStatus health_status,int lb_weight)428 XdsEnd2endTest::CreateEndpointsForBackends(size_t start_index,
429 size_t stop_index,
430 HealthStatus health_status,
431 int lb_weight) {
432 if (stop_index == 0) stop_index = backends_.size();
433 std::vector<EdsResourceArgs::Endpoint> endpoints;
434 for (size_t i = start_index; i < stop_index; ++i) {
435 endpoints.emplace_back(CreateEndpoint(i, health_status, lb_weight));
436 }
437 return endpoints;
438 }
439
ResetBackendCounters(size_t start_index,size_t stop_index)440 void XdsEnd2endTest::ResetBackendCounters(size_t start_index,
441 size_t stop_index) {
442 if (stop_index == 0) stop_index = backends_.size();
443 for (size_t i = start_index; i < stop_index; ++i) {
444 backends_[i]->backend_service()->ResetCounters();
445 backends_[i]->backend_service1()->ResetCounters();
446 backends_[i]->backend_service2()->ResetCounters();
447 }
448 }
449
SeenBackend(size_t backend_idx,const RpcService rpc_service)450 bool XdsEnd2endTest::SeenBackend(size_t backend_idx,
451 const RpcService rpc_service) {
452 switch (rpc_service) {
453 case SERVICE_ECHO:
454 if (backends_[backend_idx]->backend_service()->request_count() == 0) {
455 return false;
456 }
457 break;
458 case SERVICE_ECHO1:
459 if (backends_[backend_idx]->backend_service1()->request_count() == 0) {
460 return false;
461 }
462 break;
463 case SERVICE_ECHO2:
464 if (backends_[backend_idx]->backend_service2()->request_count() == 0) {
465 return false;
466 }
467 break;
468 }
469 return true;
470 }
471
SeenAllBackends(size_t start_index,size_t stop_index,const RpcService rpc_service)472 bool XdsEnd2endTest::SeenAllBackends(size_t start_index, size_t stop_index,
473 const RpcService rpc_service) {
474 if (stop_index == 0) stop_index = backends_.size();
475 for (size_t i = start_index; i < stop_index; ++i) {
476 if (!SeenBackend(i, rpc_service)) {
477 return false;
478 }
479 }
480 return true;
481 }
482
GetBackendPorts(size_t start_index,size_t stop_index) const483 std::vector<int> XdsEnd2endTest::GetBackendPorts(size_t start_index,
484 size_t stop_index) const {
485 if (stop_index == 0) stop_index = backends_.size();
486 std::vector<int> backend_ports;
487 for (size_t i = start_index; i < stop_index; ++i) {
488 backend_ports.push_back(backends_[i]->port());
489 }
490 return backend_ports;
491 }
492
InitClient(absl::optional<XdsBootstrapBuilder> builder,std::string lb_expected_authority,int xds_resource_does_not_exist_timeout_ms,std::string balancer_authority_override,ChannelArguments * args,std::shared_ptr<ChannelCredentials> credentials)493 void XdsEnd2endTest::InitClient(
494 absl::optional<XdsBootstrapBuilder> builder,
495 std::string lb_expected_authority,
496 int xds_resource_does_not_exist_timeout_ms,
497 std::string balancer_authority_override, ChannelArguments* args,
498 std::shared_ptr<ChannelCredentials> credentials) {
499 if (!builder.has_value()) {
500 builder = MakeBootstrapBuilder();
501 }
502 if (xds_resource_does_not_exist_timeout_ms > 0) {
503 xds_channel_args_to_add_.emplace_back(grpc_channel_arg_integer_create(
504 const_cast<char*>(GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS),
505 xds_resource_does_not_exist_timeout_ms * grpc_test_slowdown_factor()));
506 }
507 if (!lb_expected_authority.empty()) {
508 constexpr char authority_const[] = "localhost:%d";
509 if (lb_expected_authority == authority_const) {
510 lb_expected_authority =
511 absl::StrFormat(authority_const, balancer_->port());
512 }
513 xds_channel_args_to_add_.emplace_back(grpc_channel_arg_string_create(
514 const_cast<char*>(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS),
515 const_cast<char*>(lb_expected_authority.c_str())));
516 }
517 if (!balancer_authority_override.empty()) {
518 xds_channel_args_to_add_.emplace_back(grpc_channel_arg_string_create(
519 const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
520 const_cast<char*>(balancer_authority_override.c_str())));
521 }
522 xds_channel_args_.num_args = xds_channel_args_to_add_.size();
523 xds_channel_args_.args = xds_channel_args_to_add_.data();
524 bootstrap_ = builder->Build();
525 if (GetParam().bootstrap_source() == XdsTestType::kBootstrapFromEnvVar) {
526 grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP_CONFIG", bootstrap_.c_str());
527 } else if (GetParam().bootstrap_source() == XdsTestType::kBootstrapFromFile) {
528 FILE* out = gpr_tmpfile("xds_bootstrap_v3", &bootstrap_file_);
529 fputs(bootstrap_.c_str(), out);
530 fclose(out);
531 grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP", bootstrap_file_);
532 }
533 if (GetParam().bootstrap_source() != XdsTestType::kBootstrapFromChannelArg) {
534 // If getting bootstrap from channel arg, we'll pass these args in
535 // via the parent channel args in CreateChannel() instead.
536 grpc_core::internal::SetXdsChannelArgsForTest(&xds_channel_args_);
537 // Make sure each test creates a new XdsClient instance rather than
538 // reusing the one from the previous test. This avoids spurious failures
539 // caused when a load reporting test runs after a non-load reporting test
540 // and the XdsClient is still talking to the old LRS server, which fails
541 // because it's not expecting the client to connect. It also
542 // ensures that each test can independently set the global channel
543 // args for the xDS channel.
544 grpc_core::internal::UnsetGlobalXdsClientsForTest();
545 }
546 // Create channel and stub.
547 ResetStub(/*failover_timeout_ms=*/0, args, std::move(credentials));
548 }
549
ResetStub(int failover_timeout_ms,ChannelArguments * args,std::shared_ptr<ChannelCredentials> credentials)550 void XdsEnd2endTest::ResetStub(
551 int failover_timeout_ms, ChannelArguments* args,
552 std::shared_ptr<ChannelCredentials> credentials) {
553 channel_ = CreateChannel(failover_timeout_ms, kServerName, "", args,
554 std::move(credentials));
555 stub_ = grpc::testing::EchoTestService::NewStub(channel_);
556 stub1_ = grpc::testing::EchoTest1Service::NewStub(channel_);
557 stub2_ = grpc::testing::EchoTest2Service::NewStub(channel_);
558 }
559
CreateChannel(int failover_timeout_ms,const char * server_name,const char * xds_authority,ChannelArguments * args,std::shared_ptr<ChannelCredentials> credentials)560 std::shared_ptr<Channel> XdsEnd2endTest::CreateChannel(
561 int failover_timeout_ms, const char* server_name, const char* xds_authority,
562 ChannelArguments* args, std::shared_ptr<ChannelCredentials> credentials) {
563 ChannelArguments local_args;
564 if (args == nullptr) args = &local_args;
565 // TODO(roth): Remove this once we enable retries by default internally.
566 args->SetInt(GRPC_ARG_ENABLE_RETRIES, 1);
567 if (failover_timeout_ms > 0) {
568 args->SetInt(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS,
569 failover_timeout_ms * grpc_test_slowdown_factor());
570 }
571 if (GetParam().bootstrap_source() == XdsTestType::kBootstrapFromChannelArg) {
572 // We're getting the bootstrap from a channel arg, so we do the
573 // same thing for the response generator to use for the xDS
574 // channel and the xDS resource-does-not-exist timeout value.
575 args->SetString(GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG,
576 bootstrap_);
577 args->SetPointerWithVtable(
578 GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS,
579 &xds_channel_args_, &kChannelArgsArgVtable);
580 }
581 // Construct target URI.
582 std::vector<absl::string_view> parts = {"xds:"};
583 if (xds_authority != nullptr && xds_authority[0] != '\0') {
584 parts.emplace_back("//");
585 parts.emplace_back(xds_authority);
586 parts.emplace_back("/");
587 }
588 parts.emplace_back(server_name);
589 std::string uri = absl::StrJoin(parts, "");
590 // Credentials defaults to fake credentials.
591 if (credentials == nullptr) {
592 credentials = std::make_shared<FakeTransportSecurityChannelCredentials>();
593 }
594 return grpc::CreateCustomChannel(uri, credentials, *args);
595 }
596
SendRpc(const RpcOptions & rpc_options,EchoResponse * response,std::multimap<std::string,std::string> * server_initial_metadata)597 Status XdsEnd2endTest::SendRpc(
598 const RpcOptions& rpc_options, EchoResponse* response,
599 std::multimap<std::string, std::string>* server_initial_metadata) {
600 EchoResponse local_response;
601 if (response == nullptr) response = &local_response;
602 ClientContext context;
603 EchoRequest request;
604 if (rpc_options.server_expected_error != StatusCode::OK) {
605 auto* error = request.mutable_param()->mutable_expected_error();
606 error->set_code(rpc_options.server_expected_error);
607 }
608 rpc_options.SetupRpc(&context, &request);
609 Status status;
610 switch (rpc_options.service) {
611 case SERVICE_ECHO:
612 status =
613 SendRpcMethod(stub_.get(), rpc_options, &context, request, response);
614 break;
615 case SERVICE_ECHO1:
616 status =
617 SendRpcMethod(stub1_.get(), rpc_options, &context, request, response);
618 break;
619 case SERVICE_ECHO2:
620 status =
621 SendRpcMethod(stub2_.get(), rpc_options, &context, request, response);
622 break;
623 }
624 if (server_initial_metadata != nullptr) {
625 for (const auto& it : context.GetServerInitialMetadata()) {
626 std::string header(it.first.data(), it.first.size());
627 // Guard against implementation-specific header case - RFC 2616
628 absl::AsciiStrToLower(&header);
629 server_initial_metadata->emplace(
630 header, std::string(it.second.data(), it.second.size()));
631 }
632 }
633 return status;
634 }
635
SendRpcsUntil(const grpc_core::DebugLocation & debug_location,std::function<bool (const RpcResult &)> continue_predicate,int timeout_ms,const RpcOptions & rpc_options)636 void XdsEnd2endTest::SendRpcsUntil(
637 const grpc_core::DebugLocation& debug_location,
638 std::function<bool(const RpcResult&)> continue_predicate, int timeout_ms,
639 const RpcOptions& rpc_options) {
640 absl::Time deadline = absl::InfiniteFuture();
641 if (timeout_ms != 0) {
642 deadline = absl::Now() +
643 (absl::Milliseconds(timeout_ms) * grpc_test_slowdown_factor());
644 }
645 while (true) {
646 RpcResult result;
647 result.status = SendRpc(rpc_options, &result.response);
648 if (!continue_predicate(result)) return;
649 EXPECT_LE(absl::Now(), deadline)
650 << debug_location.file() << ":" << debug_location.line();
651 if (absl::Now() >= deadline) break;
652 }
653 }
654
CheckRpcSendOk(const grpc_core::DebugLocation & debug_location,const size_t times,const RpcOptions & rpc_options)655 void XdsEnd2endTest::CheckRpcSendOk(
656 const grpc_core::DebugLocation& debug_location, const size_t times,
657 const RpcOptions& rpc_options) {
658 SendRpcsUntil(
659 debug_location,
660 [debug_location, times, n = size_t{0}](const RpcResult& result) mutable {
661 EXPECT_TRUE(result.status.ok())
662 << "code=" << result.status.error_code()
663 << " message=" << result.status.error_message() << " at "
664 << debug_location.file() << ":" << debug_location.line();
665 EXPECT_EQ(result.response.message(), kRequestMessage);
666 return ++n < times;
667 },
668 /*timeout_ms=*/0, rpc_options);
669 }
670
CheckRpcSendFailure(const grpc_core::DebugLocation & debug_location,StatusCode expected_status,absl::string_view expected_message_regex,const RpcOptions & rpc_options)671 void XdsEnd2endTest::CheckRpcSendFailure(
672 const grpc_core::DebugLocation& debug_location, StatusCode expected_status,
673 absl::string_view expected_message_regex, const RpcOptions& rpc_options) {
674 const Status status = SendRpc(rpc_options);
675 EXPECT_FALSE(status.ok())
676 << debug_location.file() << ":" << debug_location.line();
677 EXPECT_EQ(expected_status, status.error_code())
678 << debug_location.file() << ":" << debug_location.line();
679 EXPECT_THAT(status.error_message(),
680 ::testing::MatchesRegex(expected_message_regex))
681 << debug_location.file() << ":" << debug_location.line();
682 }
683
SendRpcsUntilFailure(const grpc_core::DebugLocation & debug_location,StatusCode expected_status,absl::string_view expected_message_regex,int timeout_ms,const RpcOptions & rpc_options)684 void XdsEnd2endTest::SendRpcsUntilFailure(
685 const grpc_core::DebugLocation& debug_location, StatusCode expected_status,
686 absl::string_view expected_message_regex, int timeout_ms,
687 const RpcOptions& rpc_options) {
688 SendRpcsUntil(
689 debug_location,
690 [&](const RpcResult& result) {
691 // Might still succeed if channel hasn't yet seen the server go down.
692 if (result.status.ok()) return true;
693 // RPC failed. Make sure the failure status is as expected and stop.
694 EXPECT_EQ(result.status.error_code(), expected_status)
695 << debug_location.file() << ":" << debug_location.line();
696 EXPECT_THAT(result.status.error_message(),
697 ::testing::MatchesRegex(expected_message_regex))
698 << debug_location.file() << ":" << debug_location.line();
699 return false;
700 },
701 timeout_ms, rpc_options);
702 }
703
SendRpcsAndCountFailuresWithMessage(const grpc_core::DebugLocation & debug_location,size_t num_rpcs,StatusCode expected_status,absl::string_view expected_message_prefix,const RpcOptions & rpc_options)704 size_t XdsEnd2endTest::SendRpcsAndCountFailuresWithMessage(
705 const grpc_core::DebugLocation& debug_location, size_t num_rpcs,
706 StatusCode expected_status, absl::string_view expected_message_prefix,
707 const RpcOptions& rpc_options) {
708 size_t num_failed = 0;
709 SendRpcsUntil(
710 debug_location,
711 [&, n = size_t{0}](const RpcResult& result) mutable {
712 if (!result.status.ok()) {
713 EXPECT_EQ(result.status.error_code(), expected_status)
714 << debug_location.file() << ":" << debug_location.line();
715 EXPECT_THAT(result.status.error_message(),
716 ::testing::StartsWith(expected_message_prefix))
717 << debug_location.file() << ":" << debug_location.line();
718 ++num_failed;
719 }
720 return ++n < num_rpcs;
721 },
722 /*timeout_ms=*/0, rpc_options);
723 return num_failed;
724 }
725
StartRpc(grpc::testing::EchoTestService::Stub * stub,const RpcOptions & rpc_options)726 void XdsEnd2endTest::LongRunningRpc::StartRpc(
727 grpc::testing::EchoTestService::Stub* stub, const RpcOptions& rpc_options) {
728 sender_thread_ = std::thread([this, stub, rpc_options]() {
729 EchoRequest request;
730 EchoResponse response;
731 rpc_options.SetupRpc(&context_, &request);
732 status_ = stub->Echo(&context_, request, &response);
733 });
734 }
735
CancelRpc()736 void XdsEnd2endTest::LongRunningRpc::CancelRpc() {
737 context_.TryCancel();
738 if (sender_thread_.joinable()) sender_thread_.join();
739 }
740
GetStatus()741 Status XdsEnd2endTest::LongRunningRpc::GetStatus() {
742 if (sender_thread_.joinable()) sender_thread_.join();
743 return status_;
744 }
745
746 std::vector<std::unique_ptr<XdsEnd2endTest::ConcurrentRpc>>
SendConcurrentRpcs(const grpc_core::DebugLocation & debug_location,grpc::testing::EchoTestService::Stub * stub,size_t num_rpcs,const RpcOptions & rpc_options)747 XdsEnd2endTest::SendConcurrentRpcs(
748 const grpc_core::DebugLocation& debug_location,
749 grpc::testing::EchoTestService::Stub* stub, size_t num_rpcs,
750 const RpcOptions& rpc_options) {
751 // Variables for RPCs.
752 std::vector<std::unique_ptr<ConcurrentRpc>> rpcs;
753 rpcs.reserve(num_rpcs);
754 EchoRequest request;
755 // Variables for synchronization
756 grpc_core::Mutex mu;
757 grpc_core::CondVar cv;
758 size_t completed = 0;
759 // Set-off callback RPCs
760 for (size_t i = 0; i < num_rpcs; ++i) {
761 auto rpc = std::make_unique<ConcurrentRpc>();
762 rpc_options.SetupRpc(&rpc->context, &request);
763 grpc_core::Timestamp t0 = NowFromCycleCounter();
764 stub->async()->Echo(
765 &rpc->context, &request, &rpc->response,
766 [rpc = rpc.get(), &mu, &completed, &cv, num_rpcs, t0](Status s) {
767 rpc->status = s;
768 rpc->elapsed_time = NowFromCycleCounter() - t0;
769 bool done;
770 {
771 grpc_core::MutexLock lock(&mu);
772 done = (++completed) == num_rpcs;
773 }
774 if (done) cv.Signal();
775 });
776 rpcs.push_back(std::move(rpc));
777 }
778 {
779 grpc_core::MutexLock lock(&mu);
780 cv.Wait(&mu);
781 }
782 EXPECT_EQ(completed, num_rpcs)
783 << " at " << debug_location.file() << ":" << debug_location.line();
784 return rpcs;
785 }
786
WaitForAllBackends(const grpc_core::DebugLocation & debug_location,size_t start_index,size_t stop_index,std::function<void (const RpcResult &)> check_status,const WaitForBackendOptions & wait_options,const RpcOptions & rpc_options)787 size_t XdsEnd2endTest::WaitForAllBackends(
788 const grpc_core::DebugLocation& debug_location, size_t start_index,
789 size_t stop_index, std::function<void(const RpcResult&)> check_status,
790 const WaitForBackendOptions& wait_options, const RpcOptions& rpc_options) {
791 if (check_status == nullptr) {
792 check_status = [&](const RpcResult& result) {
793 EXPECT_TRUE(result.status.ok())
794 << "code=" << result.status.error_code()
795 << " message=" << result.status.error_message() << " at "
796 << debug_location.file() << ":" << debug_location.line();
797 };
798 }
799 LOG(INFO) << "========= WAITING FOR BACKENDS [" << start_index << ", "
800 << stop_index << ") ==========";
801 size_t num_rpcs = 0;
802 SendRpcsUntil(
803 debug_location,
804 [&](const RpcResult& result) {
805 ++num_rpcs;
806 check_status(result);
807 return !SeenAllBackends(start_index, stop_index, rpc_options.service);
808 },
809 wait_options.timeout_ms, rpc_options);
810 if (wait_options.reset_counters) ResetBackendCounters();
811 LOG(INFO) << "Backends up; sent " << num_rpcs << " warm up requests";
812 return num_rpcs;
813 }
814
WaitForNack(const grpc_core::DebugLocation & debug_location,std::function<absl::optional<AdsServiceImpl::ResponseState> ()> get_state,const RpcOptions & rpc_options,StatusCode expected_status)815 absl::optional<AdsServiceImpl::ResponseState> XdsEnd2endTest::WaitForNack(
816 const grpc_core::DebugLocation& debug_location,
817 std::function<absl::optional<AdsServiceImpl::ResponseState>()> get_state,
818 const RpcOptions& rpc_options, StatusCode expected_status) {
819 absl::optional<AdsServiceImpl::ResponseState> response_state;
820 auto deadline =
821 absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor());
822 auto continue_predicate = [&]() {
823 if (absl::Now() >= deadline) {
824 return false;
825 }
826 response_state = get_state();
827 return !response_state.has_value() ||
828 response_state->state != AdsServiceImpl::ResponseState::NACKED;
829 };
830 do {
831 const Status status = SendRpc(rpc_options);
832 EXPECT_EQ(expected_status, status.error_code())
833 << "code=" << status.error_code()
834 << " message=" << status.error_message() << " at "
835 << debug_location.file() << ":" << debug_location.line();
836 } while (continue_predicate());
837 return response_state;
838 }
839
SetProtoDuration(grpc_core::Duration duration,google::protobuf::Duration * duration_proto)840 void XdsEnd2endTest::SetProtoDuration(
841 grpc_core::Duration duration, google::protobuf::Duration* duration_proto) {
842 duration *= grpc_test_slowdown_factor();
843 gpr_timespec ts = duration.as_timespec();
844 duration_proto->set_seconds(ts.tv_sec);
845 duration_proto->set_nanos(ts.tv_nsec);
846 }
847
MakeConnectionFailureRegex(absl::string_view prefix,bool has_resolution_note)848 std::string XdsEnd2endTest::MakeConnectionFailureRegex(
849 absl::string_view prefix, bool has_resolution_note) {
850 return absl::StrCat(
851 prefix,
852 "(UNKNOWN|UNAVAILABLE): "
853 // IP address
854 "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
855 // Prefixes added for context
856 "(Failed to connect to remote host: )?"
857 "(Timeout occurred: )?"
858 // Syscall
859 "((connect|sendmsg|recvmsg|getsockopt\\(SO\\_ERROR\\)): ?)?"
860 // strerror() output or other message
861 "(Connection refused"
862 "|Connection reset by peer"
863 "|Socket closed"
864 "|Broken pipe"
865 "|FD shutdown)"
866 // errno value
867 "( \\([0-9]+\\))?",
868 // xDS node ID
869 has_resolution_note ? " \\(xDS node ID:xds_end2end_test\\)" : "");
870 }
871
MakeTlsHandshakeFailureRegex(absl::string_view prefix)872 std::string XdsEnd2endTest::MakeTlsHandshakeFailureRegex(
873 absl::string_view prefix) {
874 return absl::StrCat(
875 prefix,
876 "(UNKNOWN|UNAVAILABLE): "
877 // IP address
878 "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
879 // Prefixes added for context
880 "(Failed to connect to remote host: )?"
881 // Tls handshake failure
882 "Tls handshake failed \\(TSI_PROTOCOL_FAILURE\\): SSL_ERROR_SSL: "
883 "error:1000007d:SSL routines:OPENSSL_internal:CERTIFICATE_VERIFY_FAILED"
884 // Detailed reason for certificate verify failure
885 "(: .*)?");
886 }
887
ReadTlsIdentityPair(const char * key_path,const char * cert_path)888 grpc_core::PemKeyCertPairList XdsEnd2endTest::ReadTlsIdentityPair(
889 const char* key_path, const char* cert_path) {
890 return grpc_core::PemKeyCertPairList{grpc_core::PemKeyCertPair(
891 grpc_core::testing::GetFileContents(key_path),
892 grpc_core::testing::GetFileContents(cert_path))};
893 }
894
895 std::vector<experimental::IdentityKeyCertPair>
MakeIdentityKeyCertPairForTlsCreds()896 XdsEnd2endTest::MakeIdentityKeyCertPairForTlsCreds() {
897 std::string identity_cert =
898 grpc_core::testing::GetFileContents(kServerCertPath);
899 std::string private_key = grpc_core::testing::GetFileContents(kServerKeyPath);
900 return {{std::move(private_key), std::move(identity_cert)}};
901 }
902
903 std::shared_ptr<ChannelCredentials>
CreateXdsChannelCredentials()904 XdsEnd2endTest::CreateXdsChannelCredentials() {
905 return XdsCredentials(CreateTlsChannelCredentials());
906 }
907
908 std::shared_ptr<ChannelCredentials>
CreateTlsChannelCredentials()909 XdsEnd2endTest::CreateTlsChannelCredentials() {
910 auto certificate_provider = std::make_shared<StaticDataCertificateProvider>(
911 grpc_core::testing::GetFileContents(kCaCertPath),
912 MakeIdentityKeyCertPairForTlsCreds());
913 grpc::experimental::TlsChannelCredentialsOptions options;
914 options.set_certificate_provider(std::move(certificate_provider));
915 options.watch_root_certs();
916 options.watch_identity_key_cert_pairs();
917 auto verifier =
918 ExternalCertificateVerifier::Create<SyncCertificateVerifier>(true);
919 options.set_certificate_verifier(std::move(verifier));
920 options.set_verify_server_certs(true);
921 options.set_check_call_host(false);
922 return grpc::experimental::TlsCredentials(options);
923 }
924
925 std::shared_ptr<ServerCredentials>
CreateFakeServerCredentials()926 XdsEnd2endTest::CreateFakeServerCredentials() {
927 return std::make_shared<SecureServerCredentials>(
928 grpc_fake_transport_security_server_credentials_create());
929 }
930
931 std::shared_ptr<ServerCredentials>
CreateMtlsServerCredentials()932 XdsEnd2endTest::CreateMtlsServerCredentials() {
933 std::string root_cert = grpc_core::testing::GetFileContents(kCaCertPath);
934 auto certificate_provider =
935 std::make_shared<grpc::experimental::StaticDataCertificateProvider>(
936 std::move(root_cert), MakeIdentityKeyCertPairForTlsCreds());
937 grpc::experimental::TlsServerCredentialsOptions options(
938 std::move(certificate_provider));
939 options.watch_root_certs();
940 options.watch_identity_key_cert_pairs();
941 options.set_cert_request_type(GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY);
942 return grpc::experimental::TlsServerCredentials(options);
943 }
944
945 std::shared_ptr<ServerCredentials>
CreateTlsServerCredentials()946 XdsEnd2endTest::CreateTlsServerCredentials() {
947 auto certificate_provider =
948 std::make_shared<grpc::experimental::StaticDataCertificateProvider>(
949 MakeIdentityKeyCertPairForTlsCreds());
950 grpc::experimental::TlsServerCredentialsOptions options(
951 std::move(certificate_provider));
952 options.watch_identity_key_cert_pairs();
953 return grpc::experimental::TlsServerCredentials(options);
954 }
955
956 } // namespace testing
957 } // namespace grpc
958