1 // Copyright 2016 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <gmock/gmock.h>
16 #include <grpc/event_engine/endpoint_config.h>
17 #include <grpc/grpc.h>
18 #include <grpc/support/alloc.h>
19 #include <grpc/support/atm.h>
20 #include <grpc/support/time.h>
21 #include <grpcpp/channel.h>
22 #include <grpcpp/client_context.h>
23 #include <grpcpp/create_channel.h>
24 #include <grpcpp/ext/call_metric_recorder.h>
25 #include <grpcpp/ext/orca_service.h>
26 #include <grpcpp/ext/server_metric_recorder.h>
27 #include <grpcpp/health_check_service_interface.h>
28 #include <grpcpp/impl/sync.h>
29 #include <grpcpp/server.h>
30 #include <grpcpp/server_builder.h>
31 #include <gtest/gtest.h>
32
33 #include <algorithm>
34 #include <chrono>
35 #include <deque>
36 #include <memory>
37 #include <mutex>
38 #include <random>
39 #include <set>
40 #include <string>
41 #include <thread>
42
43 #include "absl/log/check.h"
44 #include "absl/log/log.h"
45 #include "absl/memory/memory.h"
46 #include "absl/strings/str_cat.h"
47 #include "absl/strings/str_format.h"
48 #include "absl/strings/str_join.h"
49 #include "absl/strings/string_view.h"
50 #include "src/core/client_channel/backup_poller.h"
51 #include "src/core/client_channel/config_selector.h"
52 #include "src/core/client_channel/global_subchannel_pool.h"
53 #include "src/core/config/config_vars.h"
54 #include "src/core/lib/address_utils/parse_address.h"
55 #include "src/core/lib/address_utils/sockaddr_utils.h"
56 #include "src/core/lib/channel/channel_args.h"
57 #include "src/core/lib/iomgr/tcp_client.h"
58 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
59 #include "src/core/lib/transport/connectivity_state.h"
60 #include "src/core/resolver/endpoint_addresses.h"
61 #include "src/core/resolver/fake/fake_resolver.h"
62 #include "src/core/server/server.h"
63 #include "src/core/service_config/service_config.h"
64 #include "src/core/service_config/service_config_impl.h"
65 #include "src/core/util/backoff.h"
66 #include "src/core/util/crash.h"
67 #include "src/core/util/debug_location.h"
68 #include "src/core/util/env.h"
69 #include "src/core/util/notification.h"
70 #include "src/core/util/ref_counted_ptr.h"
71 #include "src/core/util/time.h"
72 #include "src/cpp/server/secure_server_credentials.h"
73 #include "src/proto/grpc/health/v1/health.grpc.pb.h"
74 #include "src/proto/grpc/testing/echo.grpc.pb.h"
75 #include "test/core/test_util/port.h"
76 #include "test/core/test_util/resolve_localhost_ip46.h"
77 #include "test/core/test_util/test_config.h"
78 #include "test/core/test_util/test_lb_policies.h"
79 #include "test/cpp/end2end/connection_attempt_injector.h"
80 #include "test/cpp/end2end/test_service_impl.h"
81 #include "test/cpp/util/credentials.h"
82 #include "xds/data/orca/v3/orca_load_report.pb.h"
83
84 namespace grpc {
85 namespace testing {
86 namespace {
87
88 using xds::data::orca::v3::OrcaLoadReport;
89 constexpr char kRequestMessage[] = "Live long and prosper.";
90
91 // A noop health check service that just terminates the call and returns OK
92 // status in its methods. This is used to test the retry mechanism in
93 // SubchannelStreamClient.
94 class NoopHealthCheckServiceImpl : public health::v1::Health::Service {
95 public:
96 ~NoopHealthCheckServiceImpl() override = default;
Check(ServerContext *,const health::v1::HealthCheckRequest *,health::v1::HealthCheckResponse *)97 Status Check(ServerContext*, const health::v1::HealthCheckRequest*,
98 health::v1::HealthCheckResponse*) override {
99 return Status::OK;
100 }
Watch(ServerContext *,const health::v1::HealthCheckRequest *,ServerWriter<health::v1::HealthCheckResponse> *)101 Status Watch(ServerContext*, const health::v1::HealthCheckRequest*,
102 ServerWriter<health::v1::HealthCheckResponse>*) override {
103 grpc_core::MutexLock lock(&mu_);
104 request_count_++;
105 return Status::OK;
106 }
request_count()107 int request_count() {
108 grpc_core::MutexLock lock(&mu_);
109 return request_count_;
110 }
111
112 private:
113 grpc_core::Mutex mu_;
114 int request_count_ ABSL_GUARDED_BY(&mu_) = 0;
115 };
116
117 // Subclass of TestServiceImpl that increments a request counter for
118 // every call to the Echo RPC.
119 class MyTestServiceImpl : public TestServiceImpl {
120 public:
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)121 Status Echo(ServerContext* context, const EchoRequest* request,
122 EchoResponse* response) override {
123 {
124 grpc_core::MutexLock lock(&mu_);
125 ++request_count_;
126 }
127 AddClient(context->peer());
128 if (request->has_param() && request->param().has_backend_metrics()) {
129 const auto& request_metrics = request->param().backend_metrics();
130 auto* recorder = context->ExperimentalGetCallMetricRecorder();
131 EXPECT_NE(recorder, nullptr);
132 // Do not record when zero since it indicates no test per-call report.
133 if (request_metrics.application_utilization() > 0) {
134 recorder->RecordApplicationUtilizationMetric(
135 request_metrics.application_utilization());
136 }
137 if (request_metrics.cpu_utilization() > 0) {
138 recorder->RecordCpuUtilizationMetric(request_metrics.cpu_utilization());
139 }
140 if (request_metrics.mem_utilization() > 0) {
141 recorder->RecordMemoryUtilizationMetric(
142 request_metrics.mem_utilization());
143 }
144 if (request_metrics.rps_fractional() > 0) {
145 recorder->RecordQpsMetric(request_metrics.rps_fractional());
146 }
147 if (request_metrics.eps() > 0) {
148 recorder->RecordEpsMetric(request_metrics.eps());
149 }
150 for (const auto& p : request_metrics.request_cost()) {
151 char* key = static_cast<char*>(
152 grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));
153 strncpy(key, p.first.data(), p.first.size());
154 key[p.first.size()] = '\0';
155 recorder->RecordRequestCostMetric(key, p.second);
156 }
157 for (const auto& p : request_metrics.utilization()) {
158 char* key = static_cast<char*>(
159 grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));
160 strncpy(key, p.first.data(), p.first.size());
161 key[p.first.size()] = '\0';
162 recorder->RecordUtilizationMetric(key, p.second);
163 }
164 for (const auto& p : request_metrics.named_metrics()) {
165 char* key = static_cast<char*>(
166 grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));
167 strncpy(key, p.first.data(), p.first.size());
168 key[p.first.size()] = '\0';
169 recorder->RecordNamedMetric(key, p.second);
170 }
171 }
172 return TestServiceImpl::Echo(context, request, response);
173 }
174
request_count()175 size_t request_count() {
176 grpc_core::MutexLock lock(&mu_);
177 return request_count_;
178 }
179
ResetCounters()180 void ResetCounters() {
181 grpc_core::MutexLock lock(&mu_);
182 request_count_ = 0;
183 }
184
clients()185 std::set<std::string> clients() {
186 grpc_core::MutexLock lock(&clients_mu_);
187 return clients_;
188 }
189
190 private:
AddClient(const std::string & client)191 void AddClient(const std::string& client) {
192 grpc_core::MutexLock lock(&clients_mu_);
193 clients_.insert(client);
194 }
195
196 grpc_core::Mutex mu_;
197 size_t request_count_ ABSL_GUARDED_BY(&mu_) = 0;
198
199 grpc_core::Mutex clients_mu_;
200 std::set<std::string> clients_ ABSL_GUARDED_BY(&clients_mu_);
201 };
202
203 class FakeResolverResponseGeneratorWrapper {
204 public:
FakeResolverResponseGeneratorWrapper()205 FakeResolverResponseGeneratorWrapper()
206 : response_generator_(grpc_core::MakeRefCounted<
207 grpc_core::FakeResolverResponseGenerator>()) {}
208
FakeResolverResponseGeneratorWrapper(FakeResolverResponseGeneratorWrapper && other)209 FakeResolverResponseGeneratorWrapper(
210 FakeResolverResponseGeneratorWrapper&& other) noexcept {
211 response_generator_ = std::move(other.response_generator_);
212 }
213
SetResponse(grpc_core::Resolver::Result result)214 void SetResponse(grpc_core::Resolver::Result result) {
215 grpc_core::ExecCtx exec_ctx;
216 response_generator_->SetResponseSynchronously(std::move(result));
217 }
218
SetNextResolution(const std::vector<int> & ports,const char * service_config_json=nullptr,const grpc_core::ChannelArgs & per_address_args=grpc_core::ChannelArgs ())219 void SetNextResolution(const std::vector<int>& ports,
220 const char* service_config_json = nullptr,
221 const grpc_core::ChannelArgs& per_address_args =
222 grpc_core::ChannelArgs()) {
223 SetResponse(BuildFakeResults(ports, service_config_json, per_address_args));
224 }
225
Get() const226 grpc_core::FakeResolverResponseGenerator* Get() const {
227 return response_generator_.get();
228 }
229
230 private:
BuildFakeResults(const std::vector<int> & ports,const char * service_config_json=nullptr,const grpc_core::ChannelArgs & per_address_args=grpc_core::ChannelArgs ())231 static grpc_core::Resolver::Result BuildFakeResults(
232 const std::vector<int>& ports, const char* service_config_json = nullptr,
233 const grpc_core::ChannelArgs& per_address_args =
234 grpc_core::ChannelArgs()) {
235 grpc_core::Resolver::Result result;
236 result.addresses = grpc_core::EndpointAddressesList();
237 for (const int& port : ports) {
238 absl::StatusOr<grpc_core::URI> lb_uri =
239 grpc_core::URI::Parse(grpc_core::LocalIpUri(port));
240 CHECK_OK(lb_uri);
241 grpc_resolved_address address;
242 CHECK(grpc_parse_uri(*lb_uri, &address));
243 result.addresses->emplace_back(address, per_address_args);
244 }
245 if (result.addresses->empty()) {
246 result.resolution_note = "fake resolver empty address list";
247 }
248 if (service_config_json != nullptr) {
249 result.service_config = grpc_core::ServiceConfigImpl::Create(
250 grpc_core::ChannelArgs(), service_config_json);
251 EXPECT_TRUE(result.service_config.ok()) << result.service_config.status();
252 }
253 return result;
254 }
255
256 grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
257 response_generator_;
258 };
259
260 constexpr absl::string_view kDefaultAuthority = "default.example.com";
261
262 class ClientLbEnd2endTest : public ::testing::Test {
263 protected:
SetUp()264 void SetUp() override { grpc_init(); }
265
TearDown()266 void TearDown() override {
267 for (size_t i = 0; i < servers_.size(); ++i) {
268 servers_[i]->Shutdown();
269 }
270 servers_.clear();
271 grpc_shutdown();
272 }
273
CreateServers(size_t num_servers,std::vector<int> ports={},std::shared_ptr<ServerCredentials> server_creds=nullptr)274 void CreateServers(
275 size_t num_servers, std::vector<int> ports = {},
276 std::shared_ptr<ServerCredentials> server_creds = nullptr) {
277 servers_.clear();
278 for (size_t i = 0; i < num_servers; ++i) {
279 int port = 0;
280 if (ports.size() == num_servers) port = ports[i];
281 servers_.emplace_back(new ServerData(port, server_creds));
282 }
283 }
284
StartServer(size_t index)285 void StartServer(size_t index) { servers_[index]->Start(); }
286
StartServers(size_t num_servers,std::vector<int> ports={},std::shared_ptr<ServerCredentials> server_creds=nullptr)287 void StartServers(size_t num_servers, std::vector<int> ports = {},
288 std::shared_ptr<ServerCredentials> server_creds = nullptr) {
289 CreateServers(num_servers, std::move(ports), std::move(server_creds));
290 for (size_t i = 0; i < num_servers; ++i) {
291 StartServer(i);
292 }
293 }
294
GetServersPorts(size_t start_index=0,size_t stop_index=0)295 std::vector<int> GetServersPorts(size_t start_index = 0,
296 size_t stop_index = 0) {
297 if (stop_index == 0) stop_index = servers_.size();
298 std::vector<int> ports;
299 for (size_t i = start_index; i < stop_index; ++i) {
300 ports.push_back(servers_[i]->port_);
301 }
302 return ports;
303 }
304
BuildStub(const std::shared_ptr<Channel> & channel)305 std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
306 const std::shared_ptr<Channel>& channel) {
307 return grpc::testing::EchoTestService::NewStub(channel);
308 }
309
BuildChannel(const std::string & lb_policy_name,const FakeResolverResponseGeneratorWrapper & response_generator,ChannelArguments args=ChannelArguments (),std::shared_ptr<ChannelCredentials> channel_creds=nullptr)310 std::shared_ptr<Channel> BuildChannel(
311 const std::string& lb_policy_name,
312 const FakeResolverResponseGeneratorWrapper& response_generator,
313 ChannelArguments args = ChannelArguments(),
314 std::shared_ptr<ChannelCredentials> channel_creds = nullptr) {
315 if (!lb_policy_name.empty()) {
316 args.SetLoadBalancingPolicyName(lb_policy_name);
317 } // else, default to pick first
318 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
319 response_generator.Get());
320 if (channel_creds == nullptr) {
321 channel_creds =
322 std::make_shared<FakeTransportSecurityChannelCredentials>();
323 }
324 return grpc::CreateCustomChannel(absl::StrCat("fake:", kDefaultAuthority),
325 channel_creds, args);
326 }
327
SendRpc(const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,EchoResponse * response=nullptr,int timeout_ms=1000,bool wait_for_ready=false,EchoRequest * request=nullptr,std::string authority_override="")328 Status SendRpc(
329 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
330 EchoResponse* response = nullptr, int timeout_ms = 1000,
331 bool wait_for_ready = false, EchoRequest* request = nullptr,
332 std::string authority_override = "") {
333 EchoResponse local_response;
334 if (response == nullptr) response = &local_response;
335 EchoRequest local_request;
336 if (request == nullptr) request = &local_request;
337 request->set_message(kRequestMessage);
338 request->mutable_param()->set_echo_metadata(true);
339 ClientContext context;
340 context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
341 if (!authority_override.empty()) context.set_authority(authority_override);
342 if (wait_for_ready) context.set_wait_for_ready(true);
343 context.AddMetadata("foo", "1");
344 context.AddMetadata("bar", "2");
345 context.AddMetadata("baz", "3");
346 return stub->Echo(&context, *request, response);
347 }
348
CheckRpcSendOk(const grpc_core::DebugLocation & location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,bool wait_for_ready=false,const OrcaLoadReport * load_report=nullptr,int timeout_ms=2000)349 void CheckRpcSendOk(
350 const grpc_core::DebugLocation& location,
351 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
352 bool wait_for_ready = false, const OrcaLoadReport* load_report = nullptr,
353 int timeout_ms = 2000) {
354 EchoResponse response;
355 EchoRequest request;
356 EchoRequest* request_ptr = nullptr;
357 if (load_report != nullptr) {
358 request_ptr = &request;
359 auto params = request.mutable_param();
360 auto backend_metrics = params->mutable_backend_metrics();
361 *backend_metrics = *load_report;
362 }
363 Status status =
364 SendRpc(stub, &response, timeout_ms, wait_for_ready, request_ptr);
365 ASSERT_TRUE(status.ok())
366 << "From " << location.file() << ":" << location.line()
367 << "\nError: " << status.error_message() << " "
368 << status.error_details();
369 ASSERT_EQ(response.message(), kRequestMessage)
370 << "From " << location.file() << ":" << location.line();
371 }
372
CheckRpcSendFailure(const grpc_core::DebugLocation & location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,StatusCode expected_status,absl::string_view expected_message_regex)373 void CheckRpcSendFailure(
374 const grpc_core::DebugLocation& location,
375 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
376 StatusCode expected_status, absl::string_view expected_message_regex) {
377 Status status = SendRpc(stub);
378 EXPECT_FALSE(status.ok());
379 EXPECT_EQ(expected_status, status.error_code())
380 << location.file() << ":" << location.line();
381 EXPECT_THAT(status.error_message(),
382 ::testing::MatchesRegex(expected_message_regex))
383 << location.file() << ":" << location.line();
384 }
385
SendRpcsUntil(const grpc_core::DebugLocation & debug_location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,absl::AnyInvocable<bool (const Status &)> continue_predicate,EchoRequest * request_ptr=nullptr,int timeout_ms=15000)386 void SendRpcsUntil(
387 const grpc_core::DebugLocation& debug_location,
388 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
389 absl::AnyInvocable<bool(const Status&)> continue_predicate,
390 EchoRequest* request_ptr = nullptr, int timeout_ms = 15000) {
391 absl::Time deadline = absl::InfiniteFuture();
392 if (timeout_ms != 0) {
393 deadline = absl::Now() +
394 (absl::Milliseconds(timeout_ms) * grpc_test_slowdown_factor());
395 }
396 while (true) {
397 Status status =
398 SendRpc(stub, /*response=*/nullptr, /*timeout_ms=*/1000,
399 /*wait_for_ready=*/false, /*request=*/request_ptr);
400 if (!continue_predicate(status)) return;
401 EXPECT_LE(absl::Now(), deadline)
402 << debug_location.file() << ":" << debug_location.line();
403 if (absl::Now() >= deadline) break;
404 }
405 }
406
407 struct ServerData {
408 const int port_;
409 const std::shared_ptr<ServerCredentials> server_creds_;
410 std::unique_ptr<Server> server_;
411 MyTestServiceImpl service_;
412 std::unique_ptr<experimental::ServerMetricRecorder> server_metric_recorder_;
413 experimental::OrcaService orca_service_;
414 std::unique_ptr<std::thread> thread_;
415 bool enable_noop_health_check_service_ = false;
416 NoopHealthCheckServiceImpl noop_health_check_service_impl_;
417
418 grpc_core::Mutex mu_;
419 grpc_core::CondVar cond_;
420 bool server_ready_ ABSL_GUARDED_BY(mu_) = false;
421 bool started_ ABSL_GUARDED_BY(mu_) = false;
422
ServerDatagrpc::testing::__anon64669aad0111::ClientLbEnd2endTest::ServerData423 explicit ServerData(
424 int port = 0, std::shared_ptr<ServerCredentials> server_creds = nullptr)
425 : port_(port > 0 ? port : grpc_pick_unused_port_or_die()),
426 server_creds_(
427 server_creds == nullptr
428 ? std::shared_ptr<
429 ServerCredentials>(new SecureServerCredentials(
430 grpc_fake_transport_security_server_credentials_create()))
431 : std::move(server_creds)),
432 server_metric_recorder_(experimental::ServerMetricRecorder::Create()),
433 orca_service_(
434 server_metric_recorder_.get(),
435 experimental::OrcaService::Options().set_min_report_duration(
436 absl::Seconds(0.1))) {}
437
Startgrpc::testing::__anon64669aad0111::ClientLbEnd2endTest::ServerData438 void Start() {
439 LOG(INFO) << "starting server on port " << port_;
440 grpc_core::MutexLock lock(&mu_);
441 started_ = true;
442 thread_ =
443 std::make_unique<std::thread>(std::bind(&ServerData::Serve, this));
444 while (!server_ready_) {
445 cond_.Wait(&mu_);
446 }
447 server_ready_ = false;
448 LOG(INFO) << "server startup complete";
449 }
450
Servegrpc::testing::__anon64669aad0111::ClientLbEnd2endTest::ServerData451 void Serve() {
452 ServerBuilder builder;
453 builder.AddListeningPort(absl::StrCat("localhost:", port_),
454 server_creds_);
455 builder.RegisterService(&service_);
456 builder.RegisterService(&orca_service_);
457 if (enable_noop_health_check_service_) {
458 builder.RegisterService(&noop_health_check_service_impl_);
459 }
460 grpc::ServerBuilder::experimental_type(&builder)
461 .EnableCallMetricRecording(server_metric_recorder_.get());
462 server_ = builder.BuildAndStart();
463 grpc_core::MutexLock lock(&mu_);
464 server_ready_ = true;
465 cond_.Signal();
466 }
467
Shutdowngrpc::testing::__anon64669aad0111::ClientLbEnd2endTest::ServerData468 void Shutdown() {
469 grpc_core::MutexLock lock(&mu_);
470 if (!started_) return;
471 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
472 thread_->join();
473 started_ = false;
474 }
475
StopListeningAndSendGoawaysgrpc::testing::__anon64669aad0111::ClientLbEnd2endTest::ServerData476 void StopListeningAndSendGoaways() {
477 grpc_core::ExecCtx exec_ctx;
478 auto* server = grpc_core::Server::FromC(server_->c_server());
479 server->StopListening();
480 server->SendGoaways();
481 }
482
SetServingStatusgrpc::testing::__anon64669aad0111::ClientLbEnd2endTest::ServerData483 void SetServingStatus(const std::string& service, bool serving) {
484 server_->GetHealthCheckService()->SetServingStatus(service, serving);
485 }
486 };
487
ResetCounters()488 void ResetCounters() {
489 for (const auto& server : servers_) server->service_.ResetCounters();
490 }
491
SeenAllServers(size_t start_index=0,size_t stop_index=0)492 bool SeenAllServers(size_t start_index = 0, size_t stop_index = 0) {
493 if (stop_index == 0) stop_index = servers_.size();
494 for (size_t i = start_index; i < stop_index; ++i) {
495 if (servers_[i]->service_.request_count() == 0) return false;
496 }
497 return true;
498 }
499
500 // If status_check is null, all RPCs must succeed.
501 // If status_check is non-null, it will be called for all non-OK RPCs.
WaitForServers(const grpc_core::DebugLocation & location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,size_t start_index=0,size_t stop_index=0,absl::AnyInvocable<void (const Status &)> status_check=nullptr,absl::Duration timeout=absl::Seconds (30))502 void WaitForServers(
503 const grpc_core::DebugLocation& location,
504 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
505 size_t start_index = 0, size_t stop_index = 0,
506 absl::AnyInvocable<void(const Status&)> status_check = nullptr,
507 absl::Duration timeout = absl::Seconds(30)) {
508 if (stop_index == 0) stop_index = servers_.size();
509 auto deadline = absl::Now() + (timeout * grpc_test_slowdown_factor());
510 LOG(INFO) << "========= WAITING FOR BACKENDS [" << start_index << ", "
511 << stop_index << ") ==========";
512 while (!SeenAllServers(start_index, stop_index)) {
513 Status status = SendRpc(stub);
514 if (status_check != nullptr) {
515 if (!status.ok()) status_check(status);
516 } else {
517 EXPECT_TRUE(status.ok())
518 << " code=" << status.error_code() << " message=\""
519 << status.error_message() << "\" at " << location.file() << ":"
520 << location.line();
521 }
522 EXPECT_LE(absl::Now(), deadline)
523 << " at " << location.file() << ":" << location.line();
524 if (absl::Now() >= deadline) break;
525 }
526 ResetCounters();
527 }
528
WaitForServer(const grpc_core::DebugLocation & location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,size_t server_index,absl::AnyInvocable<void (const Status &)> status_check=nullptr)529 void WaitForServer(
530 const grpc_core::DebugLocation& location,
531 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
532 size_t server_index,
533 absl::AnyInvocable<void(const Status&)> status_check = nullptr) {
534 WaitForServers(location, stub, server_index, server_index + 1,
535 std::move(status_check));
536 }
537
WaitForChannelState(Channel * channel,absl::AnyInvocable<bool (grpc_connectivity_state)> predicate,bool try_to_connect=false,int timeout_seconds=5)538 bool WaitForChannelState(
539 Channel* channel,
540 absl::AnyInvocable<bool(grpc_connectivity_state)> predicate,
541 bool try_to_connect = false, int timeout_seconds = 5) {
542 const gpr_timespec deadline =
543 grpc_timeout_seconds_to_deadline(timeout_seconds);
544 while (true) {
545 grpc_connectivity_state state = channel->GetState(try_to_connect);
546 if (predicate(state)) break;
547 if (!channel->WaitForStateChange(state, deadline)) return false;
548 }
549 return true;
550 }
551
WaitForChannelNotReady(Channel * channel,int timeout_seconds=5)552 bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
553 auto predicate = [](grpc_connectivity_state state) {
554 return state != GRPC_CHANNEL_READY;
555 };
556 return WaitForChannelState(channel, predicate, false, timeout_seconds);
557 }
558
WaitForChannelReady(Channel * channel,int timeout_seconds=5)559 bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) {
560 auto predicate = [](grpc_connectivity_state state) {
561 return state == GRPC_CHANNEL_READY;
562 };
563 return WaitForChannelState(channel, predicate, true, timeout_seconds);
564 }
565
566 // Updates \a connection_order by appending to it the index of the newly
567 // connected server. Must be called after every single RPC.
UpdateConnectionOrder(const std::vector<std::unique_ptr<ServerData>> & servers,std::vector<int> * connection_order)568 void UpdateConnectionOrder(
569 const std::vector<std::unique_ptr<ServerData>>& servers,
570 std::vector<int>* connection_order) {
571 for (size_t i = 0; i < servers.size(); ++i) {
572 if (servers[i]->service_.request_count() == 1) {
573 // Was the server index known? If not, update connection_order.
574 const auto it =
575 std::find(connection_order->begin(), connection_order->end(), i);
576 if (it == connection_order->end()) {
577 connection_order->push_back(i);
578 return;
579 }
580 }
581 }
582 }
583
EnableNoopHealthCheckService()584 void EnableNoopHealthCheckService() {
585 for (auto& server : servers_) {
586 server->enable_noop_health_check_service_ = true;
587 }
588 }
589
MakeConnectionFailureRegex(absl::string_view prefix)590 static std::string MakeConnectionFailureRegex(absl::string_view prefix) {
591 return absl::StrCat(
592 prefix,
593 "; last error: (UNKNOWN|UNAVAILABLE): "
594 // IP address
595 "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
596 // Prefixes added for context
597 "(Failed to connect to remote host: )?"
598 "(Timeout occurred: )?"
599 // Syscall
600 "((connect|sendmsg|recvmsg|getsockopt\\(SO\\_ERROR\\)): ?)?"
601 // strerror() output or other message
602 "(Connection refused"
603 "|Connection reset by peer"
604 "|Socket closed"
605 "|FD shutdown)"
606 // errno value
607 "( \\([0-9]+\\))?");
608 }
609
610 std::vector<std::unique_ptr<ServerData>> servers_;
611 };
612
TEST_F(ClientLbEnd2endTest,ChannelStateConnectingWhenResolving)613 TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
614 const int kNumServers = 3;
615 StartServers(kNumServers);
616 FakeResolverResponseGeneratorWrapper response_generator;
617 auto channel = BuildChannel("", response_generator);
618 auto stub = BuildStub(channel);
619 // Initial state should be IDLE.
620 EXPECT_EQ(channel->GetState(false /* try_to_connect */), GRPC_CHANNEL_IDLE);
621 // Tell the channel to try to connect.
622 // Note that this call also returns IDLE, since the state change has
623 // not yet occurred; it just gets triggered by this call.
624 EXPECT_EQ(channel->GetState(true /* try_to_connect */), GRPC_CHANNEL_IDLE);
625 // Now that the channel is trying to connect, we should get to state
626 // CONNECTING.
627 ASSERT_TRUE(
628 WaitForChannelState(channel.get(), [&](grpc_connectivity_state state) {
629 if (state == GRPC_CHANNEL_IDLE) return false;
630 EXPECT_EQ(state, GRPC_CHANNEL_CONNECTING);
631 return true;
632 }));
633 // Return a resolver result, which allows the connection attempt to proceed.
634 response_generator.SetNextResolution(GetServersPorts());
635 // We should eventually transition into state READY.
636 EXPECT_TRUE(WaitForChannelReady(channel.get()));
637 }
638
TEST_F(ClientLbEnd2endTest,ChannelIdleness)639 TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
640 // Start server.
641 const int kNumServers = 1;
642 StartServers(kNumServers);
643 // Set max idle time and build the channel.
644 ChannelArguments args;
645 args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS,
646 1000 * grpc_test_slowdown_factor());
647 FakeResolverResponseGeneratorWrapper response_generator;
648 auto channel = BuildChannel("", response_generator, args);
649 auto stub = BuildStub(channel);
650 // The initial channel state should be IDLE.
651 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
652 // After sending RPC, channel state should be READY.
653 LOG(INFO) << "*** SENDING RPC, CHANNEL SHOULD CONNECT ***";
654 response_generator.SetNextResolution(GetServersPorts());
655 CheckRpcSendOk(DEBUG_LOCATION, stub);
656 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
657 // After a period time not using the channel, the channel state should switch
658 // to IDLE.
659 LOG(INFO) << "*** WAITING FOR CHANNEL TO GO IDLE ***";
660 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200));
661 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
662 // Sending a new RPC should awake the IDLE channel.
663 LOG(INFO) << "*** SENDING ANOTHER RPC, CHANNEL SHOULD RECONNECT ***";
664 response_generator.SetNextResolution(GetServersPorts());
665 CheckRpcSendOk(DEBUG_LOCATION, stub);
666 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
667 }
668
669 //
670 // authority override tests
671 //
672
673 class AuthorityOverrideTest : public ClientLbEnd2endTest {
674 protected:
SetUpTestSuite()675 static void SetUpTestSuite() {
676 grpc_core::CoreConfiguration::Reset();
677 grpc_core::CoreConfiguration::RegisterBuilder(
678 [](grpc_core::CoreConfiguration::Builder* builder) {
679 grpc_core::RegisterAuthorityOverrideLoadBalancingPolicy(builder);
680 });
681 grpc_init();
682 }
683
TearDownTestSuite()684 static void TearDownTestSuite() {
685 grpc_shutdown();
686 grpc_core::CoreConfiguration::Reset();
687 }
688 };
689
TEST_F(AuthorityOverrideTest,NoOverride)690 TEST_F(AuthorityOverrideTest, NoOverride) {
691 StartServers(1);
692 FakeResolverResponseGeneratorWrapper response_generator;
693 auto channel = BuildChannel("", response_generator);
694 auto stub = BuildStub(channel);
695 response_generator.SetNextResolution(GetServersPorts());
696 // Send an RPC.
697 EchoRequest request;
698 request.mutable_param()->set_echo_host_from_authority_header(true);
699 EchoResponse response;
700 Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
701 /*wait_for_ready=*/false, &request);
702 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
703 << " message=" << status.error_message();
704 // Check that the right authority was seen by the server.
705 EXPECT_EQ(kDefaultAuthority, response.param().host());
706 }
707
TEST_F(AuthorityOverrideTest,OverrideFromResolver)708 TEST_F(AuthorityOverrideTest, OverrideFromResolver) {
709 StartServers(1);
710 FakeResolverResponseGeneratorWrapper response_generator;
711 auto channel = BuildChannel("", response_generator);
712 auto stub = BuildStub(channel);
713 // Inject resolver result that sets the per-address authority to a
714 // different value.
715 response_generator.SetNextResolution(
716 GetServersPorts(), /*service_config_json=*/nullptr,
717 grpc_core::ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY,
718 "from-resolver.example.com"));
719 // Send an RPC.
720 EchoRequest request;
721 request.mutable_param()->set_echo_host_from_authority_header(true);
722 EchoResponse response;
723 Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
724 /*wait_for_ready=*/false, &request);
725 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
726 << " message=" << status.error_message();
727 // Check that the right authority was seen by the server.
728 EXPECT_EQ("from-resolver.example.com", response.param().host());
729 }
730
TEST_F(AuthorityOverrideTest,OverrideOnChannel)731 TEST_F(AuthorityOverrideTest, OverrideOnChannel) {
732 StartServers(1);
733 // Set authority via channel arg.
734 FakeResolverResponseGeneratorWrapper response_generator;
735 ChannelArguments args;
736 args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "from-channel.example.com");
737 auto channel = BuildChannel("", response_generator, args);
738 auto stub = BuildStub(channel);
739 response_generator.SetNextResolution(GetServersPorts());
740 // Send an RPC.
741 EchoRequest request;
742 request.mutable_param()->set_echo_host_from_authority_header(true);
743 EchoResponse response;
744 Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
745 /*wait_for_ready=*/false, &request);
746 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
747 << " message=" << status.error_message();
748 // Check that the right authority was seen by the server.
749 EXPECT_EQ("from-channel.example.com", response.param().host());
750 }
751
TEST_F(AuthorityOverrideTest,OverrideFromLbPolicy)752 TEST_F(AuthorityOverrideTest, OverrideFromLbPolicy) {
753 // We use InsecureCreds here to avoid the authority check in the fake
754 // security connector.
755 StartServers(1, {}, InsecureServerCredentials());
756 FakeResolverResponseGeneratorWrapper response_generator;
757 ChannelArguments args;
758 args.SetString(GRPC_ARG_TEST_LB_AUTHORITY_OVERRIDE, "from-lb.example.com");
759 auto channel = BuildChannel("authority_override_lb", response_generator, args,
760 InsecureChannelCredentials());
761 auto stub = BuildStub(channel);
762 response_generator.SetNextResolution(GetServersPorts());
763 // Send an RPC.
764 EchoRequest request;
765 request.mutable_param()->set_echo_host_from_authority_header(true);
766 EchoResponse response;
767 Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
768 /*wait_for_ready=*/false, &request);
769 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
770 << " message=" << status.error_message();
771 // Check that the right authority was seen by the server.
772 EXPECT_EQ("from-lb.example.com", response.param().host());
773 }
774
TEST_F(AuthorityOverrideTest,PerRpcOverride)775 TEST_F(AuthorityOverrideTest, PerRpcOverride) {
776 // We use InsecureCreds here to avoid the authority check in the fake
777 // security connector.
778 StartServers(1, {}, InsecureServerCredentials());
779 FakeResolverResponseGeneratorWrapper response_generator;
780 auto channel = BuildChannel("", response_generator, ChannelArguments(),
781 InsecureChannelCredentials());
782 auto stub = BuildStub(channel);
783 response_generator.SetNextResolution(GetServersPorts());
784 // Send an RPC.
785 EchoRequest request;
786 request.mutable_param()->set_echo_host_from_authority_header(true);
787 EchoResponse response;
788 Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
789 /*wait_for_ready=*/false, &request,
790 /*authority_override=*/"per-rpc.example.com");
791 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
792 << " message=" << status.error_message();
793 // Check that the right authority was seen by the server.
794 EXPECT_EQ("per-rpc.example.com", response.param().host());
795 }
796
TEST_F(AuthorityOverrideTest,ChannelOverrideTakesPrecedenceOverResolverOverride)797 TEST_F(AuthorityOverrideTest,
798 ChannelOverrideTakesPrecedenceOverResolverOverride) {
799 StartServers(1);
800 // Set authority via channel arg.
801 FakeResolverResponseGeneratorWrapper response_generator;
802 ChannelArguments args;
803 args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "from-channel.example.com");
804 auto channel = BuildChannel("", response_generator, args);
805 auto stub = BuildStub(channel);
806 // Inject resolver result that sets the per-address authority to a
807 // different value.
808 response_generator.SetNextResolution(
809 GetServersPorts(), /*service_config_json=*/nullptr,
810 grpc_core::ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY,
811 "from-resolver.example.com"));
812 // Send an RPC.
813 EchoRequest request;
814 request.mutable_param()->set_echo_host_from_authority_header(true);
815 EchoResponse response;
816 Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
817 /*wait_for_ready=*/false, &request);
818 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
819 << " message=" << status.error_message();
820 // Check that the right authority was seen by the server.
821 EXPECT_EQ("from-channel.example.com", response.param().host());
822 }
823
TEST_F(AuthorityOverrideTest,LbPolicyOverrideTakesPrecedenceOverChannelOverride)824 TEST_F(AuthorityOverrideTest,
825 LbPolicyOverrideTakesPrecedenceOverChannelOverride) {
826 // We use InsecureCreds here to avoid the authority check in the fake
827 // security connector.
828 StartServers(1, {}, InsecureServerCredentials());
829 FakeResolverResponseGeneratorWrapper response_generator;
830 ChannelArguments args;
831 args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "from-channel.example.com");
832 args.SetString(GRPC_ARG_TEST_LB_AUTHORITY_OVERRIDE, "from-lb.example.com");
833 auto channel = BuildChannel("authority_override_lb", response_generator, args,
834 InsecureChannelCredentials());
835 auto stub = BuildStub(channel);
836 response_generator.SetNextResolution(GetServersPorts());
837 // Send an RPC.
838 EchoRequest request;
839 request.mutable_param()->set_echo_host_from_authority_header(true);
840 EchoResponse response;
841 Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
842 /*wait_for_ready=*/false, &request);
843 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
844 << " message=" << status.error_message();
845 // Check that the right authority was seen by the server.
846 EXPECT_EQ("from-lb.example.com", response.param().host());
847 }
848
TEST_F(AuthorityOverrideTest,PerRpcOverrideTakesPrecedenceOverLbPolicyOverride)849 TEST_F(AuthorityOverrideTest,
850 PerRpcOverrideTakesPrecedenceOverLbPolicyOverride) {
851 // We use InsecureCreds here to avoid the authority check in the fake
852 // security connector.
853 StartServers(1, {}, InsecureServerCredentials());
854 FakeResolverResponseGeneratorWrapper response_generator;
855 ChannelArguments args;
856 args.SetString(GRPC_ARG_TEST_LB_AUTHORITY_OVERRIDE, "from-lb.example.com");
857 auto channel = BuildChannel("authority_override_lb", response_generator, args,
858 InsecureChannelCredentials());
859 auto stub = BuildStub(channel);
860 response_generator.SetNextResolution(GetServersPorts());
861 // Send an RPC.
862 EchoRequest request;
863 request.mutable_param()->set_echo_host_from_authority_header(true);
864 EchoResponse response;
865 Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
866 /*wait_for_ready=*/false, &request,
867 /*authority_override=*/"per-rpc.example.com");
868 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
869 << " message=" << status.error_message();
870 // Check that the right authority was seen by the server.
871 EXPECT_EQ("per-rpc.example.com", response.param().host());
872 }
873
874 //
875 // pick_first tests
876 //
877
878 using PickFirstTest = ClientLbEnd2endTest;
879
TEST_F(PickFirstTest,Basic)880 TEST_F(PickFirstTest, Basic) {
881 // Start servers and send one RPC per server.
882 const int kNumServers = 3;
883 StartServers(kNumServers);
884 FakeResolverResponseGeneratorWrapper response_generator;
885 auto channel = BuildChannel(
886 "", response_generator); // test that pick first is the default.
887 auto stub = BuildStub(channel);
888 response_generator.SetNextResolution(GetServersPorts());
889 for (size_t i = 0; i < servers_.size(); ++i) {
890 CheckRpcSendOk(DEBUG_LOCATION, stub);
891 }
892 // All requests should have gone to a single server.
893 bool found = false;
894 for (size_t i = 0; i < servers_.size(); ++i) {
895 const int request_count = servers_[i]->service_.request_count();
896 if (request_count == kNumServers) {
897 found = true;
898 } else {
899 EXPECT_EQ(0, request_count);
900 }
901 }
902 EXPECT_TRUE(found);
903 // Check LB policy name for the channel.
904 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
905 }
906
TEST_F(PickFirstTest,ProcessPending)907 TEST_F(PickFirstTest, ProcessPending) {
908 StartServers(1); // Single server
909 FakeResolverResponseGeneratorWrapper response_generator;
910 auto channel = BuildChannel(
911 "", response_generator); // test that pick first is the default.
912 auto stub = BuildStub(channel);
913 response_generator.SetNextResolution({servers_[0]->port_});
914 WaitForServer(DEBUG_LOCATION, stub, 0);
915 // Create a new channel and its corresponding PF LB policy, which will pick
916 // the subchannels in READY state from the previous RPC against the same
917 // target (even if it happened over a different channel, because subchannels
918 // are globally reused). Progress should happen without any transition from
919 // this READY state.
920 FakeResolverResponseGeneratorWrapper second_response_generator;
921 auto second_channel = BuildChannel("", second_response_generator);
922 auto second_stub = BuildStub(second_channel);
923 second_response_generator.SetNextResolution({servers_[0]->port_});
924 CheckRpcSendOk(DEBUG_LOCATION, second_stub);
925 }
926
TEST_F(PickFirstTest,SelectsReadyAtStartup)927 TEST_F(PickFirstTest, SelectsReadyAtStartup) {
928 ChannelArguments args;
929 constexpr int kInitialBackOffMs = 5000;
930 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
931 kInitialBackOffMs * grpc_test_slowdown_factor());
932 // Create 2 servers, but start only the second one.
933 std::vector<int> ports = {grpc_pick_unused_port_or_die(),
934 grpc_pick_unused_port_or_die()};
935 CreateServers(2, ports);
936 StartServer(1);
937 FakeResolverResponseGeneratorWrapper response_generator1;
938 auto channel1 = BuildChannel("pick_first", response_generator1, args);
939 auto stub1 = BuildStub(channel1);
940 response_generator1.SetNextResolution(ports);
941 // Wait for second server to be ready.
942 WaitForServer(DEBUG_LOCATION, stub1, 1);
943 // Create a second channel with the same addresses. Its PF instance
944 // should immediately pick the second subchannel, since it's already
945 // in READY state.
946 FakeResolverResponseGeneratorWrapper response_generator2;
947 auto channel2 = BuildChannel("pick_first", response_generator2, args);
948 response_generator2.SetNextResolution(ports);
949 // Check that the channel reports READY without waiting for the
950 // initial backoff.
951 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */));
952 }
953
TEST_F(PickFirstTest,BackOffInitialReconnect)954 TEST_F(PickFirstTest, BackOffInitialReconnect) {
955 StartServers(1);
956 ChannelArguments args;
957 constexpr int kInitialBackOffMs = 100;
958 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
959 kInitialBackOffMs * grpc_test_slowdown_factor());
960 FakeResolverResponseGeneratorWrapper response_generator;
961 auto channel = BuildChannel("pick_first", response_generator, args);
962 auto stub = BuildStub(channel);
963 response_generator.SetNextResolution({servers_[0]->port_});
964 // Intercept the first two connection attempts.
965 ConnectionAttemptInjector injector;
966 auto hold1 = injector.AddHold(servers_[0]->port_);
967 auto hold2 = injector.AddHold(servers_[0]->port_);
968 // Start trying to connect.
969 EXPECT_EQ(channel->GetState(/*try_to_connect=*/true), GRPC_CHANNEL_IDLE);
970 // When the first connection attempt starts, record the time, then fail the
971 // attempt.
972 hold1->Wait();
973 const grpc_core::Timestamp first_attempt_time = grpc_core::Timestamp::Now();
974 hold1->Fail(absl::UnavailableError("nope"));
975 // Wait for the second attempt and see how long it took.
976 hold2->Wait();
977 const grpc_core::Duration waited =
978 grpc_core::Timestamp::Now() - first_attempt_time;
979 // The channel will transition to TRANSIENT_FAILURE.
980 EXPECT_TRUE(
981 WaitForChannelState(channel.get(), [&](grpc_connectivity_state state) {
982 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) return true;
983 EXPECT_THAT(state, ::testing::AnyOf(GRPC_CHANNEL_IDLE,
984 GRPC_CHANNEL_CONNECTING));
985 return false;
986 }));
987 // Now let the second attempt complete.
988 hold2->Resume();
989 // The channel will transition to READY.
990 EXPECT_TRUE(WaitForChannelReady(channel.get()));
991 // Check how long it took.
992 VLOG(2) << "Waited " << waited.millis() << " milliseconds";
993 // We should have waited at least kInitialBackOffMs, plus or minus
994 // jitter. Jitter is 0.2, but we give extra leeway to account for
995 // measurement skew, thread hops, etc.
996 EXPECT_GE(waited.millis(),
997 (kInitialBackOffMs * grpc_test_slowdown_factor()) * 0.7);
998 EXPECT_LE(waited.millis(),
999 (kInitialBackOffMs * grpc_test_slowdown_factor()) * 1.3);
1000 }
1001
TEST_F(PickFirstTest,BackOffMinReconnect)1002 TEST_F(PickFirstTest, BackOffMinReconnect) {
1003 ChannelArguments args;
1004 constexpr int kMinReconnectBackOffMs = 1000;
1005 args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS,
1006 kMinReconnectBackOffMs * grpc_test_slowdown_factor());
1007 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
1008 FakeResolverResponseGeneratorWrapper response_generator;
1009 auto channel = BuildChannel("pick_first", response_generator, args);
1010 auto stub = BuildStub(channel);
1011 response_generator.SetNextResolution(ports);
1012 // Make connection delay a 10% longer than it's willing to in order to make
1013 // sure we are hitting the codepath that waits for the min reconnect backoff.
1014 ConnectionAttemptInjector injector;
1015 injector.SetDelay(grpc_core::Duration::Milliseconds(
1016 kMinReconnectBackOffMs * grpc_test_slowdown_factor() * 1.10));
1017 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
1018 channel->WaitForConnected(
1019 grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2));
1020 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
1021 const grpc_core::Duration waited =
1022 grpc_core::Duration::FromTimespec(gpr_time_sub(t1, t0));
1023 VLOG(2) << "Waited " << waited.millis() << " milliseconds";
1024 // We should have waited at least kMinReconnectBackOffMs. We subtract one to
1025 // account for test and precision accuracy drift.
1026 EXPECT_GE(waited.millis(),
1027 (kMinReconnectBackOffMs * grpc_test_slowdown_factor()) - 1);
1028 }
1029
TEST_F(PickFirstTest,ResetConnectionBackoff)1030 TEST_F(PickFirstTest, ResetConnectionBackoff) {
1031 ChannelArguments args;
1032 constexpr int kInitialBackOffMs = 1000;
1033 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
1034 kInitialBackOffMs * grpc_test_slowdown_factor());
1035 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
1036 FakeResolverResponseGeneratorWrapper response_generator;
1037 auto channel = BuildChannel("pick_first", response_generator, args);
1038 auto stub = BuildStub(channel);
1039 response_generator.SetNextResolution(ports);
1040 // The channel won't become connected (there's no server).
1041 EXPECT_FALSE(
1042 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
1043 // Bring up a server on the chosen port.
1044 StartServers(1, ports);
1045 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
1046 // Wait for connect, but not long enough. This proves that we're
1047 // being throttled by initial backoff.
1048 EXPECT_FALSE(
1049 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
1050 // Reset connection backoff.
1051 experimental::ChannelResetConnectionBackoff(channel.get());
1052 // Wait for connect. Should happen as soon as the client connects to
1053 // the newly started server, which should be before the initial
1054 // backoff timeout elapses.
1055 EXPECT_TRUE(channel->WaitForConnected(
1056 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs)));
1057 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
1058 const grpc_core::Duration waited =
1059 grpc_core::Duration::FromTimespec(gpr_time_sub(t1, t0));
1060 VLOG(2) << "Waited " << waited.millis() << " milliseconds";
1061 // We should have waited less than kInitialBackOffMs.
1062 EXPECT_LT(waited.millis(), kInitialBackOffMs * grpc_test_slowdown_factor());
1063 }
1064
TEST_F(ClientLbEnd2endTest,ResetConnectionBackoffNextAttemptStartsImmediately)1065 TEST_F(ClientLbEnd2endTest,
1066 ResetConnectionBackoffNextAttemptStartsImmediately) {
1067 // Start connection injector.
1068 ConnectionAttemptInjector injector;
1069 // Create client.
1070 const int port = grpc_pick_unused_port_or_die();
1071 ChannelArguments args;
1072 const int kInitialBackOffMs = 5000;
1073 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
1074 kInitialBackOffMs * grpc_test_slowdown_factor());
1075 FakeResolverResponseGeneratorWrapper response_generator;
1076 auto channel = BuildChannel("pick_first", response_generator, args);
1077 auto stub = BuildStub(channel);
1078 response_generator.SetNextResolution({port});
1079 // Intercept initial connection attempt.
1080 auto hold1 = injector.AddHold(port);
1081 LOG(INFO) << "=== TRIGGERING INITIAL CONNECTION ATTEMPT";
1082 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(/*try_to_connect=*/true));
1083 hold1->Wait();
1084 EXPECT_EQ(GRPC_CHANNEL_CONNECTING,
1085 channel->GetState(/*try_to_connect=*/false));
1086 // Reset backoff.
1087 LOG(INFO) << "=== RESETTING BACKOFF";
1088 experimental::ChannelResetConnectionBackoff(channel.get());
1089 // Intercept next attempt. Do this before resuming the first attempt,
1090 // just in case the client makes progress faster than this thread.
1091 auto hold2 = injector.AddHold(port);
1092 // Fail current attempt and wait for next one to start.
1093 LOG(INFO) << "=== RESUMING INITIAL ATTEMPT";
1094 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
1095 hold1->Resume();
1096 LOG(INFO) << "=== WAITING FOR SECOND ATTEMPT";
1097 // This WaitForStateChange() call just makes sure we're doing some polling.
1098 EXPECT_TRUE(channel->WaitForStateChange(GRPC_CHANNEL_CONNECTING,
1099 grpc_timeout_seconds_to_deadline(1)));
1100 hold2->Wait();
1101 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
1102 LOG(INFO) << "=== RESUMING SECOND ATTEMPT";
1103 hold2->Resume();
1104 // Elapsed time should be very short, much less than kInitialBackOffMs.
1105 const grpc_core::Duration waited =
1106 grpc_core::Duration::FromTimespec(gpr_time_sub(t1, t0));
1107 VLOG(2) << "Waited " << waited.millis() << " milliseconds";
1108 EXPECT_LT(waited.millis(), 1000 * grpc_test_slowdown_factor());
1109 }
1110
TEST_F(PickFirstTest,Updates)1111 TEST_F(PickFirstTest, Updates) {
1112 // Start servers and send one RPC per server.
1113 const int kNumServers = 3;
1114 StartServers(kNumServers);
1115 FakeResolverResponseGeneratorWrapper response_generator;
1116 auto channel = BuildChannel("pick_first", response_generator);
1117 auto stub = BuildStub(channel);
1118 // Perform one RPC against the first server.
1119 response_generator.SetNextResolution(GetServersPorts(0, 1));
1120 LOG(INFO) << "****** SET [0] *******";
1121 CheckRpcSendOk(DEBUG_LOCATION, stub);
1122 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
1123 // An empty update will result in the channel going into TRANSIENT_FAILURE.
1124 response_generator.SetNextResolution({});
1125 LOG(INFO) << "****** SET none *******";
1126 WaitForChannelNotReady(channel.get());
1127 // Next update introduces servers_[1], making the channel recover.
1128 response_generator.SetNextResolution(GetServersPorts(1, 2));
1129 LOG(INFO) << "****** SET [1] *******";
1130 WaitForChannelReady(channel.get());
1131 WaitForServer(DEBUG_LOCATION, stub, 1);
1132 // And again for servers_[2]
1133 response_generator.SetNextResolution(GetServersPorts(2, 3));
1134 LOG(INFO) << "****** SET [2] *******";
1135 WaitForServer(DEBUG_LOCATION, stub, 2);
1136 // Check LB policy name for the channel.
1137 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
1138 }
1139
TEST_F(PickFirstTest,UpdateSuperset)1140 TEST_F(PickFirstTest, UpdateSuperset) {
1141 // Start servers and send one RPC per server.
1142 const int kNumServers = 3;
1143 StartServers(kNumServers);
1144 FakeResolverResponseGeneratorWrapper response_generator;
1145 auto channel = BuildChannel("pick_first", response_generator);
1146 auto stub = BuildStub(channel);
1147
1148 std::vector<int> ports;
1149
1150 // Perform one RPC against the first server.
1151 ports.emplace_back(servers_[0]->port_);
1152 response_generator.SetNextResolution(ports);
1153 LOG(INFO) << "****** SET [0] *******";
1154 CheckRpcSendOk(DEBUG_LOCATION, stub);
1155 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
1156 servers_[0]->service_.ResetCounters();
1157
1158 // Send and superset update
1159 ports.clear();
1160 ports.emplace_back(servers_[1]->port_);
1161 ports.emplace_back(servers_[0]->port_);
1162 response_generator.SetNextResolution(ports);
1163 LOG(INFO) << "****** SET superset *******";
1164 CheckRpcSendOk(DEBUG_LOCATION, stub);
1165 // We stick to the previously connected server.
1166 WaitForServer(DEBUG_LOCATION, stub, 0);
1167 EXPECT_EQ(0, servers_[1]->service_.request_count());
1168
1169 // Check LB policy name for the channel.
1170 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
1171 }
1172
TEST_F(PickFirstTest,UpdateToUnconnected)1173 TEST_F(PickFirstTest, UpdateToUnconnected) {
1174 const int kNumServers = 2;
1175 CreateServers(kNumServers);
1176 StartServer(0);
1177 FakeResolverResponseGeneratorWrapper response_generator;
1178 auto channel = BuildChannel("pick_first", response_generator);
1179 auto stub = BuildStub(channel);
1180
1181 std::vector<int> ports;
1182
1183 // Try to send rpcs against a list where the server is available.
1184 ports.emplace_back(servers_[0]->port_);
1185 response_generator.SetNextResolution(ports);
1186 LOG(INFO) << "****** SET [0] *******";
1187 CheckRpcSendOk(DEBUG_LOCATION, stub);
1188
1189 // Send resolution for which all servers are currently unavailable. Eventually
1190 // this triggers replacing the existing working subchannel_list with the new
1191 // currently unresponsive list.
1192 ports.clear();
1193 ports.emplace_back(grpc_pick_unused_port_or_die());
1194 ports.emplace_back(servers_[1]->port_);
1195 response_generator.SetNextResolution(ports);
1196 LOG(INFO) << "****** SET [unavailable] *******";
1197 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1198
1199 // Ensure that the last resolution was installed correctly by verifying that
1200 // the channel becomes ready once one of if its endpoints becomes available.
1201 LOG(INFO) << "****** StartServer(1) *******";
1202 StartServer(1);
1203 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1204 }
1205
TEST_F(PickFirstTest,GlobalSubchannelPool)1206 TEST_F(PickFirstTest, GlobalSubchannelPool) {
1207 // Start one server.
1208 const int kNumServers = 1;
1209 StartServers(kNumServers);
1210 std::vector<int> ports = GetServersPorts();
1211 // Create two channels that (by default) use the global subchannel pool.
1212 // Use the same channel creds for both, so that they have the same
1213 // subchannel keys.
1214 auto channel_creds =
1215 std::make_shared<FakeTransportSecurityChannelCredentials>();
1216 FakeResolverResponseGeneratorWrapper response_generator1;
1217 auto channel1 = BuildChannel("pick_first", response_generator1,
1218 ChannelArguments(), channel_creds);
1219 auto stub1 = BuildStub(channel1);
1220 response_generator1.SetNextResolution(ports);
1221 FakeResolverResponseGeneratorWrapper response_generator2;
1222 auto channel2 = BuildChannel("pick_first", response_generator2,
1223 ChannelArguments(), channel_creds);
1224 auto stub2 = BuildStub(channel2);
1225 response_generator2.SetNextResolution(ports);
1226 WaitForServer(DEBUG_LOCATION, stub1, 0);
1227 // Send one RPC on each channel.
1228 CheckRpcSendOk(DEBUG_LOCATION, stub1);
1229 CheckRpcSendOk(DEBUG_LOCATION, stub2);
1230 // The server receives two requests.
1231 EXPECT_EQ(2, servers_[0]->service_.request_count());
1232 // The two requests are from the same client port, because the two channels
1233 // share subchannels via the global subchannel pool.
1234 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1235 }
1236
TEST_F(PickFirstTest,LocalSubchannelPool)1237 TEST_F(PickFirstTest, LocalSubchannelPool) {
1238 // Start one server.
1239 const int kNumServers = 1;
1240 StartServers(kNumServers);
1241 std::vector<int> ports = GetServersPorts();
1242 // Create two channels that use local subchannel pool.
1243 ChannelArguments args;
1244 args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
1245 FakeResolverResponseGeneratorWrapper response_generator1;
1246 auto channel1 = BuildChannel("pick_first", response_generator1, args);
1247 auto stub1 = BuildStub(channel1);
1248 response_generator1.SetNextResolution(ports);
1249 FakeResolverResponseGeneratorWrapper response_generator2;
1250 auto channel2 = BuildChannel("pick_first", response_generator2, args);
1251 auto stub2 = BuildStub(channel2);
1252 response_generator2.SetNextResolution(ports);
1253 WaitForServer(DEBUG_LOCATION, stub1, 0);
1254 // Send one RPC on each channel.
1255 CheckRpcSendOk(DEBUG_LOCATION, stub1);
1256 CheckRpcSendOk(DEBUG_LOCATION, stub2);
1257 // The server receives two requests.
1258 EXPECT_EQ(2, servers_[0]->service_.request_count());
1259 // The two requests are from two client ports, because the two channels didn't
1260 // share subchannels with each other.
1261 EXPECT_EQ(2UL, servers_[0]->service_.clients().size());
1262 }
1263
TEST_F(PickFirstTest,ManyUpdates)1264 TEST_F(PickFirstTest, ManyUpdates) {
1265 const int kNumUpdates = 1000;
1266 const int kNumServers = 3;
1267 StartServers(kNumServers);
1268 FakeResolverResponseGeneratorWrapper response_generator;
1269 auto channel = BuildChannel("pick_first", response_generator);
1270 auto stub = BuildStub(channel);
1271 std::vector<int> ports = GetServersPorts();
1272 for (size_t i = 0; i < kNumUpdates; ++i) {
1273 std::shuffle(ports.begin(), ports.end(),
1274 std::mt19937(std::random_device()()));
1275 response_generator.SetNextResolution(ports);
1276 // We should re-enter core at the end of the loop to give the resolution
1277 // setting closure a chance to run.
1278 if ((i + 1) % 10 == 0) CheckRpcSendOk(DEBUG_LOCATION, stub);
1279 }
1280 // Check LB policy name for the channel.
1281 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
1282 }
1283
TEST_F(PickFirstTest,ReresolutionNoSelected)1284 TEST_F(PickFirstTest, ReresolutionNoSelected) {
1285 // Prepare the ports for up servers and down servers.
1286 const int kNumServers = 3;
1287 const int kNumAliveServers = 1;
1288 StartServers(kNumAliveServers);
1289 std::vector<int> alive_ports, dead_ports;
1290 for (size_t i = 0; i < kNumServers; ++i) {
1291 if (i < kNumAliveServers) {
1292 alive_ports.emplace_back(servers_[i]->port_);
1293 } else {
1294 dead_ports.emplace_back(grpc_pick_unused_port_or_die());
1295 }
1296 }
1297 FakeResolverResponseGeneratorWrapper response_generator;
1298 auto channel = BuildChannel("pick_first", response_generator);
1299 auto stub = BuildStub(channel);
1300 // The initial resolution only contains dead ports. There won't be any
1301 // selected subchannel. Re-resolution will return the same result.
1302 response_generator.SetNextResolution(dead_ports);
1303 LOG(INFO) << "****** INITIAL RESOLUTION SET *******";
1304 for (size_t i = 0; i < 10; ++i) {
1305 CheckRpcSendFailure(
1306 DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1307 MakeConnectionFailureRegex("failed to connect to all addresses"));
1308 }
1309 // PF should request re-resolution.
1310 LOG(INFO) << "****** WAITING FOR RE-RESOLUTION *******";
1311 EXPECT_TRUE(response_generator.Get()->WaitForReresolutionRequest(
1312 absl::Seconds(5 * grpc_test_slowdown_factor())));
1313 LOG(INFO) << "****** RE-RESOLUTION SEEN *******";
1314 // Send a resolver result that contains reachable ports, so that the
1315 // pick_first LB policy can recover soon.
1316 response_generator.SetNextResolution(alive_ports);
1317 LOG(INFO) << "****** RE-RESOLUTION SENT *******";
1318 WaitForServer(DEBUG_LOCATION, stub, 0, [](const Status& status) {
1319 EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
1320 EXPECT_THAT(status.error_message(),
1321 ::testing::ContainsRegex(MakeConnectionFailureRegex(
1322 "failed to connect to all addresses")));
1323 });
1324 CheckRpcSendOk(DEBUG_LOCATION, stub);
1325 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
1326 // Check LB policy name for the channel.
1327 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
1328 }
1329
TEST_F(PickFirstTest,ReconnectWithoutNewResolverResult)1330 TEST_F(PickFirstTest, ReconnectWithoutNewResolverResult) {
1331 std::vector<int> ports = {grpc_pick_unused_port_or_die()};
1332 StartServers(1, ports);
1333 FakeResolverResponseGeneratorWrapper response_generator;
1334 auto channel = BuildChannel("pick_first", response_generator);
1335 auto stub = BuildStub(channel);
1336 response_generator.SetNextResolution(ports);
1337 LOG(INFO) << "****** INITIAL CONNECTION *******";
1338 WaitForServer(DEBUG_LOCATION, stub, 0);
1339 LOG(INFO) << "****** STOPPING SERVER ******";
1340 servers_[0]->Shutdown();
1341 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1342 LOG(INFO) << "****** RESTARTING SERVER ******";
1343 StartServers(1, ports);
1344 WaitForServer(DEBUG_LOCATION, stub, 0);
1345 }
1346
TEST_F(PickFirstTest,ReconnectWithoutNewResolverResultStartsFromTopOfList)1347 TEST_F(PickFirstTest, ReconnectWithoutNewResolverResultStartsFromTopOfList) {
1348 std::vector<int> ports = {grpc_pick_unused_port_or_die(),
1349 grpc_pick_unused_port_or_die()};
1350 CreateServers(2, ports);
1351 StartServer(1);
1352 FakeResolverResponseGeneratorWrapper response_generator;
1353 auto channel = BuildChannel("pick_first", response_generator);
1354 auto stub = BuildStub(channel);
1355 response_generator.SetNextResolution(ports);
1356 LOG(INFO) << "****** INITIAL CONNECTION *******";
1357 WaitForServer(DEBUG_LOCATION, stub, 1);
1358 LOG(INFO) << "****** STOPPING SERVER ******";
1359 servers_[1]->Shutdown();
1360 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1361 LOG(INFO) << "****** STARTING BOTH SERVERS ******";
1362 StartServers(2, ports);
1363 WaitForServer(DEBUG_LOCATION, stub, 0);
1364 }
1365
TEST_F(PickFirstTest,FailsEmptyResolverUpdate)1366 TEST_F(PickFirstTest, FailsEmptyResolverUpdate) {
1367 FakeResolverResponseGeneratorWrapper response_generator;
1368 auto channel = BuildChannel("pick_first", response_generator);
1369 auto stub = BuildStub(channel);
1370 LOG(INFO) << "****** SENDING INITIAL RESOLVER RESULT *******";
1371 // Send a resolver result with an empty address list and a callback
1372 // that triggers a notification.
1373 grpc_core::Notification notification;
1374 grpc_core::Resolver::Result result;
1375 result.addresses.emplace();
1376 result.result_health_callback = [&](absl::Status status) {
1377 LOG(INFO) << "****** RESULT HEALTH CALLBACK *******";
1378 EXPECT_EQ(absl::StatusCode::kUnavailable, status.code());
1379 EXPECT_EQ("address list must not be empty", status.message()) << status;
1380 notification.Notify();
1381 };
1382 response_generator.SetResponse(std::move(result));
1383 // Wait for channel to report TRANSIENT_FAILURE.
1384 LOG(INFO) << "****** TELLING CHANNEL TO CONNECT *******";
1385 auto predicate = [](grpc_connectivity_state state) {
1386 return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1387 };
1388 EXPECT_TRUE(
1389 WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true));
1390 // Callback should run.
1391 notification.WaitForNotification();
1392 // Return a valid address.
1393 LOG(INFO) << "****** SENDING NEXT RESOLVER RESULT *******";
1394 StartServers(1);
1395 response_generator.SetNextResolution(GetServersPorts());
1396 LOG(INFO) << "****** SENDING WAIT_FOR_READY RPC *******";
1397 CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true);
1398 }
1399
TEST_F(PickFirstTest,CheckStateBeforeStartWatch)1400 TEST_F(PickFirstTest, CheckStateBeforeStartWatch) {
1401 std::vector<int> ports = {grpc_pick_unused_port_or_die()};
1402 StartServers(1, ports);
1403 FakeResolverResponseGeneratorWrapper response_generator;
1404 auto channel_1 = BuildChannel("pick_first", response_generator);
1405 auto stub_1 = BuildStub(channel_1);
1406 response_generator.SetNextResolution(ports);
1407 LOG(INFO) << "****** RESOLUTION SET FOR CHANNEL 1 *******";
1408 WaitForServer(DEBUG_LOCATION, stub_1, 0);
1409 LOG(INFO) << "****** CHANNEL 1 CONNECTED *******";
1410 servers_[0]->Shutdown();
1411 EXPECT_TRUE(WaitForChannelNotReady(channel_1.get()));
1412 // Channel 1 will receive a re-resolution containing the same server. It will
1413 // create a new subchannel and hold a ref to it.
1414 StartServers(1, ports);
1415 LOG(INFO) << "****** SERVER RESTARTED *******";
1416 FakeResolverResponseGeneratorWrapper response_generator_2;
1417 auto channel_2 = BuildChannel("pick_first", response_generator_2);
1418 auto stub_2 = BuildStub(channel_2);
1419 response_generator_2.SetNextResolution(ports);
1420 LOG(INFO) << "****** RESOLUTION SET FOR CHANNEL 2 *******";
1421 WaitForServer(DEBUG_LOCATION, stub_2, 0);
1422 LOG(INFO) << "****** CHANNEL 2 CONNECTED *******";
1423 servers_[0]->Shutdown();
1424 // Wait until the disconnection has triggered the connectivity notification.
1425 // Otherwise, the subchannel may be picked for next call but will fail soon.
1426 EXPECT_TRUE(WaitForChannelNotReady(channel_2.get()));
1427 // Channel 2 will also receive a re-resolution containing the same server.
1428 // Both channels will ref the same subchannel that failed.
1429 StartServers(1, ports);
1430 LOG(INFO) << "****** SERVER RESTARTED AGAIN *******";
1431 LOG(INFO) << "****** CHANNEL 2 STARTING A CALL *******";
1432 // The first call after the server restart will succeed.
1433 CheckRpcSendOk(DEBUG_LOCATION, stub_2);
1434 LOG(INFO) << "****** CHANNEL 2 FINISHED A CALL *******";
1435 // Check LB policy name for the channel.
1436 EXPECT_EQ("pick_first", channel_1->GetLoadBalancingPolicyName());
1437 // Check LB policy name for the channel.
1438 EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName());
1439 }
1440
TEST_F(PickFirstTest,IdleOnDisconnect)1441 TEST_F(PickFirstTest, IdleOnDisconnect) {
1442 // Start server, send RPC, and make sure channel is READY.
1443 const int kNumServers = 1;
1444 StartServers(kNumServers);
1445 FakeResolverResponseGeneratorWrapper response_generator;
1446 auto channel =
1447 BuildChannel("", response_generator); // pick_first is the default.
1448 auto stub = BuildStub(channel);
1449 response_generator.SetNextResolution(GetServersPorts());
1450 CheckRpcSendOk(DEBUG_LOCATION, stub);
1451 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1452 // Stop server. Channel should go into state IDLE.
1453 servers_[0]->Shutdown();
1454 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1455 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1456 servers_.clear();
1457 }
1458
TEST_F(PickFirstTest,StaysIdleUponEmptyUpdate)1459 TEST_F(PickFirstTest, StaysIdleUponEmptyUpdate) {
1460 // Start server, send RPC, and make sure channel is READY.
1461 const int kNumServers = 1;
1462 StartServers(kNumServers);
1463 FakeResolverResponseGeneratorWrapper response_generator;
1464 auto channel =
1465 BuildChannel("", response_generator); // pick_first is the default.
1466 auto stub = BuildStub(channel);
1467 response_generator.SetNextResolution(GetServersPorts());
1468 CheckRpcSendOk(DEBUG_LOCATION, stub);
1469 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1470 // Stop server. Channel should go into state IDLE.
1471 servers_[0]->Shutdown();
1472 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1473 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1474 // Now send resolver update that includes no addresses. Channel
1475 // should stay in state IDLE.
1476 response_generator.SetNextResolution({});
1477 EXPECT_FALSE(channel->WaitForStateChange(
1478 GRPC_CHANNEL_IDLE, grpc_timeout_seconds_to_deadline(3)));
1479 // Now bring the backend back up and send a non-empty resolver update,
1480 // and then try to send an RPC. Channel should go back into state READY.
1481 StartServer(0);
1482 response_generator.SetNextResolution(GetServersPorts());
1483 CheckRpcSendOk(DEBUG_LOCATION, stub);
1484 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1485 }
1486
TEST_F(PickFirstTest,StaysTransientFailureOnFailedConnectionAttemptUntilReady)1487 TEST_F(PickFirstTest,
1488 StaysTransientFailureOnFailedConnectionAttemptUntilReady) {
1489 // Allocate 3 ports, with no servers running.
1490 std::vector<int> ports = {grpc_pick_unused_port_or_die(),
1491 grpc_pick_unused_port_or_die(),
1492 grpc_pick_unused_port_or_die()};
1493 // Create channel with a 1-second backoff.
1494 ChannelArguments args;
1495 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
1496 1000 * grpc_test_slowdown_factor());
1497 FakeResolverResponseGeneratorWrapper response_generator;
1498 auto channel = BuildChannel("", response_generator, args);
1499 auto stub = BuildStub(channel);
1500 response_generator.SetNextResolution(ports);
1501 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(false));
1502 // Send an RPC, which should fail.
1503 CheckRpcSendFailure(
1504 DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1505 MakeConnectionFailureRegex("failed to connect to all addresses"));
1506 // Channel should be in TRANSIENT_FAILURE.
1507 EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(false));
1508 // Now start a server on the last port.
1509 StartServers(1, {ports.back()});
1510 // Channel should remain in TRANSIENT_FAILURE until it transitions to READY.
1511 EXPECT_TRUE(channel->WaitForStateChange(GRPC_CHANNEL_TRANSIENT_FAILURE,
1512 grpc_timeout_seconds_to_deadline(4)));
1513 EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(false));
1514 CheckRpcSendOk(DEBUG_LOCATION, stub);
1515 }
1516
1517 //
1518 // round_robin tests
1519 //
1520
1521 using RoundRobinTest = ClientLbEnd2endTest;
1522
TEST_F(RoundRobinTest,Basic)1523 TEST_F(RoundRobinTest, Basic) {
1524 // Start servers and send one RPC per server.
1525 const int kNumServers = 3;
1526 StartServers(kNumServers);
1527 FakeResolverResponseGeneratorWrapper response_generator;
1528 auto channel = BuildChannel("round_robin", response_generator);
1529 auto stub = BuildStub(channel);
1530 response_generator.SetNextResolution(GetServersPorts());
1531 // Wait until all backends are ready.
1532 do {
1533 CheckRpcSendOk(DEBUG_LOCATION, stub);
1534 } while (!SeenAllServers());
1535 ResetCounters();
1536 // "Sync" to the end of the list. Next sequence of picks will start at the
1537 // first server (index 0).
1538 WaitForServer(DEBUG_LOCATION, stub, servers_.size() - 1);
1539 std::vector<int> connection_order;
1540 for (size_t i = 0; i < servers_.size(); ++i) {
1541 CheckRpcSendOk(DEBUG_LOCATION, stub);
1542 UpdateConnectionOrder(servers_, &connection_order);
1543 }
1544 // Backends should be iterated over in the order in which the addresses were
1545 // given.
1546 const auto expected = std::vector<int>{0, 1, 2};
1547 EXPECT_EQ(expected, connection_order);
1548 // Check LB policy name for the channel.
1549 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1550 }
1551
TEST_F(RoundRobinTest,ProcessPending)1552 TEST_F(RoundRobinTest, ProcessPending) {
1553 StartServers(1); // Single server
1554 FakeResolverResponseGeneratorWrapper response_generator;
1555 auto channel = BuildChannel("round_robin", response_generator);
1556 auto stub = BuildStub(channel);
1557 response_generator.SetNextResolution({servers_[0]->port_});
1558 WaitForServer(DEBUG_LOCATION, stub, 0);
1559 // Create a new channel and its corresponding RR LB policy, which will pick
1560 // the subchannels in READY state from the previous RPC against the same
1561 // target (even if it happened over a different channel, because subchannels
1562 // are globally reused). Progress should happen without any transition from
1563 // this READY state.
1564 FakeResolverResponseGeneratorWrapper second_response_generator;
1565 auto second_channel = BuildChannel("round_robin", second_response_generator);
1566 auto second_stub = BuildStub(second_channel);
1567 second_response_generator.SetNextResolution({servers_[0]->port_});
1568 CheckRpcSendOk(DEBUG_LOCATION, second_stub);
1569 }
1570
TEST_F(RoundRobinTest,Updates)1571 TEST_F(RoundRobinTest, Updates) {
1572 // Start servers.
1573 const int kNumServers = 3;
1574 StartServers(kNumServers);
1575 FakeResolverResponseGeneratorWrapper response_generator;
1576 auto channel = BuildChannel("round_robin", response_generator);
1577 auto stub = BuildStub(channel);
1578 // Start with a single server.
1579 LOG(INFO) << "*** FIRST BACKEND ***";
1580 std::vector<int> ports = {servers_[0]->port_};
1581 response_generator.SetNextResolution(ports);
1582 WaitForServer(DEBUG_LOCATION, stub, 0);
1583 // Send RPCs. They should all go servers_[0]
1584 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1585 EXPECT_EQ(10, servers_[0]->service_.request_count());
1586 EXPECT_EQ(0, servers_[1]->service_.request_count());
1587 EXPECT_EQ(0, servers_[2]->service_.request_count());
1588 ResetCounters();
1589 // And now for the second server.
1590 LOG(INFO) << "*** SECOND BACKEND ***";
1591 ports.clear();
1592 ports.emplace_back(servers_[1]->port_);
1593 response_generator.SetNextResolution(ports);
1594 // Wait until update has been processed, as signaled by the second backend
1595 // receiving a request.
1596 EXPECT_EQ(0, servers_[1]->service_.request_count());
1597 WaitForServer(DEBUG_LOCATION, stub, 1);
1598 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1599 EXPECT_EQ(0, servers_[0]->service_.request_count());
1600 EXPECT_EQ(10, servers_[1]->service_.request_count());
1601 EXPECT_EQ(0, servers_[2]->service_.request_count());
1602 ResetCounters();
1603 // ... and for the last server.
1604 LOG(INFO) << "*** THIRD BACKEND ***";
1605 ports.clear();
1606 ports.emplace_back(servers_[2]->port_);
1607 response_generator.SetNextResolution(ports);
1608 WaitForServer(DEBUG_LOCATION, stub, 2);
1609 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1610 EXPECT_EQ(0, servers_[0]->service_.request_count());
1611 EXPECT_EQ(0, servers_[1]->service_.request_count());
1612 EXPECT_EQ(10, servers_[2]->service_.request_count());
1613 ResetCounters();
1614 // Back to all servers.
1615 LOG(INFO) << "*** ALL BACKENDS ***";
1616 ports.clear();
1617 ports.emplace_back(servers_[0]->port_);
1618 ports.emplace_back(servers_[1]->port_);
1619 ports.emplace_back(servers_[2]->port_);
1620 response_generator.SetNextResolution(ports);
1621 WaitForServers(DEBUG_LOCATION, stub);
1622 // Send three RPCs, one per server.
1623 for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1624 EXPECT_EQ(1, servers_[0]->service_.request_count());
1625 EXPECT_EQ(1, servers_[1]->service_.request_count());
1626 EXPECT_EQ(1, servers_[2]->service_.request_count());
1627 ResetCounters();
1628 // An empty update will result in the channel going into TRANSIENT_FAILURE.
1629 LOG(INFO) << "*** NO BACKENDS ***";
1630 ports.clear();
1631 response_generator.SetNextResolution(ports);
1632 WaitForChannelNotReady(channel.get());
1633 CheckRpcSendFailure(
1634 DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1635 "empty address list \\(fake resolver empty address list\\)");
1636 servers_[0]->service_.ResetCounters();
1637 // Next update introduces servers_[1], making the channel recover.
1638 LOG(INFO) << "*** BACK TO SECOND BACKEND ***";
1639 ports.clear();
1640 ports.emplace_back(servers_[1]->port_);
1641 response_generator.SetNextResolution(ports);
1642 WaitForChannelReady(channel.get());
1643 WaitForServer(DEBUG_LOCATION, stub, 1);
1644 EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(/*try_to_connect=*/false));
1645 // Check LB policy name for the channel.
1646 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1647 }
1648
TEST_F(RoundRobinTest,UpdateInError)1649 TEST_F(RoundRobinTest, UpdateInError) {
1650 StartServers(2);
1651 FakeResolverResponseGeneratorWrapper response_generator;
1652 auto channel = BuildChannel("round_robin", response_generator);
1653 auto stub = BuildStub(channel);
1654 // Start with a single server.
1655 response_generator.SetNextResolution(GetServersPorts(0, 1));
1656 // Send RPCs. They should all go to server 0.
1657 for (size_t i = 0; i < 10; ++i) {
1658 CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/false,
1659 /*load_report=*/nullptr, /*timeout_ms=*/4000);
1660 }
1661 EXPECT_EQ(10, servers_[0]->service_.request_count());
1662 EXPECT_EQ(0, servers_[1]->service_.request_count());
1663 servers_[0]->service_.ResetCounters();
1664 // Send an update adding an unreachable server and server 1.
1665 std::vector<int> ports = {servers_[0]->port_, grpc_pick_unused_port_or_die(),
1666 servers_[1]->port_};
1667 response_generator.SetNextResolution(ports);
1668 WaitForServers(DEBUG_LOCATION, stub, 0, 2, /*status_check=*/nullptr,
1669 /*timeout=*/absl::Seconds(60));
1670 // Send a bunch more RPCs. They should all succeed and should be
1671 // split evenly between the two servers.
1672 // Note: The split may be slightly uneven because of an extra picker
1673 // update that can happen if the subchannels for servers 0 and 1
1674 // report READY before the subchannel for the unreachable server
1675 // transitions from CONNECTING to TRANSIENT_FAILURE.
1676 for (size_t i = 0; i < 10; ++i) {
1677 CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/false,
1678 /*load_report=*/nullptr, /*timeout_ms=*/4000);
1679 }
1680 EXPECT_THAT(servers_[0]->service_.request_count(),
1681 ::testing::AllOf(::testing::Ge(4), ::testing::Le(6)));
1682 EXPECT_THAT(servers_[1]->service_.request_count(),
1683 ::testing::AllOf(::testing::Ge(4), ::testing::Le(6)));
1684 EXPECT_EQ(10, servers_[0]->service_.request_count() +
1685 servers_[1]->service_.request_count());
1686 }
1687
TEST_F(RoundRobinTest,ManyUpdates)1688 TEST_F(RoundRobinTest, ManyUpdates) {
1689 // Start servers and send one RPC per server.
1690 const int kNumServers = 3;
1691 StartServers(kNumServers);
1692 FakeResolverResponseGeneratorWrapper response_generator;
1693 auto channel = BuildChannel("round_robin", response_generator);
1694 auto stub = BuildStub(channel);
1695 std::vector<int> ports = GetServersPorts();
1696 for (size_t i = 0; i < 1000; ++i) {
1697 std::shuffle(ports.begin(), ports.end(),
1698 std::mt19937(std::random_device()()));
1699 response_generator.SetNextResolution(ports);
1700 if (i % 10 == 0) CheckRpcSendOk(DEBUG_LOCATION, stub);
1701 }
1702 // Check LB policy name for the channel.
1703 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1704 }
1705
TEST_F(RoundRobinTest,ReresolveOnSubchannelConnectionFailure)1706 TEST_F(RoundRobinTest, ReresolveOnSubchannelConnectionFailure) {
1707 // Start 3 servers.
1708 StartServers(3);
1709 // Create channel.
1710 FakeResolverResponseGeneratorWrapper response_generator;
1711 auto channel = BuildChannel("round_robin", response_generator);
1712 auto stub = BuildStub(channel);
1713 // Initially, tell the channel about only the first two servers.
1714 std::vector<int> ports = {servers_[0]->port_, servers_[1]->port_};
1715 response_generator.SetNextResolution(ports);
1716 // Wait for both servers to be seen.
1717 WaitForServers(DEBUG_LOCATION, stub, 0, 2);
1718 // Have server 0 send a GOAWAY. This should trigger a re-resolution.
1719 LOG(INFO) << "****** SENDING GOAWAY FROM SERVER 0 *******";
1720 {
1721 grpc_core::ExecCtx exec_ctx;
1722 grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways();
1723 }
1724 LOG(INFO) << "****** WAITING FOR RE-RESOLUTION REQUEST *******";
1725 EXPECT_TRUE(response_generator.Get()->WaitForReresolutionRequest(
1726 absl::Seconds(5 * grpc_test_slowdown_factor())));
1727 LOG(INFO) << "****** RE-RESOLUTION REQUEST SEEN *******";
1728 // Tell the fake resolver to send an update that adds the last server, but
1729 // only when the LB policy requests re-resolution.
1730 ports.push_back(servers_[2]->port_);
1731 response_generator.SetNextResolution(ports);
1732 // Wait for the client to see server 2.
1733 WaitForServer(DEBUG_LOCATION, stub, 2);
1734 }
1735
TEST_F(RoundRobinTest,FailsEmptyResolverUpdate)1736 TEST_F(RoundRobinTest, FailsEmptyResolverUpdate) {
1737 FakeResolverResponseGeneratorWrapper response_generator;
1738 auto channel = BuildChannel("round_robin", response_generator);
1739 auto stub = BuildStub(channel);
1740 LOG(INFO) << "****** SENDING INITIAL RESOLVER RESULT *******";
1741 // Send a resolver result with an empty address list and a callback
1742 // that triggers a notification.
1743 grpc_core::Notification notification;
1744 grpc_core::Resolver::Result result;
1745 result.addresses.emplace();
1746 result.resolution_note = "injected error";
1747 result.result_health_callback = [&](absl::Status status) {
1748 EXPECT_EQ(status, absl::UnavailableError("empty address list"));
1749 notification.Notify();
1750 };
1751 response_generator.SetResponse(std::move(result));
1752 // Wait for channel to report TRANSIENT_FAILURE.
1753 LOG(INFO) << "****** TELLING CHANNEL TO CONNECT *******";
1754 auto predicate = [](grpc_connectivity_state state) {
1755 return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1756 };
1757 EXPECT_TRUE(
1758 WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true));
1759 // Callback should have been run.
1760 notification.WaitForNotification();
1761 // Make sure RPCs fail with the right status.
1762 CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1763 "empty address list \\(injected error\\)");
1764 // Return a valid address.
1765 LOG(INFO) << "****** SENDING NEXT RESOLVER RESULT *******";
1766 StartServers(1);
1767 response_generator.SetNextResolution(GetServersPorts());
1768 LOG(INFO) << "****** SENDING WAIT_FOR_READY RPC *******";
1769 CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true);
1770 }
1771
TEST_F(RoundRobinTest,TransientFailure)1772 TEST_F(RoundRobinTest, TransientFailure) {
1773 // Start servers and create channel. Channel should go to READY state.
1774 const int kNumServers = 3;
1775 StartServers(kNumServers);
1776 FakeResolverResponseGeneratorWrapper response_generator;
1777 auto channel = BuildChannel("round_robin", response_generator);
1778 auto stub = BuildStub(channel);
1779 response_generator.SetNextResolution(GetServersPorts());
1780 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1781 // Now kill the servers. The channel should transition to TRANSIENT_FAILURE.
1782 for (size_t i = 0; i < servers_.size(); ++i) {
1783 servers_[i]->Shutdown();
1784 }
1785 auto predicate = [](grpc_connectivity_state state) {
1786 return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1787 };
1788 EXPECT_TRUE(WaitForChannelState(channel.get(), predicate));
1789 CheckRpcSendFailure(
1790 DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1791 MakeConnectionFailureRegex("connections to all backends failing"));
1792 }
1793
TEST_F(RoundRobinTest,TransientFailureAtStartup)1794 TEST_F(RoundRobinTest, TransientFailureAtStartup) {
1795 // Create channel and return servers that don't exist. Channel should
1796 // quickly transition into TRANSIENT_FAILURE.
1797 FakeResolverResponseGeneratorWrapper response_generator;
1798 auto channel = BuildChannel("round_robin", response_generator);
1799 auto stub = BuildStub(channel);
1800 response_generator.SetNextResolution({
1801 grpc_pick_unused_port_or_die(),
1802 grpc_pick_unused_port_or_die(),
1803 grpc_pick_unused_port_or_die(),
1804 });
1805 for (size_t i = 0; i < servers_.size(); ++i) {
1806 servers_[i]->Shutdown();
1807 }
1808 auto predicate = [](grpc_connectivity_state state) {
1809 return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1810 };
1811 EXPECT_TRUE(WaitForChannelState(channel.get(), predicate, true));
1812 CheckRpcSendFailure(
1813 DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1814 MakeConnectionFailureRegex("connections to all backends failing"));
1815 }
1816
TEST_F(RoundRobinTest,StaysInTransientFailureInSubsequentConnecting)1817 TEST_F(RoundRobinTest, StaysInTransientFailureInSubsequentConnecting) {
1818 // Start connection injector.
1819 ConnectionAttemptInjector injector;
1820 // Get port.
1821 const int port = grpc_pick_unused_port_or_die();
1822 // Create channel.
1823 FakeResolverResponseGeneratorWrapper response_generator;
1824 auto channel = BuildChannel("round_robin", response_generator);
1825 auto stub = BuildStub(channel);
1826 response_generator.SetNextResolution({port});
1827 // Allow first connection attempt to fail normally, and wait for
1828 // channel to report TRANSIENT_FAILURE.
1829 LOG(INFO) << "=== WAITING FOR CHANNEL TO REPORT TF ===";
1830 auto predicate = [](grpc_connectivity_state state) {
1831 return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1832 };
1833 EXPECT_TRUE(
1834 WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true));
1835 // Wait for next connection attempt to start.
1836 auto hold = injector.AddHold(port);
1837 hold->Wait();
1838 // Now the subchannel should be reporting CONNECTING. Make sure the
1839 // channel is still in TRANSIENT_FAILURE and is still reporting the
1840 // right status.
1841 EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(false));
1842 // Send a few RPCs, just to give the channel a chance to propagate a
1843 // new picker, in case it was going to incorrectly do so.
1844 LOG(INFO) << "=== EXPECTING RPCs TO FAIL ===";
1845 for (size_t i = 0; i < 5; ++i) {
1846 CheckRpcSendFailure(
1847 DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1848 MakeConnectionFailureRegex("connections to all backends failing"));
1849 }
1850 // Clean up.
1851 hold->Resume();
1852 }
1853
TEST_F(RoundRobinTest,ReportsLatestStatusInTransientFailure)1854 TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) {
1855 // Start connection injector.
1856 ConnectionAttemptInjector injector;
1857 // Get ports.
1858 const std::vector<int> ports = {grpc_pick_unused_port_or_die(),
1859 grpc_pick_unused_port_or_die()};
1860 // Create channel.
1861 FakeResolverResponseGeneratorWrapper response_generator;
1862 auto channel = BuildChannel("round_robin", response_generator);
1863 auto stub = BuildStub(channel);
1864 response_generator.SetNextResolution(ports);
1865 // Allow first connection attempts to fail normally, and check that
1866 // the RPC fails with the right status message.
1867 CheckRpcSendFailure(
1868 DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1869 MakeConnectionFailureRegex("connections to all backends failing"));
1870 // Now intercept the next connection attempt for each port.
1871 auto hold1 = injector.AddHold(ports[0]);
1872 auto hold2 = injector.AddHold(ports[1]);
1873 hold1->Wait();
1874 hold2->Wait();
1875 // Inject a custom failure message.
1876 hold1->Fail(GRPC_ERROR_CREATE("Survey says... Bzzzzt!"));
1877 // Wait until RPC fails with the right message.
1878 absl::Time deadline =
1879 absl::Now() + (absl::Seconds(5) * grpc_test_slowdown_factor());
1880 while (true) {
1881 Status status = SendRpc(stub);
1882 EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
1883 if (::testing::Matches(::testing::MatchesRegex(
1884 "connections to all backends failing; last error: "
1885 "UNKNOWN: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
1886 "Survey says... Bzzzzt!"))(status.error_message())) {
1887 break;
1888 }
1889 LOG(INFO) << "STATUS MESSAGE: " << status.error_message();
1890 EXPECT_THAT(status.error_message(),
1891 ::testing::MatchesRegex(MakeConnectionFailureRegex(
1892 "connections to all backends failing")));
1893 EXPECT_LT(absl::Now(), deadline);
1894 if (absl::Now() >= deadline) break;
1895 }
1896 // Clean up.
1897 hold2->Resume();
1898 }
1899
TEST_F(RoundRobinTest,DoesNotFailRpcsUponDisconnection)1900 TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) {
1901 // Start connection injector.
1902 ConnectionAttemptInjector injector;
1903 // Start server.
1904 StartServers(1);
1905 // Create channel.
1906 FakeResolverResponseGeneratorWrapper response_generator;
1907 auto channel = BuildChannel("round_robin", response_generator);
1908 auto stub = BuildStub(channel);
1909 response_generator.SetNextResolution(GetServersPorts());
1910 // Start a thread constantly sending RPCs in a loop.
1911 LOG(INFO) << "=== STARTING CLIENT THREAD ===";
1912 std::atomic<bool> shutdown{false};
1913 gpr_event ev;
1914 gpr_event_init(&ev);
1915 std::thread thd([&]() {
1916 LOG(INFO) << "sending first RPC";
1917 CheckRpcSendOk(DEBUG_LOCATION, stub);
1918 gpr_event_set(&ev, reinterpret_cast<void*>(1));
1919 while (!shutdown.load()) {
1920 LOG(INFO) << "sending RPC";
1921 CheckRpcSendOk(DEBUG_LOCATION, stub);
1922 }
1923 });
1924 // Wait for first RPC to complete.
1925 LOG(INFO) << "=== WAITING FOR FIRST RPC TO COMPLETE ===";
1926 ASSERT_EQ(reinterpret_cast<void*>(1),
1927 gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(1)));
1928 // Channel should now be READY.
1929 ASSERT_EQ(GRPC_CHANNEL_READY, channel->GetState(false));
1930 // Tell injector to intercept the next connection attempt.
1931 auto hold1 =
1932 injector.AddHold(servers_[0]->port_, /*intercept_completion=*/true);
1933 // Now kill the server. The subchannel should report IDLE and be
1934 // immediately reconnected to, but this should not cause any test
1935 // failures.
1936 LOG(INFO) << "=== SHUTTING DOWN SERVER ===";
1937 {
1938 grpc_core::ExecCtx exec_ctx;
1939 grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways();
1940 }
1941 gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
1942 servers_[0]->Shutdown();
1943 // Wait for next attempt to start.
1944 LOG(INFO) << "=== WAITING FOR RECONNECTION ATTEMPT ===";
1945 hold1->Wait();
1946 // Start server and allow attempt to continue.
1947 LOG(INFO) << "=== RESTARTING SERVER ===";
1948 StartServer(0);
1949 hold1->Resume();
1950 // Wait for next attempt to complete.
1951 LOG(INFO) << "=== WAITING FOR RECONNECTION ATTEMPT TO COMPLETE ===";
1952 hold1->WaitForCompletion();
1953 // Now shut down the thread.
1954 LOG(INFO) << "=== SHUTTING DOWN CLIENT THREAD ===";
1955 shutdown.store(true);
1956 thd.join();
1957 }
1958
TEST_F(RoundRobinTest,SingleReconnect)1959 TEST_F(RoundRobinTest, SingleReconnect) {
1960 const int kNumServers = 3;
1961 StartServers(kNumServers);
1962 const auto ports = GetServersPorts();
1963 FakeResolverResponseGeneratorWrapper response_generator;
1964 auto channel = BuildChannel("round_robin", response_generator);
1965 auto stub = BuildStub(channel);
1966 response_generator.SetNextResolution(ports);
1967 WaitForServers(DEBUG_LOCATION, stub);
1968 // Sync to end of list.
1969 WaitForServer(DEBUG_LOCATION, stub, servers_.size() - 1);
1970 for (size_t i = 0; i < servers_.size(); ++i) {
1971 CheckRpcSendOk(DEBUG_LOCATION, stub);
1972 EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
1973 }
1974 // One request should have gone to each server.
1975 for (size_t i = 0; i < servers_.size(); ++i) {
1976 EXPECT_EQ(1, servers_[i]->service_.request_count());
1977 }
1978 // Kill the first server.
1979 servers_[0]->StopListeningAndSendGoaways();
1980 // Wait for client to notice that the backend is down. We know that's
1981 // happened when we see kNumServers RPCs that do not go to backend 0.
1982 ResetCounters();
1983 SendRpcsUntil(
1984 DEBUG_LOCATION, stub,
1985 [&, num_rpcs_not_on_backend_0 = 0](const Status& status) mutable {
1986 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1987 << " message=" << status.error_message();
1988 if (servers_[0]->service_.request_count() == 1) {
1989 num_rpcs_not_on_backend_0 = 0;
1990 } else {
1991 ++num_rpcs_not_on_backend_0;
1992 }
1993 ResetCounters();
1994 return num_rpcs_not_on_backend_0 < kNumServers;
1995 });
1996 // Send a bunch of RPCs.
1997 for (int i = 0; i < 10 * kNumServers; ++i) {
1998 CheckRpcSendOk(DEBUG_LOCATION, stub);
1999 }
2000 // No requests have gone to the deceased server.
2001 EXPECT_EQ(0UL, servers_[0]->service_.request_count());
2002 // Bring the first server back up.
2003 servers_[0]->Shutdown();
2004 StartServer(0);
2005 // Requests should start arriving at the first server either right away (if
2006 // the server managed to start before the RR policy retried the subchannel) or
2007 // after the subchannel retry delay otherwise (RR's subchannel retried before
2008 // the server was fully back up).
2009 WaitForServer(DEBUG_LOCATION, stub, 0);
2010 }
2011
2012 // If health checking is required by client but health checking service
2013 // is not running on the server, the channel should be treated as healthy.
TEST_F(RoundRobinTest,ServersHealthCheckingUnimplementedTreatedAsHealthy)2014 TEST_F(RoundRobinTest, ServersHealthCheckingUnimplementedTreatedAsHealthy) {
2015 StartServers(1); // Single server
2016 ChannelArguments args;
2017 args.SetServiceConfigJSON(
2018 "{\"healthCheckConfig\": "
2019 "{\"serviceName\": \"health_check_service_name\"}}");
2020 FakeResolverResponseGeneratorWrapper response_generator;
2021 auto channel = BuildChannel("round_robin", response_generator, args);
2022 auto stub = BuildStub(channel);
2023 response_generator.SetNextResolution({servers_[0]->port_});
2024 EXPECT_TRUE(WaitForChannelReady(channel.get()));
2025 CheckRpcSendOk(DEBUG_LOCATION, stub);
2026 }
2027
TEST_F(RoundRobinTest,HealthChecking)2028 TEST_F(RoundRobinTest, HealthChecking) {
2029 EnableDefaultHealthCheckService(true);
2030 // Start servers.
2031 const int kNumServers = 3;
2032 StartServers(kNumServers);
2033 ChannelArguments args;
2034 args.SetServiceConfigJSON(
2035 "{\"healthCheckConfig\": "
2036 "{\"serviceName\": \"health_check_service_name\"}}");
2037 FakeResolverResponseGeneratorWrapper response_generator;
2038 auto channel = BuildChannel("round_robin", response_generator, args);
2039 auto stub = BuildStub(channel);
2040 response_generator.SetNextResolution(GetServersPorts());
2041 // Channel should not become READY, because health checks should be failing.
2042 LOG(INFO)
2043 << "*** initial state: unknown health check service name for all servers";
2044 EXPECT_FALSE(WaitForChannelReady(channel.get(), 1));
2045 // Now set one of the servers to be healthy.
2046 // The channel should become healthy and all requests should go to
2047 // the healthy server.
2048 LOG(INFO) << "*** server 0 healthy";
2049 servers_[0]->SetServingStatus("health_check_service_name", true);
2050 EXPECT_TRUE(WaitForChannelReady(channel.get()));
2051 // New channel state may be reported before the picker is updated, so
2052 // wait for the server before proceeding.
2053 WaitForServer(DEBUG_LOCATION, stub, 0);
2054 for (int i = 0; i < 10; ++i) {
2055 CheckRpcSendOk(DEBUG_LOCATION, stub);
2056 }
2057 EXPECT_EQ(10, servers_[0]->service_.request_count());
2058 EXPECT_EQ(0, servers_[1]->service_.request_count());
2059 EXPECT_EQ(0, servers_[2]->service_.request_count());
2060 // Now set a second server to be healthy.
2061 LOG(INFO) << "*** server 2 healthy";
2062 servers_[2]->SetServingStatus("health_check_service_name", true);
2063 WaitForServer(DEBUG_LOCATION, stub, 2);
2064 for (int i = 0; i < 10; ++i) {
2065 CheckRpcSendOk(DEBUG_LOCATION, stub);
2066 }
2067 EXPECT_EQ(5, servers_[0]->service_.request_count());
2068 EXPECT_EQ(0, servers_[1]->service_.request_count());
2069 EXPECT_EQ(5, servers_[2]->service_.request_count());
2070 // Now set the remaining server to be healthy.
2071 LOG(INFO) << "*** server 1 healthy";
2072 servers_[1]->SetServingStatus("health_check_service_name", true);
2073 WaitForServer(DEBUG_LOCATION, stub, 1);
2074 for (int i = 0; i < 9; ++i) {
2075 CheckRpcSendOk(DEBUG_LOCATION, stub);
2076 }
2077 EXPECT_EQ(3, servers_[0]->service_.request_count());
2078 EXPECT_EQ(3, servers_[1]->service_.request_count());
2079 EXPECT_EQ(3, servers_[2]->service_.request_count());
2080 // Now set one server to be unhealthy again. Then wait until the
2081 // unhealthiness has hit the client. We know that the client will see
2082 // this when we send kNumServers requests and one of the remaining servers
2083 // sees two of the requests.
2084 LOG(INFO) << "*** server 0 unhealthy";
2085 servers_[0]->SetServingStatus("health_check_service_name", false);
2086 do {
2087 ResetCounters();
2088 for (int i = 0; i < kNumServers; ++i) {
2089 CheckRpcSendOk(DEBUG_LOCATION, stub);
2090 }
2091 } while (servers_[1]->service_.request_count() != 2 &&
2092 servers_[2]->service_.request_count() != 2);
2093 // Now set the remaining two servers to be unhealthy. Make sure the
2094 // channel leaves READY state and that RPCs fail.
2095 LOG(INFO) << "*** all servers unhealthy";
2096 servers_[1]->SetServingStatus("health_check_service_name", false);
2097 servers_[2]->SetServingStatus("health_check_service_name", false);
2098 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
2099 // New channel state may be reported before the picker is updated, so
2100 // one or two more RPCs may succeed before we see a failure.
2101 SendRpcsUntil(DEBUG_LOCATION, stub, [&](const Status& status) {
2102 if (status.ok()) return true;
2103 EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE);
2104 EXPECT_THAT(
2105 status.error_message(),
2106 ::testing::MatchesRegex(
2107 "connections to all backends failing; last error: "
2108 "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: backend unhealthy"));
2109 return false;
2110 });
2111 // Clean up.
2112 EnableDefaultHealthCheckService(false);
2113 }
2114
TEST_F(RoundRobinTest,HealthCheckingHandlesSubchannelFailure)2115 TEST_F(RoundRobinTest, HealthCheckingHandlesSubchannelFailure) {
2116 EnableDefaultHealthCheckService(true);
2117 // Start servers.
2118 const int kNumServers = 3;
2119 StartServers(kNumServers);
2120 servers_[0]->SetServingStatus("health_check_service_name", true);
2121 servers_[1]->SetServingStatus("health_check_service_name", true);
2122 servers_[2]->SetServingStatus("health_check_service_name", true);
2123 ChannelArguments args;
2124 args.SetServiceConfigJSON(
2125 "{\"healthCheckConfig\": "
2126 "{\"serviceName\": \"health_check_service_name\"}}");
2127 FakeResolverResponseGeneratorWrapper response_generator;
2128 auto channel = BuildChannel("round_robin", response_generator, args);
2129 auto stub = BuildStub(channel);
2130 response_generator.SetNextResolution(GetServersPorts());
2131 WaitForServer(DEBUG_LOCATION, stub, 0);
2132 // Stop server 0 and send a new resolver result to ensure that RR
2133 // checks each subchannel's state.
2134 servers_[0]->StopListeningAndSendGoaways();
2135 response_generator.SetNextResolution(GetServersPorts());
2136 // Send a bunch more RPCs.
2137 for (size_t i = 0; i < 100; i++) {
2138 CheckRpcSendOk(DEBUG_LOCATION, stub);
2139 }
2140 }
2141
TEST_F(RoundRobinTest,WithHealthCheckingInhibitPerChannel)2142 TEST_F(RoundRobinTest, WithHealthCheckingInhibitPerChannel) {
2143 EnableDefaultHealthCheckService(true);
2144 // Start server.
2145 const int kNumServers = 1;
2146 StartServers(kNumServers);
2147 // Use the same channel creds for both channels, so that they have the same
2148 // subchannel keys.
2149 auto channel_creds =
2150 std::make_shared<FakeTransportSecurityChannelCredentials>();
2151 // Create a channel with health-checking enabled.
2152 ChannelArguments args;
2153 args.SetServiceConfigJSON(
2154 "{\"healthCheckConfig\": "
2155 "{\"serviceName\": \"health_check_service_name\"}}");
2156 FakeResolverResponseGeneratorWrapper response_generator1;
2157 auto channel1 =
2158 BuildChannel("round_robin", response_generator1, args, channel_creds);
2159 auto stub1 = BuildStub(channel1);
2160 std::vector<int> ports = GetServersPorts();
2161 response_generator1.SetNextResolution(ports);
2162 // Create a channel with health checking enabled but inhibited.
2163 args.SetInt(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
2164 FakeResolverResponseGeneratorWrapper response_generator2;
2165 auto channel2 =
2166 BuildChannel("round_robin", response_generator2, args, channel_creds);
2167 auto stub2 = BuildStub(channel2);
2168 response_generator2.SetNextResolution(ports);
2169 // First channel should not become READY, because health checks should be
2170 // failing.
2171 EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
2172 CheckRpcSendFailure(
2173 DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
2174 "connections to all backends failing; last error: "
2175 "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: backend unhealthy");
2176 // Second channel should be READY.
2177 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
2178 CheckRpcSendOk(DEBUG_LOCATION, stub2);
2179 // Enable health checks on the backend and wait for channel 1 to succeed.
2180 servers_[0]->SetServingStatus("health_check_service_name", true);
2181 CheckRpcSendOk(DEBUG_LOCATION, stub1, true /* wait_for_ready */);
2182 // Check that we created only one subchannel to the backend.
2183 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
2184 // Clean up.
2185 EnableDefaultHealthCheckService(false);
2186 }
2187
TEST_F(RoundRobinTest,HealthCheckingServiceNamePerChannel)2188 TEST_F(RoundRobinTest, HealthCheckingServiceNamePerChannel) {
2189 EnableDefaultHealthCheckService(true);
2190 // Start server.
2191 const int kNumServers = 1;
2192 StartServers(kNumServers);
2193 // Use the same channel creds for both channels, so that they have the same
2194 // subchannel keys.
2195 auto channel_creds =
2196 std::make_shared<FakeTransportSecurityChannelCredentials>();
2197 // Create a channel with health-checking enabled.
2198 ChannelArguments args;
2199 args.SetServiceConfigJSON(
2200 "{\"healthCheckConfig\": "
2201 "{\"serviceName\": \"health_check_service_name\"}}");
2202 FakeResolverResponseGeneratorWrapper response_generator1;
2203 auto channel1 =
2204 BuildChannel("round_robin", response_generator1, args, channel_creds);
2205 auto stub1 = BuildStub(channel1);
2206 std::vector<int> ports = GetServersPorts();
2207 response_generator1.SetNextResolution(ports);
2208 // Create a channel with health-checking enabled with a different
2209 // service name.
2210 ChannelArguments args2;
2211 args2.SetServiceConfigJSON(
2212 "{\"healthCheckConfig\": "
2213 "{\"serviceName\": \"health_check_service_name2\"}}");
2214 FakeResolverResponseGeneratorWrapper response_generator2;
2215 auto channel2 =
2216 BuildChannel("round_robin", response_generator2, args2, channel_creds);
2217 auto stub2 = BuildStub(channel2);
2218 response_generator2.SetNextResolution(ports);
2219 // Allow health checks from channel 2 to succeed.
2220 servers_[0]->SetServingStatus("health_check_service_name2", true);
2221 // First channel should not become READY, because health checks should be
2222 // failing.
2223 EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
2224 CheckRpcSendFailure(
2225 DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
2226 "connections to all backends failing; last error: "
2227 "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: backend unhealthy");
2228 // Second channel should be READY.
2229 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
2230 CheckRpcSendOk(DEBUG_LOCATION, stub2);
2231 // Enable health checks for channel 1 and wait for it to succeed.
2232 servers_[0]->SetServingStatus("health_check_service_name", true);
2233 CheckRpcSendOk(DEBUG_LOCATION, stub1, true /* wait_for_ready */);
2234 // Check that we created only one subchannel to the backend.
2235 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
2236 // Clean up.
2237 EnableDefaultHealthCheckService(false);
2238 }
2239
TEST_F(RoundRobinTest,HealthCheckingServiceNameChangesAfterSubchannelsCreated)2240 TEST_F(RoundRobinTest,
2241 HealthCheckingServiceNameChangesAfterSubchannelsCreated) {
2242 EnableDefaultHealthCheckService(true);
2243 // Start server.
2244 const int kNumServers = 1;
2245 StartServers(kNumServers);
2246 // Create a channel with health-checking enabled.
2247 const char* kServiceConfigJson =
2248 "{\"healthCheckConfig\": "
2249 "{\"serviceName\": \"health_check_service_name\"}}";
2250 FakeResolverResponseGeneratorWrapper response_generator;
2251 auto channel = BuildChannel("round_robin", response_generator);
2252 auto stub = BuildStub(channel);
2253 std::vector<int> ports = GetServersPorts();
2254 response_generator.SetNextResolution(ports, kServiceConfigJson);
2255 servers_[0]->SetServingStatus("health_check_service_name", true);
2256 EXPECT_TRUE(WaitForChannelReady(channel.get(), 1 /* timeout_seconds */));
2257 // Send an update on the channel to change it to use a health checking
2258 // service name that is not being reported as healthy.
2259 const char* kServiceConfigJson2 =
2260 "{\"healthCheckConfig\": "
2261 "{\"serviceName\": \"health_check_service_name2\"}}";
2262 response_generator.SetNextResolution(ports, kServiceConfigJson2);
2263 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
2264 // Clean up.
2265 EnableDefaultHealthCheckService(false);
2266 }
2267
TEST_F(RoundRobinTest,HealthCheckingRetryOnStreamEnd)2268 TEST_F(RoundRobinTest, HealthCheckingRetryOnStreamEnd) {
2269 // Start servers.
2270 const int kNumServers = 2;
2271 CreateServers(kNumServers);
2272 EnableNoopHealthCheckService();
2273 StartServer(0);
2274 StartServer(1);
2275 ChannelArguments args;
2276 // Create a channel with health-checking enabled.
2277 args.SetServiceConfigJSON(
2278 "{\"healthCheckConfig\": "
2279 "{\"serviceName\": \"health_check_service_name\"}}");
2280 FakeResolverResponseGeneratorWrapper response_generator;
2281 auto channel = BuildChannel("round_robin", response_generator, args);
2282 response_generator.SetNextResolution(GetServersPorts());
2283 EXPECT_FALSE(WaitForChannelReady(channel.get()));
2284 EXPECT_GT(servers_[0]->noop_health_check_service_impl_.request_count(), 1);
2285 EXPECT_GT(servers_[1]->noop_health_check_service_impl_.request_count(), 1);
2286 }
2287
2288 //
2289 // LB policy pick args
2290 //
2291
2292 class ClientLbPickArgsTest : public ClientLbEnd2endTest {
2293 protected:
SetUp()2294 void SetUp() override {
2295 ClientLbEnd2endTest::SetUp();
2296 current_test_instance_ = this;
2297 }
2298
SetUpTestSuite()2299 static void SetUpTestSuite() {
2300 grpc_core::CoreConfiguration::Reset();
2301 grpc_core::CoreConfiguration::RegisterBuilder(
2302 [](grpc_core::CoreConfiguration::Builder* builder) {
2303 grpc_core::RegisterTestPickArgsLoadBalancingPolicy(builder,
2304 SavePickArgs);
2305 });
2306 grpc_init();
2307 }
2308
TearDownTestSuite()2309 static void TearDownTestSuite() {
2310 grpc_shutdown();
2311 grpc_core::CoreConfiguration::Reset();
2312 }
2313
args_seen_list()2314 std::vector<grpc_core::PickArgsSeen> args_seen_list() {
2315 grpc_core::MutexLock lock(&mu_);
2316 return args_seen_list_;
2317 }
2318
ArgsSeenListString(const std::vector<grpc_core::PickArgsSeen> & args_seen_list)2319 static std::string ArgsSeenListString(
2320 const std::vector<grpc_core::PickArgsSeen>& args_seen_list) {
2321 std::vector<std::string> entries;
2322 for (const auto& args_seen : args_seen_list) {
2323 std::vector<std::string> metadata;
2324 for (const auto& p : args_seen.metadata) {
2325 metadata.push_back(absl::StrCat(p.first, "=", p.second));
2326 }
2327 entries.push_back(absl::StrFormat("{path=\"%s\", metadata=[%s]}",
2328 args_seen.path,
2329 absl::StrJoin(metadata, ", ")));
2330 }
2331 return absl::StrCat("[", absl::StrJoin(entries, ", "), "]");
2332 }
2333
2334 private:
SavePickArgs(const grpc_core::PickArgsSeen & args_seen)2335 static void SavePickArgs(const grpc_core::PickArgsSeen& args_seen) {
2336 ClientLbPickArgsTest* self = current_test_instance_;
2337 grpc_core::MutexLock lock(&self->mu_);
2338 self->args_seen_list_.emplace_back(args_seen);
2339 }
2340
2341 static ClientLbPickArgsTest* current_test_instance_;
2342 grpc_core::Mutex mu_;
2343 std::vector<grpc_core::PickArgsSeen> args_seen_list_;
2344 };
2345
2346 ClientLbPickArgsTest* ClientLbPickArgsTest::current_test_instance_ = nullptr;
2347
TEST_F(ClientLbPickArgsTest,Basic)2348 TEST_F(ClientLbPickArgsTest, Basic) {
2349 const int kNumServers = 1;
2350 StartServers(kNumServers);
2351 FakeResolverResponseGeneratorWrapper response_generator;
2352 auto channel = BuildChannel("test_pick_args_lb", response_generator);
2353 auto stub = BuildStub(channel);
2354 response_generator.SetNextResolution(GetServersPorts());
2355 // Proactively connect the channel, so that the LB policy will always
2356 // be connected before it sees the pick. Otherwise, the test would be
2357 // flaky because sometimes the pick would be seen twice (once in
2358 // CONNECTING and again in READY) and other times only once (in READY).
2359 ASSERT_TRUE(channel->WaitForConnected(gpr_inf_future(GPR_CLOCK_MONOTONIC)));
2360 // Check LB policy name for the channel.
2361 EXPECT_EQ("test_pick_args_lb", channel->GetLoadBalancingPolicyName());
2362 // Now send an RPC and check that the picker sees the expected data.
2363 CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true);
2364 auto pick_args_seen_list = args_seen_list();
2365 EXPECT_THAT(pick_args_seen_list,
2366 ::testing::ElementsAre(::testing::AllOf(
2367 ::testing::Field(&grpc_core::PickArgsSeen::path,
2368 "/grpc.testing.EchoTestService/Echo"),
2369 ::testing::Field(&grpc_core::PickArgsSeen::metadata,
2370 ::testing::UnorderedElementsAre(
2371 ::testing::Pair("foo", "1"),
2372 ::testing::Pair("bar", "2"),
2373 ::testing::Pair("baz", "3"))))))
2374 << ArgsSeenListString(pick_args_seen_list);
2375 }
2376
2377 //
2378 // tests that LB policies can get the call's trailing metadata
2379 //
2380
2381 class OrcaLoadReportBuilder {
2382 public:
2383 OrcaLoadReportBuilder() = default;
OrcaLoadReportBuilder(const OrcaLoadReport & report)2384 explicit OrcaLoadReportBuilder(const OrcaLoadReport& report)
2385 : report_(report) {}
SetApplicationUtilization(double v)2386 OrcaLoadReportBuilder& SetApplicationUtilization(double v) {
2387 report_.set_application_utilization(v);
2388 return *this;
2389 }
SetCpuUtilization(double v)2390 OrcaLoadReportBuilder& SetCpuUtilization(double v) {
2391 report_.set_cpu_utilization(v);
2392 return *this;
2393 }
SetMemUtilization(double v)2394 OrcaLoadReportBuilder& SetMemUtilization(double v) {
2395 report_.set_mem_utilization(v);
2396 return *this;
2397 }
SetQps(double v)2398 OrcaLoadReportBuilder& SetQps(double v) {
2399 report_.set_rps_fractional(v);
2400 return *this;
2401 }
SetEps(double v)2402 OrcaLoadReportBuilder& SetEps(double v) {
2403 report_.set_eps(v);
2404 return *this;
2405 }
SetRequestCost(absl::string_view n,double v)2406 OrcaLoadReportBuilder& SetRequestCost(absl::string_view n, double v) {
2407 (*report_.mutable_request_cost())[n] = v;
2408 return *this;
2409 }
SetUtilization(absl::string_view n,double v)2410 OrcaLoadReportBuilder& SetUtilization(absl::string_view n, double v) {
2411 (*report_.mutable_utilization())[n] = v;
2412 return *this;
2413 }
SetNamedMetrics(absl::string_view n,double v)2414 OrcaLoadReportBuilder& SetNamedMetrics(absl::string_view n, double v) {
2415 (*report_.mutable_named_metrics())[n] = v;
2416 return *this;
2417 }
Build()2418 OrcaLoadReport Build() { return std::move(report_); }
2419
2420 private:
2421 OrcaLoadReport report_;
2422 };
2423
BackendMetricDataToOrcaLoadReport(const grpc_core::BackendMetricData & backend_metric_data)2424 OrcaLoadReport BackendMetricDataToOrcaLoadReport(
2425 const grpc_core::BackendMetricData& backend_metric_data) {
2426 auto builder = OrcaLoadReportBuilder()
2427 .SetApplicationUtilization(
2428 backend_metric_data.application_utilization)
2429 .SetCpuUtilization(backend_metric_data.cpu_utilization)
2430 .SetMemUtilization(backend_metric_data.mem_utilization)
2431 .SetQps(backend_metric_data.qps)
2432 .SetEps(backend_metric_data.eps);
2433 for (const auto& p : backend_metric_data.request_cost) {
2434 builder.SetRequestCost(std::string(p.first), p.second);
2435 }
2436 for (const auto& p : backend_metric_data.utilization) {
2437 builder.SetUtilization(std::string(p.first), p.second);
2438 }
2439 for (const auto& p : backend_metric_data.named_metrics) {
2440 builder.SetNamedMetrics(std::string(p.first), p.second);
2441 }
2442 return builder.Build();
2443 }
2444
2445 // TODO(roth): Change this to use EqualsProto() once that becomes available in
2446 // OSS.
CheckLoadReportAsExpected(const OrcaLoadReport & actual,const OrcaLoadReport & expected)2447 void CheckLoadReportAsExpected(const OrcaLoadReport& actual,
2448 const OrcaLoadReport& expected) {
2449 EXPECT_EQ(actual.application_utilization(),
2450 expected.application_utilization());
2451 EXPECT_EQ(actual.cpu_utilization(), expected.cpu_utilization());
2452 EXPECT_EQ(actual.mem_utilization(), expected.mem_utilization());
2453 EXPECT_EQ(actual.rps_fractional(), expected.rps_fractional());
2454 EXPECT_EQ(actual.eps(), expected.eps());
2455 EXPECT_EQ(actual.request_cost().size(), expected.request_cost().size());
2456 for (const auto& p : actual.request_cost()) {
2457 auto it = expected.request_cost().find(p.first);
2458 ASSERT_NE(it, expected.request_cost().end());
2459 EXPECT_EQ(it->second, p.second);
2460 }
2461 EXPECT_EQ(actual.utilization().size(), expected.utilization().size());
2462 for (const auto& p : actual.utilization()) {
2463 auto it = expected.utilization().find(p.first);
2464 ASSERT_NE(it, expected.utilization().end());
2465 EXPECT_EQ(it->second, p.second);
2466 }
2467 EXPECT_EQ(actual.named_metrics().size(), expected.named_metrics().size());
2468 for (const auto& p : actual.named_metrics()) {
2469 auto it = expected.named_metrics().find(p.first);
2470 ASSERT_NE(it, expected.named_metrics().end());
2471 EXPECT_EQ(it->second, p.second);
2472 }
2473 }
2474
2475 class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
2476 protected:
SetUp()2477 void SetUp() override {
2478 ClientLbEnd2endTest::SetUp();
2479 current_test_instance_ = this;
2480 }
2481
SetUpTestSuite()2482 static void SetUpTestSuite() {
2483 grpc_core::CoreConfiguration::Reset();
2484 grpc_core::CoreConfiguration::RegisterBuilder(
2485 [](grpc_core::CoreConfiguration::Builder* builder) {
2486 grpc_core::RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
2487 builder, ReportTrailerIntercepted);
2488 });
2489 grpc_init();
2490 }
2491
TearDownTestSuite()2492 static void TearDownTestSuite() {
2493 grpc_shutdown();
2494 grpc_core::CoreConfiguration::Reset();
2495 }
2496
num_trailers_intercepted()2497 int num_trailers_intercepted() {
2498 grpc_core::MutexLock lock(&mu_);
2499 return num_trailers_intercepted_;
2500 }
2501
last_status()2502 absl::Status last_status() {
2503 grpc_core::MutexLock lock(&mu_);
2504 return last_status_;
2505 }
2506
trailing_metadata()2507 grpc_core::MetadataVector trailing_metadata() {
2508 grpc_core::MutexLock lock(&mu_);
2509 return std::move(trailing_metadata_);
2510 }
2511
backend_load_report()2512 absl::optional<OrcaLoadReport> backend_load_report() {
2513 grpc_core::MutexLock lock(&mu_);
2514 return std::move(load_report_);
2515 }
2516
2517 // Returns true if received callback within deadline.
WaitForLbCallback()2518 bool WaitForLbCallback() {
2519 grpc_core::MutexLock lock(&mu_);
2520 while (!trailer_intercepted_) {
2521 if (cond_.WaitWithTimeout(&mu_, absl::Seconds(3))) return false;
2522 }
2523 trailer_intercepted_ = false;
2524 return true;
2525 }
2526
RunPerRpcMetricReportingTest(const OrcaLoadReport & reported,const OrcaLoadReport & expected)2527 void RunPerRpcMetricReportingTest(const OrcaLoadReport& reported,
2528 const OrcaLoadReport& expected) {
2529 const int kNumServers = 1;
2530 const int kNumRpcs = 10;
2531 StartServers(kNumServers);
2532 FakeResolverResponseGeneratorWrapper response_generator;
2533 auto channel =
2534 BuildChannel("intercept_trailing_metadata_lb", response_generator);
2535 auto stub = BuildStub(channel);
2536 response_generator.SetNextResolution(GetServersPorts());
2537 for (size_t i = 0; i < kNumRpcs; ++i) {
2538 CheckRpcSendOk(DEBUG_LOCATION, stub, false, &reported);
2539 auto actual = backend_load_report();
2540 ASSERT_TRUE(actual.has_value());
2541 CheckLoadReportAsExpected(*actual, expected);
2542 }
2543 // Check LB policy name for the channel.
2544 EXPECT_EQ("intercept_trailing_metadata_lb",
2545 channel->GetLoadBalancingPolicyName());
2546 EXPECT_EQ(kNumRpcs, num_trailers_intercepted());
2547 }
2548
2549 private:
ReportTrailerIntercepted(const grpc_core::TrailingMetadataArgsSeen & args_seen)2550 static void ReportTrailerIntercepted(
2551 const grpc_core::TrailingMetadataArgsSeen& args_seen) {
2552 const auto* backend_metric_data = args_seen.backend_metric_data;
2553 ClientLbInterceptTrailingMetadataTest* self = current_test_instance_;
2554 grpc_core::MutexLock lock(&self->mu_);
2555 self->last_status_ = args_seen.status;
2556 self->num_trailers_intercepted_++;
2557 self->trailer_intercepted_ = true;
2558 self->trailing_metadata_ = args_seen.metadata;
2559 if (backend_metric_data != nullptr) {
2560 self->load_report_ =
2561 BackendMetricDataToOrcaLoadReport(*backend_metric_data);
2562 }
2563 self->cond_.Signal();
2564 }
2565
2566 static ClientLbInterceptTrailingMetadataTest* current_test_instance_;
2567 int num_trailers_intercepted_ = 0;
2568 bool trailer_intercepted_ = false;
2569 grpc_core::Mutex mu_;
2570 grpc_core::CondVar cond_;
2571 absl::Status last_status_;
2572 grpc_core::MetadataVector trailing_metadata_;
2573 absl::optional<OrcaLoadReport> load_report_;
2574 };
2575
2576 ClientLbInterceptTrailingMetadataTest*
2577 ClientLbInterceptTrailingMetadataTest::current_test_instance_ = nullptr;
2578
TEST_F(ClientLbInterceptTrailingMetadataTest,StatusOk)2579 TEST_F(ClientLbInterceptTrailingMetadataTest, StatusOk) {
2580 StartServers(1);
2581 FakeResolverResponseGeneratorWrapper response_generator;
2582 auto channel =
2583 BuildChannel("intercept_trailing_metadata_lb", response_generator);
2584 auto stub = BuildStub(channel);
2585 response_generator.SetNextResolution(GetServersPorts());
2586 // Send an OK RPC.
2587 CheckRpcSendOk(DEBUG_LOCATION, stub);
2588 // Check LB policy name for the channel.
2589 EXPECT_EQ("intercept_trailing_metadata_lb",
2590 channel->GetLoadBalancingPolicyName());
2591 EXPECT_EQ(1, num_trailers_intercepted());
2592 EXPECT_EQ(absl::OkStatus(), last_status());
2593 }
2594
TEST_F(ClientLbInterceptTrailingMetadataTest,StatusFailed)2595 TEST_F(ClientLbInterceptTrailingMetadataTest, StatusFailed) {
2596 StartServers(1);
2597 FakeResolverResponseGeneratorWrapper response_generator;
2598 auto channel =
2599 BuildChannel("intercept_trailing_metadata_lb", response_generator);
2600 auto stub = BuildStub(channel);
2601 response_generator.SetNextResolution(GetServersPorts());
2602 EchoRequest request;
2603 auto* expected_error = request.mutable_param()->mutable_expected_error();
2604 expected_error->set_code(GRPC_STATUS_PERMISSION_DENIED);
2605 expected_error->set_error_message("bummer, man");
2606 Status status = SendRpc(stub, /*response=*/nullptr, /*timeout_ms=*/1000,
2607 /*wait_for_ready=*/false, &request);
2608 EXPECT_EQ(status.error_code(), StatusCode::PERMISSION_DENIED);
2609 EXPECT_EQ(status.error_message(), "bummer, man");
2610 absl::Status status_seen_by_lb = last_status();
2611 EXPECT_EQ(status_seen_by_lb.code(), absl::StatusCode::kPermissionDenied);
2612 EXPECT_EQ(status_seen_by_lb.message(), "bummer, man");
2613 }
2614
TEST_F(ClientLbInterceptTrailingMetadataTest,StatusCancelledWithoutStartingRecvTrailingMetadata)2615 TEST_F(ClientLbInterceptTrailingMetadataTest,
2616 StatusCancelledWithoutStartingRecvTrailingMetadata) {
2617 StartServers(1);
2618 FakeResolverResponseGeneratorWrapper response_generator;
2619 auto channel =
2620 BuildChannel("intercept_trailing_metadata_lb", response_generator);
2621 response_generator.SetNextResolution(GetServersPorts());
2622 auto stub = BuildStub(channel);
2623 {
2624 // Start a stream (sends initial metadata) and then cancel without
2625 // calling Finish().
2626 ClientContext ctx;
2627 auto stream = stub->BidiStream(&ctx);
2628 ctx.TryCancel();
2629 }
2630 // Wait for stream to be cancelled.
2631 ASSERT_TRUE(WaitForLbCallback());
2632 // Check status seen by LB policy.
2633 EXPECT_EQ(1, num_trailers_intercepted());
2634 absl::Status status_seen_by_lb = last_status();
2635 EXPECT_EQ(status_seen_by_lb.code(), absl::StatusCode::kCancelled);
2636 EXPECT_EQ(status_seen_by_lb.message(), "call cancelled");
2637 }
2638
TEST_F(ClientLbInterceptTrailingMetadataTest,InterceptsRetriesDisabled)2639 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesDisabled) {
2640 const int kNumServers = 1;
2641 const int kNumRpcs = 10;
2642 StartServers(kNumServers);
2643 FakeResolverResponseGeneratorWrapper response_generator;
2644 ChannelArguments channel_args;
2645 channel_args.SetInt(GRPC_ARG_ENABLE_RETRIES, 0);
2646 auto channel = BuildChannel("intercept_trailing_metadata_lb",
2647 response_generator, channel_args);
2648 auto stub = BuildStub(channel);
2649 response_generator.SetNextResolution(GetServersPorts());
2650 for (size_t i = 0; i < kNumRpcs; ++i) {
2651 CheckRpcSendOk(DEBUG_LOCATION, stub);
2652 }
2653 // Check LB policy name for the channel.
2654 EXPECT_EQ("intercept_trailing_metadata_lb",
2655 channel->GetLoadBalancingPolicyName());
2656 EXPECT_EQ(kNumRpcs, num_trailers_intercepted());
2657 EXPECT_THAT(trailing_metadata(),
2658 ::testing::UnorderedElementsAre(
2659 // TODO(roth): Should grpc-status be visible here?
2660 ::testing::Pair("grpc-status", "0"),
2661 ::testing::Pair("user-agent", ::testing::_),
2662 ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"),
2663 ::testing::Pair("baz", "3")));
2664 EXPECT_FALSE(backend_load_report().has_value());
2665 }
2666
TEST_F(ClientLbInterceptTrailingMetadataTest,InterceptsRetriesEnabled)2667 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) {
2668 const int kNumServers = 1;
2669 const int kNumRpcs = 10;
2670 StartServers(kNumServers);
2671 ChannelArguments args;
2672 args.SetServiceConfigJSON(
2673 "{\n"
2674 " \"methodConfig\": [ {\n"
2675 " \"name\": [\n"
2676 " { \"service\": \"grpc.testing.EchoTestService\" }\n"
2677 " ],\n"
2678 " \"retryPolicy\": {\n"
2679 " \"maxAttempts\": 3,\n"
2680 " \"initialBackoff\": \"1s\",\n"
2681 " \"maxBackoff\": \"120s\",\n"
2682 " \"backoffMultiplier\": 1.6,\n"
2683 " \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
2684 " }\n"
2685 " } ]\n"
2686 "}");
2687 FakeResolverResponseGeneratorWrapper response_generator;
2688 auto channel =
2689 BuildChannel("intercept_trailing_metadata_lb", response_generator, args);
2690 auto stub = BuildStub(channel);
2691 response_generator.SetNextResolution(GetServersPorts());
2692 for (size_t i = 0; i < kNumRpcs; ++i) {
2693 CheckRpcSendOk(DEBUG_LOCATION, stub);
2694 }
2695 // Check LB policy name for the channel.
2696 EXPECT_EQ("intercept_trailing_metadata_lb",
2697 channel->GetLoadBalancingPolicyName());
2698 EXPECT_EQ(kNumRpcs, num_trailers_intercepted());
2699 EXPECT_THAT(trailing_metadata(),
2700 ::testing::UnorderedElementsAre(
2701 // TODO(roth): Should grpc-status be visible here?
2702 ::testing::Pair("grpc-status", "0"),
2703 ::testing::Pair("user-agent", ::testing::_),
2704 ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"),
2705 ::testing::Pair("baz", "3")));
2706 EXPECT_FALSE(backend_load_report().has_value());
2707 }
2708
TEST_F(ClientLbInterceptTrailingMetadataTest,Valid)2709 TEST_F(ClientLbInterceptTrailingMetadataTest, Valid) {
2710 RunPerRpcMetricReportingTest(OrcaLoadReportBuilder()
2711 .SetApplicationUtilization(0.25)
2712 .SetCpuUtilization(0.5)
2713 .SetMemUtilization(0.75)
2714 .SetQps(0.25)
2715 .SetEps(0.1)
2716 .SetRequestCost("foo", -0.8)
2717 .SetRequestCost("bar", 1.4)
2718 .SetUtilization("baz", 1.0)
2719 .SetUtilization("quux", 0.9)
2720 .SetNamedMetrics("metric0", 3.0)
2721 .SetNamedMetrics("metric1", -1.0)
2722 .Build(),
2723 OrcaLoadReportBuilder()
2724 .SetApplicationUtilization(0.25)
2725 .SetCpuUtilization(0.5)
2726 .SetMemUtilization(0.75)
2727 .SetQps(0.25)
2728 .SetEps(0.1)
2729 .SetRequestCost("foo", -0.8)
2730 .SetRequestCost("bar", 1.4)
2731 .SetUtilization("baz", 1.0)
2732 .SetUtilization("quux", 0.9)
2733 .SetNamedMetrics("metric0", 3.0)
2734 .SetNamedMetrics("metric1", -1.0)
2735 .Build());
2736 }
2737
TEST_F(ClientLbInterceptTrailingMetadataTest,NegativeValues)2738 TEST_F(ClientLbInterceptTrailingMetadataTest, NegativeValues) {
2739 RunPerRpcMetricReportingTest(OrcaLoadReportBuilder()
2740 .SetApplicationUtilization(-0.3)
2741 .SetCpuUtilization(-0.1)
2742 .SetMemUtilization(-0.2)
2743 .SetQps(-3)
2744 .SetEps(-4)
2745 .SetRequestCost("foo", -5)
2746 .SetUtilization("bar", -0.6)
2747 .SetNamedMetrics("baz", -0.7)
2748 .Build(),
2749 OrcaLoadReportBuilder()
2750 .SetRequestCost("foo", -5)
2751 .SetNamedMetrics("baz", -0.7)
2752 .Build());
2753 }
2754
TEST_F(ClientLbInterceptTrailingMetadataTest,AboveOneUtilization)2755 TEST_F(ClientLbInterceptTrailingMetadataTest, AboveOneUtilization) {
2756 RunPerRpcMetricReportingTest(OrcaLoadReportBuilder()
2757 .SetApplicationUtilization(1.9)
2758 .SetCpuUtilization(1.1)
2759 .SetMemUtilization(2)
2760 .SetQps(3)
2761 .SetEps(4)
2762 .SetUtilization("foo", 5)
2763 .Build(),
2764 OrcaLoadReportBuilder()
2765 .SetApplicationUtilization(1.9)
2766 .SetCpuUtilization(1.1)
2767 .SetQps(3)
2768 .SetEps(4)
2769 .Build());
2770 }
2771
TEST_F(ClientLbInterceptTrailingMetadataTest,BackendMetricDataMerge)2772 TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricDataMerge) {
2773 const int kNumServers = 1;
2774 const int kNumRpcs = 10;
2775 StartServers(kNumServers);
2776 servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.99);
2777 servers_[0]->server_metric_recorder_->SetCpuUtilization(0.99);
2778 servers_[0]->server_metric_recorder_->SetMemoryUtilization(0.99);
2779 servers_[0]->server_metric_recorder_->SetQps(0.99);
2780 servers_[0]->server_metric_recorder_->SetEps(0.99);
2781 servers_[0]->server_metric_recorder_->SetNamedUtilization("foo", 0.99);
2782 servers_[0]->server_metric_recorder_->SetNamedUtilization("bar", 0.1);
2783 OrcaLoadReport per_server_load = OrcaLoadReportBuilder()
2784 .SetApplicationUtilization(0.99)
2785 .SetCpuUtilization(0.99)
2786 .SetMemUtilization(0.99)
2787 .SetQps(0.99)
2788 .SetEps(0.99)
2789 .SetUtilization("foo", 0.99)
2790 .SetUtilization("bar", 0.1)
2791 .Build();
2792 FakeResolverResponseGeneratorWrapper response_generator;
2793 auto channel =
2794 BuildChannel("intercept_trailing_metadata_lb", response_generator);
2795 auto stub = BuildStub(channel);
2796 response_generator.SetNextResolution(GetServersPorts());
2797 size_t total_num_rpcs = 0;
2798 {
2799 OrcaLoadReport load_report =
2800 OrcaLoadReportBuilder().SetApplicationUtilization(0.5).Build();
2801 OrcaLoadReport expected = OrcaLoadReportBuilder(per_server_load)
2802 .SetApplicationUtilization(0.5)
2803 .Build();
2804 for (size_t i = 0; i < kNumRpcs; ++i) {
2805 CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2806 auto actual = backend_load_report();
2807 ASSERT_TRUE(actual.has_value());
2808 CheckLoadReportAsExpected(*actual, expected);
2809 ++total_num_rpcs;
2810 }
2811 }
2812 {
2813 OrcaLoadReport load_report =
2814 OrcaLoadReportBuilder().SetMemUtilization(0.5).Build();
2815 OrcaLoadReport expected =
2816 OrcaLoadReportBuilder(per_server_load).SetMemUtilization(0.5).Build();
2817 for (size_t i = 0; i < kNumRpcs; ++i) {
2818 CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2819 auto actual = backend_load_report();
2820 ASSERT_TRUE(actual.has_value());
2821 CheckLoadReportAsExpected(*actual, expected);
2822 ++total_num_rpcs;
2823 }
2824 }
2825 {
2826 OrcaLoadReport load_report = OrcaLoadReportBuilder().SetQps(0.5).Build();
2827 OrcaLoadReport expected =
2828 OrcaLoadReportBuilder(per_server_load).SetQps(0.5).Build();
2829 for (size_t i = 0; i < kNumRpcs; ++i) {
2830 CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2831 auto actual = backend_load_report();
2832 ASSERT_TRUE(actual.has_value());
2833 CheckLoadReportAsExpected(*actual, expected);
2834 ++total_num_rpcs;
2835 }
2836 }
2837 {
2838 OrcaLoadReport load_report = OrcaLoadReportBuilder().SetEps(0.5).Build();
2839 OrcaLoadReport expected =
2840 OrcaLoadReportBuilder(per_server_load).SetEps(0.5).Build();
2841 for (size_t i = 0; i < kNumRpcs; ++i) {
2842 CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2843 auto actual = backend_load_report();
2844 ASSERT_TRUE(actual.has_value());
2845 CheckLoadReportAsExpected(*actual, expected);
2846 ++total_num_rpcs;
2847 }
2848 }
2849 {
2850 OrcaLoadReport load_report =
2851 OrcaLoadReportBuilder()
2852 .SetUtilization("foo", 0.5)
2853 .SetUtilization("bar", 1.1) // Out of range.
2854 .SetUtilization("baz", 1.0)
2855 .Build();
2856 auto expected = OrcaLoadReportBuilder(per_server_load)
2857 .SetUtilization("foo", 0.5)
2858 .SetUtilization("baz", 1.0)
2859 .Build();
2860 for (size_t i = 0; i < kNumRpcs; ++i) {
2861 CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2862 auto actual = backend_load_report();
2863 ASSERT_TRUE(actual.has_value());
2864 CheckLoadReportAsExpected(*actual, expected);
2865 ++total_num_rpcs;
2866 }
2867 }
2868 // Check LB policy name for the channel.
2869 EXPECT_EQ("intercept_trailing_metadata_lb",
2870 channel->GetLoadBalancingPolicyName());
2871 EXPECT_EQ(total_num_rpcs, num_trailers_intercepted());
2872 }
2873
2874 //
2875 // tests that per-address args from the resolver are visible to the LB policy
2876 //
2877
2878 class ClientLbAddressTest : public ClientLbEnd2endTest {
2879 protected:
SetUp()2880 void SetUp() override {
2881 ClientLbEnd2endTest::SetUp();
2882 current_test_instance_ = this;
2883 }
2884
SetUpTestSuite()2885 static void SetUpTestSuite() {
2886 grpc_core::CoreConfiguration::Reset();
2887 grpc_core::CoreConfiguration::RegisterBuilder(
2888 [](grpc_core::CoreConfiguration::Builder* builder) {
2889 grpc_core::RegisterAddressTestLoadBalancingPolicy(builder,
2890 SaveAddress);
2891 });
2892 grpc_init();
2893 }
2894
TearDownTestSuite()2895 static void TearDownTestSuite() {
2896 grpc_shutdown();
2897 grpc_core::CoreConfiguration::Reset();
2898 }
2899
addresses_seen()2900 std::vector<std::string> addresses_seen() {
2901 grpc_core::MutexLock lock(&mu_);
2902 return addresses_seen_;
2903 }
2904
2905 private:
SaveAddress(const grpc_core::EndpointAddresses & address)2906 static void SaveAddress(const grpc_core::EndpointAddresses& address) {
2907 ClientLbAddressTest* self = current_test_instance_;
2908 grpc_core::MutexLock lock(&self->mu_);
2909 self->addresses_seen_.emplace_back(address.ToString());
2910 }
2911
2912 static ClientLbAddressTest* current_test_instance_;
2913 grpc_core::Mutex mu_;
2914 std::vector<std::string> addresses_seen_ ABSL_GUARDED_BY(&mu_);
2915 };
2916
2917 ClientLbAddressTest* ClientLbAddressTest::current_test_instance_ = nullptr;
2918
TEST_F(ClientLbAddressTest,Basic)2919 TEST_F(ClientLbAddressTest, Basic) {
2920 const int kNumServers = 1;
2921 StartServers(kNumServers);
2922 FakeResolverResponseGeneratorWrapper response_generator;
2923 auto channel = BuildChannel("address_test_lb", response_generator);
2924 auto stub = BuildStub(channel);
2925 // Addresses returned by the resolver will have attached args.
2926 response_generator.SetNextResolution(
2927 GetServersPorts(), nullptr,
2928 grpc_core::ChannelArgs().Set("test_key", "test_value"));
2929 CheckRpcSendOk(DEBUG_LOCATION, stub);
2930 // Check LB policy name for the channel.
2931 EXPECT_EQ("address_test_lb", channel->GetLoadBalancingPolicyName());
2932 // Make sure that the attributes wind up on the subchannels.
2933 std::vector<std::string> expected;
2934 for (const int port : GetServersPorts()) {
2935 expected.emplace_back(absl::StrCat("addrs=[", grpc_core::LocalIp(), ":",
2936 port, "] args={test_key=test_value}"));
2937 }
2938 EXPECT_EQ(addresses_seen(), expected);
2939 }
2940
2941 //
2942 // tests OOB backend metric API
2943 //
2944
2945 class OobBackendMetricTest : public ClientLbEnd2endTest {
2946 protected:
2947 using BackendMetricReport = std::pair<int /*port*/, OrcaLoadReport>;
2948
SetUp()2949 void SetUp() override {
2950 ClientLbEnd2endTest::SetUp();
2951 current_test_instance_ = this;
2952 }
2953
SetUpTestSuite()2954 static void SetUpTestSuite() {
2955 grpc_core::CoreConfiguration::Reset();
2956 grpc_core::CoreConfiguration::RegisterBuilder(
2957 [](grpc_core::CoreConfiguration::Builder* builder) {
2958 grpc_core::RegisterOobBackendMetricTestLoadBalancingPolicy(
2959 builder, BackendMetricCallback);
2960 });
2961 grpc_init();
2962 }
2963
TearDownTestSuite()2964 static void TearDownTestSuite() {
2965 grpc_shutdown();
2966 grpc_core::CoreConfiguration::Reset();
2967 }
2968
GetBackendMetricReport()2969 absl::optional<BackendMetricReport> GetBackendMetricReport() {
2970 grpc_core::MutexLock lock(&mu_);
2971 if (backend_metric_reports_.empty()) return absl::nullopt;
2972 auto result = std::move(backend_metric_reports_.front());
2973 backend_metric_reports_.pop_front();
2974 return result;
2975 }
2976
2977 private:
BackendMetricCallback(const grpc_core::EndpointAddresses & address,const grpc_core::BackendMetricData & backend_metric_data)2978 static void BackendMetricCallback(
2979 const grpc_core::EndpointAddresses& address,
2980 const grpc_core::BackendMetricData& backend_metric_data) {
2981 auto load_report = BackendMetricDataToOrcaLoadReport(backend_metric_data);
2982 int port = grpc_sockaddr_get_port(&address.address());
2983 grpc_core::MutexLock lock(¤t_test_instance_->mu_);
2984 current_test_instance_->backend_metric_reports_.push_back(
2985 {port, std::move(load_report)});
2986 }
2987
2988 static OobBackendMetricTest* current_test_instance_;
2989 grpc_core::Mutex mu_;
2990 std::deque<BackendMetricReport> backend_metric_reports_ ABSL_GUARDED_BY(&mu_);
2991 };
2992
2993 OobBackendMetricTest* OobBackendMetricTest::current_test_instance_ = nullptr;
2994
TEST_F(OobBackendMetricTest,Basic)2995 TEST_F(OobBackendMetricTest, Basic) {
2996 StartServers(1);
2997 // Set initial backend metric data on server.
2998 constexpr char kMetricName[] = "foo";
2999 servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.5);
3000 servers_[0]->server_metric_recorder_->SetCpuUtilization(0.1);
3001 servers_[0]->server_metric_recorder_->SetMemoryUtilization(0.2);
3002 servers_[0]->server_metric_recorder_->SetEps(0.3);
3003 servers_[0]->server_metric_recorder_->SetQps(0.4);
3004 servers_[0]->server_metric_recorder_->SetNamedUtilization(kMetricName, 0.4);
3005 // Start client.
3006 FakeResolverResponseGeneratorWrapper response_generator;
3007 auto channel = BuildChannel("oob_backend_metric_test_lb", response_generator);
3008 auto stub = BuildStub(channel);
3009 response_generator.SetNextResolution(GetServersPorts());
3010 // Send an OK RPC.
3011 CheckRpcSendOk(DEBUG_LOCATION, stub);
3012 // Check LB policy name for the channel.
3013 EXPECT_EQ("oob_backend_metric_test_lb",
3014 channel->GetLoadBalancingPolicyName());
3015 // Check report seen by client.
3016 bool report_seen = false;
3017 for (size_t i = 0; i < 5; ++i) {
3018 auto report = GetBackendMetricReport();
3019 if (report.has_value()) {
3020 EXPECT_EQ(report->first, servers_[0]->port_);
3021 EXPECT_EQ(report->second.application_utilization(), 0.5);
3022 EXPECT_EQ(report->second.cpu_utilization(), 0.1);
3023 EXPECT_EQ(report->second.mem_utilization(), 0.2);
3024 EXPECT_EQ(report->second.eps(), 0.3);
3025 EXPECT_EQ(report->second.rps_fractional(), 0.4);
3026 EXPECT_THAT(
3027 report->second.utilization(),
3028 ::testing::UnorderedElementsAre(::testing::Pair(kMetricName, 0.4)));
3029 report_seen = true;
3030 break;
3031 }
3032 gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
3033 }
3034 ASSERT_TRUE(report_seen);
3035 // Now update the utilization data on the server.
3036 // Note that the server may send a new report while we're updating these,
3037 // so we set them in reverse order, so that we know we'll get all new
3038 // data once we see a report with the new app utilization value.
3039 servers_[0]->server_metric_recorder_->SetNamedUtilization(kMetricName, 0.7);
3040 servers_[0]->server_metric_recorder_->SetQps(0.8);
3041 servers_[0]->server_metric_recorder_->SetEps(0.6);
3042 servers_[0]->server_metric_recorder_->SetMemoryUtilization(0.5);
3043 servers_[0]->server_metric_recorder_->SetCpuUtilization(2.4);
3044 servers_[0]->server_metric_recorder_->SetApplicationUtilization(1.2);
3045 // Wait for client to see new report.
3046 report_seen = false;
3047 for (size_t i = 0; i < 5; ++i) {
3048 auto report = GetBackendMetricReport();
3049 if (report.has_value()) {
3050 EXPECT_EQ(report->first, servers_[0]->port_);
3051 if (report->second.application_utilization() != 0.5) {
3052 EXPECT_EQ(report->second.application_utilization(), 1.2);
3053 EXPECT_EQ(report->second.cpu_utilization(), 2.4);
3054 EXPECT_EQ(report->second.mem_utilization(), 0.5);
3055 EXPECT_EQ(report->second.eps(), 0.6);
3056 EXPECT_EQ(report->second.rps_fractional(), 0.8);
3057 EXPECT_THAT(
3058 report->second.utilization(),
3059 ::testing::UnorderedElementsAre(::testing::Pair(kMetricName, 0.7)));
3060 report_seen = true;
3061 break;
3062 }
3063 }
3064 gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
3065 }
3066 ASSERT_TRUE(report_seen);
3067 }
3068
3069 //
3070 // tests rewriting of control plane status codes
3071 //
3072
3073 class ControlPlaneStatusRewritingTest : public ClientLbEnd2endTest {
3074 protected:
SetUpTestSuite()3075 static void SetUpTestSuite() {
3076 grpc_core::CoreConfiguration::Reset();
3077 grpc_core::CoreConfiguration::RegisterBuilder(
3078 [](grpc_core::CoreConfiguration::Builder* builder) {
3079 grpc_core::RegisterFailLoadBalancingPolicy(
3080 builder, absl::AbortedError("nope"));
3081 });
3082 grpc_init();
3083 }
3084
TearDownTestSuite()3085 static void TearDownTestSuite() {
3086 grpc_shutdown();
3087 grpc_core::CoreConfiguration::Reset();
3088 }
3089 };
3090
TEST_F(ControlPlaneStatusRewritingTest,RewritesFromLb)3091 TEST_F(ControlPlaneStatusRewritingTest, RewritesFromLb) {
3092 // Start client.
3093 FakeResolverResponseGeneratorWrapper response_generator;
3094 auto channel = BuildChannel("fail_lb", response_generator);
3095 auto stub = BuildStub(channel);
3096 response_generator.SetNextResolution(GetServersPorts());
3097 // Send an RPC, verify that status was rewritten.
3098 CheckRpcSendFailure(
3099 DEBUG_LOCATION, stub, StatusCode::INTERNAL,
3100 "Illegal status code from LB pick; original status: ABORTED: nope");
3101 }
3102
TEST_F(ControlPlaneStatusRewritingTest,RewritesFromResolver)3103 TEST_F(ControlPlaneStatusRewritingTest, RewritesFromResolver) {
3104 // Start client.
3105 FakeResolverResponseGeneratorWrapper response_generator;
3106 auto channel = BuildChannel("pick_first", response_generator);
3107 auto stub = BuildStub(channel);
3108 grpc_core::Resolver::Result result;
3109 result.service_config = absl::AbortedError("nope");
3110 result.addresses.emplace();
3111 response_generator.SetResponse(std::move(result));
3112 // Send an RPC, verify that status was rewritten.
3113 CheckRpcSendFailure(
3114 DEBUG_LOCATION, stub, StatusCode::INTERNAL,
3115 "Illegal status code from resolver; original status: ABORTED: nope");
3116 }
3117
TEST_F(ControlPlaneStatusRewritingTest,RewritesFromConfigSelector)3118 TEST_F(ControlPlaneStatusRewritingTest, RewritesFromConfigSelector) {
3119 class FailConfigSelector : public grpc_core::ConfigSelector {
3120 public:
3121 explicit FailConfigSelector(absl::Status status)
3122 : status_(std::move(status)) {}
3123 grpc_core::UniqueTypeName name() const override {
3124 static grpc_core::UniqueTypeName::Factory kFactory("FailConfigSelector");
3125 return kFactory.Create();
3126 }
3127 bool Equals(const ConfigSelector* other) const override {
3128 return status_ == static_cast<const FailConfigSelector*>(other)->status_;
3129 }
3130 absl::Status GetCallConfig(GetCallConfigArgs /*args*/) override {
3131 return status_;
3132 }
3133
3134 private:
3135 absl::Status status_;
3136 };
3137 // Start client.
3138 FakeResolverResponseGeneratorWrapper response_generator;
3139 auto channel = BuildChannel("pick_first", response_generator);
3140 auto stub = BuildStub(channel);
3141 auto config_selector =
3142 grpc_core::MakeRefCounted<FailConfigSelector>(absl::AbortedError("nope"));
3143 grpc_core::Resolver::Result result;
3144 result.addresses.emplace();
3145 result.service_config =
3146 grpc_core::ServiceConfigImpl::Create(grpc_core::ChannelArgs(), "{}");
3147 ASSERT_TRUE(result.service_config.ok()) << result.service_config.status();
3148 result.args = grpc_core::ChannelArgs().SetObject(config_selector);
3149 response_generator.SetResponse(std::move(result));
3150 // Send an RPC, verify that status was rewritten.
3151 CheckRpcSendFailure(
3152 DEBUG_LOCATION, stub, StatusCode::INTERNAL,
3153 "Illegal status code from ConfigSelector; original status: "
3154 "ABORTED: nope");
3155 }
3156
3157 //
3158 // WeightedRoundRobinTest
3159 //
3160
3161 const char kServiceConfigPerCall[] =
3162 "{\n"
3163 " \"loadBalancingConfig\": [\n"
3164 " {\"weighted_round_robin\": {\n"
3165 " \"blackoutPeriod\": \"0s\",\n"
3166 " \"weightUpdatePeriod\": \"0.1s\"\n"
3167 " }}\n"
3168 " ]\n"
3169 "}";
3170
3171 const char kServiceConfigOob[] =
3172 "{\n"
3173 " \"loadBalancingConfig\": [\n"
3174 " {\"weighted_round_robin\": {\n"
3175 " \"blackoutPeriod\": \"0s\",\n"
3176 " \"weightUpdatePeriod\": \"0.1s\",\n"
3177 " \"enableOobLoadReport\": true\n"
3178 " }}\n"
3179 " ]\n"
3180 "}";
3181
3182 const char kServiceConfigWithOutlierDetection[] =
3183 "{\n"
3184 " \"loadBalancingConfig\": [\n"
3185 " {\"outlier_detection_experimental\": {\n"
3186 " \"childPolicy\": [\n"
3187 " {\"weighted_round_robin\": {\n"
3188 " \"blackoutPeriod\": \"%ds\",\n"
3189 " \"weightUpdatePeriod\": \"0.1s\"\n"
3190 " }}\n"
3191 " ]\n"
3192 " }}\n"
3193 " ]\n"
3194 "}";
3195
3196 class WeightedRoundRobinTest : public ClientLbEnd2endTest {
3197 protected:
ExpectWeightedRoundRobinPicks(const grpc_core::DebugLocation & location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,const std::vector<size_t> & expected_weights,size_t total_passes=3,EchoRequest * request_ptr=nullptr,int timeout_ms=15000)3198 void ExpectWeightedRoundRobinPicks(
3199 const grpc_core::DebugLocation& location,
3200 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
3201 const std::vector<size_t>& expected_weights, size_t total_passes = 3,
3202 EchoRequest* request_ptr = nullptr, int timeout_ms = 15000) {
3203 CHECK_EQ(expected_weights.size(), servers_.size());
3204 size_t total_picks_per_pass = 0;
3205 for (size_t picks : expected_weights) {
3206 total_picks_per_pass += picks;
3207 }
3208 size_t num_picks = 0;
3209 size_t num_passes = 0;
3210 SendRpcsUntil(
3211 location, stub,
3212 [&](const Status&) {
3213 if (++num_picks == total_picks_per_pass) {
3214 bool match = true;
3215 for (size_t i = 0; i < expected_weights.size(); ++i) {
3216 if (servers_[i]->service_.request_count() !=
3217 expected_weights[i]) {
3218 match = false;
3219 break;
3220 }
3221 }
3222 if (match) {
3223 if (++num_passes == total_passes) return false;
3224 } else {
3225 num_passes = 0;
3226 }
3227 num_picks = 0;
3228 ResetCounters();
3229 }
3230 return true;
3231 },
3232 request_ptr, timeout_ms);
3233 }
3234 };
3235
TEST_F(WeightedRoundRobinTest,CallAndServerMetric)3236 TEST_F(WeightedRoundRobinTest, CallAndServerMetric) {
3237 const int kNumServers = 3;
3238 StartServers(kNumServers);
3239 // Report server metrics that should give 6:4:3 WRR picks.
3240 // weights = qps / (util + (eps/qps)) =
3241 // 1/(0.2+0.2) : 1/(0.3+0.3) : 2/(1.5+0.1) = 6:4:3
3242 // where util is app_util if set, or cpu_util.
3243 servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.2);
3244 servers_[0]->server_metric_recorder_->SetEps(20);
3245 servers_[0]->server_metric_recorder_->SetQps(100);
3246 servers_[1]->server_metric_recorder_->SetApplicationUtilization(0.3);
3247 servers_[1]->server_metric_recorder_->SetEps(30);
3248 servers_[1]->server_metric_recorder_->SetQps(100);
3249 servers_[2]->server_metric_recorder_->SetApplicationUtilization(1.5);
3250 servers_[2]->server_metric_recorder_->SetEps(20);
3251 servers_[2]->server_metric_recorder_->SetQps(200);
3252 // Create channel.
3253 FakeResolverResponseGeneratorWrapper response_generator;
3254 auto channel = BuildChannel("", response_generator);
3255 auto stub = BuildStub(channel);
3256 response_generator.SetNextResolution(GetServersPorts(),
3257 kServiceConfigPerCall);
3258 // Send requests with per-call reported EPS/QPS set to 0/100.
3259 // This should give 1/2:1/3:1/15 = 15:10:2 WRR picks.
3260 EchoRequest request;
3261 // We cannot override with 0 with proto3, so setting it to almost 0.
3262 request.mutable_param()->mutable_backend_metrics()->set_eps(
3263 std::numeric_limits<double>::min());
3264 request.mutable_param()->mutable_backend_metrics()->set_rps_fractional(100);
3265 ExpectWeightedRoundRobinPicks(DEBUG_LOCATION, stub,
3266 /*expected_weights=*/{15, 10, 2},
3267 /*total_passes=*/3, &request);
3268 // Now send requests without per-call reported QPS.
3269 // This should change WRR picks back to 6:4:3.
3270 ExpectWeightedRoundRobinPicks(DEBUG_LOCATION, stub,
3271 /*expected_weights=*/{6, 4, 3});
3272 // Check LB policy name for the channel.
3273 EXPECT_EQ("weighted_round_robin", channel->GetLoadBalancingPolicyName());
3274 }
3275
3276 // This tests a bug seen in production where the outlier_detection
3277 // policy would incorrectly generate a duplicate READY notification on
3278 // all of its subchannels every time it saw an update, thus causing the
3279 // WRR policy to re-enter the blackout period for that address.
TEST_F(WeightedRoundRobinTest,WithOutlierDetection)3280 TEST_F(WeightedRoundRobinTest, WithOutlierDetection) {
3281 const int kBlackoutPeriodSeconds = 10;
3282 const int kNumServers = 3;
3283 StartServers(kNumServers);
3284 // Report server metrics that should give 6:4:3 WRR picks.
3285 // weights = qps / (util + (eps/qps)) =
3286 // 1/(0.2+0.2) : 1/(0.3+0.3) : 2/(1.5+0.1) = 6:4:3
3287 // where util is app_util if set, or cpu_util.
3288 servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.2);
3289 servers_[0]->server_metric_recorder_->SetEps(20);
3290 servers_[0]->server_metric_recorder_->SetQps(100);
3291 servers_[1]->server_metric_recorder_->SetApplicationUtilization(0.3);
3292 servers_[1]->server_metric_recorder_->SetEps(30);
3293 servers_[1]->server_metric_recorder_->SetQps(100);
3294 servers_[2]->server_metric_recorder_->SetApplicationUtilization(1.5);
3295 servers_[2]->server_metric_recorder_->SetEps(20);
3296 servers_[2]->server_metric_recorder_->SetQps(200);
3297 // Create channel.
3298 // Initial blackout period is 0, so that we start seeing traffic in
3299 // the right proportions right away.
3300 FakeResolverResponseGeneratorWrapper response_generator;
3301 auto channel = BuildChannel("", response_generator);
3302 auto stub = BuildStub(channel);
3303 response_generator.SetNextResolution(
3304 GetServersPorts(),
3305 absl::StrFormat(kServiceConfigWithOutlierDetection, 0).c_str());
3306 // Send requests with per-call reported EPS/QPS set to 0/100.
3307 // This should give 1/2:1/3:1/15 = 15:10:2 WRR picks.
3308 // Keep sending RPCs long enough to go past the new blackout period
3309 // that we're going to add later.
3310 absl::Time deadline =
3311 absl::Now() +
3312 absl::Seconds(kBlackoutPeriodSeconds * grpc_test_slowdown_factor());
3313 EchoRequest request;
3314 // We cannot override with 0 with proto3, so setting it to almost 0.
3315 request.mutable_param()->mutable_backend_metrics()->set_eps(
3316 std::numeric_limits<double>::min());
3317 request.mutable_param()->mutable_backend_metrics()->set_rps_fractional(100);
3318 do {
3319 ExpectWeightedRoundRobinPicks(DEBUG_LOCATION, stub,
3320 /*expected_weights=*/{15, 10, 2},
3321 /*total_passes=*/3, &request);
3322 } while (absl::Now() < deadline);
3323 // Send a new resolver response that increases blackout period.
3324 response_generator.SetNextResolution(
3325 GetServersPorts(),
3326 absl::StrFormat(kServiceConfigWithOutlierDetection,
3327 kBlackoutPeriodSeconds * grpc_test_slowdown_factor())
3328 .c_str());
3329 // Weights should be the same before the blackout period expires.
3330 ExpectWeightedRoundRobinPicks(
3331 DEBUG_LOCATION, stub, /*expected_weights=*/{15, 10, 2},
3332 /*total_passes=*/3, &request,
3333 /*timeout_ms=*/(kBlackoutPeriodSeconds - 1) * 1000);
3334 }
3335
3336 class WeightedRoundRobinParamTest
3337 : public WeightedRoundRobinTest,
3338 public ::testing::WithParamInterface<const char*> {};
3339
3340 INSTANTIATE_TEST_SUITE_P(WeightedRoundRobin, WeightedRoundRobinParamTest,
3341 ::testing::Values(kServiceConfigPerCall,
3342 kServiceConfigOob));
3343
TEST_P(WeightedRoundRobinParamTest,Basic)3344 TEST_P(WeightedRoundRobinParamTest, Basic) {
3345 const int kNumServers = 3;
3346 StartServers(kNumServers);
3347 // Report server metrics that should give 1:2:4 WRR picks.
3348 // weights = qps / (util + (eps/qps)) =
3349 // 1/(0.4+0.4) : 1/(0.2+0.2) : 2/(0.3+0.1) = 1:2:4
3350 // where util is app_util if set, or cpu_util.
3351 servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.4);
3352 servers_[0]->server_metric_recorder_->SetEps(40);
3353 servers_[0]->server_metric_recorder_->SetQps(100);
3354 servers_[1]->server_metric_recorder_->SetApplicationUtilization(0.2);
3355 servers_[1]->server_metric_recorder_->SetEps(20);
3356 servers_[1]->server_metric_recorder_->SetQps(100);
3357 servers_[2]->server_metric_recorder_->SetApplicationUtilization(0.3);
3358 servers_[2]->server_metric_recorder_->SetEps(5);
3359 servers_[2]->server_metric_recorder_->SetQps(200);
3360 // Create channel.
3361 FakeResolverResponseGeneratorWrapper response_generator;
3362 auto channel = BuildChannel("", response_generator);
3363 auto stub = BuildStub(channel);
3364 response_generator.SetNextResolution(GetServersPorts(), GetParam());
3365 // Wait for the right set of WRR picks.
3366 ExpectWeightedRoundRobinPicks(DEBUG_LOCATION, stub,
3367 /*expected_weights=*/{1, 2, 4});
3368 // Check LB policy name for the channel.
3369 EXPECT_EQ("weighted_round_robin", channel->GetLoadBalancingPolicyName());
3370 }
3371
3372 } // namespace
3373 } // namespace testing
3374 } // namespace grpc
3375
main(int argc,char ** argv)3376 int main(int argc, char** argv) {
3377 ::testing::InitGoogleTest(&argc, argv);
3378 grpc::testing::TestEnvironment env(&argc, argv);
3379 // Make the backup poller poll very frequently in order to pick up
3380 // updates from all the subchannels's FDs.
3381 grpc_core::ConfigVars::Overrides overrides;
3382 overrides.client_channel_backup_poll_interval_ms = 1;
3383 grpc_core::ConfigVars::SetOverrides(overrides);
3384 #if TARGET_OS_IPHONE
3385 // Workaround Apple CFStream bug
3386 grpc_core::SetEnv("grpc_cfstream", "0");
3387 #endif
3388 grpc_init();
3389 grpc::testing::ConnectionAttemptInjector::Init();
3390 const auto result = RUN_ALL_TESTS();
3391 grpc_shutdown();
3392 return result;
3393 }
3394