• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <gmock/gmock.h>
16 #include <grpc/event_engine/endpoint_config.h>
17 #include <grpc/grpc.h>
18 #include <grpc/support/alloc.h>
19 #include <grpc/support/atm.h>
20 #include <grpc/support/time.h>
21 #include <grpcpp/channel.h>
22 #include <grpcpp/client_context.h>
23 #include <grpcpp/create_channel.h>
24 #include <grpcpp/ext/call_metric_recorder.h>
25 #include <grpcpp/ext/orca_service.h>
26 #include <grpcpp/ext/server_metric_recorder.h>
27 #include <grpcpp/health_check_service_interface.h>
28 #include <grpcpp/impl/sync.h>
29 #include <grpcpp/server.h>
30 #include <grpcpp/server_builder.h>
31 #include <gtest/gtest.h>
32 
33 #include <algorithm>
34 #include <chrono>
35 #include <deque>
36 #include <memory>
37 #include <mutex>
38 #include <random>
39 #include <set>
40 #include <string>
41 #include <thread>
42 
43 #include "absl/log/check.h"
44 #include "absl/log/log.h"
45 #include "absl/memory/memory.h"
46 #include "absl/strings/str_cat.h"
47 #include "absl/strings/str_format.h"
48 #include "absl/strings/str_join.h"
49 #include "absl/strings/string_view.h"
50 #include "src/core/client_channel/backup_poller.h"
51 #include "src/core/client_channel/config_selector.h"
52 #include "src/core/client_channel/global_subchannel_pool.h"
53 #include "src/core/config/config_vars.h"
54 #include "src/core/lib/address_utils/parse_address.h"
55 #include "src/core/lib/address_utils/sockaddr_utils.h"
56 #include "src/core/lib/channel/channel_args.h"
57 #include "src/core/lib/iomgr/tcp_client.h"
58 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
59 #include "src/core/lib/transport/connectivity_state.h"
60 #include "src/core/resolver/endpoint_addresses.h"
61 #include "src/core/resolver/fake/fake_resolver.h"
62 #include "src/core/server/server.h"
63 #include "src/core/service_config/service_config.h"
64 #include "src/core/service_config/service_config_impl.h"
65 #include "src/core/util/backoff.h"
66 #include "src/core/util/crash.h"
67 #include "src/core/util/debug_location.h"
68 #include "src/core/util/env.h"
69 #include "src/core/util/notification.h"
70 #include "src/core/util/ref_counted_ptr.h"
71 #include "src/core/util/time.h"
72 #include "src/cpp/server/secure_server_credentials.h"
73 #include "src/proto/grpc/health/v1/health.grpc.pb.h"
74 #include "src/proto/grpc/testing/echo.grpc.pb.h"
75 #include "test/core/test_util/port.h"
76 #include "test/core/test_util/resolve_localhost_ip46.h"
77 #include "test/core/test_util/test_config.h"
78 #include "test/core/test_util/test_lb_policies.h"
79 #include "test/cpp/end2end/connection_attempt_injector.h"
80 #include "test/cpp/end2end/test_service_impl.h"
81 #include "test/cpp/util/credentials.h"
82 #include "xds/data/orca/v3/orca_load_report.pb.h"
83 
84 namespace grpc {
85 namespace testing {
86 namespace {
87 
88 using xds::data::orca::v3::OrcaLoadReport;
89 constexpr char kRequestMessage[] = "Live long and prosper.";
90 
91 // A noop health check service that just terminates the call and returns OK
92 // status in its methods. This is used to test the retry mechanism in
93 // SubchannelStreamClient.
94 class NoopHealthCheckServiceImpl : public health::v1::Health::Service {
95  public:
96   ~NoopHealthCheckServiceImpl() override = default;
Check(ServerContext *,const health::v1::HealthCheckRequest *,health::v1::HealthCheckResponse *)97   Status Check(ServerContext*, const health::v1::HealthCheckRequest*,
98                health::v1::HealthCheckResponse*) override {
99     return Status::OK;
100   }
Watch(ServerContext *,const health::v1::HealthCheckRequest *,ServerWriter<health::v1::HealthCheckResponse> *)101   Status Watch(ServerContext*, const health::v1::HealthCheckRequest*,
102                ServerWriter<health::v1::HealthCheckResponse>*) override {
103     grpc_core::MutexLock lock(&mu_);
104     request_count_++;
105     return Status::OK;
106   }
request_count()107   int request_count() {
108     grpc_core::MutexLock lock(&mu_);
109     return request_count_;
110   }
111 
112  private:
113   grpc_core::Mutex mu_;
114   int request_count_ ABSL_GUARDED_BY(&mu_) = 0;
115 };
116 
117 // Subclass of TestServiceImpl that increments a request counter for
118 // every call to the Echo RPC.
119 class MyTestServiceImpl : public TestServiceImpl {
120  public:
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)121   Status Echo(ServerContext* context, const EchoRequest* request,
122               EchoResponse* response) override {
123     {
124       grpc_core::MutexLock lock(&mu_);
125       ++request_count_;
126     }
127     AddClient(context->peer());
128     if (request->has_param() && request->param().has_backend_metrics()) {
129       const auto& request_metrics = request->param().backend_metrics();
130       auto* recorder = context->ExperimentalGetCallMetricRecorder();
131       EXPECT_NE(recorder, nullptr);
132       // Do not record when zero since it indicates no test per-call report.
133       if (request_metrics.application_utilization() > 0) {
134         recorder->RecordApplicationUtilizationMetric(
135             request_metrics.application_utilization());
136       }
137       if (request_metrics.cpu_utilization() > 0) {
138         recorder->RecordCpuUtilizationMetric(request_metrics.cpu_utilization());
139       }
140       if (request_metrics.mem_utilization() > 0) {
141         recorder->RecordMemoryUtilizationMetric(
142             request_metrics.mem_utilization());
143       }
144       if (request_metrics.rps_fractional() > 0) {
145         recorder->RecordQpsMetric(request_metrics.rps_fractional());
146       }
147       if (request_metrics.eps() > 0) {
148         recorder->RecordEpsMetric(request_metrics.eps());
149       }
150       for (const auto& p : request_metrics.request_cost()) {
151         char* key = static_cast<char*>(
152             grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));
153         strncpy(key, p.first.data(), p.first.size());
154         key[p.first.size()] = '\0';
155         recorder->RecordRequestCostMetric(key, p.second);
156       }
157       for (const auto& p : request_metrics.utilization()) {
158         char* key = static_cast<char*>(
159             grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));
160         strncpy(key, p.first.data(), p.first.size());
161         key[p.first.size()] = '\0';
162         recorder->RecordUtilizationMetric(key, p.second);
163       }
164       for (const auto& p : request_metrics.named_metrics()) {
165         char* key = static_cast<char*>(
166             grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));
167         strncpy(key, p.first.data(), p.first.size());
168         key[p.first.size()] = '\0';
169         recorder->RecordNamedMetric(key, p.second);
170       }
171     }
172     return TestServiceImpl::Echo(context, request, response);
173   }
174 
request_count()175   size_t request_count() {
176     grpc_core::MutexLock lock(&mu_);
177     return request_count_;
178   }
179 
ResetCounters()180   void ResetCounters() {
181     grpc_core::MutexLock lock(&mu_);
182     request_count_ = 0;
183   }
184 
clients()185   std::set<std::string> clients() {
186     grpc_core::MutexLock lock(&clients_mu_);
187     return clients_;
188   }
189 
190  private:
AddClient(const std::string & client)191   void AddClient(const std::string& client) {
192     grpc_core::MutexLock lock(&clients_mu_);
193     clients_.insert(client);
194   }
195 
196   grpc_core::Mutex mu_;
197   size_t request_count_ ABSL_GUARDED_BY(&mu_) = 0;
198 
199   grpc_core::Mutex clients_mu_;
200   std::set<std::string> clients_ ABSL_GUARDED_BY(&clients_mu_);
201 };
202 
203 class FakeResolverResponseGeneratorWrapper {
204  public:
FakeResolverResponseGeneratorWrapper()205   FakeResolverResponseGeneratorWrapper()
206       : response_generator_(grpc_core::MakeRefCounted<
207                             grpc_core::FakeResolverResponseGenerator>()) {}
208 
FakeResolverResponseGeneratorWrapper(FakeResolverResponseGeneratorWrapper && other)209   FakeResolverResponseGeneratorWrapper(
210       FakeResolverResponseGeneratorWrapper&& other) noexcept {
211     response_generator_ = std::move(other.response_generator_);
212   }
213 
SetResponse(grpc_core::Resolver::Result result)214   void SetResponse(grpc_core::Resolver::Result result) {
215     grpc_core::ExecCtx exec_ctx;
216     response_generator_->SetResponseSynchronously(std::move(result));
217   }
218 
SetNextResolution(const std::vector<int> & ports,const char * service_config_json=nullptr,const grpc_core::ChannelArgs & per_address_args=grpc_core::ChannelArgs ())219   void SetNextResolution(const std::vector<int>& ports,
220                          const char* service_config_json = nullptr,
221                          const grpc_core::ChannelArgs& per_address_args =
222                              grpc_core::ChannelArgs()) {
223     SetResponse(BuildFakeResults(ports, service_config_json, per_address_args));
224   }
225 
Get() const226   grpc_core::FakeResolverResponseGenerator* Get() const {
227     return response_generator_.get();
228   }
229 
230  private:
BuildFakeResults(const std::vector<int> & ports,const char * service_config_json=nullptr,const grpc_core::ChannelArgs & per_address_args=grpc_core::ChannelArgs ())231   static grpc_core::Resolver::Result BuildFakeResults(
232       const std::vector<int>& ports, const char* service_config_json = nullptr,
233       const grpc_core::ChannelArgs& per_address_args =
234           grpc_core::ChannelArgs()) {
235     grpc_core::Resolver::Result result;
236     result.addresses = grpc_core::EndpointAddressesList();
237     for (const int& port : ports) {
238       absl::StatusOr<grpc_core::URI> lb_uri =
239           grpc_core::URI::Parse(grpc_core::LocalIpUri(port));
240       CHECK_OK(lb_uri);
241       grpc_resolved_address address;
242       CHECK(grpc_parse_uri(*lb_uri, &address));
243       result.addresses->emplace_back(address, per_address_args);
244     }
245     if (result.addresses->empty()) {
246       result.resolution_note = "fake resolver empty address list";
247     }
248     if (service_config_json != nullptr) {
249       result.service_config = grpc_core::ServiceConfigImpl::Create(
250           grpc_core::ChannelArgs(), service_config_json);
251       EXPECT_TRUE(result.service_config.ok()) << result.service_config.status();
252     }
253     return result;
254   }
255 
256   grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
257       response_generator_;
258 };
259 
260 constexpr absl::string_view kDefaultAuthority = "default.example.com";
261 
262 class ClientLbEnd2endTest : public ::testing::Test {
263  protected:
SetUp()264   void SetUp() override { grpc_init(); }
265 
TearDown()266   void TearDown() override {
267     for (size_t i = 0; i < servers_.size(); ++i) {
268       servers_[i]->Shutdown();
269     }
270     servers_.clear();
271     grpc_shutdown();
272   }
273 
CreateServers(size_t num_servers,std::vector<int> ports={},std::shared_ptr<ServerCredentials> server_creds=nullptr)274   void CreateServers(
275       size_t num_servers, std::vector<int> ports = {},
276       std::shared_ptr<ServerCredentials> server_creds = nullptr) {
277     servers_.clear();
278     for (size_t i = 0; i < num_servers; ++i) {
279       int port = 0;
280       if (ports.size() == num_servers) port = ports[i];
281       servers_.emplace_back(new ServerData(port, server_creds));
282     }
283   }
284 
StartServer(size_t index)285   void StartServer(size_t index) { servers_[index]->Start(); }
286 
StartServers(size_t num_servers,std::vector<int> ports={},std::shared_ptr<ServerCredentials> server_creds=nullptr)287   void StartServers(size_t num_servers, std::vector<int> ports = {},
288                     std::shared_ptr<ServerCredentials> server_creds = nullptr) {
289     CreateServers(num_servers, std::move(ports), std::move(server_creds));
290     for (size_t i = 0; i < num_servers; ++i) {
291       StartServer(i);
292     }
293   }
294 
GetServersPorts(size_t start_index=0,size_t stop_index=0)295   std::vector<int> GetServersPorts(size_t start_index = 0,
296                                    size_t stop_index = 0) {
297     if (stop_index == 0) stop_index = servers_.size();
298     std::vector<int> ports;
299     for (size_t i = start_index; i < stop_index; ++i) {
300       ports.push_back(servers_[i]->port_);
301     }
302     return ports;
303   }
304 
BuildStub(const std::shared_ptr<Channel> & channel)305   std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
306       const std::shared_ptr<Channel>& channel) {
307     return grpc::testing::EchoTestService::NewStub(channel);
308   }
309 
BuildChannel(const std::string & lb_policy_name,const FakeResolverResponseGeneratorWrapper & response_generator,ChannelArguments args=ChannelArguments (),std::shared_ptr<ChannelCredentials> channel_creds=nullptr)310   std::shared_ptr<Channel> BuildChannel(
311       const std::string& lb_policy_name,
312       const FakeResolverResponseGeneratorWrapper& response_generator,
313       ChannelArguments args = ChannelArguments(),
314       std::shared_ptr<ChannelCredentials> channel_creds = nullptr) {
315     if (!lb_policy_name.empty()) {
316       args.SetLoadBalancingPolicyName(lb_policy_name);
317     }  // else, default to pick first
318     args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
319                     response_generator.Get());
320     if (channel_creds == nullptr) {
321       channel_creds =
322           std::make_shared<FakeTransportSecurityChannelCredentials>();
323     }
324     return grpc::CreateCustomChannel(absl::StrCat("fake:", kDefaultAuthority),
325                                      channel_creds, args);
326   }
327 
SendRpc(const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,EchoResponse * response=nullptr,int timeout_ms=1000,bool wait_for_ready=false,EchoRequest * request=nullptr,std::string authority_override="")328   Status SendRpc(
329       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
330       EchoResponse* response = nullptr, int timeout_ms = 1000,
331       bool wait_for_ready = false, EchoRequest* request = nullptr,
332       std::string authority_override = "") {
333     EchoResponse local_response;
334     if (response == nullptr) response = &local_response;
335     EchoRequest local_request;
336     if (request == nullptr) request = &local_request;
337     request->set_message(kRequestMessage);
338     request->mutable_param()->set_echo_metadata(true);
339     ClientContext context;
340     context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
341     if (!authority_override.empty()) context.set_authority(authority_override);
342     if (wait_for_ready) context.set_wait_for_ready(true);
343     context.AddMetadata("foo", "1");
344     context.AddMetadata("bar", "2");
345     context.AddMetadata("baz", "3");
346     return stub->Echo(&context, *request, response);
347   }
348 
CheckRpcSendOk(const grpc_core::DebugLocation & location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,bool wait_for_ready=false,const OrcaLoadReport * load_report=nullptr,int timeout_ms=2000)349   void CheckRpcSendOk(
350       const grpc_core::DebugLocation& location,
351       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
352       bool wait_for_ready = false, const OrcaLoadReport* load_report = nullptr,
353       int timeout_ms = 2000) {
354     EchoResponse response;
355     EchoRequest request;
356     EchoRequest* request_ptr = nullptr;
357     if (load_report != nullptr) {
358       request_ptr = &request;
359       auto params = request.mutable_param();
360       auto backend_metrics = params->mutable_backend_metrics();
361       *backend_metrics = *load_report;
362     }
363     Status status =
364         SendRpc(stub, &response, timeout_ms, wait_for_ready, request_ptr);
365     ASSERT_TRUE(status.ok())
366         << "From " << location.file() << ":" << location.line()
367         << "\nError: " << status.error_message() << " "
368         << status.error_details();
369     ASSERT_EQ(response.message(), kRequestMessage)
370         << "From " << location.file() << ":" << location.line();
371   }
372 
CheckRpcSendFailure(const grpc_core::DebugLocation & location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,StatusCode expected_status,absl::string_view expected_message_regex)373   void CheckRpcSendFailure(
374       const grpc_core::DebugLocation& location,
375       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
376       StatusCode expected_status, absl::string_view expected_message_regex) {
377     Status status = SendRpc(stub);
378     EXPECT_FALSE(status.ok());
379     EXPECT_EQ(expected_status, status.error_code())
380         << location.file() << ":" << location.line();
381     EXPECT_THAT(status.error_message(),
382                 ::testing::MatchesRegex(expected_message_regex))
383         << location.file() << ":" << location.line();
384   }
385 
SendRpcsUntil(const grpc_core::DebugLocation & debug_location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,absl::AnyInvocable<bool (const Status &)> continue_predicate,EchoRequest * request_ptr=nullptr,int timeout_ms=15000)386   void SendRpcsUntil(
387       const grpc_core::DebugLocation& debug_location,
388       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
389       absl::AnyInvocable<bool(const Status&)> continue_predicate,
390       EchoRequest* request_ptr = nullptr, int timeout_ms = 15000) {
391     absl::Time deadline = absl::InfiniteFuture();
392     if (timeout_ms != 0) {
393       deadline = absl::Now() +
394                  (absl::Milliseconds(timeout_ms) * grpc_test_slowdown_factor());
395     }
396     while (true) {
397       Status status =
398           SendRpc(stub, /*response=*/nullptr, /*timeout_ms=*/1000,
399                   /*wait_for_ready=*/false, /*request=*/request_ptr);
400       if (!continue_predicate(status)) return;
401       EXPECT_LE(absl::Now(), deadline)
402           << debug_location.file() << ":" << debug_location.line();
403       if (absl::Now() >= deadline) break;
404     }
405   }
406 
407   struct ServerData {
408     const int port_;
409     const std::shared_ptr<ServerCredentials> server_creds_;
410     std::unique_ptr<Server> server_;
411     MyTestServiceImpl service_;
412     std::unique_ptr<experimental::ServerMetricRecorder> server_metric_recorder_;
413     experimental::OrcaService orca_service_;
414     std::unique_ptr<std::thread> thread_;
415     bool enable_noop_health_check_service_ = false;
416     NoopHealthCheckServiceImpl noop_health_check_service_impl_;
417 
418     grpc_core::Mutex mu_;
419     grpc_core::CondVar cond_;
420     bool server_ready_ ABSL_GUARDED_BY(mu_) = false;
421     bool started_ ABSL_GUARDED_BY(mu_) = false;
422 
ServerDatagrpc::testing::__anon64669aad0111::ClientLbEnd2endTest::ServerData423     explicit ServerData(
424         int port = 0, std::shared_ptr<ServerCredentials> server_creds = nullptr)
425         : port_(port > 0 ? port : grpc_pick_unused_port_or_die()),
426           server_creds_(
427               server_creds == nullptr
428                   ? std::shared_ptr<
429                         ServerCredentials>(new SecureServerCredentials(
430                         grpc_fake_transport_security_server_credentials_create()))
431                   : std::move(server_creds)),
432           server_metric_recorder_(experimental::ServerMetricRecorder::Create()),
433           orca_service_(
434               server_metric_recorder_.get(),
435               experimental::OrcaService::Options().set_min_report_duration(
436                   absl::Seconds(0.1))) {}
437 
Startgrpc::testing::__anon64669aad0111::ClientLbEnd2endTest::ServerData438     void Start() {
439       LOG(INFO) << "starting server on port " << port_;
440       grpc_core::MutexLock lock(&mu_);
441       started_ = true;
442       thread_ =
443           std::make_unique<std::thread>(std::bind(&ServerData::Serve, this));
444       while (!server_ready_) {
445         cond_.Wait(&mu_);
446       }
447       server_ready_ = false;
448       LOG(INFO) << "server startup complete";
449     }
450 
Servegrpc::testing::__anon64669aad0111::ClientLbEnd2endTest::ServerData451     void Serve() {
452       ServerBuilder builder;
453       builder.AddListeningPort(absl::StrCat("localhost:", port_),
454                                server_creds_);
455       builder.RegisterService(&service_);
456       builder.RegisterService(&orca_service_);
457       if (enable_noop_health_check_service_) {
458         builder.RegisterService(&noop_health_check_service_impl_);
459       }
460       grpc::ServerBuilder::experimental_type(&builder)
461           .EnableCallMetricRecording(server_metric_recorder_.get());
462       server_ = builder.BuildAndStart();
463       grpc_core::MutexLock lock(&mu_);
464       server_ready_ = true;
465       cond_.Signal();
466     }
467 
Shutdowngrpc::testing::__anon64669aad0111::ClientLbEnd2endTest::ServerData468     void Shutdown() {
469       grpc_core::MutexLock lock(&mu_);
470       if (!started_) return;
471       server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
472       thread_->join();
473       started_ = false;
474     }
475 
StopListeningAndSendGoawaysgrpc::testing::__anon64669aad0111::ClientLbEnd2endTest::ServerData476     void StopListeningAndSendGoaways() {
477       grpc_core::ExecCtx exec_ctx;
478       auto* server = grpc_core::Server::FromC(server_->c_server());
479       server->StopListening();
480       server->SendGoaways();
481     }
482 
SetServingStatusgrpc::testing::__anon64669aad0111::ClientLbEnd2endTest::ServerData483     void SetServingStatus(const std::string& service, bool serving) {
484       server_->GetHealthCheckService()->SetServingStatus(service, serving);
485     }
486   };
487 
ResetCounters()488   void ResetCounters() {
489     for (const auto& server : servers_) server->service_.ResetCounters();
490   }
491 
SeenAllServers(size_t start_index=0,size_t stop_index=0)492   bool SeenAllServers(size_t start_index = 0, size_t stop_index = 0) {
493     if (stop_index == 0) stop_index = servers_.size();
494     for (size_t i = start_index; i < stop_index; ++i) {
495       if (servers_[i]->service_.request_count() == 0) return false;
496     }
497     return true;
498   }
499 
500   // If status_check is null, all RPCs must succeed.
501   // If status_check is non-null, it will be called for all non-OK RPCs.
WaitForServers(const grpc_core::DebugLocation & location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,size_t start_index=0,size_t stop_index=0,absl::AnyInvocable<void (const Status &)> status_check=nullptr,absl::Duration timeout=absl::Seconds (30))502   void WaitForServers(
503       const grpc_core::DebugLocation& location,
504       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
505       size_t start_index = 0, size_t stop_index = 0,
506       absl::AnyInvocable<void(const Status&)> status_check = nullptr,
507       absl::Duration timeout = absl::Seconds(30)) {
508     if (stop_index == 0) stop_index = servers_.size();
509     auto deadline = absl::Now() + (timeout * grpc_test_slowdown_factor());
510     LOG(INFO) << "========= WAITING FOR BACKENDS [" << start_index << ", "
511               << stop_index << ") ==========";
512     while (!SeenAllServers(start_index, stop_index)) {
513       Status status = SendRpc(stub);
514       if (status_check != nullptr) {
515         if (!status.ok()) status_check(status);
516       } else {
517         EXPECT_TRUE(status.ok())
518             << " code=" << status.error_code() << " message=\""
519             << status.error_message() << "\" at " << location.file() << ":"
520             << location.line();
521       }
522       EXPECT_LE(absl::Now(), deadline)
523           << " at " << location.file() << ":" << location.line();
524       if (absl::Now() >= deadline) break;
525     }
526     ResetCounters();
527   }
528 
WaitForServer(const grpc_core::DebugLocation & location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,size_t server_index,absl::AnyInvocable<void (const Status &)> status_check=nullptr)529   void WaitForServer(
530       const grpc_core::DebugLocation& location,
531       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
532       size_t server_index,
533       absl::AnyInvocable<void(const Status&)> status_check = nullptr) {
534     WaitForServers(location, stub, server_index, server_index + 1,
535                    std::move(status_check));
536   }
537 
WaitForChannelState(Channel * channel,absl::AnyInvocable<bool (grpc_connectivity_state)> predicate,bool try_to_connect=false,int timeout_seconds=5)538   bool WaitForChannelState(
539       Channel* channel,
540       absl::AnyInvocable<bool(grpc_connectivity_state)> predicate,
541       bool try_to_connect = false, int timeout_seconds = 5) {
542     const gpr_timespec deadline =
543         grpc_timeout_seconds_to_deadline(timeout_seconds);
544     while (true) {
545       grpc_connectivity_state state = channel->GetState(try_to_connect);
546       if (predicate(state)) break;
547       if (!channel->WaitForStateChange(state, deadline)) return false;
548     }
549     return true;
550   }
551 
WaitForChannelNotReady(Channel * channel,int timeout_seconds=5)552   bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
553     auto predicate = [](grpc_connectivity_state state) {
554       return state != GRPC_CHANNEL_READY;
555     };
556     return WaitForChannelState(channel, predicate, false, timeout_seconds);
557   }
558 
WaitForChannelReady(Channel * channel,int timeout_seconds=5)559   bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) {
560     auto predicate = [](grpc_connectivity_state state) {
561       return state == GRPC_CHANNEL_READY;
562     };
563     return WaitForChannelState(channel, predicate, true, timeout_seconds);
564   }
565 
566   // Updates \a connection_order by appending to it the index of the newly
567   // connected server. Must be called after every single RPC.
UpdateConnectionOrder(const std::vector<std::unique_ptr<ServerData>> & servers,std::vector<int> * connection_order)568   void UpdateConnectionOrder(
569       const std::vector<std::unique_ptr<ServerData>>& servers,
570       std::vector<int>* connection_order) {
571     for (size_t i = 0; i < servers.size(); ++i) {
572       if (servers[i]->service_.request_count() == 1) {
573         // Was the server index known? If not, update connection_order.
574         const auto it =
575             std::find(connection_order->begin(), connection_order->end(), i);
576         if (it == connection_order->end()) {
577           connection_order->push_back(i);
578           return;
579         }
580       }
581     }
582   }
583 
EnableNoopHealthCheckService()584   void EnableNoopHealthCheckService() {
585     for (auto& server : servers_) {
586       server->enable_noop_health_check_service_ = true;
587     }
588   }
589 
MakeConnectionFailureRegex(absl::string_view prefix)590   static std::string MakeConnectionFailureRegex(absl::string_view prefix) {
591     return absl::StrCat(
592         prefix,
593         "; last error: (UNKNOWN|UNAVAILABLE): "
594         // IP address
595         "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
596         // Prefixes added for context
597         "(Failed to connect to remote host: )?"
598         "(Timeout occurred: )?"
599         // Syscall
600         "((connect|sendmsg|recvmsg|getsockopt\\(SO\\_ERROR\\)): ?)?"
601         // strerror() output or other message
602         "(Connection refused"
603         "|Connection reset by peer"
604         "|Socket closed"
605         "|FD shutdown)"
606         // errno value
607         "( \\([0-9]+\\))?");
608   }
609 
610   std::vector<std::unique_ptr<ServerData>> servers_;
611 };
612 
TEST_F(ClientLbEnd2endTest,ChannelStateConnectingWhenResolving)613 TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
614   const int kNumServers = 3;
615   StartServers(kNumServers);
616   FakeResolverResponseGeneratorWrapper response_generator;
617   auto channel = BuildChannel("", response_generator);
618   auto stub = BuildStub(channel);
619   // Initial state should be IDLE.
620   EXPECT_EQ(channel->GetState(false /* try_to_connect */), GRPC_CHANNEL_IDLE);
621   // Tell the channel to try to connect.
622   // Note that this call also returns IDLE, since the state change has
623   // not yet occurred; it just gets triggered by this call.
624   EXPECT_EQ(channel->GetState(true /* try_to_connect */), GRPC_CHANNEL_IDLE);
625   // Now that the channel is trying to connect, we should get to state
626   // CONNECTING.
627   ASSERT_TRUE(
628       WaitForChannelState(channel.get(), [&](grpc_connectivity_state state) {
629         if (state == GRPC_CHANNEL_IDLE) return false;
630         EXPECT_EQ(state, GRPC_CHANNEL_CONNECTING);
631         return true;
632       }));
633   // Return a resolver result, which allows the connection attempt to proceed.
634   response_generator.SetNextResolution(GetServersPorts());
635   // We should eventually transition into state READY.
636   EXPECT_TRUE(WaitForChannelReady(channel.get()));
637 }
638 
TEST_F(ClientLbEnd2endTest,ChannelIdleness)639 TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
640   // Start server.
641   const int kNumServers = 1;
642   StartServers(kNumServers);
643   // Set max idle time and build the channel.
644   ChannelArguments args;
645   args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS,
646               1000 * grpc_test_slowdown_factor());
647   FakeResolverResponseGeneratorWrapper response_generator;
648   auto channel = BuildChannel("", response_generator, args);
649   auto stub = BuildStub(channel);
650   // The initial channel state should be IDLE.
651   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
652   // After sending RPC, channel state should be READY.
653   LOG(INFO) << "*** SENDING RPC, CHANNEL SHOULD CONNECT ***";
654   response_generator.SetNextResolution(GetServersPorts());
655   CheckRpcSendOk(DEBUG_LOCATION, stub);
656   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
657   // After a period time not using the channel, the channel state should switch
658   // to IDLE.
659   LOG(INFO) << "*** WAITING FOR CHANNEL TO GO IDLE ***";
660   gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200));
661   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
662   // Sending a new RPC should awake the IDLE channel.
663   LOG(INFO) << "*** SENDING ANOTHER RPC, CHANNEL SHOULD RECONNECT ***";
664   response_generator.SetNextResolution(GetServersPorts());
665   CheckRpcSendOk(DEBUG_LOCATION, stub);
666   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
667 }
668 
669 //
670 // authority override tests
671 //
672 
673 class AuthorityOverrideTest : public ClientLbEnd2endTest {
674  protected:
SetUpTestSuite()675   static void SetUpTestSuite() {
676     grpc_core::CoreConfiguration::Reset();
677     grpc_core::CoreConfiguration::RegisterBuilder(
678         [](grpc_core::CoreConfiguration::Builder* builder) {
679           grpc_core::RegisterAuthorityOverrideLoadBalancingPolicy(builder);
680         });
681     grpc_init();
682   }
683 
TearDownTestSuite()684   static void TearDownTestSuite() {
685     grpc_shutdown();
686     grpc_core::CoreConfiguration::Reset();
687   }
688 };
689 
TEST_F(AuthorityOverrideTest,NoOverride)690 TEST_F(AuthorityOverrideTest, NoOverride) {
691   StartServers(1);
692   FakeResolverResponseGeneratorWrapper response_generator;
693   auto channel = BuildChannel("", response_generator);
694   auto stub = BuildStub(channel);
695   response_generator.SetNextResolution(GetServersPorts());
696   // Send an RPC.
697   EchoRequest request;
698   request.mutable_param()->set_echo_host_from_authority_header(true);
699   EchoResponse response;
700   Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
701                           /*wait_for_ready=*/false, &request);
702   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
703                            << " message=" << status.error_message();
704   // Check that the right authority was seen by the server.
705   EXPECT_EQ(kDefaultAuthority, response.param().host());
706 }
707 
TEST_F(AuthorityOverrideTest,OverrideFromResolver)708 TEST_F(AuthorityOverrideTest, OverrideFromResolver) {
709   StartServers(1);
710   FakeResolverResponseGeneratorWrapper response_generator;
711   auto channel = BuildChannel("", response_generator);
712   auto stub = BuildStub(channel);
713   // Inject resolver result that sets the per-address authority to a
714   // different value.
715   response_generator.SetNextResolution(
716       GetServersPorts(), /*service_config_json=*/nullptr,
717       grpc_core::ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY,
718                                    "from-resolver.example.com"));
719   // Send an RPC.
720   EchoRequest request;
721   request.mutable_param()->set_echo_host_from_authority_header(true);
722   EchoResponse response;
723   Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
724                           /*wait_for_ready=*/false, &request);
725   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
726                            << " message=" << status.error_message();
727   // Check that the right authority was seen by the server.
728   EXPECT_EQ("from-resolver.example.com", response.param().host());
729 }
730 
TEST_F(AuthorityOverrideTest,OverrideOnChannel)731 TEST_F(AuthorityOverrideTest, OverrideOnChannel) {
732   StartServers(1);
733   // Set authority via channel arg.
734   FakeResolverResponseGeneratorWrapper response_generator;
735   ChannelArguments args;
736   args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "from-channel.example.com");
737   auto channel = BuildChannel("", response_generator, args);
738   auto stub = BuildStub(channel);
739   response_generator.SetNextResolution(GetServersPorts());
740   // Send an RPC.
741   EchoRequest request;
742   request.mutable_param()->set_echo_host_from_authority_header(true);
743   EchoResponse response;
744   Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
745                           /*wait_for_ready=*/false, &request);
746   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
747                            << " message=" << status.error_message();
748   // Check that the right authority was seen by the server.
749   EXPECT_EQ("from-channel.example.com", response.param().host());
750 }
751 
TEST_F(AuthorityOverrideTest,OverrideFromLbPolicy)752 TEST_F(AuthorityOverrideTest, OverrideFromLbPolicy) {
753   // We use InsecureCreds here to avoid the authority check in the fake
754   // security connector.
755   StartServers(1, {}, InsecureServerCredentials());
756   FakeResolverResponseGeneratorWrapper response_generator;
757   ChannelArguments args;
758   args.SetString(GRPC_ARG_TEST_LB_AUTHORITY_OVERRIDE, "from-lb.example.com");
759   auto channel = BuildChannel("authority_override_lb", response_generator, args,
760                               InsecureChannelCredentials());
761   auto stub = BuildStub(channel);
762   response_generator.SetNextResolution(GetServersPorts());
763   // Send an RPC.
764   EchoRequest request;
765   request.mutable_param()->set_echo_host_from_authority_header(true);
766   EchoResponse response;
767   Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
768                           /*wait_for_ready=*/false, &request);
769   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
770                            << " message=" << status.error_message();
771   // Check that the right authority was seen by the server.
772   EXPECT_EQ("from-lb.example.com", response.param().host());
773 }
774 
TEST_F(AuthorityOverrideTest,PerRpcOverride)775 TEST_F(AuthorityOverrideTest, PerRpcOverride) {
776   // We use InsecureCreds here to avoid the authority check in the fake
777   // security connector.
778   StartServers(1, {}, InsecureServerCredentials());
779   FakeResolverResponseGeneratorWrapper response_generator;
780   auto channel = BuildChannel("", response_generator, ChannelArguments(),
781                               InsecureChannelCredentials());
782   auto stub = BuildStub(channel);
783   response_generator.SetNextResolution(GetServersPorts());
784   // Send an RPC.
785   EchoRequest request;
786   request.mutable_param()->set_echo_host_from_authority_header(true);
787   EchoResponse response;
788   Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
789                           /*wait_for_ready=*/false, &request,
790                           /*authority_override=*/"per-rpc.example.com");
791   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
792                            << " message=" << status.error_message();
793   // Check that the right authority was seen by the server.
794   EXPECT_EQ("per-rpc.example.com", response.param().host());
795 }
796 
TEST_F(AuthorityOverrideTest,ChannelOverrideTakesPrecedenceOverResolverOverride)797 TEST_F(AuthorityOverrideTest,
798        ChannelOverrideTakesPrecedenceOverResolverOverride) {
799   StartServers(1);
800   // Set authority via channel arg.
801   FakeResolverResponseGeneratorWrapper response_generator;
802   ChannelArguments args;
803   args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "from-channel.example.com");
804   auto channel = BuildChannel("", response_generator, args);
805   auto stub = BuildStub(channel);
806   // Inject resolver result that sets the per-address authority to a
807   // different value.
808   response_generator.SetNextResolution(
809       GetServersPorts(), /*service_config_json=*/nullptr,
810       grpc_core::ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY,
811                                    "from-resolver.example.com"));
812   // Send an RPC.
813   EchoRequest request;
814   request.mutable_param()->set_echo_host_from_authority_header(true);
815   EchoResponse response;
816   Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
817                           /*wait_for_ready=*/false, &request);
818   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
819                            << " message=" << status.error_message();
820   // Check that the right authority was seen by the server.
821   EXPECT_EQ("from-channel.example.com", response.param().host());
822 }
823 
TEST_F(AuthorityOverrideTest,LbPolicyOverrideTakesPrecedenceOverChannelOverride)824 TEST_F(AuthorityOverrideTest,
825        LbPolicyOverrideTakesPrecedenceOverChannelOverride) {
826   // We use InsecureCreds here to avoid the authority check in the fake
827   // security connector.
828   StartServers(1, {}, InsecureServerCredentials());
829   FakeResolverResponseGeneratorWrapper response_generator;
830   ChannelArguments args;
831   args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "from-channel.example.com");
832   args.SetString(GRPC_ARG_TEST_LB_AUTHORITY_OVERRIDE, "from-lb.example.com");
833   auto channel = BuildChannel("authority_override_lb", response_generator, args,
834                               InsecureChannelCredentials());
835   auto stub = BuildStub(channel);
836   response_generator.SetNextResolution(GetServersPorts());
837   // Send an RPC.
838   EchoRequest request;
839   request.mutable_param()->set_echo_host_from_authority_header(true);
840   EchoResponse response;
841   Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
842                           /*wait_for_ready=*/false, &request);
843   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
844                            << " message=" << status.error_message();
845   // Check that the right authority was seen by the server.
846   EXPECT_EQ("from-lb.example.com", response.param().host());
847 }
848 
TEST_F(AuthorityOverrideTest,PerRpcOverrideTakesPrecedenceOverLbPolicyOverride)849 TEST_F(AuthorityOverrideTest,
850        PerRpcOverrideTakesPrecedenceOverLbPolicyOverride) {
851   // We use InsecureCreds here to avoid the authority check in the fake
852   // security connector.
853   StartServers(1, {}, InsecureServerCredentials());
854   FakeResolverResponseGeneratorWrapper response_generator;
855   ChannelArguments args;
856   args.SetString(GRPC_ARG_TEST_LB_AUTHORITY_OVERRIDE, "from-lb.example.com");
857   auto channel = BuildChannel("authority_override_lb", response_generator, args,
858                               InsecureChannelCredentials());
859   auto stub = BuildStub(channel);
860   response_generator.SetNextResolution(GetServersPorts());
861   // Send an RPC.
862   EchoRequest request;
863   request.mutable_param()->set_echo_host_from_authority_header(true);
864   EchoResponse response;
865   Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
866                           /*wait_for_ready=*/false, &request,
867                           /*authority_override=*/"per-rpc.example.com");
868   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
869                            << " message=" << status.error_message();
870   // Check that the right authority was seen by the server.
871   EXPECT_EQ("per-rpc.example.com", response.param().host());
872 }
873 
874 //
875 // pick_first tests
876 //
877 
878 using PickFirstTest = ClientLbEnd2endTest;
879 
TEST_F(PickFirstTest,Basic)880 TEST_F(PickFirstTest, Basic) {
881   // Start servers and send one RPC per server.
882   const int kNumServers = 3;
883   StartServers(kNumServers);
884   FakeResolverResponseGeneratorWrapper response_generator;
885   auto channel = BuildChannel(
886       "", response_generator);  // test that pick first is the default.
887   auto stub = BuildStub(channel);
888   response_generator.SetNextResolution(GetServersPorts());
889   for (size_t i = 0; i < servers_.size(); ++i) {
890     CheckRpcSendOk(DEBUG_LOCATION, stub);
891   }
892   // All requests should have gone to a single server.
893   bool found = false;
894   for (size_t i = 0; i < servers_.size(); ++i) {
895     const int request_count = servers_[i]->service_.request_count();
896     if (request_count == kNumServers) {
897       found = true;
898     } else {
899       EXPECT_EQ(0, request_count);
900     }
901   }
902   EXPECT_TRUE(found);
903   // Check LB policy name for the channel.
904   EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
905 }
906 
TEST_F(PickFirstTest,ProcessPending)907 TEST_F(PickFirstTest, ProcessPending) {
908   StartServers(1);  // Single server
909   FakeResolverResponseGeneratorWrapper response_generator;
910   auto channel = BuildChannel(
911       "", response_generator);  // test that pick first is the default.
912   auto stub = BuildStub(channel);
913   response_generator.SetNextResolution({servers_[0]->port_});
914   WaitForServer(DEBUG_LOCATION, stub, 0);
915   // Create a new channel and its corresponding PF LB policy, which will pick
916   // the subchannels in READY state from the previous RPC against the same
917   // target (even if it happened over a different channel, because subchannels
918   // are globally reused). Progress should happen without any transition from
919   // this READY state.
920   FakeResolverResponseGeneratorWrapper second_response_generator;
921   auto second_channel = BuildChannel("", second_response_generator);
922   auto second_stub = BuildStub(second_channel);
923   second_response_generator.SetNextResolution({servers_[0]->port_});
924   CheckRpcSendOk(DEBUG_LOCATION, second_stub);
925 }
926 
TEST_F(PickFirstTest,SelectsReadyAtStartup)927 TEST_F(PickFirstTest, SelectsReadyAtStartup) {
928   ChannelArguments args;
929   constexpr int kInitialBackOffMs = 5000;
930   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
931               kInitialBackOffMs * grpc_test_slowdown_factor());
932   // Create 2 servers, but start only the second one.
933   std::vector<int> ports = {grpc_pick_unused_port_or_die(),
934                             grpc_pick_unused_port_or_die()};
935   CreateServers(2, ports);
936   StartServer(1);
937   FakeResolverResponseGeneratorWrapper response_generator1;
938   auto channel1 = BuildChannel("pick_first", response_generator1, args);
939   auto stub1 = BuildStub(channel1);
940   response_generator1.SetNextResolution(ports);
941   // Wait for second server to be ready.
942   WaitForServer(DEBUG_LOCATION, stub1, 1);
943   // Create a second channel with the same addresses.  Its PF instance
944   // should immediately pick the second subchannel, since it's already
945   // in READY state.
946   FakeResolverResponseGeneratorWrapper response_generator2;
947   auto channel2 = BuildChannel("pick_first", response_generator2, args);
948   response_generator2.SetNextResolution(ports);
949   // Check that the channel reports READY without waiting for the
950   // initial backoff.
951   EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */));
952 }
953 
TEST_F(PickFirstTest,BackOffInitialReconnect)954 TEST_F(PickFirstTest, BackOffInitialReconnect) {
955   StartServers(1);
956   ChannelArguments args;
957   constexpr int kInitialBackOffMs = 100;
958   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
959               kInitialBackOffMs * grpc_test_slowdown_factor());
960   FakeResolverResponseGeneratorWrapper response_generator;
961   auto channel = BuildChannel("pick_first", response_generator, args);
962   auto stub = BuildStub(channel);
963   response_generator.SetNextResolution({servers_[0]->port_});
964   // Intercept the first two connection attempts.
965   ConnectionAttemptInjector injector;
966   auto hold1 = injector.AddHold(servers_[0]->port_);
967   auto hold2 = injector.AddHold(servers_[0]->port_);
968   // Start trying to connect.
969   EXPECT_EQ(channel->GetState(/*try_to_connect=*/true), GRPC_CHANNEL_IDLE);
970   // When the first connection attempt starts, record the time, then fail the
971   // attempt.
972   hold1->Wait();
973   const grpc_core::Timestamp first_attempt_time = grpc_core::Timestamp::Now();
974   hold1->Fail(absl::UnavailableError("nope"));
975   // Wait for the second attempt and see how long it took.
976   hold2->Wait();
977   const grpc_core::Duration waited =
978       grpc_core::Timestamp::Now() - first_attempt_time;
979   // The channel will transition to TRANSIENT_FAILURE.
980   EXPECT_TRUE(
981       WaitForChannelState(channel.get(), [&](grpc_connectivity_state state) {
982         if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) return true;
983         EXPECT_THAT(state, ::testing::AnyOf(GRPC_CHANNEL_IDLE,
984                                             GRPC_CHANNEL_CONNECTING));
985         return false;
986       }));
987   // Now let the second attempt complete.
988   hold2->Resume();
989   // The channel will transition to READY.
990   EXPECT_TRUE(WaitForChannelReady(channel.get()));
991   // Check how long it took.
992   VLOG(2) << "Waited " << waited.millis() << " milliseconds";
993   // We should have waited at least kInitialBackOffMs, plus or minus
994   // jitter.  Jitter is 0.2, but we give extra leeway to account for
995   // measurement skew, thread hops, etc.
996   EXPECT_GE(waited.millis(),
997             (kInitialBackOffMs * grpc_test_slowdown_factor()) * 0.7);
998   EXPECT_LE(waited.millis(),
999             (kInitialBackOffMs * grpc_test_slowdown_factor()) * 1.3);
1000 }
1001 
TEST_F(PickFirstTest,BackOffMinReconnect)1002 TEST_F(PickFirstTest, BackOffMinReconnect) {
1003   ChannelArguments args;
1004   constexpr int kMinReconnectBackOffMs = 1000;
1005   args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS,
1006               kMinReconnectBackOffMs * grpc_test_slowdown_factor());
1007   const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
1008   FakeResolverResponseGeneratorWrapper response_generator;
1009   auto channel = BuildChannel("pick_first", response_generator, args);
1010   auto stub = BuildStub(channel);
1011   response_generator.SetNextResolution(ports);
1012   // Make connection delay a 10% longer than it's willing to in order to make
1013   // sure we are hitting the codepath that waits for the min reconnect backoff.
1014   ConnectionAttemptInjector injector;
1015   injector.SetDelay(grpc_core::Duration::Milliseconds(
1016       kMinReconnectBackOffMs * grpc_test_slowdown_factor() * 1.10));
1017   const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
1018   channel->WaitForConnected(
1019       grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2));
1020   const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
1021   const grpc_core::Duration waited =
1022       grpc_core::Duration::FromTimespec(gpr_time_sub(t1, t0));
1023   VLOG(2) << "Waited " << waited.millis() << " milliseconds";
1024   // We should have waited at least kMinReconnectBackOffMs. We subtract one to
1025   // account for test and precision accuracy drift.
1026   EXPECT_GE(waited.millis(),
1027             (kMinReconnectBackOffMs * grpc_test_slowdown_factor()) - 1);
1028 }
1029 
TEST_F(PickFirstTest,ResetConnectionBackoff)1030 TEST_F(PickFirstTest, ResetConnectionBackoff) {
1031   ChannelArguments args;
1032   constexpr int kInitialBackOffMs = 1000;
1033   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
1034               kInitialBackOffMs * grpc_test_slowdown_factor());
1035   const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
1036   FakeResolverResponseGeneratorWrapper response_generator;
1037   auto channel = BuildChannel("pick_first", response_generator, args);
1038   auto stub = BuildStub(channel);
1039   response_generator.SetNextResolution(ports);
1040   // The channel won't become connected (there's no server).
1041   EXPECT_FALSE(
1042       channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
1043   // Bring up a server on the chosen port.
1044   StartServers(1, ports);
1045   const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
1046   // Wait for connect, but not long enough.  This proves that we're
1047   // being throttled by initial backoff.
1048   EXPECT_FALSE(
1049       channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
1050   // Reset connection backoff.
1051   experimental::ChannelResetConnectionBackoff(channel.get());
1052   // Wait for connect.  Should happen as soon as the client connects to
1053   // the newly started server, which should be before the initial
1054   // backoff timeout elapses.
1055   EXPECT_TRUE(channel->WaitForConnected(
1056       grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs)));
1057   const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
1058   const grpc_core::Duration waited =
1059       grpc_core::Duration::FromTimespec(gpr_time_sub(t1, t0));
1060   VLOG(2) << "Waited " << waited.millis() << " milliseconds";
1061   // We should have waited less than kInitialBackOffMs.
1062   EXPECT_LT(waited.millis(), kInitialBackOffMs * grpc_test_slowdown_factor());
1063 }
1064 
TEST_F(ClientLbEnd2endTest,ResetConnectionBackoffNextAttemptStartsImmediately)1065 TEST_F(ClientLbEnd2endTest,
1066        ResetConnectionBackoffNextAttemptStartsImmediately) {
1067   // Start connection injector.
1068   ConnectionAttemptInjector injector;
1069   // Create client.
1070   const int port = grpc_pick_unused_port_or_die();
1071   ChannelArguments args;
1072   const int kInitialBackOffMs = 5000;
1073   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
1074               kInitialBackOffMs * grpc_test_slowdown_factor());
1075   FakeResolverResponseGeneratorWrapper response_generator;
1076   auto channel = BuildChannel("pick_first", response_generator, args);
1077   auto stub = BuildStub(channel);
1078   response_generator.SetNextResolution({port});
1079   // Intercept initial connection attempt.
1080   auto hold1 = injector.AddHold(port);
1081   LOG(INFO) << "=== TRIGGERING INITIAL CONNECTION ATTEMPT";
1082   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(/*try_to_connect=*/true));
1083   hold1->Wait();
1084   EXPECT_EQ(GRPC_CHANNEL_CONNECTING,
1085             channel->GetState(/*try_to_connect=*/false));
1086   // Reset backoff.
1087   LOG(INFO) << "=== RESETTING BACKOFF";
1088   experimental::ChannelResetConnectionBackoff(channel.get());
1089   // Intercept next attempt.  Do this before resuming the first attempt,
1090   // just in case the client makes progress faster than this thread.
1091   auto hold2 = injector.AddHold(port);
1092   // Fail current attempt and wait for next one to start.
1093   LOG(INFO) << "=== RESUMING INITIAL ATTEMPT";
1094   const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
1095   hold1->Resume();
1096   LOG(INFO) << "=== WAITING FOR SECOND ATTEMPT";
1097   // This WaitForStateChange() call just makes sure we're doing some polling.
1098   EXPECT_TRUE(channel->WaitForStateChange(GRPC_CHANNEL_CONNECTING,
1099                                           grpc_timeout_seconds_to_deadline(1)));
1100   hold2->Wait();
1101   const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
1102   LOG(INFO) << "=== RESUMING SECOND ATTEMPT";
1103   hold2->Resume();
1104   // Elapsed time should be very short, much less than kInitialBackOffMs.
1105   const grpc_core::Duration waited =
1106       grpc_core::Duration::FromTimespec(gpr_time_sub(t1, t0));
1107   VLOG(2) << "Waited " << waited.millis() << " milliseconds";
1108   EXPECT_LT(waited.millis(), 1000 * grpc_test_slowdown_factor());
1109 }
1110 
TEST_F(PickFirstTest,Updates)1111 TEST_F(PickFirstTest, Updates) {
1112   // Start servers and send one RPC per server.
1113   const int kNumServers = 3;
1114   StartServers(kNumServers);
1115   FakeResolverResponseGeneratorWrapper response_generator;
1116   auto channel = BuildChannel("pick_first", response_generator);
1117   auto stub = BuildStub(channel);
1118   // Perform one RPC against the first server.
1119   response_generator.SetNextResolution(GetServersPorts(0, 1));
1120   LOG(INFO) << "****** SET [0] *******";
1121   CheckRpcSendOk(DEBUG_LOCATION, stub);
1122   EXPECT_EQ(servers_[0]->service_.request_count(), 1);
1123   // An empty update will result in the channel going into TRANSIENT_FAILURE.
1124   response_generator.SetNextResolution({});
1125   LOG(INFO) << "****** SET none *******";
1126   WaitForChannelNotReady(channel.get());
1127   // Next update introduces servers_[1], making the channel recover.
1128   response_generator.SetNextResolution(GetServersPorts(1, 2));
1129   LOG(INFO) << "****** SET [1] *******";
1130   WaitForChannelReady(channel.get());
1131   WaitForServer(DEBUG_LOCATION, stub, 1);
1132   // And again for servers_[2]
1133   response_generator.SetNextResolution(GetServersPorts(2, 3));
1134   LOG(INFO) << "****** SET [2] *******";
1135   WaitForServer(DEBUG_LOCATION, stub, 2);
1136   // Check LB policy name for the channel.
1137   EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
1138 }
1139 
TEST_F(PickFirstTest,UpdateSuperset)1140 TEST_F(PickFirstTest, UpdateSuperset) {
1141   // Start servers and send one RPC per server.
1142   const int kNumServers = 3;
1143   StartServers(kNumServers);
1144   FakeResolverResponseGeneratorWrapper response_generator;
1145   auto channel = BuildChannel("pick_first", response_generator);
1146   auto stub = BuildStub(channel);
1147 
1148   std::vector<int> ports;
1149 
1150   // Perform one RPC against the first server.
1151   ports.emplace_back(servers_[0]->port_);
1152   response_generator.SetNextResolution(ports);
1153   LOG(INFO) << "****** SET [0] *******";
1154   CheckRpcSendOk(DEBUG_LOCATION, stub);
1155   EXPECT_EQ(servers_[0]->service_.request_count(), 1);
1156   servers_[0]->service_.ResetCounters();
1157 
1158   // Send and superset update
1159   ports.clear();
1160   ports.emplace_back(servers_[1]->port_);
1161   ports.emplace_back(servers_[0]->port_);
1162   response_generator.SetNextResolution(ports);
1163   LOG(INFO) << "****** SET superset *******";
1164   CheckRpcSendOk(DEBUG_LOCATION, stub);
1165   // We stick to the previously connected server.
1166   WaitForServer(DEBUG_LOCATION, stub, 0);
1167   EXPECT_EQ(0, servers_[1]->service_.request_count());
1168 
1169   // Check LB policy name for the channel.
1170   EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
1171 }
1172 
TEST_F(PickFirstTest,UpdateToUnconnected)1173 TEST_F(PickFirstTest, UpdateToUnconnected) {
1174   const int kNumServers = 2;
1175   CreateServers(kNumServers);
1176   StartServer(0);
1177   FakeResolverResponseGeneratorWrapper response_generator;
1178   auto channel = BuildChannel("pick_first", response_generator);
1179   auto stub = BuildStub(channel);
1180 
1181   std::vector<int> ports;
1182 
1183   // Try to send rpcs against a list where the server is available.
1184   ports.emplace_back(servers_[0]->port_);
1185   response_generator.SetNextResolution(ports);
1186   LOG(INFO) << "****** SET [0] *******";
1187   CheckRpcSendOk(DEBUG_LOCATION, stub);
1188 
1189   // Send resolution for which all servers are currently unavailable. Eventually
1190   // this triggers replacing the existing working subchannel_list with the new
1191   // currently unresponsive list.
1192   ports.clear();
1193   ports.emplace_back(grpc_pick_unused_port_or_die());
1194   ports.emplace_back(servers_[1]->port_);
1195   response_generator.SetNextResolution(ports);
1196   LOG(INFO) << "****** SET [unavailable] *******";
1197   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1198 
1199   // Ensure that the last resolution was installed correctly by verifying that
1200   // the channel becomes ready once one of if its endpoints becomes available.
1201   LOG(INFO) << "****** StartServer(1) *******";
1202   StartServer(1);
1203   EXPECT_TRUE(WaitForChannelReady(channel.get()));
1204 }
1205 
TEST_F(PickFirstTest,GlobalSubchannelPool)1206 TEST_F(PickFirstTest, GlobalSubchannelPool) {
1207   // Start one server.
1208   const int kNumServers = 1;
1209   StartServers(kNumServers);
1210   std::vector<int> ports = GetServersPorts();
1211   // Create two channels that (by default) use the global subchannel pool.
1212   // Use the same channel creds for both, so that they have the same
1213   // subchannel keys.
1214   auto channel_creds =
1215       std::make_shared<FakeTransportSecurityChannelCredentials>();
1216   FakeResolverResponseGeneratorWrapper response_generator1;
1217   auto channel1 = BuildChannel("pick_first", response_generator1,
1218                                ChannelArguments(), channel_creds);
1219   auto stub1 = BuildStub(channel1);
1220   response_generator1.SetNextResolution(ports);
1221   FakeResolverResponseGeneratorWrapper response_generator2;
1222   auto channel2 = BuildChannel("pick_first", response_generator2,
1223                                ChannelArguments(), channel_creds);
1224   auto stub2 = BuildStub(channel2);
1225   response_generator2.SetNextResolution(ports);
1226   WaitForServer(DEBUG_LOCATION, stub1, 0);
1227   // Send one RPC on each channel.
1228   CheckRpcSendOk(DEBUG_LOCATION, stub1);
1229   CheckRpcSendOk(DEBUG_LOCATION, stub2);
1230   // The server receives two requests.
1231   EXPECT_EQ(2, servers_[0]->service_.request_count());
1232   // The two requests are from the same client port, because the two channels
1233   // share subchannels via the global subchannel pool.
1234   EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1235 }
1236 
TEST_F(PickFirstTest,LocalSubchannelPool)1237 TEST_F(PickFirstTest, LocalSubchannelPool) {
1238   // Start one server.
1239   const int kNumServers = 1;
1240   StartServers(kNumServers);
1241   std::vector<int> ports = GetServersPorts();
1242   // Create two channels that use local subchannel pool.
1243   ChannelArguments args;
1244   args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
1245   FakeResolverResponseGeneratorWrapper response_generator1;
1246   auto channel1 = BuildChannel("pick_first", response_generator1, args);
1247   auto stub1 = BuildStub(channel1);
1248   response_generator1.SetNextResolution(ports);
1249   FakeResolverResponseGeneratorWrapper response_generator2;
1250   auto channel2 = BuildChannel("pick_first", response_generator2, args);
1251   auto stub2 = BuildStub(channel2);
1252   response_generator2.SetNextResolution(ports);
1253   WaitForServer(DEBUG_LOCATION, stub1, 0);
1254   // Send one RPC on each channel.
1255   CheckRpcSendOk(DEBUG_LOCATION, stub1);
1256   CheckRpcSendOk(DEBUG_LOCATION, stub2);
1257   // The server receives two requests.
1258   EXPECT_EQ(2, servers_[0]->service_.request_count());
1259   // The two requests are from two client ports, because the two channels didn't
1260   // share subchannels with each other.
1261   EXPECT_EQ(2UL, servers_[0]->service_.clients().size());
1262 }
1263 
TEST_F(PickFirstTest,ManyUpdates)1264 TEST_F(PickFirstTest, ManyUpdates) {
1265   const int kNumUpdates = 1000;
1266   const int kNumServers = 3;
1267   StartServers(kNumServers);
1268   FakeResolverResponseGeneratorWrapper response_generator;
1269   auto channel = BuildChannel("pick_first", response_generator);
1270   auto stub = BuildStub(channel);
1271   std::vector<int> ports = GetServersPorts();
1272   for (size_t i = 0; i < kNumUpdates; ++i) {
1273     std::shuffle(ports.begin(), ports.end(),
1274                  std::mt19937(std::random_device()()));
1275     response_generator.SetNextResolution(ports);
1276     // We should re-enter core at the end of the loop to give the resolution
1277     // setting closure a chance to run.
1278     if ((i + 1) % 10 == 0) CheckRpcSendOk(DEBUG_LOCATION, stub);
1279   }
1280   // Check LB policy name for the channel.
1281   EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
1282 }
1283 
TEST_F(PickFirstTest,ReresolutionNoSelected)1284 TEST_F(PickFirstTest, ReresolutionNoSelected) {
1285   // Prepare the ports for up servers and down servers.
1286   const int kNumServers = 3;
1287   const int kNumAliveServers = 1;
1288   StartServers(kNumAliveServers);
1289   std::vector<int> alive_ports, dead_ports;
1290   for (size_t i = 0; i < kNumServers; ++i) {
1291     if (i < kNumAliveServers) {
1292       alive_ports.emplace_back(servers_[i]->port_);
1293     } else {
1294       dead_ports.emplace_back(grpc_pick_unused_port_or_die());
1295     }
1296   }
1297   FakeResolverResponseGeneratorWrapper response_generator;
1298   auto channel = BuildChannel("pick_first", response_generator);
1299   auto stub = BuildStub(channel);
1300   // The initial resolution only contains dead ports. There won't be any
1301   // selected subchannel. Re-resolution will return the same result.
1302   response_generator.SetNextResolution(dead_ports);
1303   LOG(INFO) << "****** INITIAL RESOLUTION SET *******";
1304   for (size_t i = 0; i < 10; ++i) {
1305     CheckRpcSendFailure(
1306         DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1307         MakeConnectionFailureRegex("failed to connect to all addresses"));
1308   }
1309   // PF should request re-resolution.
1310   LOG(INFO) << "****** WAITING FOR RE-RESOLUTION *******";
1311   EXPECT_TRUE(response_generator.Get()->WaitForReresolutionRequest(
1312       absl::Seconds(5 * grpc_test_slowdown_factor())));
1313   LOG(INFO) << "****** RE-RESOLUTION SEEN *******";
1314   // Send a resolver result that contains reachable ports, so that the
1315   // pick_first LB policy can recover soon.
1316   response_generator.SetNextResolution(alive_ports);
1317   LOG(INFO) << "****** RE-RESOLUTION SENT *******";
1318   WaitForServer(DEBUG_LOCATION, stub, 0, [](const Status& status) {
1319     EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
1320     EXPECT_THAT(status.error_message(),
1321                 ::testing::ContainsRegex(MakeConnectionFailureRegex(
1322                     "failed to connect to all addresses")));
1323   });
1324   CheckRpcSendOk(DEBUG_LOCATION, stub);
1325   EXPECT_EQ(servers_[0]->service_.request_count(), 1);
1326   // Check LB policy name for the channel.
1327   EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
1328 }
1329 
TEST_F(PickFirstTest,ReconnectWithoutNewResolverResult)1330 TEST_F(PickFirstTest, ReconnectWithoutNewResolverResult) {
1331   std::vector<int> ports = {grpc_pick_unused_port_or_die()};
1332   StartServers(1, ports);
1333   FakeResolverResponseGeneratorWrapper response_generator;
1334   auto channel = BuildChannel("pick_first", response_generator);
1335   auto stub = BuildStub(channel);
1336   response_generator.SetNextResolution(ports);
1337   LOG(INFO) << "****** INITIAL CONNECTION *******";
1338   WaitForServer(DEBUG_LOCATION, stub, 0);
1339   LOG(INFO) << "****** STOPPING SERVER ******";
1340   servers_[0]->Shutdown();
1341   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1342   LOG(INFO) << "****** RESTARTING SERVER ******";
1343   StartServers(1, ports);
1344   WaitForServer(DEBUG_LOCATION, stub, 0);
1345 }
1346 
TEST_F(PickFirstTest,ReconnectWithoutNewResolverResultStartsFromTopOfList)1347 TEST_F(PickFirstTest, ReconnectWithoutNewResolverResultStartsFromTopOfList) {
1348   std::vector<int> ports = {grpc_pick_unused_port_or_die(),
1349                             grpc_pick_unused_port_or_die()};
1350   CreateServers(2, ports);
1351   StartServer(1);
1352   FakeResolverResponseGeneratorWrapper response_generator;
1353   auto channel = BuildChannel("pick_first", response_generator);
1354   auto stub = BuildStub(channel);
1355   response_generator.SetNextResolution(ports);
1356   LOG(INFO) << "****** INITIAL CONNECTION *******";
1357   WaitForServer(DEBUG_LOCATION, stub, 1);
1358   LOG(INFO) << "****** STOPPING SERVER ******";
1359   servers_[1]->Shutdown();
1360   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1361   LOG(INFO) << "****** STARTING BOTH SERVERS ******";
1362   StartServers(2, ports);
1363   WaitForServer(DEBUG_LOCATION, stub, 0);
1364 }
1365 
TEST_F(PickFirstTest,FailsEmptyResolverUpdate)1366 TEST_F(PickFirstTest, FailsEmptyResolverUpdate) {
1367   FakeResolverResponseGeneratorWrapper response_generator;
1368   auto channel = BuildChannel("pick_first", response_generator);
1369   auto stub = BuildStub(channel);
1370   LOG(INFO) << "****** SENDING INITIAL RESOLVER RESULT *******";
1371   // Send a resolver result with an empty address list and a callback
1372   // that triggers a notification.
1373   grpc_core::Notification notification;
1374   grpc_core::Resolver::Result result;
1375   result.addresses.emplace();
1376   result.result_health_callback = [&](absl::Status status) {
1377     LOG(INFO) << "****** RESULT HEALTH CALLBACK *******";
1378     EXPECT_EQ(absl::StatusCode::kUnavailable, status.code());
1379     EXPECT_EQ("address list must not be empty", status.message()) << status;
1380     notification.Notify();
1381   };
1382   response_generator.SetResponse(std::move(result));
1383   // Wait for channel to report TRANSIENT_FAILURE.
1384   LOG(INFO) << "****** TELLING CHANNEL TO CONNECT *******";
1385   auto predicate = [](grpc_connectivity_state state) {
1386     return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1387   };
1388   EXPECT_TRUE(
1389       WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true));
1390   // Callback should run.
1391   notification.WaitForNotification();
1392   // Return a valid address.
1393   LOG(INFO) << "****** SENDING NEXT RESOLVER RESULT *******";
1394   StartServers(1);
1395   response_generator.SetNextResolution(GetServersPorts());
1396   LOG(INFO) << "****** SENDING WAIT_FOR_READY RPC *******";
1397   CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true);
1398 }
1399 
TEST_F(PickFirstTest,CheckStateBeforeStartWatch)1400 TEST_F(PickFirstTest, CheckStateBeforeStartWatch) {
1401   std::vector<int> ports = {grpc_pick_unused_port_or_die()};
1402   StartServers(1, ports);
1403   FakeResolverResponseGeneratorWrapper response_generator;
1404   auto channel_1 = BuildChannel("pick_first", response_generator);
1405   auto stub_1 = BuildStub(channel_1);
1406   response_generator.SetNextResolution(ports);
1407   LOG(INFO) << "****** RESOLUTION SET FOR CHANNEL 1 *******";
1408   WaitForServer(DEBUG_LOCATION, stub_1, 0);
1409   LOG(INFO) << "****** CHANNEL 1 CONNECTED *******";
1410   servers_[0]->Shutdown();
1411   EXPECT_TRUE(WaitForChannelNotReady(channel_1.get()));
1412   // Channel 1 will receive a re-resolution containing the same server. It will
1413   // create a new subchannel and hold a ref to it.
1414   StartServers(1, ports);
1415   LOG(INFO) << "****** SERVER RESTARTED *******";
1416   FakeResolverResponseGeneratorWrapper response_generator_2;
1417   auto channel_2 = BuildChannel("pick_first", response_generator_2);
1418   auto stub_2 = BuildStub(channel_2);
1419   response_generator_2.SetNextResolution(ports);
1420   LOG(INFO) << "****** RESOLUTION SET FOR CHANNEL 2 *******";
1421   WaitForServer(DEBUG_LOCATION, stub_2, 0);
1422   LOG(INFO) << "****** CHANNEL 2 CONNECTED *******";
1423   servers_[0]->Shutdown();
1424   // Wait until the disconnection has triggered the connectivity notification.
1425   // Otherwise, the subchannel may be picked for next call but will fail soon.
1426   EXPECT_TRUE(WaitForChannelNotReady(channel_2.get()));
1427   // Channel 2 will also receive a re-resolution containing the same server.
1428   // Both channels will ref the same subchannel that failed.
1429   StartServers(1, ports);
1430   LOG(INFO) << "****** SERVER RESTARTED AGAIN *******";
1431   LOG(INFO) << "****** CHANNEL 2 STARTING A CALL *******";
1432   // The first call after the server restart will succeed.
1433   CheckRpcSendOk(DEBUG_LOCATION, stub_2);
1434   LOG(INFO) << "****** CHANNEL 2 FINISHED A CALL *******";
1435   // Check LB policy name for the channel.
1436   EXPECT_EQ("pick_first", channel_1->GetLoadBalancingPolicyName());
1437   // Check LB policy name for the channel.
1438   EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName());
1439 }
1440 
TEST_F(PickFirstTest,IdleOnDisconnect)1441 TEST_F(PickFirstTest, IdleOnDisconnect) {
1442   // Start server, send RPC, and make sure channel is READY.
1443   const int kNumServers = 1;
1444   StartServers(kNumServers);
1445   FakeResolverResponseGeneratorWrapper response_generator;
1446   auto channel =
1447       BuildChannel("", response_generator);  // pick_first is the default.
1448   auto stub = BuildStub(channel);
1449   response_generator.SetNextResolution(GetServersPorts());
1450   CheckRpcSendOk(DEBUG_LOCATION, stub);
1451   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1452   // Stop server.  Channel should go into state IDLE.
1453   servers_[0]->Shutdown();
1454   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1455   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1456   servers_.clear();
1457 }
1458 
TEST_F(PickFirstTest,StaysIdleUponEmptyUpdate)1459 TEST_F(PickFirstTest, StaysIdleUponEmptyUpdate) {
1460   // Start server, send RPC, and make sure channel is READY.
1461   const int kNumServers = 1;
1462   StartServers(kNumServers);
1463   FakeResolverResponseGeneratorWrapper response_generator;
1464   auto channel =
1465       BuildChannel("", response_generator);  // pick_first is the default.
1466   auto stub = BuildStub(channel);
1467   response_generator.SetNextResolution(GetServersPorts());
1468   CheckRpcSendOk(DEBUG_LOCATION, stub);
1469   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1470   // Stop server.  Channel should go into state IDLE.
1471   servers_[0]->Shutdown();
1472   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1473   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1474   // Now send resolver update that includes no addresses.  Channel
1475   // should stay in state IDLE.
1476   response_generator.SetNextResolution({});
1477   EXPECT_FALSE(channel->WaitForStateChange(
1478       GRPC_CHANNEL_IDLE, grpc_timeout_seconds_to_deadline(3)));
1479   // Now bring the backend back up and send a non-empty resolver update,
1480   // and then try to send an RPC.  Channel should go back into state READY.
1481   StartServer(0);
1482   response_generator.SetNextResolution(GetServersPorts());
1483   CheckRpcSendOk(DEBUG_LOCATION, stub);
1484   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1485 }
1486 
TEST_F(PickFirstTest,StaysTransientFailureOnFailedConnectionAttemptUntilReady)1487 TEST_F(PickFirstTest,
1488        StaysTransientFailureOnFailedConnectionAttemptUntilReady) {
1489   // Allocate 3 ports, with no servers running.
1490   std::vector<int> ports = {grpc_pick_unused_port_or_die(),
1491                             grpc_pick_unused_port_or_die(),
1492                             grpc_pick_unused_port_or_die()};
1493   // Create channel with a 1-second backoff.
1494   ChannelArguments args;
1495   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
1496               1000 * grpc_test_slowdown_factor());
1497   FakeResolverResponseGeneratorWrapper response_generator;
1498   auto channel = BuildChannel("", response_generator, args);
1499   auto stub = BuildStub(channel);
1500   response_generator.SetNextResolution(ports);
1501   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(false));
1502   // Send an RPC, which should fail.
1503   CheckRpcSendFailure(
1504       DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1505       MakeConnectionFailureRegex("failed to connect to all addresses"));
1506   // Channel should be in TRANSIENT_FAILURE.
1507   EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(false));
1508   // Now start a server on the last port.
1509   StartServers(1, {ports.back()});
1510   // Channel should remain in TRANSIENT_FAILURE until it transitions to READY.
1511   EXPECT_TRUE(channel->WaitForStateChange(GRPC_CHANNEL_TRANSIENT_FAILURE,
1512                                           grpc_timeout_seconds_to_deadline(4)));
1513   EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(false));
1514   CheckRpcSendOk(DEBUG_LOCATION, stub);
1515 }
1516 
1517 //
1518 // round_robin tests
1519 //
1520 
1521 using RoundRobinTest = ClientLbEnd2endTest;
1522 
TEST_F(RoundRobinTest,Basic)1523 TEST_F(RoundRobinTest, Basic) {
1524   // Start servers and send one RPC per server.
1525   const int kNumServers = 3;
1526   StartServers(kNumServers);
1527   FakeResolverResponseGeneratorWrapper response_generator;
1528   auto channel = BuildChannel("round_robin", response_generator);
1529   auto stub = BuildStub(channel);
1530   response_generator.SetNextResolution(GetServersPorts());
1531   // Wait until all backends are ready.
1532   do {
1533     CheckRpcSendOk(DEBUG_LOCATION, stub);
1534   } while (!SeenAllServers());
1535   ResetCounters();
1536   // "Sync" to the end of the list. Next sequence of picks will start at the
1537   // first server (index 0).
1538   WaitForServer(DEBUG_LOCATION, stub, servers_.size() - 1);
1539   std::vector<int> connection_order;
1540   for (size_t i = 0; i < servers_.size(); ++i) {
1541     CheckRpcSendOk(DEBUG_LOCATION, stub);
1542     UpdateConnectionOrder(servers_, &connection_order);
1543   }
1544   // Backends should be iterated over in the order in which the addresses were
1545   // given.
1546   const auto expected = std::vector<int>{0, 1, 2};
1547   EXPECT_EQ(expected, connection_order);
1548   // Check LB policy name for the channel.
1549   EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1550 }
1551 
TEST_F(RoundRobinTest,ProcessPending)1552 TEST_F(RoundRobinTest, ProcessPending) {
1553   StartServers(1);  // Single server
1554   FakeResolverResponseGeneratorWrapper response_generator;
1555   auto channel = BuildChannel("round_robin", response_generator);
1556   auto stub = BuildStub(channel);
1557   response_generator.SetNextResolution({servers_[0]->port_});
1558   WaitForServer(DEBUG_LOCATION, stub, 0);
1559   // Create a new channel and its corresponding RR LB policy, which will pick
1560   // the subchannels in READY state from the previous RPC against the same
1561   // target (even if it happened over a different channel, because subchannels
1562   // are globally reused). Progress should happen without any transition from
1563   // this READY state.
1564   FakeResolverResponseGeneratorWrapper second_response_generator;
1565   auto second_channel = BuildChannel("round_robin", second_response_generator);
1566   auto second_stub = BuildStub(second_channel);
1567   second_response_generator.SetNextResolution({servers_[0]->port_});
1568   CheckRpcSendOk(DEBUG_LOCATION, second_stub);
1569 }
1570 
TEST_F(RoundRobinTest,Updates)1571 TEST_F(RoundRobinTest, Updates) {
1572   // Start servers.
1573   const int kNumServers = 3;
1574   StartServers(kNumServers);
1575   FakeResolverResponseGeneratorWrapper response_generator;
1576   auto channel = BuildChannel("round_robin", response_generator);
1577   auto stub = BuildStub(channel);
1578   // Start with a single server.
1579   LOG(INFO) << "*** FIRST BACKEND ***";
1580   std::vector<int> ports = {servers_[0]->port_};
1581   response_generator.SetNextResolution(ports);
1582   WaitForServer(DEBUG_LOCATION, stub, 0);
1583   // Send RPCs. They should all go servers_[0]
1584   for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1585   EXPECT_EQ(10, servers_[0]->service_.request_count());
1586   EXPECT_EQ(0, servers_[1]->service_.request_count());
1587   EXPECT_EQ(0, servers_[2]->service_.request_count());
1588   ResetCounters();
1589   // And now for the second server.
1590   LOG(INFO) << "*** SECOND BACKEND ***";
1591   ports.clear();
1592   ports.emplace_back(servers_[1]->port_);
1593   response_generator.SetNextResolution(ports);
1594   // Wait until update has been processed, as signaled by the second backend
1595   // receiving a request.
1596   EXPECT_EQ(0, servers_[1]->service_.request_count());
1597   WaitForServer(DEBUG_LOCATION, stub, 1);
1598   for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1599   EXPECT_EQ(0, servers_[0]->service_.request_count());
1600   EXPECT_EQ(10, servers_[1]->service_.request_count());
1601   EXPECT_EQ(0, servers_[2]->service_.request_count());
1602   ResetCounters();
1603   // ... and for the last server.
1604   LOG(INFO) << "*** THIRD BACKEND ***";
1605   ports.clear();
1606   ports.emplace_back(servers_[2]->port_);
1607   response_generator.SetNextResolution(ports);
1608   WaitForServer(DEBUG_LOCATION, stub, 2);
1609   for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1610   EXPECT_EQ(0, servers_[0]->service_.request_count());
1611   EXPECT_EQ(0, servers_[1]->service_.request_count());
1612   EXPECT_EQ(10, servers_[2]->service_.request_count());
1613   ResetCounters();
1614   // Back to all servers.
1615   LOG(INFO) << "*** ALL BACKENDS ***";
1616   ports.clear();
1617   ports.emplace_back(servers_[0]->port_);
1618   ports.emplace_back(servers_[1]->port_);
1619   ports.emplace_back(servers_[2]->port_);
1620   response_generator.SetNextResolution(ports);
1621   WaitForServers(DEBUG_LOCATION, stub);
1622   // Send three RPCs, one per server.
1623   for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1624   EXPECT_EQ(1, servers_[0]->service_.request_count());
1625   EXPECT_EQ(1, servers_[1]->service_.request_count());
1626   EXPECT_EQ(1, servers_[2]->service_.request_count());
1627   ResetCounters();
1628   // An empty update will result in the channel going into TRANSIENT_FAILURE.
1629   LOG(INFO) << "*** NO BACKENDS ***";
1630   ports.clear();
1631   response_generator.SetNextResolution(ports);
1632   WaitForChannelNotReady(channel.get());
1633   CheckRpcSendFailure(
1634       DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1635       "empty address list \\(fake resolver empty address list\\)");
1636   servers_[0]->service_.ResetCounters();
1637   // Next update introduces servers_[1], making the channel recover.
1638   LOG(INFO) << "*** BACK TO SECOND BACKEND ***";
1639   ports.clear();
1640   ports.emplace_back(servers_[1]->port_);
1641   response_generator.SetNextResolution(ports);
1642   WaitForChannelReady(channel.get());
1643   WaitForServer(DEBUG_LOCATION, stub, 1);
1644   EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(/*try_to_connect=*/false));
1645   // Check LB policy name for the channel.
1646   EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1647 }
1648 
TEST_F(RoundRobinTest,UpdateInError)1649 TEST_F(RoundRobinTest, UpdateInError) {
1650   StartServers(2);
1651   FakeResolverResponseGeneratorWrapper response_generator;
1652   auto channel = BuildChannel("round_robin", response_generator);
1653   auto stub = BuildStub(channel);
1654   // Start with a single server.
1655   response_generator.SetNextResolution(GetServersPorts(0, 1));
1656   // Send RPCs. They should all go to server 0.
1657   for (size_t i = 0; i < 10; ++i) {
1658     CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/false,
1659                    /*load_report=*/nullptr, /*timeout_ms=*/4000);
1660   }
1661   EXPECT_EQ(10, servers_[0]->service_.request_count());
1662   EXPECT_EQ(0, servers_[1]->service_.request_count());
1663   servers_[0]->service_.ResetCounters();
1664   // Send an update adding an unreachable server and server 1.
1665   std::vector<int> ports = {servers_[0]->port_, grpc_pick_unused_port_or_die(),
1666                             servers_[1]->port_};
1667   response_generator.SetNextResolution(ports);
1668   WaitForServers(DEBUG_LOCATION, stub, 0, 2, /*status_check=*/nullptr,
1669                  /*timeout=*/absl::Seconds(60));
1670   // Send a bunch more RPCs.  They should all succeed and should be
1671   // split evenly between the two servers.
1672   // Note: The split may be slightly uneven because of an extra picker
1673   // update that can happen if the subchannels for servers 0 and 1
1674   // report READY before the subchannel for the unreachable server
1675   // transitions from CONNECTING to TRANSIENT_FAILURE.
1676   for (size_t i = 0; i < 10; ++i) {
1677     CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/false,
1678                    /*load_report=*/nullptr, /*timeout_ms=*/4000);
1679   }
1680   EXPECT_THAT(servers_[0]->service_.request_count(),
1681               ::testing::AllOf(::testing::Ge(4), ::testing::Le(6)));
1682   EXPECT_THAT(servers_[1]->service_.request_count(),
1683               ::testing::AllOf(::testing::Ge(4), ::testing::Le(6)));
1684   EXPECT_EQ(10, servers_[0]->service_.request_count() +
1685                     servers_[1]->service_.request_count());
1686 }
1687 
TEST_F(RoundRobinTest,ManyUpdates)1688 TEST_F(RoundRobinTest, ManyUpdates) {
1689   // Start servers and send one RPC per server.
1690   const int kNumServers = 3;
1691   StartServers(kNumServers);
1692   FakeResolverResponseGeneratorWrapper response_generator;
1693   auto channel = BuildChannel("round_robin", response_generator);
1694   auto stub = BuildStub(channel);
1695   std::vector<int> ports = GetServersPorts();
1696   for (size_t i = 0; i < 1000; ++i) {
1697     std::shuffle(ports.begin(), ports.end(),
1698                  std::mt19937(std::random_device()()));
1699     response_generator.SetNextResolution(ports);
1700     if (i % 10 == 0) CheckRpcSendOk(DEBUG_LOCATION, stub);
1701   }
1702   // Check LB policy name for the channel.
1703   EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1704 }
1705 
TEST_F(RoundRobinTest,ReresolveOnSubchannelConnectionFailure)1706 TEST_F(RoundRobinTest, ReresolveOnSubchannelConnectionFailure) {
1707   // Start 3 servers.
1708   StartServers(3);
1709   // Create channel.
1710   FakeResolverResponseGeneratorWrapper response_generator;
1711   auto channel = BuildChannel("round_robin", response_generator);
1712   auto stub = BuildStub(channel);
1713   // Initially, tell the channel about only the first two servers.
1714   std::vector<int> ports = {servers_[0]->port_, servers_[1]->port_};
1715   response_generator.SetNextResolution(ports);
1716   // Wait for both servers to be seen.
1717   WaitForServers(DEBUG_LOCATION, stub, 0, 2);
1718   // Have server 0 send a GOAWAY.  This should trigger a re-resolution.
1719   LOG(INFO) << "****** SENDING GOAWAY FROM SERVER 0 *******";
1720   {
1721     grpc_core::ExecCtx exec_ctx;
1722     grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways();
1723   }
1724   LOG(INFO) << "****** WAITING FOR RE-RESOLUTION REQUEST *******";
1725   EXPECT_TRUE(response_generator.Get()->WaitForReresolutionRequest(
1726       absl::Seconds(5 * grpc_test_slowdown_factor())));
1727   LOG(INFO) << "****** RE-RESOLUTION REQUEST SEEN *******";
1728   // Tell the fake resolver to send an update that adds the last server, but
1729   // only when the LB policy requests re-resolution.
1730   ports.push_back(servers_[2]->port_);
1731   response_generator.SetNextResolution(ports);
1732   // Wait for the client to see server 2.
1733   WaitForServer(DEBUG_LOCATION, stub, 2);
1734 }
1735 
TEST_F(RoundRobinTest,FailsEmptyResolverUpdate)1736 TEST_F(RoundRobinTest, FailsEmptyResolverUpdate) {
1737   FakeResolverResponseGeneratorWrapper response_generator;
1738   auto channel = BuildChannel("round_robin", response_generator);
1739   auto stub = BuildStub(channel);
1740   LOG(INFO) << "****** SENDING INITIAL RESOLVER RESULT *******";
1741   // Send a resolver result with an empty address list and a callback
1742   // that triggers a notification.
1743   grpc_core::Notification notification;
1744   grpc_core::Resolver::Result result;
1745   result.addresses.emplace();
1746   result.resolution_note = "injected error";
1747   result.result_health_callback = [&](absl::Status status) {
1748     EXPECT_EQ(status, absl::UnavailableError("empty address list"));
1749     notification.Notify();
1750   };
1751   response_generator.SetResponse(std::move(result));
1752   // Wait for channel to report TRANSIENT_FAILURE.
1753   LOG(INFO) << "****** TELLING CHANNEL TO CONNECT *******";
1754   auto predicate = [](grpc_connectivity_state state) {
1755     return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1756   };
1757   EXPECT_TRUE(
1758       WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true));
1759   // Callback should have been run.
1760   notification.WaitForNotification();
1761   // Make sure RPCs fail with the right status.
1762   CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1763                       "empty address list \\(injected error\\)");
1764   // Return a valid address.
1765   LOG(INFO) << "****** SENDING NEXT RESOLVER RESULT *******";
1766   StartServers(1);
1767   response_generator.SetNextResolution(GetServersPorts());
1768   LOG(INFO) << "****** SENDING WAIT_FOR_READY RPC *******";
1769   CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true);
1770 }
1771 
TEST_F(RoundRobinTest,TransientFailure)1772 TEST_F(RoundRobinTest, TransientFailure) {
1773   // Start servers and create channel.  Channel should go to READY state.
1774   const int kNumServers = 3;
1775   StartServers(kNumServers);
1776   FakeResolverResponseGeneratorWrapper response_generator;
1777   auto channel = BuildChannel("round_robin", response_generator);
1778   auto stub = BuildStub(channel);
1779   response_generator.SetNextResolution(GetServersPorts());
1780   EXPECT_TRUE(WaitForChannelReady(channel.get()));
1781   // Now kill the servers.  The channel should transition to TRANSIENT_FAILURE.
1782   for (size_t i = 0; i < servers_.size(); ++i) {
1783     servers_[i]->Shutdown();
1784   }
1785   auto predicate = [](grpc_connectivity_state state) {
1786     return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1787   };
1788   EXPECT_TRUE(WaitForChannelState(channel.get(), predicate));
1789   CheckRpcSendFailure(
1790       DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1791       MakeConnectionFailureRegex("connections to all backends failing"));
1792 }
1793 
TEST_F(RoundRobinTest,TransientFailureAtStartup)1794 TEST_F(RoundRobinTest, TransientFailureAtStartup) {
1795   // Create channel and return servers that don't exist.  Channel should
1796   // quickly transition into TRANSIENT_FAILURE.
1797   FakeResolverResponseGeneratorWrapper response_generator;
1798   auto channel = BuildChannel("round_robin", response_generator);
1799   auto stub = BuildStub(channel);
1800   response_generator.SetNextResolution({
1801       grpc_pick_unused_port_or_die(),
1802       grpc_pick_unused_port_or_die(),
1803       grpc_pick_unused_port_or_die(),
1804   });
1805   for (size_t i = 0; i < servers_.size(); ++i) {
1806     servers_[i]->Shutdown();
1807   }
1808   auto predicate = [](grpc_connectivity_state state) {
1809     return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1810   };
1811   EXPECT_TRUE(WaitForChannelState(channel.get(), predicate, true));
1812   CheckRpcSendFailure(
1813       DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1814       MakeConnectionFailureRegex("connections to all backends failing"));
1815 }
1816 
TEST_F(RoundRobinTest,StaysInTransientFailureInSubsequentConnecting)1817 TEST_F(RoundRobinTest, StaysInTransientFailureInSubsequentConnecting) {
1818   // Start connection injector.
1819   ConnectionAttemptInjector injector;
1820   // Get port.
1821   const int port = grpc_pick_unused_port_or_die();
1822   // Create channel.
1823   FakeResolverResponseGeneratorWrapper response_generator;
1824   auto channel = BuildChannel("round_robin", response_generator);
1825   auto stub = BuildStub(channel);
1826   response_generator.SetNextResolution({port});
1827   // Allow first connection attempt to fail normally, and wait for
1828   // channel to report TRANSIENT_FAILURE.
1829   LOG(INFO) << "=== WAITING FOR CHANNEL TO REPORT TF ===";
1830   auto predicate = [](grpc_connectivity_state state) {
1831     return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1832   };
1833   EXPECT_TRUE(
1834       WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true));
1835   // Wait for next connection attempt to start.
1836   auto hold = injector.AddHold(port);
1837   hold->Wait();
1838   // Now the subchannel should be reporting CONNECTING.  Make sure the
1839   // channel is still in TRANSIENT_FAILURE and is still reporting the
1840   // right status.
1841   EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(false));
1842   // Send a few RPCs, just to give the channel a chance to propagate a
1843   // new picker, in case it was going to incorrectly do so.
1844   LOG(INFO) << "=== EXPECTING RPCs TO FAIL ===";
1845   for (size_t i = 0; i < 5; ++i) {
1846     CheckRpcSendFailure(
1847         DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1848         MakeConnectionFailureRegex("connections to all backends failing"));
1849   }
1850   // Clean up.
1851   hold->Resume();
1852 }
1853 
TEST_F(RoundRobinTest,ReportsLatestStatusInTransientFailure)1854 TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) {
1855   // Start connection injector.
1856   ConnectionAttemptInjector injector;
1857   // Get ports.
1858   const std::vector<int> ports = {grpc_pick_unused_port_or_die(),
1859                                   grpc_pick_unused_port_or_die()};
1860   // Create channel.
1861   FakeResolverResponseGeneratorWrapper response_generator;
1862   auto channel = BuildChannel("round_robin", response_generator);
1863   auto stub = BuildStub(channel);
1864   response_generator.SetNextResolution(ports);
1865   // Allow first connection attempts to fail normally, and check that
1866   // the RPC fails with the right status message.
1867   CheckRpcSendFailure(
1868       DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1869       MakeConnectionFailureRegex("connections to all backends failing"));
1870   // Now intercept the next connection attempt for each port.
1871   auto hold1 = injector.AddHold(ports[0]);
1872   auto hold2 = injector.AddHold(ports[1]);
1873   hold1->Wait();
1874   hold2->Wait();
1875   // Inject a custom failure message.
1876   hold1->Fail(GRPC_ERROR_CREATE("Survey says... Bzzzzt!"));
1877   // Wait until RPC fails with the right message.
1878   absl::Time deadline =
1879       absl::Now() + (absl::Seconds(5) * grpc_test_slowdown_factor());
1880   while (true) {
1881     Status status = SendRpc(stub);
1882     EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
1883     if (::testing::Matches(::testing::MatchesRegex(
1884             "connections to all backends failing; last error: "
1885             "UNKNOWN: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
1886             "Survey says... Bzzzzt!"))(status.error_message())) {
1887       break;
1888     }
1889     LOG(INFO) << "STATUS MESSAGE: " << status.error_message();
1890     EXPECT_THAT(status.error_message(),
1891                 ::testing::MatchesRegex(MakeConnectionFailureRegex(
1892                     "connections to all backends failing")));
1893     EXPECT_LT(absl::Now(), deadline);
1894     if (absl::Now() >= deadline) break;
1895   }
1896   // Clean up.
1897   hold2->Resume();
1898 }
1899 
TEST_F(RoundRobinTest,DoesNotFailRpcsUponDisconnection)1900 TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) {
1901   // Start connection injector.
1902   ConnectionAttemptInjector injector;
1903   // Start server.
1904   StartServers(1);
1905   // Create channel.
1906   FakeResolverResponseGeneratorWrapper response_generator;
1907   auto channel = BuildChannel("round_robin", response_generator);
1908   auto stub = BuildStub(channel);
1909   response_generator.SetNextResolution(GetServersPorts());
1910   // Start a thread constantly sending RPCs in a loop.
1911   LOG(INFO) << "=== STARTING CLIENT THREAD ===";
1912   std::atomic<bool> shutdown{false};
1913   gpr_event ev;
1914   gpr_event_init(&ev);
1915   std::thread thd([&]() {
1916     LOG(INFO) << "sending first RPC";
1917     CheckRpcSendOk(DEBUG_LOCATION, stub);
1918     gpr_event_set(&ev, reinterpret_cast<void*>(1));
1919     while (!shutdown.load()) {
1920       LOG(INFO) << "sending RPC";
1921       CheckRpcSendOk(DEBUG_LOCATION, stub);
1922     }
1923   });
1924   // Wait for first RPC to complete.
1925   LOG(INFO) << "=== WAITING FOR FIRST RPC TO COMPLETE ===";
1926   ASSERT_EQ(reinterpret_cast<void*>(1),
1927             gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(1)));
1928   // Channel should now be READY.
1929   ASSERT_EQ(GRPC_CHANNEL_READY, channel->GetState(false));
1930   // Tell injector to intercept the next connection attempt.
1931   auto hold1 =
1932       injector.AddHold(servers_[0]->port_, /*intercept_completion=*/true);
1933   // Now kill the server.  The subchannel should report IDLE and be
1934   // immediately reconnected to, but this should not cause any test
1935   // failures.
1936   LOG(INFO) << "=== SHUTTING DOWN SERVER ===";
1937   {
1938     grpc_core::ExecCtx exec_ctx;
1939     grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways();
1940   }
1941   gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
1942   servers_[0]->Shutdown();
1943   // Wait for next attempt to start.
1944   LOG(INFO) << "=== WAITING FOR RECONNECTION ATTEMPT ===";
1945   hold1->Wait();
1946   // Start server and allow attempt to continue.
1947   LOG(INFO) << "=== RESTARTING SERVER ===";
1948   StartServer(0);
1949   hold1->Resume();
1950   // Wait for next attempt to complete.
1951   LOG(INFO) << "=== WAITING FOR RECONNECTION ATTEMPT TO COMPLETE ===";
1952   hold1->WaitForCompletion();
1953   // Now shut down the thread.
1954   LOG(INFO) << "=== SHUTTING DOWN CLIENT THREAD ===";
1955   shutdown.store(true);
1956   thd.join();
1957 }
1958 
TEST_F(RoundRobinTest,SingleReconnect)1959 TEST_F(RoundRobinTest, SingleReconnect) {
1960   const int kNumServers = 3;
1961   StartServers(kNumServers);
1962   const auto ports = GetServersPorts();
1963   FakeResolverResponseGeneratorWrapper response_generator;
1964   auto channel = BuildChannel("round_robin", response_generator);
1965   auto stub = BuildStub(channel);
1966   response_generator.SetNextResolution(ports);
1967   WaitForServers(DEBUG_LOCATION, stub);
1968   // Sync to end of list.
1969   WaitForServer(DEBUG_LOCATION, stub, servers_.size() - 1);
1970   for (size_t i = 0; i < servers_.size(); ++i) {
1971     CheckRpcSendOk(DEBUG_LOCATION, stub);
1972     EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
1973   }
1974   // One request should have gone to each server.
1975   for (size_t i = 0; i < servers_.size(); ++i) {
1976     EXPECT_EQ(1, servers_[i]->service_.request_count());
1977   }
1978   // Kill the first server.
1979   servers_[0]->StopListeningAndSendGoaways();
1980   // Wait for client to notice that the backend is down.  We know that's
1981   // happened when we see kNumServers RPCs that do not go to backend 0.
1982   ResetCounters();
1983   SendRpcsUntil(
1984       DEBUG_LOCATION, stub,
1985       [&, num_rpcs_not_on_backend_0 = 0](const Status& status) mutable {
1986         EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1987                                  << " message=" << status.error_message();
1988         if (servers_[0]->service_.request_count() == 1) {
1989           num_rpcs_not_on_backend_0 = 0;
1990         } else {
1991           ++num_rpcs_not_on_backend_0;
1992         }
1993         ResetCounters();
1994         return num_rpcs_not_on_backend_0 < kNumServers;
1995       });
1996   // Send a bunch of RPCs.
1997   for (int i = 0; i < 10 * kNumServers; ++i) {
1998     CheckRpcSendOk(DEBUG_LOCATION, stub);
1999   }
2000   // No requests have gone to the deceased server.
2001   EXPECT_EQ(0UL, servers_[0]->service_.request_count());
2002   // Bring the first server back up.
2003   servers_[0]->Shutdown();
2004   StartServer(0);
2005   // Requests should start arriving at the first server either right away (if
2006   // the server managed to start before the RR policy retried the subchannel) or
2007   // after the subchannel retry delay otherwise (RR's subchannel retried before
2008   // the server was fully back up).
2009   WaitForServer(DEBUG_LOCATION, stub, 0);
2010 }
2011 
2012 // If health checking is required by client but health checking service
2013 // is not running on the server, the channel should be treated as healthy.
TEST_F(RoundRobinTest,ServersHealthCheckingUnimplementedTreatedAsHealthy)2014 TEST_F(RoundRobinTest, ServersHealthCheckingUnimplementedTreatedAsHealthy) {
2015   StartServers(1);  // Single server
2016   ChannelArguments args;
2017   args.SetServiceConfigJSON(
2018       "{\"healthCheckConfig\": "
2019       "{\"serviceName\": \"health_check_service_name\"}}");
2020   FakeResolverResponseGeneratorWrapper response_generator;
2021   auto channel = BuildChannel("round_robin", response_generator, args);
2022   auto stub = BuildStub(channel);
2023   response_generator.SetNextResolution({servers_[0]->port_});
2024   EXPECT_TRUE(WaitForChannelReady(channel.get()));
2025   CheckRpcSendOk(DEBUG_LOCATION, stub);
2026 }
2027 
TEST_F(RoundRobinTest,HealthChecking)2028 TEST_F(RoundRobinTest, HealthChecking) {
2029   EnableDefaultHealthCheckService(true);
2030   // Start servers.
2031   const int kNumServers = 3;
2032   StartServers(kNumServers);
2033   ChannelArguments args;
2034   args.SetServiceConfigJSON(
2035       "{\"healthCheckConfig\": "
2036       "{\"serviceName\": \"health_check_service_name\"}}");
2037   FakeResolverResponseGeneratorWrapper response_generator;
2038   auto channel = BuildChannel("round_robin", response_generator, args);
2039   auto stub = BuildStub(channel);
2040   response_generator.SetNextResolution(GetServersPorts());
2041   // Channel should not become READY, because health checks should be failing.
2042   LOG(INFO)
2043       << "*** initial state: unknown health check service name for all servers";
2044   EXPECT_FALSE(WaitForChannelReady(channel.get(), 1));
2045   // Now set one of the servers to be healthy.
2046   // The channel should become healthy and all requests should go to
2047   // the healthy server.
2048   LOG(INFO) << "*** server 0 healthy";
2049   servers_[0]->SetServingStatus("health_check_service_name", true);
2050   EXPECT_TRUE(WaitForChannelReady(channel.get()));
2051   // New channel state may be reported before the picker is updated, so
2052   // wait for the server before proceeding.
2053   WaitForServer(DEBUG_LOCATION, stub, 0);
2054   for (int i = 0; i < 10; ++i) {
2055     CheckRpcSendOk(DEBUG_LOCATION, stub);
2056   }
2057   EXPECT_EQ(10, servers_[0]->service_.request_count());
2058   EXPECT_EQ(0, servers_[1]->service_.request_count());
2059   EXPECT_EQ(0, servers_[2]->service_.request_count());
2060   // Now set a second server to be healthy.
2061   LOG(INFO) << "*** server 2 healthy";
2062   servers_[2]->SetServingStatus("health_check_service_name", true);
2063   WaitForServer(DEBUG_LOCATION, stub, 2);
2064   for (int i = 0; i < 10; ++i) {
2065     CheckRpcSendOk(DEBUG_LOCATION, stub);
2066   }
2067   EXPECT_EQ(5, servers_[0]->service_.request_count());
2068   EXPECT_EQ(0, servers_[1]->service_.request_count());
2069   EXPECT_EQ(5, servers_[2]->service_.request_count());
2070   // Now set the remaining server to be healthy.
2071   LOG(INFO) << "*** server 1 healthy";
2072   servers_[1]->SetServingStatus("health_check_service_name", true);
2073   WaitForServer(DEBUG_LOCATION, stub, 1);
2074   for (int i = 0; i < 9; ++i) {
2075     CheckRpcSendOk(DEBUG_LOCATION, stub);
2076   }
2077   EXPECT_EQ(3, servers_[0]->service_.request_count());
2078   EXPECT_EQ(3, servers_[1]->service_.request_count());
2079   EXPECT_EQ(3, servers_[2]->service_.request_count());
2080   // Now set one server to be unhealthy again.  Then wait until the
2081   // unhealthiness has hit the client.  We know that the client will see
2082   // this when we send kNumServers requests and one of the remaining servers
2083   // sees two of the requests.
2084   LOG(INFO) << "*** server 0 unhealthy";
2085   servers_[0]->SetServingStatus("health_check_service_name", false);
2086   do {
2087     ResetCounters();
2088     for (int i = 0; i < kNumServers; ++i) {
2089       CheckRpcSendOk(DEBUG_LOCATION, stub);
2090     }
2091   } while (servers_[1]->service_.request_count() != 2 &&
2092            servers_[2]->service_.request_count() != 2);
2093   // Now set the remaining two servers to be unhealthy.  Make sure the
2094   // channel leaves READY state and that RPCs fail.
2095   LOG(INFO) << "*** all servers unhealthy";
2096   servers_[1]->SetServingStatus("health_check_service_name", false);
2097   servers_[2]->SetServingStatus("health_check_service_name", false);
2098   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
2099   // New channel state may be reported before the picker is updated, so
2100   // one or two more RPCs may succeed before we see a failure.
2101   SendRpcsUntil(DEBUG_LOCATION, stub, [&](const Status& status) {
2102     if (status.ok()) return true;
2103     EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE);
2104     EXPECT_THAT(
2105         status.error_message(),
2106         ::testing::MatchesRegex(
2107             "connections to all backends failing; last error: "
2108             "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: backend unhealthy"));
2109     return false;
2110   });
2111   // Clean up.
2112   EnableDefaultHealthCheckService(false);
2113 }
2114 
TEST_F(RoundRobinTest,HealthCheckingHandlesSubchannelFailure)2115 TEST_F(RoundRobinTest, HealthCheckingHandlesSubchannelFailure) {
2116   EnableDefaultHealthCheckService(true);
2117   // Start servers.
2118   const int kNumServers = 3;
2119   StartServers(kNumServers);
2120   servers_[0]->SetServingStatus("health_check_service_name", true);
2121   servers_[1]->SetServingStatus("health_check_service_name", true);
2122   servers_[2]->SetServingStatus("health_check_service_name", true);
2123   ChannelArguments args;
2124   args.SetServiceConfigJSON(
2125       "{\"healthCheckConfig\": "
2126       "{\"serviceName\": \"health_check_service_name\"}}");
2127   FakeResolverResponseGeneratorWrapper response_generator;
2128   auto channel = BuildChannel("round_robin", response_generator, args);
2129   auto stub = BuildStub(channel);
2130   response_generator.SetNextResolution(GetServersPorts());
2131   WaitForServer(DEBUG_LOCATION, stub, 0);
2132   // Stop server 0 and send a new resolver result to ensure that RR
2133   // checks each subchannel's state.
2134   servers_[0]->StopListeningAndSendGoaways();
2135   response_generator.SetNextResolution(GetServersPorts());
2136   // Send a bunch more RPCs.
2137   for (size_t i = 0; i < 100; i++) {
2138     CheckRpcSendOk(DEBUG_LOCATION, stub);
2139   }
2140 }
2141 
TEST_F(RoundRobinTest,WithHealthCheckingInhibitPerChannel)2142 TEST_F(RoundRobinTest, WithHealthCheckingInhibitPerChannel) {
2143   EnableDefaultHealthCheckService(true);
2144   // Start server.
2145   const int kNumServers = 1;
2146   StartServers(kNumServers);
2147   // Use the same channel creds for both channels, so that they have the same
2148   // subchannel keys.
2149   auto channel_creds =
2150       std::make_shared<FakeTransportSecurityChannelCredentials>();
2151   // Create a channel with health-checking enabled.
2152   ChannelArguments args;
2153   args.SetServiceConfigJSON(
2154       "{\"healthCheckConfig\": "
2155       "{\"serviceName\": \"health_check_service_name\"}}");
2156   FakeResolverResponseGeneratorWrapper response_generator1;
2157   auto channel1 =
2158       BuildChannel("round_robin", response_generator1, args, channel_creds);
2159   auto stub1 = BuildStub(channel1);
2160   std::vector<int> ports = GetServersPorts();
2161   response_generator1.SetNextResolution(ports);
2162   // Create a channel with health checking enabled but inhibited.
2163   args.SetInt(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
2164   FakeResolverResponseGeneratorWrapper response_generator2;
2165   auto channel2 =
2166       BuildChannel("round_robin", response_generator2, args, channel_creds);
2167   auto stub2 = BuildStub(channel2);
2168   response_generator2.SetNextResolution(ports);
2169   // First channel should not become READY, because health checks should be
2170   // failing.
2171   EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
2172   CheckRpcSendFailure(
2173       DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
2174       "connections to all backends failing; last error: "
2175       "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: backend unhealthy");
2176   // Second channel should be READY.
2177   EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
2178   CheckRpcSendOk(DEBUG_LOCATION, stub2);
2179   // Enable health checks on the backend and wait for channel 1 to succeed.
2180   servers_[0]->SetServingStatus("health_check_service_name", true);
2181   CheckRpcSendOk(DEBUG_LOCATION, stub1, true /* wait_for_ready */);
2182   // Check that we created only one subchannel to the backend.
2183   EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
2184   // Clean up.
2185   EnableDefaultHealthCheckService(false);
2186 }
2187 
TEST_F(RoundRobinTest,HealthCheckingServiceNamePerChannel)2188 TEST_F(RoundRobinTest, HealthCheckingServiceNamePerChannel) {
2189   EnableDefaultHealthCheckService(true);
2190   // Start server.
2191   const int kNumServers = 1;
2192   StartServers(kNumServers);
2193   // Use the same channel creds for both channels, so that they have the same
2194   // subchannel keys.
2195   auto channel_creds =
2196       std::make_shared<FakeTransportSecurityChannelCredentials>();
2197   // Create a channel with health-checking enabled.
2198   ChannelArguments args;
2199   args.SetServiceConfigJSON(
2200       "{\"healthCheckConfig\": "
2201       "{\"serviceName\": \"health_check_service_name\"}}");
2202   FakeResolverResponseGeneratorWrapper response_generator1;
2203   auto channel1 =
2204       BuildChannel("round_robin", response_generator1, args, channel_creds);
2205   auto stub1 = BuildStub(channel1);
2206   std::vector<int> ports = GetServersPorts();
2207   response_generator1.SetNextResolution(ports);
2208   // Create a channel with health-checking enabled with a different
2209   // service name.
2210   ChannelArguments args2;
2211   args2.SetServiceConfigJSON(
2212       "{\"healthCheckConfig\": "
2213       "{\"serviceName\": \"health_check_service_name2\"}}");
2214   FakeResolverResponseGeneratorWrapper response_generator2;
2215   auto channel2 =
2216       BuildChannel("round_robin", response_generator2, args2, channel_creds);
2217   auto stub2 = BuildStub(channel2);
2218   response_generator2.SetNextResolution(ports);
2219   // Allow health checks from channel 2 to succeed.
2220   servers_[0]->SetServingStatus("health_check_service_name2", true);
2221   // First channel should not become READY, because health checks should be
2222   // failing.
2223   EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
2224   CheckRpcSendFailure(
2225       DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
2226       "connections to all backends failing; last error: "
2227       "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: backend unhealthy");
2228   // Second channel should be READY.
2229   EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
2230   CheckRpcSendOk(DEBUG_LOCATION, stub2);
2231   // Enable health checks for channel 1 and wait for it to succeed.
2232   servers_[0]->SetServingStatus("health_check_service_name", true);
2233   CheckRpcSendOk(DEBUG_LOCATION, stub1, true /* wait_for_ready */);
2234   // Check that we created only one subchannel to the backend.
2235   EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
2236   // Clean up.
2237   EnableDefaultHealthCheckService(false);
2238 }
2239 
TEST_F(RoundRobinTest,HealthCheckingServiceNameChangesAfterSubchannelsCreated)2240 TEST_F(RoundRobinTest,
2241        HealthCheckingServiceNameChangesAfterSubchannelsCreated) {
2242   EnableDefaultHealthCheckService(true);
2243   // Start server.
2244   const int kNumServers = 1;
2245   StartServers(kNumServers);
2246   // Create a channel with health-checking enabled.
2247   const char* kServiceConfigJson =
2248       "{\"healthCheckConfig\": "
2249       "{\"serviceName\": \"health_check_service_name\"}}";
2250   FakeResolverResponseGeneratorWrapper response_generator;
2251   auto channel = BuildChannel("round_robin", response_generator);
2252   auto stub = BuildStub(channel);
2253   std::vector<int> ports = GetServersPorts();
2254   response_generator.SetNextResolution(ports, kServiceConfigJson);
2255   servers_[0]->SetServingStatus("health_check_service_name", true);
2256   EXPECT_TRUE(WaitForChannelReady(channel.get(), 1 /* timeout_seconds */));
2257   // Send an update on the channel to change it to use a health checking
2258   // service name that is not being reported as healthy.
2259   const char* kServiceConfigJson2 =
2260       "{\"healthCheckConfig\": "
2261       "{\"serviceName\": \"health_check_service_name2\"}}";
2262   response_generator.SetNextResolution(ports, kServiceConfigJson2);
2263   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
2264   // Clean up.
2265   EnableDefaultHealthCheckService(false);
2266 }
2267 
TEST_F(RoundRobinTest,HealthCheckingRetryOnStreamEnd)2268 TEST_F(RoundRobinTest, HealthCheckingRetryOnStreamEnd) {
2269   // Start servers.
2270   const int kNumServers = 2;
2271   CreateServers(kNumServers);
2272   EnableNoopHealthCheckService();
2273   StartServer(0);
2274   StartServer(1);
2275   ChannelArguments args;
2276   // Create a channel with health-checking enabled.
2277   args.SetServiceConfigJSON(
2278       "{\"healthCheckConfig\": "
2279       "{\"serviceName\": \"health_check_service_name\"}}");
2280   FakeResolverResponseGeneratorWrapper response_generator;
2281   auto channel = BuildChannel("round_robin", response_generator, args);
2282   response_generator.SetNextResolution(GetServersPorts());
2283   EXPECT_FALSE(WaitForChannelReady(channel.get()));
2284   EXPECT_GT(servers_[0]->noop_health_check_service_impl_.request_count(), 1);
2285   EXPECT_GT(servers_[1]->noop_health_check_service_impl_.request_count(), 1);
2286 }
2287 
2288 //
2289 // LB policy pick args
2290 //
2291 
2292 class ClientLbPickArgsTest : public ClientLbEnd2endTest {
2293  protected:
SetUp()2294   void SetUp() override {
2295     ClientLbEnd2endTest::SetUp();
2296     current_test_instance_ = this;
2297   }
2298 
SetUpTestSuite()2299   static void SetUpTestSuite() {
2300     grpc_core::CoreConfiguration::Reset();
2301     grpc_core::CoreConfiguration::RegisterBuilder(
2302         [](grpc_core::CoreConfiguration::Builder* builder) {
2303           grpc_core::RegisterTestPickArgsLoadBalancingPolicy(builder,
2304                                                              SavePickArgs);
2305         });
2306     grpc_init();
2307   }
2308 
TearDownTestSuite()2309   static void TearDownTestSuite() {
2310     grpc_shutdown();
2311     grpc_core::CoreConfiguration::Reset();
2312   }
2313 
args_seen_list()2314   std::vector<grpc_core::PickArgsSeen> args_seen_list() {
2315     grpc_core::MutexLock lock(&mu_);
2316     return args_seen_list_;
2317   }
2318 
ArgsSeenListString(const std::vector<grpc_core::PickArgsSeen> & args_seen_list)2319   static std::string ArgsSeenListString(
2320       const std::vector<grpc_core::PickArgsSeen>& args_seen_list) {
2321     std::vector<std::string> entries;
2322     for (const auto& args_seen : args_seen_list) {
2323       std::vector<std::string> metadata;
2324       for (const auto& p : args_seen.metadata) {
2325         metadata.push_back(absl::StrCat(p.first, "=", p.second));
2326       }
2327       entries.push_back(absl::StrFormat("{path=\"%s\", metadata=[%s]}",
2328                                         args_seen.path,
2329                                         absl::StrJoin(metadata, ", ")));
2330     }
2331     return absl::StrCat("[", absl::StrJoin(entries, ", "), "]");
2332   }
2333 
2334  private:
SavePickArgs(const grpc_core::PickArgsSeen & args_seen)2335   static void SavePickArgs(const grpc_core::PickArgsSeen& args_seen) {
2336     ClientLbPickArgsTest* self = current_test_instance_;
2337     grpc_core::MutexLock lock(&self->mu_);
2338     self->args_seen_list_.emplace_back(args_seen);
2339   }
2340 
2341   static ClientLbPickArgsTest* current_test_instance_;
2342   grpc_core::Mutex mu_;
2343   std::vector<grpc_core::PickArgsSeen> args_seen_list_;
2344 };
2345 
2346 ClientLbPickArgsTest* ClientLbPickArgsTest::current_test_instance_ = nullptr;
2347 
TEST_F(ClientLbPickArgsTest,Basic)2348 TEST_F(ClientLbPickArgsTest, Basic) {
2349   const int kNumServers = 1;
2350   StartServers(kNumServers);
2351   FakeResolverResponseGeneratorWrapper response_generator;
2352   auto channel = BuildChannel("test_pick_args_lb", response_generator);
2353   auto stub = BuildStub(channel);
2354   response_generator.SetNextResolution(GetServersPorts());
2355   // Proactively connect the channel, so that the LB policy will always
2356   // be connected before it sees the pick.  Otherwise, the test would be
2357   // flaky because sometimes the pick would be seen twice (once in
2358   // CONNECTING and again in READY) and other times only once (in READY).
2359   ASSERT_TRUE(channel->WaitForConnected(gpr_inf_future(GPR_CLOCK_MONOTONIC)));
2360   // Check LB policy name for the channel.
2361   EXPECT_EQ("test_pick_args_lb", channel->GetLoadBalancingPolicyName());
2362   // Now send an RPC and check that the picker sees the expected data.
2363   CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true);
2364   auto pick_args_seen_list = args_seen_list();
2365   EXPECT_THAT(pick_args_seen_list,
2366               ::testing::ElementsAre(::testing::AllOf(
2367                   ::testing::Field(&grpc_core::PickArgsSeen::path,
2368                                    "/grpc.testing.EchoTestService/Echo"),
2369                   ::testing::Field(&grpc_core::PickArgsSeen::metadata,
2370                                    ::testing::UnorderedElementsAre(
2371                                        ::testing::Pair("foo", "1"),
2372                                        ::testing::Pair("bar", "2"),
2373                                        ::testing::Pair("baz", "3"))))))
2374       << ArgsSeenListString(pick_args_seen_list);
2375 }
2376 
2377 //
2378 // tests that LB policies can get the call's trailing metadata
2379 //
2380 
2381 class OrcaLoadReportBuilder {
2382  public:
2383   OrcaLoadReportBuilder() = default;
OrcaLoadReportBuilder(const OrcaLoadReport & report)2384   explicit OrcaLoadReportBuilder(const OrcaLoadReport& report)
2385       : report_(report) {}
SetApplicationUtilization(double v)2386   OrcaLoadReportBuilder& SetApplicationUtilization(double v) {
2387     report_.set_application_utilization(v);
2388     return *this;
2389   }
SetCpuUtilization(double v)2390   OrcaLoadReportBuilder& SetCpuUtilization(double v) {
2391     report_.set_cpu_utilization(v);
2392     return *this;
2393   }
SetMemUtilization(double v)2394   OrcaLoadReportBuilder& SetMemUtilization(double v) {
2395     report_.set_mem_utilization(v);
2396     return *this;
2397   }
SetQps(double v)2398   OrcaLoadReportBuilder& SetQps(double v) {
2399     report_.set_rps_fractional(v);
2400     return *this;
2401   }
SetEps(double v)2402   OrcaLoadReportBuilder& SetEps(double v) {
2403     report_.set_eps(v);
2404     return *this;
2405   }
SetRequestCost(absl::string_view n,double v)2406   OrcaLoadReportBuilder& SetRequestCost(absl::string_view n, double v) {
2407     (*report_.mutable_request_cost())[n] = v;
2408     return *this;
2409   }
SetUtilization(absl::string_view n,double v)2410   OrcaLoadReportBuilder& SetUtilization(absl::string_view n, double v) {
2411     (*report_.mutable_utilization())[n] = v;
2412     return *this;
2413   }
SetNamedMetrics(absl::string_view n,double v)2414   OrcaLoadReportBuilder& SetNamedMetrics(absl::string_view n, double v) {
2415     (*report_.mutable_named_metrics())[n] = v;
2416     return *this;
2417   }
Build()2418   OrcaLoadReport Build() { return std::move(report_); }
2419 
2420  private:
2421   OrcaLoadReport report_;
2422 };
2423 
BackendMetricDataToOrcaLoadReport(const grpc_core::BackendMetricData & backend_metric_data)2424 OrcaLoadReport BackendMetricDataToOrcaLoadReport(
2425     const grpc_core::BackendMetricData& backend_metric_data) {
2426   auto builder = OrcaLoadReportBuilder()
2427                      .SetApplicationUtilization(
2428                          backend_metric_data.application_utilization)
2429                      .SetCpuUtilization(backend_metric_data.cpu_utilization)
2430                      .SetMemUtilization(backend_metric_data.mem_utilization)
2431                      .SetQps(backend_metric_data.qps)
2432                      .SetEps(backend_metric_data.eps);
2433   for (const auto& p : backend_metric_data.request_cost) {
2434     builder.SetRequestCost(std::string(p.first), p.second);
2435   }
2436   for (const auto& p : backend_metric_data.utilization) {
2437     builder.SetUtilization(std::string(p.first), p.second);
2438   }
2439   for (const auto& p : backend_metric_data.named_metrics) {
2440     builder.SetNamedMetrics(std::string(p.first), p.second);
2441   }
2442   return builder.Build();
2443 }
2444 
2445 // TODO(roth): Change this to use EqualsProto() once that becomes available in
2446 // OSS.
CheckLoadReportAsExpected(const OrcaLoadReport & actual,const OrcaLoadReport & expected)2447 void CheckLoadReportAsExpected(const OrcaLoadReport& actual,
2448                                const OrcaLoadReport& expected) {
2449   EXPECT_EQ(actual.application_utilization(),
2450             expected.application_utilization());
2451   EXPECT_EQ(actual.cpu_utilization(), expected.cpu_utilization());
2452   EXPECT_EQ(actual.mem_utilization(), expected.mem_utilization());
2453   EXPECT_EQ(actual.rps_fractional(), expected.rps_fractional());
2454   EXPECT_EQ(actual.eps(), expected.eps());
2455   EXPECT_EQ(actual.request_cost().size(), expected.request_cost().size());
2456   for (const auto& p : actual.request_cost()) {
2457     auto it = expected.request_cost().find(p.first);
2458     ASSERT_NE(it, expected.request_cost().end());
2459     EXPECT_EQ(it->second, p.second);
2460   }
2461   EXPECT_EQ(actual.utilization().size(), expected.utilization().size());
2462   for (const auto& p : actual.utilization()) {
2463     auto it = expected.utilization().find(p.first);
2464     ASSERT_NE(it, expected.utilization().end());
2465     EXPECT_EQ(it->second, p.second);
2466   }
2467   EXPECT_EQ(actual.named_metrics().size(), expected.named_metrics().size());
2468   for (const auto& p : actual.named_metrics()) {
2469     auto it = expected.named_metrics().find(p.first);
2470     ASSERT_NE(it, expected.named_metrics().end());
2471     EXPECT_EQ(it->second, p.second);
2472   }
2473 }
2474 
2475 class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
2476  protected:
SetUp()2477   void SetUp() override {
2478     ClientLbEnd2endTest::SetUp();
2479     current_test_instance_ = this;
2480   }
2481 
SetUpTestSuite()2482   static void SetUpTestSuite() {
2483     grpc_core::CoreConfiguration::Reset();
2484     grpc_core::CoreConfiguration::RegisterBuilder(
2485         [](grpc_core::CoreConfiguration::Builder* builder) {
2486           grpc_core::RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
2487               builder, ReportTrailerIntercepted);
2488         });
2489     grpc_init();
2490   }
2491 
TearDownTestSuite()2492   static void TearDownTestSuite() {
2493     grpc_shutdown();
2494     grpc_core::CoreConfiguration::Reset();
2495   }
2496 
num_trailers_intercepted()2497   int num_trailers_intercepted() {
2498     grpc_core::MutexLock lock(&mu_);
2499     return num_trailers_intercepted_;
2500   }
2501 
last_status()2502   absl::Status last_status() {
2503     grpc_core::MutexLock lock(&mu_);
2504     return last_status_;
2505   }
2506 
trailing_metadata()2507   grpc_core::MetadataVector trailing_metadata() {
2508     grpc_core::MutexLock lock(&mu_);
2509     return std::move(trailing_metadata_);
2510   }
2511 
backend_load_report()2512   absl::optional<OrcaLoadReport> backend_load_report() {
2513     grpc_core::MutexLock lock(&mu_);
2514     return std::move(load_report_);
2515   }
2516 
2517   // Returns true if received callback within deadline.
WaitForLbCallback()2518   bool WaitForLbCallback() {
2519     grpc_core::MutexLock lock(&mu_);
2520     while (!trailer_intercepted_) {
2521       if (cond_.WaitWithTimeout(&mu_, absl::Seconds(3))) return false;
2522     }
2523     trailer_intercepted_ = false;
2524     return true;
2525   }
2526 
RunPerRpcMetricReportingTest(const OrcaLoadReport & reported,const OrcaLoadReport & expected)2527   void RunPerRpcMetricReportingTest(const OrcaLoadReport& reported,
2528                                     const OrcaLoadReport& expected) {
2529     const int kNumServers = 1;
2530     const int kNumRpcs = 10;
2531     StartServers(kNumServers);
2532     FakeResolverResponseGeneratorWrapper response_generator;
2533     auto channel =
2534         BuildChannel("intercept_trailing_metadata_lb", response_generator);
2535     auto stub = BuildStub(channel);
2536     response_generator.SetNextResolution(GetServersPorts());
2537     for (size_t i = 0; i < kNumRpcs; ++i) {
2538       CheckRpcSendOk(DEBUG_LOCATION, stub, false, &reported);
2539       auto actual = backend_load_report();
2540       ASSERT_TRUE(actual.has_value());
2541       CheckLoadReportAsExpected(*actual, expected);
2542     }
2543     // Check LB policy name for the channel.
2544     EXPECT_EQ("intercept_trailing_metadata_lb",
2545               channel->GetLoadBalancingPolicyName());
2546     EXPECT_EQ(kNumRpcs, num_trailers_intercepted());
2547   }
2548 
2549  private:
ReportTrailerIntercepted(const grpc_core::TrailingMetadataArgsSeen & args_seen)2550   static void ReportTrailerIntercepted(
2551       const grpc_core::TrailingMetadataArgsSeen& args_seen) {
2552     const auto* backend_metric_data = args_seen.backend_metric_data;
2553     ClientLbInterceptTrailingMetadataTest* self = current_test_instance_;
2554     grpc_core::MutexLock lock(&self->mu_);
2555     self->last_status_ = args_seen.status;
2556     self->num_trailers_intercepted_++;
2557     self->trailer_intercepted_ = true;
2558     self->trailing_metadata_ = args_seen.metadata;
2559     if (backend_metric_data != nullptr) {
2560       self->load_report_ =
2561           BackendMetricDataToOrcaLoadReport(*backend_metric_data);
2562     }
2563     self->cond_.Signal();
2564   }
2565 
2566   static ClientLbInterceptTrailingMetadataTest* current_test_instance_;
2567   int num_trailers_intercepted_ = 0;
2568   bool trailer_intercepted_ = false;
2569   grpc_core::Mutex mu_;
2570   grpc_core::CondVar cond_;
2571   absl::Status last_status_;
2572   grpc_core::MetadataVector trailing_metadata_;
2573   absl::optional<OrcaLoadReport> load_report_;
2574 };
2575 
2576 ClientLbInterceptTrailingMetadataTest*
2577     ClientLbInterceptTrailingMetadataTest::current_test_instance_ = nullptr;
2578 
TEST_F(ClientLbInterceptTrailingMetadataTest,StatusOk)2579 TEST_F(ClientLbInterceptTrailingMetadataTest, StatusOk) {
2580   StartServers(1);
2581   FakeResolverResponseGeneratorWrapper response_generator;
2582   auto channel =
2583       BuildChannel("intercept_trailing_metadata_lb", response_generator);
2584   auto stub = BuildStub(channel);
2585   response_generator.SetNextResolution(GetServersPorts());
2586   // Send an OK RPC.
2587   CheckRpcSendOk(DEBUG_LOCATION, stub);
2588   // Check LB policy name for the channel.
2589   EXPECT_EQ("intercept_trailing_metadata_lb",
2590             channel->GetLoadBalancingPolicyName());
2591   EXPECT_EQ(1, num_trailers_intercepted());
2592   EXPECT_EQ(absl::OkStatus(), last_status());
2593 }
2594 
TEST_F(ClientLbInterceptTrailingMetadataTest,StatusFailed)2595 TEST_F(ClientLbInterceptTrailingMetadataTest, StatusFailed) {
2596   StartServers(1);
2597   FakeResolverResponseGeneratorWrapper response_generator;
2598   auto channel =
2599       BuildChannel("intercept_trailing_metadata_lb", response_generator);
2600   auto stub = BuildStub(channel);
2601   response_generator.SetNextResolution(GetServersPorts());
2602   EchoRequest request;
2603   auto* expected_error = request.mutable_param()->mutable_expected_error();
2604   expected_error->set_code(GRPC_STATUS_PERMISSION_DENIED);
2605   expected_error->set_error_message("bummer, man");
2606   Status status = SendRpc(stub, /*response=*/nullptr, /*timeout_ms=*/1000,
2607                           /*wait_for_ready=*/false, &request);
2608   EXPECT_EQ(status.error_code(), StatusCode::PERMISSION_DENIED);
2609   EXPECT_EQ(status.error_message(), "bummer, man");
2610   absl::Status status_seen_by_lb = last_status();
2611   EXPECT_EQ(status_seen_by_lb.code(), absl::StatusCode::kPermissionDenied);
2612   EXPECT_EQ(status_seen_by_lb.message(), "bummer, man");
2613 }
2614 
TEST_F(ClientLbInterceptTrailingMetadataTest,StatusCancelledWithoutStartingRecvTrailingMetadata)2615 TEST_F(ClientLbInterceptTrailingMetadataTest,
2616        StatusCancelledWithoutStartingRecvTrailingMetadata) {
2617   StartServers(1);
2618   FakeResolverResponseGeneratorWrapper response_generator;
2619   auto channel =
2620       BuildChannel("intercept_trailing_metadata_lb", response_generator);
2621   response_generator.SetNextResolution(GetServersPorts());
2622   auto stub = BuildStub(channel);
2623   {
2624     // Start a stream (sends initial metadata) and then cancel without
2625     // calling Finish().
2626     ClientContext ctx;
2627     auto stream = stub->BidiStream(&ctx);
2628     ctx.TryCancel();
2629   }
2630   // Wait for stream to be cancelled.
2631   ASSERT_TRUE(WaitForLbCallback());
2632   // Check status seen by LB policy.
2633   EXPECT_EQ(1, num_trailers_intercepted());
2634   absl::Status status_seen_by_lb = last_status();
2635   EXPECT_EQ(status_seen_by_lb.code(), absl::StatusCode::kCancelled);
2636   EXPECT_EQ(status_seen_by_lb.message(), "call cancelled");
2637 }
2638 
TEST_F(ClientLbInterceptTrailingMetadataTest,InterceptsRetriesDisabled)2639 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesDisabled) {
2640   const int kNumServers = 1;
2641   const int kNumRpcs = 10;
2642   StartServers(kNumServers);
2643   FakeResolverResponseGeneratorWrapper response_generator;
2644   ChannelArguments channel_args;
2645   channel_args.SetInt(GRPC_ARG_ENABLE_RETRIES, 0);
2646   auto channel = BuildChannel("intercept_trailing_metadata_lb",
2647                               response_generator, channel_args);
2648   auto stub = BuildStub(channel);
2649   response_generator.SetNextResolution(GetServersPorts());
2650   for (size_t i = 0; i < kNumRpcs; ++i) {
2651     CheckRpcSendOk(DEBUG_LOCATION, stub);
2652   }
2653   // Check LB policy name for the channel.
2654   EXPECT_EQ("intercept_trailing_metadata_lb",
2655             channel->GetLoadBalancingPolicyName());
2656   EXPECT_EQ(kNumRpcs, num_trailers_intercepted());
2657   EXPECT_THAT(trailing_metadata(),
2658               ::testing::UnorderedElementsAre(
2659                   // TODO(roth): Should grpc-status be visible here?
2660                   ::testing::Pair("grpc-status", "0"),
2661                   ::testing::Pair("user-agent", ::testing::_),
2662                   ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"),
2663                   ::testing::Pair("baz", "3")));
2664   EXPECT_FALSE(backend_load_report().has_value());
2665 }
2666 
TEST_F(ClientLbInterceptTrailingMetadataTest,InterceptsRetriesEnabled)2667 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) {
2668   const int kNumServers = 1;
2669   const int kNumRpcs = 10;
2670   StartServers(kNumServers);
2671   ChannelArguments args;
2672   args.SetServiceConfigJSON(
2673       "{\n"
2674       "  \"methodConfig\": [ {\n"
2675       "    \"name\": [\n"
2676       "      { \"service\": \"grpc.testing.EchoTestService\" }\n"
2677       "    ],\n"
2678       "    \"retryPolicy\": {\n"
2679       "      \"maxAttempts\": 3,\n"
2680       "      \"initialBackoff\": \"1s\",\n"
2681       "      \"maxBackoff\": \"120s\",\n"
2682       "      \"backoffMultiplier\": 1.6,\n"
2683       "      \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
2684       "    }\n"
2685       "  } ]\n"
2686       "}");
2687   FakeResolverResponseGeneratorWrapper response_generator;
2688   auto channel =
2689       BuildChannel("intercept_trailing_metadata_lb", response_generator, args);
2690   auto stub = BuildStub(channel);
2691   response_generator.SetNextResolution(GetServersPorts());
2692   for (size_t i = 0; i < kNumRpcs; ++i) {
2693     CheckRpcSendOk(DEBUG_LOCATION, stub);
2694   }
2695   // Check LB policy name for the channel.
2696   EXPECT_EQ("intercept_trailing_metadata_lb",
2697             channel->GetLoadBalancingPolicyName());
2698   EXPECT_EQ(kNumRpcs, num_trailers_intercepted());
2699   EXPECT_THAT(trailing_metadata(),
2700               ::testing::UnorderedElementsAre(
2701                   // TODO(roth): Should grpc-status be visible here?
2702                   ::testing::Pair("grpc-status", "0"),
2703                   ::testing::Pair("user-agent", ::testing::_),
2704                   ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"),
2705                   ::testing::Pair("baz", "3")));
2706   EXPECT_FALSE(backend_load_report().has_value());
2707 }
2708 
TEST_F(ClientLbInterceptTrailingMetadataTest,Valid)2709 TEST_F(ClientLbInterceptTrailingMetadataTest, Valid) {
2710   RunPerRpcMetricReportingTest(OrcaLoadReportBuilder()
2711                                    .SetApplicationUtilization(0.25)
2712                                    .SetCpuUtilization(0.5)
2713                                    .SetMemUtilization(0.75)
2714                                    .SetQps(0.25)
2715                                    .SetEps(0.1)
2716                                    .SetRequestCost("foo", -0.8)
2717                                    .SetRequestCost("bar", 1.4)
2718                                    .SetUtilization("baz", 1.0)
2719                                    .SetUtilization("quux", 0.9)
2720                                    .SetNamedMetrics("metric0", 3.0)
2721                                    .SetNamedMetrics("metric1", -1.0)
2722                                    .Build(),
2723                                OrcaLoadReportBuilder()
2724                                    .SetApplicationUtilization(0.25)
2725                                    .SetCpuUtilization(0.5)
2726                                    .SetMemUtilization(0.75)
2727                                    .SetQps(0.25)
2728                                    .SetEps(0.1)
2729                                    .SetRequestCost("foo", -0.8)
2730                                    .SetRequestCost("bar", 1.4)
2731                                    .SetUtilization("baz", 1.0)
2732                                    .SetUtilization("quux", 0.9)
2733                                    .SetNamedMetrics("metric0", 3.0)
2734                                    .SetNamedMetrics("metric1", -1.0)
2735                                    .Build());
2736 }
2737 
TEST_F(ClientLbInterceptTrailingMetadataTest,NegativeValues)2738 TEST_F(ClientLbInterceptTrailingMetadataTest, NegativeValues) {
2739   RunPerRpcMetricReportingTest(OrcaLoadReportBuilder()
2740                                    .SetApplicationUtilization(-0.3)
2741                                    .SetCpuUtilization(-0.1)
2742                                    .SetMemUtilization(-0.2)
2743                                    .SetQps(-3)
2744                                    .SetEps(-4)
2745                                    .SetRequestCost("foo", -5)
2746                                    .SetUtilization("bar", -0.6)
2747                                    .SetNamedMetrics("baz", -0.7)
2748                                    .Build(),
2749                                OrcaLoadReportBuilder()
2750                                    .SetRequestCost("foo", -5)
2751                                    .SetNamedMetrics("baz", -0.7)
2752                                    .Build());
2753 }
2754 
TEST_F(ClientLbInterceptTrailingMetadataTest,AboveOneUtilization)2755 TEST_F(ClientLbInterceptTrailingMetadataTest, AboveOneUtilization) {
2756   RunPerRpcMetricReportingTest(OrcaLoadReportBuilder()
2757                                    .SetApplicationUtilization(1.9)
2758                                    .SetCpuUtilization(1.1)
2759                                    .SetMemUtilization(2)
2760                                    .SetQps(3)
2761                                    .SetEps(4)
2762                                    .SetUtilization("foo", 5)
2763                                    .Build(),
2764                                OrcaLoadReportBuilder()
2765                                    .SetApplicationUtilization(1.9)
2766                                    .SetCpuUtilization(1.1)
2767                                    .SetQps(3)
2768                                    .SetEps(4)
2769                                    .Build());
2770 }
2771 
TEST_F(ClientLbInterceptTrailingMetadataTest,BackendMetricDataMerge)2772 TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricDataMerge) {
2773   const int kNumServers = 1;
2774   const int kNumRpcs = 10;
2775   StartServers(kNumServers);
2776   servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.99);
2777   servers_[0]->server_metric_recorder_->SetCpuUtilization(0.99);
2778   servers_[0]->server_metric_recorder_->SetMemoryUtilization(0.99);
2779   servers_[0]->server_metric_recorder_->SetQps(0.99);
2780   servers_[0]->server_metric_recorder_->SetEps(0.99);
2781   servers_[0]->server_metric_recorder_->SetNamedUtilization("foo", 0.99);
2782   servers_[0]->server_metric_recorder_->SetNamedUtilization("bar", 0.1);
2783   OrcaLoadReport per_server_load = OrcaLoadReportBuilder()
2784                                        .SetApplicationUtilization(0.99)
2785                                        .SetCpuUtilization(0.99)
2786                                        .SetMemUtilization(0.99)
2787                                        .SetQps(0.99)
2788                                        .SetEps(0.99)
2789                                        .SetUtilization("foo", 0.99)
2790                                        .SetUtilization("bar", 0.1)
2791                                        .Build();
2792   FakeResolverResponseGeneratorWrapper response_generator;
2793   auto channel =
2794       BuildChannel("intercept_trailing_metadata_lb", response_generator);
2795   auto stub = BuildStub(channel);
2796   response_generator.SetNextResolution(GetServersPorts());
2797   size_t total_num_rpcs = 0;
2798   {
2799     OrcaLoadReport load_report =
2800         OrcaLoadReportBuilder().SetApplicationUtilization(0.5).Build();
2801     OrcaLoadReport expected = OrcaLoadReportBuilder(per_server_load)
2802                                   .SetApplicationUtilization(0.5)
2803                                   .Build();
2804     for (size_t i = 0; i < kNumRpcs; ++i) {
2805       CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2806       auto actual = backend_load_report();
2807       ASSERT_TRUE(actual.has_value());
2808       CheckLoadReportAsExpected(*actual, expected);
2809       ++total_num_rpcs;
2810     }
2811   }
2812   {
2813     OrcaLoadReport load_report =
2814         OrcaLoadReportBuilder().SetMemUtilization(0.5).Build();
2815     OrcaLoadReport expected =
2816         OrcaLoadReportBuilder(per_server_load).SetMemUtilization(0.5).Build();
2817     for (size_t i = 0; i < kNumRpcs; ++i) {
2818       CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2819       auto actual = backend_load_report();
2820       ASSERT_TRUE(actual.has_value());
2821       CheckLoadReportAsExpected(*actual, expected);
2822       ++total_num_rpcs;
2823     }
2824   }
2825   {
2826     OrcaLoadReport load_report = OrcaLoadReportBuilder().SetQps(0.5).Build();
2827     OrcaLoadReport expected =
2828         OrcaLoadReportBuilder(per_server_load).SetQps(0.5).Build();
2829     for (size_t i = 0; i < kNumRpcs; ++i) {
2830       CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2831       auto actual = backend_load_report();
2832       ASSERT_TRUE(actual.has_value());
2833       CheckLoadReportAsExpected(*actual, expected);
2834       ++total_num_rpcs;
2835     }
2836   }
2837   {
2838     OrcaLoadReport load_report = OrcaLoadReportBuilder().SetEps(0.5).Build();
2839     OrcaLoadReport expected =
2840         OrcaLoadReportBuilder(per_server_load).SetEps(0.5).Build();
2841     for (size_t i = 0; i < kNumRpcs; ++i) {
2842       CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2843       auto actual = backend_load_report();
2844       ASSERT_TRUE(actual.has_value());
2845       CheckLoadReportAsExpected(*actual, expected);
2846       ++total_num_rpcs;
2847     }
2848   }
2849   {
2850     OrcaLoadReport load_report =
2851         OrcaLoadReportBuilder()
2852             .SetUtilization("foo", 0.5)
2853             .SetUtilization("bar", 1.1)  // Out of range.
2854             .SetUtilization("baz", 1.0)
2855             .Build();
2856     auto expected = OrcaLoadReportBuilder(per_server_load)
2857                         .SetUtilization("foo", 0.5)
2858                         .SetUtilization("baz", 1.0)
2859                         .Build();
2860     for (size_t i = 0; i < kNumRpcs; ++i) {
2861       CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2862       auto actual = backend_load_report();
2863       ASSERT_TRUE(actual.has_value());
2864       CheckLoadReportAsExpected(*actual, expected);
2865       ++total_num_rpcs;
2866     }
2867   }
2868   // Check LB policy name for the channel.
2869   EXPECT_EQ("intercept_trailing_metadata_lb",
2870             channel->GetLoadBalancingPolicyName());
2871   EXPECT_EQ(total_num_rpcs, num_trailers_intercepted());
2872 }
2873 
2874 //
2875 // tests that per-address args from the resolver are visible to the LB policy
2876 //
2877 
2878 class ClientLbAddressTest : public ClientLbEnd2endTest {
2879  protected:
SetUp()2880   void SetUp() override {
2881     ClientLbEnd2endTest::SetUp();
2882     current_test_instance_ = this;
2883   }
2884 
SetUpTestSuite()2885   static void SetUpTestSuite() {
2886     grpc_core::CoreConfiguration::Reset();
2887     grpc_core::CoreConfiguration::RegisterBuilder(
2888         [](grpc_core::CoreConfiguration::Builder* builder) {
2889           grpc_core::RegisterAddressTestLoadBalancingPolicy(builder,
2890                                                             SaveAddress);
2891         });
2892     grpc_init();
2893   }
2894 
TearDownTestSuite()2895   static void TearDownTestSuite() {
2896     grpc_shutdown();
2897     grpc_core::CoreConfiguration::Reset();
2898   }
2899 
addresses_seen()2900   std::vector<std::string> addresses_seen() {
2901     grpc_core::MutexLock lock(&mu_);
2902     return addresses_seen_;
2903   }
2904 
2905  private:
SaveAddress(const grpc_core::EndpointAddresses & address)2906   static void SaveAddress(const grpc_core::EndpointAddresses& address) {
2907     ClientLbAddressTest* self = current_test_instance_;
2908     grpc_core::MutexLock lock(&self->mu_);
2909     self->addresses_seen_.emplace_back(address.ToString());
2910   }
2911 
2912   static ClientLbAddressTest* current_test_instance_;
2913   grpc_core::Mutex mu_;
2914   std::vector<std::string> addresses_seen_ ABSL_GUARDED_BY(&mu_);
2915 };
2916 
2917 ClientLbAddressTest* ClientLbAddressTest::current_test_instance_ = nullptr;
2918 
TEST_F(ClientLbAddressTest,Basic)2919 TEST_F(ClientLbAddressTest, Basic) {
2920   const int kNumServers = 1;
2921   StartServers(kNumServers);
2922   FakeResolverResponseGeneratorWrapper response_generator;
2923   auto channel = BuildChannel("address_test_lb", response_generator);
2924   auto stub = BuildStub(channel);
2925   // Addresses returned by the resolver will have attached args.
2926   response_generator.SetNextResolution(
2927       GetServersPorts(), nullptr,
2928       grpc_core::ChannelArgs().Set("test_key", "test_value"));
2929   CheckRpcSendOk(DEBUG_LOCATION, stub);
2930   // Check LB policy name for the channel.
2931   EXPECT_EQ("address_test_lb", channel->GetLoadBalancingPolicyName());
2932   // Make sure that the attributes wind up on the subchannels.
2933   std::vector<std::string> expected;
2934   for (const int port : GetServersPorts()) {
2935     expected.emplace_back(absl::StrCat("addrs=[", grpc_core::LocalIp(), ":",
2936                                        port, "] args={test_key=test_value}"));
2937   }
2938   EXPECT_EQ(addresses_seen(), expected);
2939 }
2940 
2941 //
2942 // tests OOB backend metric API
2943 //
2944 
2945 class OobBackendMetricTest : public ClientLbEnd2endTest {
2946  protected:
2947   using BackendMetricReport = std::pair<int /*port*/, OrcaLoadReport>;
2948 
SetUp()2949   void SetUp() override {
2950     ClientLbEnd2endTest::SetUp();
2951     current_test_instance_ = this;
2952   }
2953 
SetUpTestSuite()2954   static void SetUpTestSuite() {
2955     grpc_core::CoreConfiguration::Reset();
2956     grpc_core::CoreConfiguration::RegisterBuilder(
2957         [](grpc_core::CoreConfiguration::Builder* builder) {
2958           grpc_core::RegisterOobBackendMetricTestLoadBalancingPolicy(
2959               builder, BackendMetricCallback);
2960         });
2961     grpc_init();
2962   }
2963 
TearDownTestSuite()2964   static void TearDownTestSuite() {
2965     grpc_shutdown();
2966     grpc_core::CoreConfiguration::Reset();
2967   }
2968 
GetBackendMetricReport()2969   absl::optional<BackendMetricReport> GetBackendMetricReport() {
2970     grpc_core::MutexLock lock(&mu_);
2971     if (backend_metric_reports_.empty()) return absl::nullopt;
2972     auto result = std::move(backend_metric_reports_.front());
2973     backend_metric_reports_.pop_front();
2974     return result;
2975   }
2976 
2977  private:
BackendMetricCallback(const grpc_core::EndpointAddresses & address,const grpc_core::BackendMetricData & backend_metric_data)2978   static void BackendMetricCallback(
2979       const grpc_core::EndpointAddresses& address,
2980       const grpc_core::BackendMetricData& backend_metric_data) {
2981     auto load_report = BackendMetricDataToOrcaLoadReport(backend_metric_data);
2982     int port = grpc_sockaddr_get_port(&address.address());
2983     grpc_core::MutexLock lock(&current_test_instance_->mu_);
2984     current_test_instance_->backend_metric_reports_.push_back(
2985         {port, std::move(load_report)});
2986   }
2987 
2988   static OobBackendMetricTest* current_test_instance_;
2989   grpc_core::Mutex mu_;
2990   std::deque<BackendMetricReport> backend_metric_reports_ ABSL_GUARDED_BY(&mu_);
2991 };
2992 
2993 OobBackendMetricTest* OobBackendMetricTest::current_test_instance_ = nullptr;
2994 
TEST_F(OobBackendMetricTest,Basic)2995 TEST_F(OobBackendMetricTest, Basic) {
2996   StartServers(1);
2997   // Set initial backend metric data on server.
2998   constexpr char kMetricName[] = "foo";
2999   servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.5);
3000   servers_[0]->server_metric_recorder_->SetCpuUtilization(0.1);
3001   servers_[0]->server_metric_recorder_->SetMemoryUtilization(0.2);
3002   servers_[0]->server_metric_recorder_->SetEps(0.3);
3003   servers_[0]->server_metric_recorder_->SetQps(0.4);
3004   servers_[0]->server_metric_recorder_->SetNamedUtilization(kMetricName, 0.4);
3005   // Start client.
3006   FakeResolverResponseGeneratorWrapper response_generator;
3007   auto channel = BuildChannel("oob_backend_metric_test_lb", response_generator);
3008   auto stub = BuildStub(channel);
3009   response_generator.SetNextResolution(GetServersPorts());
3010   // Send an OK RPC.
3011   CheckRpcSendOk(DEBUG_LOCATION, stub);
3012   // Check LB policy name for the channel.
3013   EXPECT_EQ("oob_backend_metric_test_lb",
3014             channel->GetLoadBalancingPolicyName());
3015   // Check report seen by client.
3016   bool report_seen = false;
3017   for (size_t i = 0; i < 5; ++i) {
3018     auto report = GetBackendMetricReport();
3019     if (report.has_value()) {
3020       EXPECT_EQ(report->first, servers_[0]->port_);
3021       EXPECT_EQ(report->second.application_utilization(), 0.5);
3022       EXPECT_EQ(report->second.cpu_utilization(), 0.1);
3023       EXPECT_EQ(report->second.mem_utilization(), 0.2);
3024       EXPECT_EQ(report->second.eps(), 0.3);
3025       EXPECT_EQ(report->second.rps_fractional(), 0.4);
3026       EXPECT_THAT(
3027           report->second.utilization(),
3028           ::testing::UnorderedElementsAre(::testing::Pair(kMetricName, 0.4)));
3029       report_seen = true;
3030       break;
3031     }
3032     gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
3033   }
3034   ASSERT_TRUE(report_seen);
3035   // Now update the utilization data on the server.
3036   // Note that the server may send a new report while we're updating these,
3037   // so we set them in reverse order, so that we know we'll get all new
3038   // data once we see a report with the new app utilization value.
3039   servers_[0]->server_metric_recorder_->SetNamedUtilization(kMetricName, 0.7);
3040   servers_[0]->server_metric_recorder_->SetQps(0.8);
3041   servers_[0]->server_metric_recorder_->SetEps(0.6);
3042   servers_[0]->server_metric_recorder_->SetMemoryUtilization(0.5);
3043   servers_[0]->server_metric_recorder_->SetCpuUtilization(2.4);
3044   servers_[0]->server_metric_recorder_->SetApplicationUtilization(1.2);
3045   // Wait for client to see new report.
3046   report_seen = false;
3047   for (size_t i = 0; i < 5; ++i) {
3048     auto report = GetBackendMetricReport();
3049     if (report.has_value()) {
3050       EXPECT_EQ(report->first, servers_[0]->port_);
3051       if (report->second.application_utilization() != 0.5) {
3052         EXPECT_EQ(report->second.application_utilization(), 1.2);
3053         EXPECT_EQ(report->second.cpu_utilization(), 2.4);
3054         EXPECT_EQ(report->second.mem_utilization(), 0.5);
3055         EXPECT_EQ(report->second.eps(), 0.6);
3056         EXPECT_EQ(report->second.rps_fractional(), 0.8);
3057         EXPECT_THAT(
3058             report->second.utilization(),
3059             ::testing::UnorderedElementsAre(::testing::Pair(kMetricName, 0.7)));
3060         report_seen = true;
3061         break;
3062       }
3063     }
3064     gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
3065   }
3066   ASSERT_TRUE(report_seen);
3067 }
3068 
3069 //
3070 // tests rewriting of control plane status codes
3071 //
3072 
3073 class ControlPlaneStatusRewritingTest : public ClientLbEnd2endTest {
3074  protected:
SetUpTestSuite()3075   static void SetUpTestSuite() {
3076     grpc_core::CoreConfiguration::Reset();
3077     grpc_core::CoreConfiguration::RegisterBuilder(
3078         [](grpc_core::CoreConfiguration::Builder* builder) {
3079           grpc_core::RegisterFailLoadBalancingPolicy(
3080               builder, absl::AbortedError("nope"));
3081         });
3082     grpc_init();
3083   }
3084 
TearDownTestSuite()3085   static void TearDownTestSuite() {
3086     grpc_shutdown();
3087     grpc_core::CoreConfiguration::Reset();
3088   }
3089 };
3090 
TEST_F(ControlPlaneStatusRewritingTest,RewritesFromLb)3091 TEST_F(ControlPlaneStatusRewritingTest, RewritesFromLb) {
3092   // Start client.
3093   FakeResolverResponseGeneratorWrapper response_generator;
3094   auto channel = BuildChannel("fail_lb", response_generator);
3095   auto stub = BuildStub(channel);
3096   response_generator.SetNextResolution(GetServersPorts());
3097   // Send an RPC, verify that status was rewritten.
3098   CheckRpcSendFailure(
3099       DEBUG_LOCATION, stub, StatusCode::INTERNAL,
3100       "Illegal status code from LB pick; original status: ABORTED: nope");
3101 }
3102 
TEST_F(ControlPlaneStatusRewritingTest,RewritesFromResolver)3103 TEST_F(ControlPlaneStatusRewritingTest, RewritesFromResolver) {
3104   // Start client.
3105   FakeResolverResponseGeneratorWrapper response_generator;
3106   auto channel = BuildChannel("pick_first", response_generator);
3107   auto stub = BuildStub(channel);
3108   grpc_core::Resolver::Result result;
3109   result.service_config = absl::AbortedError("nope");
3110   result.addresses.emplace();
3111   response_generator.SetResponse(std::move(result));
3112   // Send an RPC, verify that status was rewritten.
3113   CheckRpcSendFailure(
3114       DEBUG_LOCATION, stub, StatusCode::INTERNAL,
3115       "Illegal status code from resolver; original status: ABORTED: nope");
3116 }
3117 
TEST_F(ControlPlaneStatusRewritingTest,RewritesFromConfigSelector)3118 TEST_F(ControlPlaneStatusRewritingTest, RewritesFromConfigSelector) {
3119   class FailConfigSelector : public grpc_core::ConfigSelector {
3120    public:
3121     explicit FailConfigSelector(absl::Status status)
3122         : status_(std::move(status)) {}
3123     grpc_core::UniqueTypeName name() const override {
3124       static grpc_core::UniqueTypeName::Factory kFactory("FailConfigSelector");
3125       return kFactory.Create();
3126     }
3127     bool Equals(const ConfigSelector* other) const override {
3128       return status_ == static_cast<const FailConfigSelector*>(other)->status_;
3129     }
3130     absl::Status GetCallConfig(GetCallConfigArgs /*args*/) override {
3131       return status_;
3132     }
3133 
3134    private:
3135     absl::Status status_;
3136   };
3137   // Start client.
3138   FakeResolverResponseGeneratorWrapper response_generator;
3139   auto channel = BuildChannel("pick_first", response_generator);
3140   auto stub = BuildStub(channel);
3141   auto config_selector =
3142       grpc_core::MakeRefCounted<FailConfigSelector>(absl::AbortedError("nope"));
3143   grpc_core::Resolver::Result result;
3144   result.addresses.emplace();
3145   result.service_config =
3146       grpc_core::ServiceConfigImpl::Create(grpc_core::ChannelArgs(), "{}");
3147   ASSERT_TRUE(result.service_config.ok()) << result.service_config.status();
3148   result.args = grpc_core::ChannelArgs().SetObject(config_selector);
3149   response_generator.SetResponse(std::move(result));
3150   // Send an RPC, verify that status was rewritten.
3151   CheckRpcSendFailure(
3152       DEBUG_LOCATION, stub, StatusCode::INTERNAL,
3153       "Illegal status code from ConfigSelector; original status: "
3154       "ABORTED: nope");
3155 }
3156 
3157 //
3158 // WeightedRoundRobinTest
3159 //
3160 
3161 const char kServiceConfigPerCall[] =
3162     "{\n"
3163     "  \"loadBalancingConfig\": [\n"
3164     "    {\"weighted_round_robin\": {\n"
3165     "      \"blackoutPeriod\": \"0s\",\n"
3166     "      \"weightUpdatePeriod\": \"0.1s\"\n"
3167     "    }}\n"
3168     "  ]\n"
3169     "}";
3170 
3171 const char kServiceConfigOob[] =
3172     "{\n"
3173     "  \"loadBalancingConfig\": [\n"
3174     "    {\"weighted_round_robin\": {\n"
3175     "      \"blackoutPeriod\": \"0s\",\n"
3176     "      \"weightUpdatePeriod\": \"0.1s\",\n"
3177     "      \"enableOobLoadReport\": true\n"
3178     "    }}\n"
3179     "  ]\n"
3180     "}";
3181 
3182 const char kServiceConfigWithOutlierDetection[] =
3183     "{\n"
3184     "  \"loadBalancingConfig\": [\n"
3185     "    {\"outlier_detection_experimental\": {\n"
3186     "      \"childPolicy\": [\n"
3187     "        {\"weighted_round_robin\": {\n"
3188     "          \"blackoutPeriod\": \"%ds\",\n"
3189     "          \"weightUpdatePeriod\": \"0.1s\"\n"
3190     "        }}\n"
3191     "      ]\n"
3192     "    }}\n"
3193     "  ]\n"
3194     "}";
3195 
3196 class WeightedRoundRobinTest : public ClientLbEnd2endTest {
3197  protected:
ExpectWeightedRoundRobinPicks(const grpc_core::DebugLocation & location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,const std::vector<size_t> & expected_weights,size_t total_passes=3,EchoRequest * request_ptr=nullptr,int timeout_ms=15000)3198   void ExpectWeightedRoundRobinPicks(
3199       const grpc_core::DebugLocation& location,
3200       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
3201       const std::vector<size_t>& expected_weights, size_t total_passes = 3,
3202       EchoRequest* request_ptr = nullptr, int timeout_ms = 15000) {
3203     CHECK_EQ(expected_weights.size(), servers_.size());
3204     size_t total_picks_per_pass = 0;
3205     for (size_t picks : expected_weights) {
3206       total_picks_per_pass += picks;
3207     }
3208     size_t num_picks = 0;
3209     size_t num_passes = 0;
3210     SendRpcsUntil(
3211         location, stub,
3212         [&](const Status&) {
3213           if (++num_picks == total_picks_per_pass) {
3214             bool match = true;
3215             for (size_t i = 0; i < expected_weights.size(); ++i) {
3216               if (servers_[i]->service_.request_count() !=
3217                   expected_weights[i]) {
3218                 match = false;
3219                 break;
3220               }
3221             }
3222             if (match) {
3223               if (++num_passes == total_passes) return false;
3224             } else {
3225               num_passes = 0;
3226             }
3227             num_picks = 0;
3228             ResetCounters();
3229           }
3230           return true;
3231         },
3232         request_ptr, timeout_ms);
3233   }
3234 };
3235 
TEST_F(WeightedRoundRobinTest,CallAndServerMetric)3236 TEST_F(WeightedRoundRobinTest, CallAndServerMetric) {
3237   const int kNumServers = 3;
3238   StartServers(kNumServers);
3239   // Report server metrics that should give 6:4:3 WRR picks.
3240   // weights = qps / (util + (eps/qps)) =
3241   //   1/(0.2+0.2) : 1/(0.3+0.3) : 2/(1.5+0.1) = 6:4:3
3242   // where util is app_util if set, or cpu_util.
3243   servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.2);
3244   servers_[0]->server_metric_recorder_->SetEps(20);
3245   servers_[0]->server_metric_recorder_->SetQps(100);
3246   servers_[1]->server_metric_recorder_->SetApplicationUtilization(0.3);
3247   servers_[1]->server_metric_recorder_->SetEps(30);
3248   servers_[1]->server_metric_recorder_->SetQps(100);
3249   servers_[2]->server_metric_recorder_->SetApplicationUtilization(1.5);
3250   servers_[2]->server_metric_recorder_->SetEps(20);
3251   servers_[2]->server_metric_recorder_->SetQps(200);
3252   // Create channel.
3253   FakeResolverResponseGeneratorWrapper response_generator;
3254   auto channel = BuildChannel("", response_generator);
3255   auto stub = BuildStub(channel);
3256   response_generator.SetNextResolution(GetServersPorts(),
3257                                        kServiceConfigPerCall);
3258   // Send requests with per-call reported EPS/QPS set to 0/100.
3259   // This should give 1/2:1/3:1/15 = 15:10:2 WRR picks.
3260   EchoRequest request;
3261   // We cannot override with 0 with proto3, so setting it to almost 0.
3262   request.mutable_param()->mutable_backend_metrics()->set_eps(
3263       std::numeric_limits<double>::min());
3264   request.mutable_param()->mutable_backend_metrics()->set_rps_fractional(100);
3265   ExpectWeightedRoundRobinPicks(DEBUG_LOCATION, stub,
3266                                 /*expected_weights=*/{15, 10, 2},
3267                                 /*total_passes=*/3, &request);
3268   // Now send requests without per-call reported QPS.
3269   // This should change WRR picks back to 6:4:3.
3270   ExpectWeightedRoundRobinPicks(DEBUG_LOCATION, stub,
3271                                 /*expected_weights=*/{6, 4, 3});
3272   // Check LB policy name for the channel.
3273   EXPECT_EQ("weighted_round_robin", channel->GetLoadBalancingPolicyName());
3274 }
3275 
3276 // This tests a bug seen in production where the outlier_detection
3277 // policy would incorrectly generate a duplicate READY notification on
3278 // all of its subchannels every time it saw an update, thus causing the
3279 // WRR policy to re-enter the blackout period for that address.
TEST_F(WeightedRoundRobinTest,WithOutlierDetection)3280 TEST_F(WeightedRoundRobinTest, WithOutlierDetection) {
3281   const int kBlackoutPeriodSeconds = 10;
3282   const int kNumServers = 3;
3283   StartServers(kNumServers);
3284   // Report server metrics that should give 6:4:3 WRR picks.
3285   // weights = qps / (util + (eps/qps)) =
3286   //   1/(0.2+0.2) : 1/(0.3+0.3) : 2/(1.5+0.1) = 6:4:3
3287   // where util is app_util if set, or cpu_util.
3288   servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.2);
3289   servers_[0]->server_metric_recorder_->SetEps(20);
3290   servers_[0]->server_metric_recorder_->SetQps(100);
3291   servers_[1]->server_metric_recorder_->SetApplicationUtilization(0.3);
3292   servers_[1]->server_metric_recorder_->SetEps(30);
3293   servers_[1]->server_metric_recorder_->SetQps(100);
3294   servers_[2]->server_metric_recorder_->SetApplicationUtilization(1.5);
3295   servers_[2]->server_metric_recorder_->SetEps(20);
3296   servers_[2]->server_metric_recorder_->SetQps(200);
3297   // Create channel.
3298   // Initial blackout period is 0, so that we start seeing traffic in
3299   // the right proportions right away.
3300   FakeResolverResponseGeneratorWrapper response_generator;
3301   auto channel = BuildChannel("", response_generator);
3302   auto stub = BuildStub(channel);
3303   response_generator.SetNextResolution(
3304       GetServersPorts(),
3305       absl::StrFormat(kServiceConfigWithOutlierDetection, 0).c_str());
3306   // Send requests with per-call reported EPS/QPS set to 0/100.
3307   // This should give 1/2:1/3:1/15 = 15:10:2 WRR picks.
3308   // Keep sending RPCs long enough to go past the new blackout period
3309   // that we're going to add later.
3310   absl::Time deadline =
3311       absl::Now() +
3312       absl::Seconds(kBlackoutPeriodSeconds * grpc_test_slowdown_factor());
3313   EchoRequest request;
3314   // We cannot override with 0 with proto3, so setting it to almost 0.
3315   request.mutable_param()->mutable_backend_metrics()->set_eps(
3316       std::numeric_limits<double>::min());
3317   request.mutable_param()->mutable_backend_metrics()->set_rps_fractional(100);
3318   do {
3319     ExpectWeightedRoundRobinPicks(DEBUG_LOCATION, stub,
3320                                   /*expected_weights=*/{15, 10, 2},
3321                                   /*total_passes=*/3, &request);
3322   } while (absl::Now() < deadline);
3323   // Send a new resolver response that increases blackout period.
3324   response_generator.SetNextResolution(
3325       GetServersPorts(),
3326       absl::StrFormat(kServiceConfigWithOutlierDetection,
3327                       kBlackoutPeriodSeconds * grpc_test_slowdown_factor())
3328           .c_str());
3329   // Weights should be the same before the blackout period expires.
3330   ExpectWeightedRoundRobinPicks(
3331       DEBUG_LOCATION, stub, /*expected_weights=*/{15, 10, 2},
3332       /*total_passes=*/3, &request,
3333       /*timeout_ms=*/(kBlackoutPeriodSeconds - 1) * 1000);
3334 }
3335 
3336 class WeightedRoundRobinParamTest
3337     : public WeightedRoundRobinTest,
3338       public ::testing::WithParamInterface<const char*> {};
3339 
3340 INSTANTIATE_TEST_SUITE_P(WeightedRoundRobin, WeightedRoundRobinParamTest,
3341                          ::testing::Values(kServiceConfigPerCall,
3342                                            kServiceConfigOob));
3343 
TEST_P(WeightedRoundRobinParamTest,Basic)3344 TEST_P(WeightedRoundRobinParamTest, Basic) {
3345   const int kNumServers = 3;
3346   StartServers(kNumServers);
3347   // Report server metrics that should give 1:2:4 WRR picks.
3348   // weights = qps / (util + (eps/qps)) =
3349   //   1/(0.4+0.4) : 1/(0.2+0.2) : 2/(0.3+0.1) = 1:2:4
3350   // where util is app_util if set, or cpu_util.
3351   servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.4);
3352   servers_[0]->server_metric_recorder_->SetEps(40);
3353   servers_[0]->server_metric_recorder_->SetQps(100);
3354   servers_[1]->server_metric_recorder_->SetApplicationUtilization(0.2);
3355   servers_[1]->server_metric_recorder_->SetEps(20);
3356   servers_[1]->server_metric_recorder_->SetQps(100);
3357   servers_[2]->server_metric_recorder_->SetApplicationUtilization(0.3);
3358   servers_[2]->server_metric_recorder_->SetEps(5);
3359   servers_[2]->server_metric_recorder_->SetQps(200);
3360   // Create channel.
3361   FakeResolverResponseGeneratorWrapper response_generator;
3362   auto channel = BuildChannel("", response_generator);
3363   auto stub = BuildStub(channel);
3364   response_generator.SetNextResolution(GetServersPorts(), GetParam());
3365   // Wait for the right set of WRR picks.
3366   ExpectWeightedRoundRobinPicks(DEBUG_LOCATION, stub,
3367                                 /*expected_weights=*/{1, 2, 4});
3368   // Check LB policy name for the channel.
3369   EXPECT_EQ("weighted_round_robin", channel->GetLoadBalancingPolicyName());
3370 }
3371 
3372 }  // namespace
3373 }  // namespace testing
3374 }  // namespace grpc
3375 
main(int argc,char ** argv)3376 int main(int argc, char** argv) {
3377   ::testing::InitGoogleTest(&argc, argv);
3378   grpc::testing::TestEnvironment env(&argc, argv);
3379   // Make the backup poller poll very frequently in order to pick up
3380   // updates from all the subchannels's FDs.
3381   grpc_core::ConfigVars::Overrides overrides;
3382   overrides.client_channel_backup_poll_interval_ms = 1;
3383   grpc_core::ConfigVars::SetOverrides(overrides);
3384 #if TARGET_OS_IPHONE
3385   // Workaround Apple CFStream bug
3386   grpc_core::SetEnv("grpc_cfstream", "0");
3387 #endif
3388   grpc_init();
3389   grpc::testing::ConnectionAttemptInjector::Init();
3390   const auto result = RUN_ALL_TESTS();
3391   grpc_shutdown();
3392   return result;
3393 }
3394