1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <memory>
20 #include <mutex>
21 #include <sstream>
22 #include <thread>
23
24 #include <grpc/grpc.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/string_util.h>
28 #include <grpc/support/time.h>
29 #include <grpcpp/channel.h>
30 #include <grpcpp/client_context.h>
31 #include <grpcpp/create_channel.h>
32 #include <grpcpp/server.h>
33 #include <grpcpp/server_builder.h>
34
35 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
36 #include "src/core/lib/gpr/env.h"
37 #include "src/core/lib/gprpp/ref_counted_ptr.h"
38 #include "src/core/lib/iomgr/sockaddr.h"
39 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
40 #include "src/cpp/server/secure_server_credentials.h"
41
42 #include "src/cpp/client/secure_credentials.h"
43
44 #include "test/core/util/port.h"
45 #include "test/core/util/test_config.h"
46 #include "test/cpp/end2end/test_service_impl.h"
47
48 #include "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h"
49 #include "src/proto/grpc/testing/echo.grpc.pb.h"
50
51 #include <gmock/gmock.h>
52 #include <gtest/gtest.h>
53
54 // TODO(dgq): Other scenarios in need of testing:
55 // - Send a serverlist with faulty ip:port addresses (port > 2^16, etc).
56 // - Test reception of invalid serverlist
57 // - Test pinging
58 // - Test against a non-LB server.
59 // - Random LB server closing the stream unexpectedly.
60 // - Test using DNS-resolvable names (localhost?)
61 // - Test handling of creation of faulty RR instance by having the LB return a
62 // serverlist with non-existent backends after having initially returned a
63 // valid one.
64 //
65 // Findings from end to end testing to be covered here:
66 // - Handling of LB servers restart, including reconnection after backing-off
67 // retries.
68 // - Destruction of load balanced channel (and therefore of grpclb instance)
69 // while:
70 // 1) the internal LB call is still active. This should work by virtue
71 // of the weak reference the LB call holds. The call should be terminated as
72 // part of the grpclb shutdown process.
73 // 2) the retry timer is active. Again, the weak reference it holds should
74 // prevent a premature call to \a glb_destroy.
75 // - Restart of backend servers with no changes to serverlist. This exercises
76 // the RR handover mechanism.
77
78 using std::chrono::system_clock;
79
80 using grpc::lb::v1::LoadBalanceRequest;
81 using grpc::lb::v1::LoadBalanceResponse;
82 using grpc::lb::v1::LoadBalancer;
83
84 namespace grpc {
85 namespace testing {
86 namespace {
87
88 template <typename ServiceType>
89 class CountedService : public ServiceType {
90 public:
request_count()91 size_t request_count() {
92 std::unique_lock<std::mutex> lock(mu_);
93 return request_count_;
94 }
95
response_count()96 size_t response_count() {
97 std::unique_lock<std::mutex> lock(mu_);
98 return response_count_;
99 }
100
IncreaseResponseCount()101 void IncreaseResponseCount() {
102 std::unique_lock<std::mutex> lock(mu_);
103 ++response_count_;
104 }
IncreaseRequestCount()105 void IncreaseRequestCount() {
106 std::unique_lock<std::mutex> lock(mu_);
107 ++request_count_;
108 }
109
ResetCounters()110 void ResetCounters() {
111 std::unique_lock<std::mutex> lock(mu_);
112 request_count_ = 0;
113 response_count_ = 0;
114 }
115
116 protected:
117 std::mutex mu_;
118
119 private:
120 size_t request_count_ = 0;
121 size_t response_count_ = 0;
122 };
123
124 using BackendService = CountedService<TestServiceImpl>;
125 using BalancerService = CountedService<LoadBalancer::Service>;
126
127 const char g_kCallCredsMdKey[] = "Balancer should not ...";
128 const char g_kCallCredsMdValue[] = "... receive me";
129
130 class BackendServiceImpl : public BackendService {
131 public:
BackendServiceImpl()132 BackendServiceImpl() {}
133
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)134 Status Echo(ServerContext* context, const EchoRequest* request,
135 EchoResponse* response) override {
136 // Backend should receive the call credentials metadata.
137 auto call_credentials_entry =
138 context->client_metadata().find(g_kCallCredsMdKey);
139 EXPECT_NE(call_credentials_entry, context->client_metadata().end());
140 if (call_credentials_entry != context->client_metadata().end()) {
141 EXPECT_EQ(call_credentials_entry->second, g_kCallCredsMdValue);
142 }
143 IncreaseRequestCount();
144 const auto status = TestServiceImpl::Echo(context, request, response);
145 IncreaseResponseCount();
146 return status;
147 }
148
149 // Returns true on its first invocation, false otherwise.
Shutdown()150 bool Shutdown() {
151 std::unique_lock<std::mutex> lock(mu_);
152 const bool prev = !shutdown_;
153 shutdown_ = true;
154 gpr_log(GPR_INFO, "Backend: shut down");
155 return prev;
156 }
157
158 private:
159 std::mutex mu_;
160 bool shutdown_ = false;
161 };
162
Ip4ToPackedString(const char * ip_str)163 grpc::string Ip4ToPackedString(const char* ip_str) {
164 struct in_addr ip4;
165 GPR_ASSERT(inet_pton(AF_INET, ip_str, &ip4) == 1);
166 return grpc::string(reinterpret_cast<const char*>(&ip4), sizeof(ip4));
167 }
168
169 struct ClientStats {
170 size_t num_calls_started = 0;
171 size_t num_calls_finished = 0;
172 size_t num_calls_finished_with_client_failed_to_send = 0;
173 size_t num_calls_finished_known_received = 0;
174 std::map<grpc::string, size_t> drop_token_counts;
175
operator +=grpc::testing::__anonf0b0436a0111::ClientStats176 ClientStats& operator+=(const ClientStats& other) {
177 num_calls_started += other.num_calls_started;
178 num_calls_finished += other.num_calls_finished;
179 num_calls_finished_with_client_failed_to_send +=
180 other.num_calls_finished_with_client_failed_to_send;
181 num_calls_finished_known_received +=
182 other.num_calls_finished_known_received;
183 for (const auto& p : other.drop_token_counts) {
184 drop_token_counts[p.first] += p.second;
185 }
186 return *this;
187 }
188 };
189
190 class BalancerServiceImpl : public BalancerService {
191 public:
192 using Stream = ServerReaderWriter<LoadBalanceResponse, LoadBalanceRequest>;
193 using ResponseDelayPair = std::pair<LoadBalanceResponse, int>;
194
BalancerServiceImpl(int client_load_reporting_interval_seconds)195 explicit BalancerServiceImpl(int client_load_reporting_interval_seconds)
196 : client_load_reporting_interval_seconds_(
197 client_load_reporting_interval_seconds),
198 shutdown_(false) {}
199
BalanceLoad(ServerContext * context,Stream * stream)200 Status BalanceLoad(ServerContext* context, Stream* stream) override {
201 // Balancer shouldn't receive the call credentials metadata.
202 EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey),
203 context->client_metadata().end());
204 gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this);
205 LoadBalanceRequest request;
206 std::vector<ResponseDelayPair> responses_and_delays;
207
208 if (!stream->Read(&request)) {
209 goto done;
210 }
211 IncreaseRequestCount();
212 gpr_log(GPR_INFO, "LB[%p]: received initial message '%s'", this,
213 request.DebugString().c_str());
214
215 // TODO(juanlishen): Initial response should always be the first response.
216 if (client_load_reporting_interval_seconds_ > 0) {
217 LoadBalanceResponse initial_response;
218 initial_response.mutable_initial_response()
219 ->mutable_client_stats_report_interval()
220 ->set_seconds(client_load_reporting_interval_seconds_);
221 stream->Write(initial_response);
222 }
223
224 {
225 std::unique_lock<std::mutex> lock(mu_);
226 responses_and_delays = responses_and_delays_;
227 }
228 for (const auto& response_and_delay : responses_and_delays) {
229 {
230 std::unique_lock<std::mutex> lock(mu_);
231 if (shutdown_) goto done;
232 }
233 SendResponse(stream, response_and_delay.first, response_and_delay.second);
234 }
235 {
236 std::unique_lock<std::mutex> lock(mu_);
237 if (shutdown_) goto done;
238 serverlist_cond_.wait(lock, [this] { return serverlist_ready_; });
239 }
240
241 if (client_load_reporting_interval_seconds_ > 0) {
242 request.Clear();
243 if (stream->Read(&request)) {
244 gpr_log(GPR_INFO, "LB[%p]: received client load report message '%s'",
245 this, request.DebugString().c_str());
246 GPR_ASSERT(request.has_client_stats());
247 // We need to acquire the lock here in order to prevent the notify_one
248 // below from firing before its corresponding wait is executed.
249 std::lock_guard<std::mutex> lock(mu_);
250 client_stats_.num_calls_started +=
251 request.client_stats().num_calls_started();
252 client_stats_.num_calls_finished +=
253 request.client_stats().num_calls_finished();
254 client_stats_.num_calls_finished_with_client_failed_to_send +=
255 request.client_stats()
256 .num_calls_finished_with_client_failed_to_send();
257 client_stats_.num_calls_finished_known_received +=
258 request.client_stats().num_calls_finished_known_received();
259 for (const auto& drop_token_count :
260 request.client_stats().calls_finished_with_drop()) {
261 client_stats_
262 .drop_token_counts[drop_token_count.load_balance_token()] +=
263 drop_token_count.num_calls();
264 }
265 load_report_ready_ = true;
266 load_report_cond_.notify_one();
267 }
268 }
269 done:
270 gpr_log(GPR_INFO, "LB[%p]: done", this);
271 return Status::OK;
272 }
273
add_response(const LoadBalanceResponse & response,int send_after_ms)274 void add_response(const LoadBalanceResponse& response, int send_after_ms) {
275 std::unique_lock<std::mutex> lock(mu_);
276 responses_and_delays_.push_back(std::make_pair(response, send_after_ms));
277 }
278
279 // Returns true on its first invocation, false otherwise.
Shutdown()280 bool Shutdown() {
281 NotifyDoneWithServerlists();
282 std::unique_lock<std::mutex> lock(mu_);
283 const bool prev = !shutdown_;
284 shutdown_ = true;
285 gpr_log(GPR_INFO, "LB[%p]: shut down", this);
286 return prev;
287 }
288
BuildResponseForBackends(const std::vector<int> & backend_ports,const std::map<grpc::string,size_t> & drop_token_counts)289 static LoadBalanceResponse BuildResponseForBackends(
290 const std::vector<int>& backend_ports,
291 const std::map<grpc::string, size_t>& drop_token_counts) {
292 LoadBalanceResponse response;
293 for (const auto& drop_token_count : drop_token_counts) {
294 for (size_t i = 0; i < drop_token_count.second; ++i) {
295 auto* server = response.mutable_server_list()->add_servers();
296 server->set_drop(true);
297 server->set_load_balance_token(drop_token_count.first);
298 }
299 }
300 for (const int& backend_port : backend_ports) {
301 auto* server = response.mutable_server_list()->add_servers();
302 server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
303 server->set_port(backend_port);
304 }
305 return response;
306 }
307
WaitForLoadReport()308 const ClientStats& WaitForLoadReport() {
309 std::unique_lock<std::mutex> lock(mu_);
310 load_report_cond_.wait(lock, [this] { return load_report_ready_; });
311 load_report_ready_ = false;
312 return client_stats_;
313 }
314
NotifyDoneWithServerlists()315 void NotifyDoneWithServerlists() {
316 std::lock_guard<std::mutex> lock(mu_);
317 serverlist_ready_ = true;
318 serverlist_cond_.notify_all();
319 }
320
321 private:
SendResponse(Stream * stream,const LoadBalanceResponse & response,int delay_ms)322 void SendResponse(Stream* stream, const LoadBalanceResponse& response,
323 int delay_ms) {
324 gpr_log(GPR_INFO, "LB[%p]: sleeping for %d ms...", this, delay_ms);
325 if (delay_ms > 0) {
326 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms));
327 }
328 gpr_log(GPR_INFO, "LB[%p]: Woke up! Sending response '%s'", this,
329 response.DebugString().c_str());
330 IncreaseResponseCount();
331 stream->Write(response);
332 }
333
334 const int client_load_reporting_interval_seconds_;
335 std::vector<ResponseDelayPair> responses_and_delays_;
336 std::mutex mu_;
337 std::condition_variable load_report_cond_;
338 bool load_report_ready_ = false;
339 std::condition_variable serverlist_cond_;
340 bool serverlist_ready_ = false;
341 ClientStats client_stats_;
342 bool shutdown_;
343 };
344
345 class GrpclbEnd2endTest : public ::testing::Test {
346 protected:
GrpclbEnd2endTest(int num_backends,int num_balancers,int client_load_reporting_interval_seconds)347 GrpclbEnd2endTest(int num_backends, int num_balancers,
348 int client_load_reporting_interval_seconds)
349 : server_host_("localhost"),
350 num_backends_(num_backends),
351 num_balancers_(num_balancers),
352 client_load_reporting_interval_seconds_(
353 client_load_reporting_interval_seconds) {
354 // Make the backup poller poll very frequently in order to pick up
355 // updates from all the subchannels's FDs.
356 gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1");
357 }
358
SetUp()359 void SetUp() override {
360 response_generator_ =
361 grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
362 // Start the backends.
363 for (size_t i = 0; i < num_backends_; ++i) {
364 backends_.emplace_back(new BackendServiceImpl());
365 backend_servers_.emplace_back(ServerThread<BackendService>(
366 "backend", server_host_, backends_.back().get()));
367 }
368 // Start the load balancers.
369 for (size_t i = 0; i < num_balancers_; ++i) {
370 balancers_.emplace_back(
371 new BalancerServiceImpl(client_load_reporting_interval_seconds_));
372 balancer_servers_.emplace_back(ServerThread<BalancerService>(
373 "balancer", server_host_, balancers_.back().get()));
374 }
375 ResetStub();
376 }
377
TearDown()378 void TearDown() override {
379 for (size_t i = 0; i < backends_.size(); ++i) {
380 if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
381 }
382 for (size_t i = 0; i < balancers_.size(); ++i) {
383 if (balancers_[i]->Shutdown()) balancer_servers_[i].Shutdown();
384 }
385 }
386
SetNextResolutionAllBalancers()387 void SetNextResolutionAllBalancers() {
388 std::vector<AddressData> addresses;
389 for (size_t i = 0; i < balancer_servers_.size(); ++i) {
390 addresses.emplace_back(AddressData{balancer_servers_[i].port_, true, ""});
391 }
392 SetNextResolution(addresses);
393 }
394
ResetStub(int fallback_timeout=0,const grpc::string & expected_targets="")395 void ResetStub(int fallback_timeout = 0,
396 const grpc::string& expected_targets = "") {
397 ChannelArguments args;
398 args.SetGrpclbFallbackTimeout(fallback_timeout);
399 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
400 response_generator_.get());
401 if (!expected_targets.empty()) {
402 args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, expected_targets);
403 }
404 std::ostringstream uri;
405 uri << "fake:///" << kApplicationTargetName_;
406 // TODO(dgq): templatize tests to run everything using both secure and
407 // insecure channel credentials.
408 grpc_channel_credentials* channel_creds =
409 grpc_fake_transport_security_credentials_create();
410 grpc_call_credentials* call_creds = grpc_md_only_test_credentials_create(
411 g_kCallCredsMdKey, g_kCallCredsMdValue, false);
412 std::shared_ptr<ChannelCredentials> creds(
413 new SecureChannelCredentials(grpc_composite_channel_credentials_create(
414 channel_creds, call_creds, nullptr)));
415 grpc_call_credentials_unref(call_creds);
416 grpc_channel_credentials_unref(channel_creds);
417 channel_ = CreateCustomChannel(uri.str(), creds, args);
418 stub_ = grpc::testing::EchoTestService::NewStub(channel_);
419 }
420
ResetBackendCounters()421 void ResetBackendCounters() {
422 for (const auto& backend : backends_) backend->ResetCounters();
423 }
424
WaitForLoadReports()425 ClientStats WaitForLoadReports() {
426 ClientStats client_stats;
427 for (const auto& balancer : balancers_) {
428 client_stats += balancer->WaitForLoadReport();
429 }
430 return client_stats;
431 }
432
SeenAllBackends()433 bool SeenAllBackends() {
434 for (const auto& backend : backends_) {
435 if (backend->request_count() == 0) return false;
436 }
437 return true;
438 }
439
SendRpcAndCount(int * num_total,int * num_ok,int * num_failure,int * num_drops)440 void SendRpcAndCount(int* num_total, int* num_ok, int* num_failure,
441 int* num_drops) {
442 const Status status = SendRpc();
443 if (status.ok()) {
444 ++*num_ok;
445 } else {
446 if (status.error_message() == "Call dropped by load balancing policy") {
447 ++*num_drops;
448 } else {
449 ++*num_failure;
450 }
451 }
452 ++*num_total;
453 }
454
WaitForAllBackends(int num_requests_multiple_of=1)455 std::tuple<int, int, int> WaitForAllBackends(
456 int num_requests_multiple_of = 1) {
457 int num_ok = 0;
458 int num_failure = 0;
459 int num_drops = 0;
460 int num_total = 0;
461 while (!SeenAllBackends()) {
462 SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops);
463 }
464 while (num_total % num_requests_multiple_of != 0) {
465 SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops);
466 }
467 ResetBackendCounters();
468 gpr_log(GPR_INFO,
469 "Performed %d warm up requests (a multiple of %d) against the "
470 "backends. %d succeeded, %d failed, %d dropped.",
471 num_total, num_requests_multiple_of, num_ok, num_failure,
472 num_drops);
473 return std::make_tuple(num_ok, num_failure, num_drops);
474 }
475
WaitForBackend(size_t backend_idx)476 void WaitForBackend(size_t backend_idx) {
477 do {
478 (void)SendRpc();
479 } while (backends_[backend_idx]->request_count() == 0);
480 ResetBackendCounters();
481 }
482
483 struct AddressData {
484 int port;
485 bool is_balancer;
486 grpc::string balancer_name;
487 };
488
CreateLbAddressesFromAddressDataList(const std::vector<AddressData> & address_data)489 grpc_lb_addresses* CreateLbAddressesFromAddressDataList(
490 const std::vector<AddressData>& address_data) {
491 grpc_lb_addresses* addresses =
492 grpc_lb_addresses_create(address_data.size(), nullptr);
493 for (size_t i = 0; i < address_data.size(); ++i) {
494 char* lb_uri_str;
495 gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", address_data[i].port);
496 grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
497 GPR_ASSERT(lb_uri != nullptr);
498 grpc_lb_addresses_set_address_from_uri(
499 addresses, i, lb_uri, address_data[i].is_balancer,
500 address_data[i].balancer_name.c_str(), nullptr);
501 grpc_uri_destroy(lb_uri);
502 gpr_free(lb_uri_str);
503 }
504 return addresses;
505 }
506
SetNextResolution(const std::vector<AddressData> & address_data)507 void SetNextResolution(const std::vector<AddressData>& address_data) {
508 grpc_core::ExecCtx exec_ctx;
509 grpc_lb_addresses* addresses =
510 CreateLbAddressesFromAddressDataList(address_data);
511 grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses);
512 grpc_channel_args fake_result = {1, &fake_addresses};
513 response_generator_->SetResponse(&fake_result);
514 grpc_lb_addresses_destroy(addresses);
515 }
516
SetNextReresolutionResponse(const std::vector<AddressData> & address_data)517 void SetNextReresolutionResponse(
518 const std::vector<AddressData>& address_data) {
519 grpc_core::ExecCtx exec_ctx;
520 grpc_lb_addresses* addresses =
521 CreateLbAddressesFromAddressDataList(address_data);
522 grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses);
523 grpc_channel_args fake_result = {1, &fake_addresses};
524 response_generator_->SetReresolutionResponse(&fake_result);
525 grpc_lb_addresses_destroy(addresses);
526 }
527
GetBackendPorts(const size_t start_index=0) const528 const std::vector<int> GetBackendPorts(const size_t start_index = 0) const {
529 std::vector<int> backend_ports;
530 for (size_t i = start_index; i < backend_servers_.size(); ++i) {
531 backend_ports.push_back(backend_servers_[i].port_);
532 }
533 return backend_ports;
534 }
535
ScheduleResponseForBalancer(size_t i,const LoadBalanceResponse & response,int delay_ms)536 void ScheduleResponseForBalancer(size_t i,
537 const LoadBalanceResponse& response,
538 int delay_ms) {
539 balancers_.at(i)->add_response(response, delay_ms);
540 }
541
SendRpc(EchoResponse * response=nullptr,int timeout_ms=1000)542 Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000) {
543 const bool local_response = (response == nullptr);
544 if (local_response) response = new EchoResponse;
545 EchoRequest request;
546 request.set_message(kRequestMessage_);
547 ClientContext context;
548 context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
549 Status status = stub_->Echo(&context, request, response);
550 if (local_response) delete response;
551 return status;
552 }
553
CheckRpcSendOk(const size_t times=1,const int timeout_ms=1000)554 void CheckRpcSendOk(const size_t times = 1, const int timeout_ms = 1000) {
555 for (size_t i = 0; i < times; ++i) {
556 EchoResponse response;
557 const Status status = SendRpc(&response, timeout_ms);
558 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
559 << " message=" << status.error_message();
560 EXPECT_EQ(response.message(), kRequestMessage_);
561 }
562 }
563
CheckRpcSendFailure()564 void CheckRpcSendFailure() {
565 const Status status = SendRpc();
566 EXPECT_FALSE(status.ok());
567 }
568
569 template <typename T>
570 struct ServerThread {
ServerThreadgrpc::testing::__anonf0b0436a0111::GrpclbEnd2endTest::ServerThread571 explicit ServerThread(const grpc::string& type,
572 const grpc::string& server_host, T* service)
573 : type_(type), service_(service) {
574 std::mutex mu;
575 // We need to acquire the lock here in order to prevent the notify_one
576 // by ServerThread::Start from firing before the wait below is hit.
577 std::unique_lock<std::mutex> lock(mu);
578 port_ = grpc_pick_unused_port_or_die();
579 gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_);
580 std::condition_variable cond;
581 thread_.reset(new std::thread(
582 std::bind(&ServerThread::Start, this, server_host, &mu, &cond)));
583 cond.wait(lock);
584 gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
585 }
586
Startgrpc::testing::__anonf0b0436a0111::GrpclbEnd2endTest::ServerThread587 void Start(const grpc::string& server_host, std::mutex* mu,
588 std::condition_variable* cond) {
589 // We need to acquire the lock here in order to prevent the notify_one
590 // below from firing before its corresponding wait is executed.
591 std::lock_guard<std::mutex> lock(*mu);
592 std::ostringstream server_address;
593 server_address << server_host << ":" << port_;
594 ServerBuilder builder;
595 std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
596 grpc_fake_transport_security_server_credentials_create()));
597 builder.AddListeningPort(server_address.str(), creds);
598 builder.RegisterService(service_);
599 server_ = builder.BuildAndStart();
600 cond->notify_one();
601 }
602
Shutdowngrpc::testing::__anonf0b0436a0111::GrpclbEnd2endTest::ServerThread603 void Shutdown() {
604 gpr_log(GPR_INFO, "%s about to shutdown", type_.c_str());
605 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
606 thread_->join();
607 gpr_log(GPR_INFO, "%s shutdown completed", type_.c_str());
608 }
609
610 int port_;
611 grpc::string type_;
612 std::unique_ptr<Server> server_;
613 T* service_;
614 std::unique_ptr<std::thread> thread_;
615 };
616
617 const grpc::string server_host_;
618 const size_t num_backends_;
619 const size_t num_balancers_;
620 const int client_load_reporting_interval_seconds_;
621 std::shared_ptr<Channel> channel_;
622 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
623 std::vector<std::unique_ptr<BackendServiceImpl>> backends_;
624 std::vector<std::unique_ptr<BalancerServiceImpl>> balancers_;
625 std::vector<ServerThread<BackendService>> backend_servers_;
626 std::vector<ServerThread<BalancerService>> balancer_servers_;
627 grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
628 response_generator_;
629 const grpc::string kRequestMessage_ = "Live long and prosper.";
630 const grpc::string kApplicationTargetName_ = "application_target_name";
631 };
632
633 class SingleBalancerTest : public GrpclbEnd2endTest {
634 public:
SingleBalancerTest()635 SingleBalancerTest() : GrpclbEnd2endTest(4, 1, 0) {}
636 };
637
TEST_F(SingleBalancerTest,Vanilla)638 TEST_F(SingleBalancerTest, Vanilla) {
639 SetNextResolutionAllBalancers();
640 const size_t kNumRpcsPerAddress = 100;
641 ScheduleResponseForBalancer(
642 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
643 0);
644 // Make sure that trying to connect works without a call.
645 channel_->GetState(true /* try_to_connect */);
646 // We need to wait for all backends to come online.
647 WaitForAllBackends();
648 // Send kNumRpcsPerAddress RPCs per server.
649 CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
650
651 // Each backend should have gotten 100 requests.
652 for (size_t i = 0; i < backends_.size(); ++i) {
653 EXPECT_EQ(kNumRpcsPerAddress,
654 backend_servers_[i].service_->request_count());
655 }
656 balancers_[0]->NotifyDoneWithServerlists();
657 // The balancer got a single request.
658 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
659 // and sent a single response.
660 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
661
662 // Check LB policy name for the channel.
663 EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
664 }
665
TEST_F(SingleBalancerTest,SecureNaming)666 TEST_F(SingleBalancerTest, SecureNaming) {
667 ResetStub(0, kApplicationTargetName_ + ";lb");
668 SetNextResolution({AddressData{balancer_servers_[0].port_, true, "lb"}});
669 const size_t kNumRpcsPerAddress = 100;
670 ScheduleResponseForBalancer(
671 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
672 0);
673 // Make sure that trying to connect works without a call.
674 channel_->GetState(true /* try_to_connect */);
675 // We need to wait for all backends to come online.
676 WaitForAllBackends();
677 // Send kNumRpcsPerAddress RPCs per server.
678 CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
679
680 // Each backend should have gotten 100 requests.
681 for (size_t i = 0; i < backends_.size(); ++i) {
682 EXPECT_EQ(kNumRpcsPerAddress,
683 backend_servers_[i].service_->request_count());
684 }
685 balancers_[0]->NotifyDoneWithServerlists();
686 // The balancer got a single request.
687 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
688 // and sent a single response.
689 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
690 // Check LB policy name for the channel.
691 EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
692 }
693
TEST_F(SingleBalancerTest,SecureNamingDeathTest)694 TEST_F(SingleBalancerTest, SecureNamingDeathTest) {
695 ::testing::FLAGS_gtest_death_test_style = "threadsafe";
696 // Make sure that we blow up (via abort() from the security connector) when
697 // the name from the balancer doesn't match expectations.
698 ASSERT_DEATH(
699 {
700 ResetStub(0, kApplicationTargetName_ + ";lb");
701 SetNextResolution(
702 {AddressData{balancer_servers_[0].port_, true, "woops"}});
703 channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(1));
704 },
705 "");
706 }
707
TEST_F(SingleBalancerTest,InitiallyEmptyServerlist)708 TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
709 SetNextResolutionAllBalancers();
710 const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
711 const int kCallDeadlineMs = kServerlistDelayMs * 2;
712 // First response is an empty serverlist, sent right away.
713 ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0);
714 // Send non-empty serverlist only after kServerlistDelayMs
715 ScheduleResponseForBalancer(
716 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
717 kServerlistDelayMs);
718
719 const auto t0 = system_clock::now();
720 // Client will block: LB will initially send empty serverlist.
721 CheckRpcSendOk(1, kCallDeadlineMs);
722 const auto ellapsed_ms =
723 std::chrono::duration_cast<std::chrono::milliseconds>(
724 system_clock::now() - t0);
725 // but eventually, the LB sends a serverlist update that allows the call to
726 // proceed. The call delay must be larger than the delay in sending the
727 // populated serverlist but under the call's deadline (which is enforced by
728 // the call's deadline).
729 EXPECT_GT(ellapsed_ms.count(), kServerlistDelayMs);
730 balancers_[0]->NotifyDoneWithServerlists();
731 // The balancer got a single request.
732 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
733 // and sent two responses.
734 EXPECT_EQ(2U, balancer_servers_[0].service_->response_count());
735 }
736
TEST_F(SingleBalancerTest,AllServersUnreachableFailFast)737 TEST_F(SingleBalancerTest, AllServersUnreachableFailFast) {
738 SetNextResolutionAllBalancers();
739 const size_t kNumUnreachableServers = 5;
740 std::vector<int> ports;
741 for (size_t i = 0; i < kNumUnreachableServers; ++i) {
742 ports.push_back(grpc_pick_unused_port_or_die());
743 }
744 ScheduleResponseForBalancer(
745 0, BalancerServiceImpl::BuildResponseForBackends(ports, {}), 0);
746 const Status status = SendRpc();
747 // The error shouldn't be DEADLINE_EXCEEDED.
748 EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
749 balancers_[0]->NotifyDoneWithServerlists();
750 // The balancer got a single request.
751 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
752 // and sent a single response.
753 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
754 }
755
TEST_F(SingleBalancerTest,Fallback)756 TEST_F(SingleBalancerTest, Fallback) {
757 SetNextResolutionAllBalancers();
758 const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
759 const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
760 const size_t kNumBackendInResolution = backends_.size() / 2;
761
762 ResetStub(kFallbackTimeoutMs);
763 std::vector<AddressData> addresses;
764 addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
765 for (size_t i = 0; i < kNumBackendInResolution; ++i) {
766 addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""});
767 }
768 SetNextResolution(addresses);
769
770 // Send non-empty serverlist only after kServerlistDelayMs.
771 ScheduleResponseForBalancer(
772 0,
773 BalancerServiceImpl::BuildResponseForBackends(
774 GetBackendPorts(kNumBackendInResolution /* start_index */), {}),
775 kServerlistDelayMs);
776
777 // Wait until all the fallback backends are reachable.
778 for (size_t i = 0; i < kNumBackendInResolution; ++i) {
779 WaitForBackend(i);
780 }
781
782 // The first request.
783 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
784 CheckRpcSendOk(kNumBackendInResolution);
785 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
786
787 // Fallback is used: each backend returned by the resolver should have
788 // gotten one request.
789 for (size_t i = 0; i < kNumBackendInResolution; ++i) {
790 EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
791 }
792 for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
793 EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
794 }
795
796 // Wait until the serverlist reception has been processed and all backends
797 // in the serverlist are reachable.
798 for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
799 WaitForBackend(i);
800 }
801
802 // Send out the second request.
803 gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
804 CheckRpcSendOk(backends_.size() - kNumBackendInResolution);
805 gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
806
807 // Serverlist is used: each backend returned by the balancer should
808 // have gotten one request.
809 for (size_t i = 0; i < kNumBackendInResolution; ++i) {
810 EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
811 }
812 for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
813 EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
814 }
815
816 balancers_[0]->NotifyDoneWithServerlists();
817 // The balancer got a single request.
818 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
819 // and sent a single response.
820 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
821 }
822
TEST_F(SingleBalancerTest,FallbackUpdate)823 TEST_F(SingleBalancerTest, FallbackUpdate) {
824 SetNextResolutionAllBalancers();
825 const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
826 const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
827 const size_t kNumBackendInResolution = backends_.size() / 3;
828 const size_t kNumBackendInResolutionUpdate = backends_.size() / 3;
829
830 ResetStub(kFallbackTimeoutMs);
831 std::vector<AddressData> addresses;
832 addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
833 for (size_t i = 0; i < kNumBackendInResolution; ++i) {
834 addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""});
835 }
836 SetNextResolution(addresses);
837
838 // Send non-empty serverlist only after kServerlistDelayMs.
839 ScheduleResponseForBalancer(
840 0,
841 BalancerServiceImpl::BuildResponseForBackends(
842 GetBackendPorts(kNumBackendInResolution +
843 kNumBackendInResolutionUpdate /* start_index */),
844 {}),
845 kServerlistDelayMs);
846
847 // Wait until all the fallback backends are reachable.
848 for (size_t i = 0; i < kNumBackendInResolution; ++i) {
849 WaitForBackend(i);
850 }
851
852 // The first request.
853 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
854 CheckRpcSendOk(kNumBackendInResolution);
855 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
856
857 // Fallback is used: each backend returned by the resolver should have
858 // gotten one request.
859 for (size_t i = 0; i < kNumBackendInResolution; ++i) {
860 EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
861 }
862 for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
863 EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
864 }
865
866 addresses.clear();
867 addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
868 for (size_t i = kNumBackendInResolution;
869 i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
870 addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""});
871 }
872 SetNextResolution(addresses);
873
874 // Wait until the resolution update has been processed and all the new
875 // fallback backends are reachable.
876 for (size_t i = kNumBackendInResolution;
877 i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
878 WaitForBackend(i);
879 }
880
881 // Send out the second request.
882 gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
883 CheckRpcSendOk(kNumBackendInResolutionUpdate);
884 gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
885
886 // The resolution update is used: each backend in the resolution update should
887 // have gotten one request.
888 for (size_t i = 0; i < kNumBackendInResolution; ++i) {
889 EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
890 }
891 for (size_t i = kNumBackendInResolution;
892 i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
893 EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
894 }
895 for (size_t i = kNumBackendInResolution + kNumBackendInResolutionUpdate;
896 i < backends_.size(); ++i) {
897 EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
898 }
899
900 // Wait until the serverlist reception has been processed and all backends
901 // in the serverlist are reachable.
902 for (size_t i = kNumBackendInResolution + kNumBackendInResolutionUpdate;
903 i < backends_.size(); ++i) {
904 WaitForBackend(i);
905 }
906
907 // Send out the third request.
908 gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
909 CheckRpcSendOk(backends_.size() - kNumBackendInResolution -
910 kNumBackendInResolutionUpdate);
911 gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
912
913 // Serverlist is used: each backend returned by the balancer should
914 // have gotten one request.
915 for (size_t i = 0;
916 i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
917 EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
918 }
919 for (size_t i = kNumBackendInResolution + kNumBackendInResolutionUpdate;
920 i < backends_.size(); ++i) {
921 EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
922 }
923
924 balancers_[0]->NotifyDoneWithServerlists();
925 // The balancer got a single request.
926 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
927 // and sent a single response.
928 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
929 }
930
TEST_F(SingleBalancerTest,BackendsRestart)931 TEST_F(SingleBalancerTest, BackendsRestart) {
932 SetNextResolutionAllBalancers();
933 const size_t kNumRpcsPerAddress = 100;
934 ScheduleResponseForBalancer(
935 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
936 0);
937 // Make sure that trying to connect works without a call.
938 channel_->GetState(true /* try_to_connect */);
939 // Send kNumRpcsPerAddress RPCs per server.
940 CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
941 balancers_[0]->NotifyDoneWithServerlists();
942 // The balancer got a single request.
943 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
944 // and sent a single response.
945 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
946 for (size_t i = 0; i < backends_.size(); ++i) {
947 if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
948 }
949 CheckRpcSendFailure();
950 for (size_t i = 0; i < num_backends_; ++i) {
951 backends_.emplace_back(new BackendServiceImpl());
952 backend_servers_.emplace_back(ServerThread<BackendService>(
953 "backend", server_host_, backends_.back().get()));
954 }
955 // The following RPC will fail due to the backend ports having changed. It
956 // will nonetheless exercise the grpclb-roundrobin handling of the RR policy
957 // having gone into shutdown.
958 // TODO(dgq): implement the "backend restart" component as well. We need extra
959 // machinery to either update the LB responses "on the fly" or instruct
960 // backends which ports to restart on.
961 CheckRpcSendFailure();
962 }
963
964 class UpdatesTest : public GrpclbEnd2endTest {
965 public:
UpdatesTest()966 UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {}
967 };
968
TEST_F(UpdatesTest,UpdateBalancers)969 TEST_F(UpdatesTest, UpdateBalancers) {
970 SetNextResolutionAllBalancers();
971 const std::vector<int> first_backend{GetBackendPorts()[0]};
972 const std::vector<int> second_backend{GetBackendPorts()[1]};
973 ScheduleResponseForBalancer(
974 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
975 ScheduleResponseForBalancer(
976 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
977
978 // Wait until the first backend is ready.
979 WaitForBackend(0);
980
981 // Send 10 requests.
982 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
983 CheckRpcSendOk(10);
984 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
985
986 // All 10 requests should have gone to the first backend.
987 EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
988
989 balancers_[0]->NotifyDoneWithServerlists();
990 balancers_[1]->NotifyDoneWithServerlists();
991 balancers_[2]->NotifyDoneWithServerlists();
992 // Balancer 0 got a single request.
993 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
994 // and sent a single response.
995 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
996 EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
997 EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
998 EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
999 EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1000
1001 std::vector<AddressData> addresses;
1002 addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
1003 gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
1004 SetNextResolution(addresses);
1005 gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
1006
1007 // Wait until update has been processed, as signaled by the second backend
1008 // receiving a request.
1009 EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
1010 WaitForBackend(1);
1011
1012 backend_servers_[1].service_->ResetCounters();
1013 gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
1014 CheckRpcSendOk(10);
1015 gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
1016 // All 10 requests should have gone to the second backend.
1017 EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
1018
1019 balancers_[0]->NotifyDoneWithServerlists();
1020 balancers_[1]->NotifyDoneWithServerlists();
1021 balancers_[2]->NotifyDoneWithServerlists();
1022 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1023 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1024 EXPECT_EQ(1U, balancer_servers_[1].service_->request_count());
1025 EXPECT_EQ(1U, balancer_servers_[1].service_->response_count());
1026 EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
1027 EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1028 }
1029
1030 // Send an update with the same set of LBs as the one in SetUp() in order to
1031 // verify that the LB channel inside grpclb keeps the initial connection (which
1032 // by definition is also present in the update).
TEST_F(UpdatesTest,UpdateBalancersRepeated)1033 TEST_F(UpdatesTest, UpdateBalancersRepeated) {
1034 SetNextResolutionAllBalancers();
1035 const std::vector<int> first_backend{GetBackendPorts()[0]};
1036 const std::vector<int> second_backend{GetBackendPorts()[0]};
1037
1038 ScheduleResponseForBalancer(
1039 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
1040 ScheduleResponseForBalancer(
1041 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
1042
1043 // Wait until the first backend is ready.
1044 WaitForBackend(0);
1045
1046 // Send 10 requests.
1047 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
1048 CheckRpcSendOk(10);
1049 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
1050
1051 // All 10 requests should have gone to the first backend.
1052 EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
1053
1054 balancers_[0]->NotifyDoneWithServerlists();
1055 // Balancer 0 got a single request.
1056 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1057 // and sent a single response.
1058 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1059 EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
1060 EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
1061 EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
1062 EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1063
1064 std::vector<AddressData> addresses;
1065 addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
1066 addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
1067 addresses.emplace_back(AddressData{balancer_servers_[2].port_, true, ""});
1068 gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
1069 SetNextResolution(addresses);
1070 gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
1071
1072 EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
1073 gpr_timespec deadline = gpr_time_add(
1074 gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN));
1075 // Send 10 seconds worth of RPCs
1076 do {
1077 CheckRpcSendOk();
1078 } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
1079 // grpclb continued using the original LB call to the first balancer, which
1080 // doesn't assign the second backend.
1081 EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
1082 balancers_[0]->NotifyDoneWithServerlists();
1083
1084 addresses.clear();
1085 addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
1086 addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
1087 gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 2 ==========");
1088 SetNextResolution(addresses);
1089 gpr_log(GPR_INFO, "========= UPDATE 2 DONE ==========");
1090
1091 EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
1092 deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1093 gpr_time_from_millis(10000, GPR_TIMESPAN));
1094 // Send 10 seconds worth of RPCs
1095 do {
1096 CheckRpcSendOk();
1097 } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
1098 // grpclb continued using the original LB call to the first balancer, which
1099 // doesn't assign the second backend.
1100 EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
1101 balancers_[0]->NotifyDoneWithServerlists();
1102 }
1103
TEST_F(UpdatesTest,UpdateBalancersDeadUpdate)1104 TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
1105 std::vector<AddressData> addresses;
1106 addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
1107 SetNextResolution(addresses);
1108 const std::vector<int> first_backend{GetBackendPorts()[0]};
1109 const std::vector<int> second_backend{GetBackendPorts()[1]};
1110
1111 ScheduleResponseForBalancer(
1112 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
1113 ScheduleResponseForBalancer(
1114 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
1115
1116 // Start servers and send 10 RPCs per server.
1117 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
1118 CheckRpcSendOk(10);
1119 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
1120 // All 10 requests should have gone to the first backend.
1121 EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
1122
1123 // Kill balancer 0
1124 gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************");
1125 balancers_[0]->NotifyDoneWithServerlists();
1126 if (balancers_[0]->Shutdown()) balancer_servers_[0].Shutdown();
1127 gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************");
1128
1129 // This is serviced by the existing RR policy
1130 gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
1131 CheckRpcSendOk(10);
1132 gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
1133 // All 10 requests should again have gone to the first backend.
1134 EXPECT_EQ(20U, backend_servers_[0].service_->request_count());
1135 EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
1136
1137 balancers_[0]->NotifyDoneWithServerlists();
1138 balancers_[1]->NotifyDoneWithServerlists();
1139 balancers_[2]->NotifyDoneWithServerlists();
1140 // Balancer 0 got a single request.
1141 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1142 // and sent a single response.
1143 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1144 EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
1145 EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
1146 EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
1147 EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1148
1149 addresses.clear();
1150 addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
1151 gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
1152 SetNextResolution(addresses);
1153 gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
1154
1155 // Wait until update has been processed, as signaled by the second backend
1156 // receiving a request. In the meantime, the client continues to be serviced
1157 // (by the first backend) without interruption.
1158 EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
1159 WaitForBackend(1);
1160
1161 // This is serviced by the updated RR policy
1162 backend_servers_[1].service_->ResetCounters();
1163 gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
1164 CheckRpcSendOk(10);
1165 gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
1166 // All 10 requests should have gone to the second backend.
1167 EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
1168
1169 balancers_[0]->NotifyDoneWithServerlists();
1170 balancers_[1]->NotifyDoneWithServerlists();
1171 balancers_[2]->NotifyDoneWithServerlists();
1172 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1173 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1174 // The second balancer, published as part of the first update, may end up
1175 // getting two requests (that is, 1 <= #req <= 2) if the LB call retry timer
1176 // firing races with the arrival of the update containing the second
1177 // balancer.
1178 EXPECT_GE(balancer_servers_[1].service_->request_count(), 1U);
1179 EXPECT_GE(balancer_servers_[1].service_->response_count(), 1U);
1180 EXPECT_LE(balancer_servers_[1].service_->request_count(), 2U);
1181 EXPECT_LE(balancer_servers_[1].service_->response_count(), 2U);
1182 EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
1183 EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1184 }
1185
TEST_F(UpdatesTest,ReresolveDeadBackend)1186 TEST_F(UpdatesTest, ReresolveDeadBackend) {
1187 ResetStub(500);
1188 // The first resolution contains the addresses of a balancer that never
1189 // responds, and a fallback backend.
1190 std::vector<AddressData> addresses;
1191 addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
1192 addresses.emplace_back(AddressData{backend_servers_[0].port_, false, ""});
1193 SetNextResolution(addresses);
1194 // The re-resolution result will contain the addresses of the same balancer
1195 // and a new fallback backend.
1196 addresses.clear();
1197 addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
1198 addresses.emplace_back(AddressData{backend_servers_[1].port_, false, ""});
1199 SetNextReresolutionResponse(addresses);
1200
1201 // Start servers and send 10 RPCs per server.
1202 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
1203 CheckRpcSendOk(10);
1204 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
1205 // All 10 requests should have gone to the fallback backend.
1206 EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
1207
1208 // Kill backend 0.
1209 gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************");
1210 if (backends_[0]->Shutdown()) backend_servers_[0].Shutdown();
1211 gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************");
1212
1213 // Wait until re-resolution has finished, as signaled by the second backend
1214 // receiving a request.
1215 WaitForBackend(1);
1216
1217 gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
1218 CheckRpcSendOk(10);
1219 gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
1220 // All 10 requests should have gone to the second backend.
1221 EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
1222
1223 balancers_[0]->NotifyDoneWithServerlists();
1224 balancers_[1]->NotifyDoneWithServerlists();
1225 balancers_[2]->NotifyDoneWithServerlists();
1226 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1227 EXPECT_EQ(0U, balancer_servers_[0].service_->response_count());
1228 EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
1229 EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
1230 EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
1231 EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1232 }
1233
1234 // TODO(juanlishen): Should be removed when the first response is always the
1235 // initial response. Currently, if client load reporting is not enabled, the
1236 // balancer doesn't send initial response. When the backend shuts down, an
1237 // unexpected re-resolution will happen. This test configuration is a workaround
1238 // for test ReresolveDeadBalancer.
1239 class UpdatesWithClientLoadReportingTest : public GrpclbEnd2endTest {
1240 public:
UpdatesWithClientLoadReportingTest()1241 UpdatesWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 3, 2) {}
1242 };
1243
TEST_F(UpdatesWithClientLoadReportingTest,ReresolveDeadBalancer)1244 TEST_F(UpdatesWithClientLoadReportingTest, ReresolveDeadBalancer) {
1245 std::vector<AddressData> addresses;
1246 addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
1247 SetNextResolution(addresses);
1248 addresses.clear();
1249 addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
1250 SetNextReresolutionResponse(addresses);
1251 const std::vector<int> first_backend{GetBackendPorts()[0]};
1252 const std::vector<int> second_backend{GetBackendPorts()[1]};
1253
1254 ScheduleResponseForBalancer(
1255 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
1256 ScheduleResponseForBalancer(
1257 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
1258
1259 // Start servers and send 10 RPCs per server.
1260 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
1261 CheckRpcSendOk(10);
1262 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
1263 // All 10 requests should have gone to the first backend.
1264 EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
1265
1266 // Kill backend 0.
1267 gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************");
1268 if (backends_[0]->Shutdown()) backend_servers_[0].Shutdown();
1269 gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************");
1270
1271 CheckRpcSendFailure();
1272
1273 // Balancer 0 got a single request.
1274 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1275 // and sent a single response.
1276 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1277 EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
1278 EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
1279 EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
1280 EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1281
1282 // Kill balancer 0.
1283 gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************");
1284 if (balancers_[0]->Shutdown()) balancer_servers_[0].Shutdown();
1285 gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************");
1286
1287 // Wait until re-resolution has finished, as signaled by the second backend
1288 // receiving a request.
1289 WaitForBackend(1);
1290
1291 // This is serviced by the new serverlist.
1292 gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
1293 CheckRpcSendOk(10);
1294 gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
1295 // All 10 requests should have gone to the second backend.
1296 EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
1297
1298 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1299 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1300 // After balancer 0 is killed, we restart an LB call immediately (because we
1301 // disconnect to a previously connected balancer). Although we will cancel
1302 // this call when the re-resolution update is done and another LB call restart
1303 // is needed, this old call may still succeed reaching the LB server if
1304 // re-resolution is slow. So balancer 1 may have received 2 requests and sent
1305 // 2 responses.
1306 EXPECT_GE(balancer_servers_[1].service_->request_count(), 1U);
1307 EXPECT_GE(balancer_servers_[1].service_->response_count(), 1U);
1308 EXPECT_LE(balancer_servers_[1].service_->request_count(), 2U);
1309 EXPECT_LE(balancer_servers_[1].service_->response_count(), 2U);
1310 EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
1311 EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1312 }
1313
TEST_F(SingleBalancerTest,Drop)1314 TEST_F(SingleBalancerTest, Drop) {
1315 SetNextResolutionAllBalancers();
1316 const size_t kNumRpcsPerAddress = 100;
1317 const int num_of_drop_by_rate_limiting_addresses = 1;
1318 const int num_of_drop_by_load_balancing_addresses = 2;
1319 const int num_of_drop_addresses = num_of_drop_by_rate_limiting_addresses +
1320 num_of_drop_by_load_balancing_addresses;
1321 const int num_total_addresses = num_backends_ + num_of_drop_addresses;
1322 ScheduleResponseForBalancer(
1323 0,
1324 BalancerServiceImpl::BuildResponseForBackends(
1325 GetBackendPorts(),
1326 {{"rate_limiting", num_of_drop_by_rate_limiting_addresses},
1327 {"load_balancing", num_of_drop_by_load_balancing_addresses}}),
1328 0);
1329 // Wait until all backends are ready.
1330 WaitForAllBackends();
1331 // Send kNumRpcsPerAddress RPCs for each server and drop address.
1332 size_t num_drops = 0;
1333 for (size_t i = 0; i < kNumRpcsPerAddress * num_total_addresses; ++i) {
1334 EchoResponse response;
1335 const Status status = SendRpc(&response);
1336 if (!status.ok() &&
1337 status.error_message() == "Call dropped by load balancing policy") {
1338 ++num_drops;
1339 } else {
1340 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1341 << " message=" << status.error_message();
1342 EXPECT_EQ(response.message(), kRequestMessage_);
1343 }
1344 }
1345 EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops);
1346
1347 // Each backend should have gotten 100 requests.
1348 for (size_t i = 0; i < backends_.size(); ++i) {
1349 EXPECT_EQ(kNumRpcsPerAddress,
1350 backend_servers_[i].service_->request_count());
1351 }
1352 // The balancer got a single request.
1353 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1354 // and sent a single response.
1355 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1356 }
1357
TEST_F(SingleBalancerTest,DropAllFirst)1358 TEST_F(SingleBalancerTest, DropAllFirst) {
1359 SetNextResolutionAllBalancers();
1360 // All registered addresses are marked as "drop".
1361 const int num_of_drop_by_rate_limiting_addresses = 1;
1362 const int num_of_drop_by_load_balancing_addresses = 1;
1363 ScheduleResponseForBalancer(
1364 0,
1365 BalancerServiceImpl::BuildResponseForBackends(
1366 {}, {{"rate_limiting", num_of_drop_by_rate_limiting_addresses},
1367 {"load_balancing", num_of_drop_by_load_balancing_addresses}}),
1368 0);
1369 const Status status = SendRpc();
1370 EXPECT_FALSE(status.ok());
1371 EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
1372 }
1373
TEST_F(SingleBalancerTest,DropAll)1374 TEST_F(SingleBalancerTest, DropAll) {
1375 SetNextResolutionAllBalancers();
1376 ScheduleResponseForBalancer(
1377 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
1378 0);
1379 const int num_of_drop_by_rate_limiting_addresses = 1;
1380 const int num_of_drop_by_load_balancing_addresses = 1;
1381 ScheduleResponseForBalancer(
1382 0,
1383 BalancerServiceImpl::BuildResponseForBackends(
1384 {}, {{"rate_limiting", num_of_drop_by_rate_limiting_addresses},
1385 {"load_balancing", num_of_drop_by_load_balancing_addresses}}),
1386 1000);
1387
1388 // First call succeeds.
1389 CheckRpcSendOk();
1390 // But eventually, the update with only dropped servers is processed and calls
1391 // fail.
1392 Status status;
1393 do {
1394 status = SendRpc();
1395 } while (status.ok());
1396 EXPECT_FALSE(status.ok());
1397 EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
1398 }
1399
1400 class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest {
1401 public:
SingleBalancerWithClientLoadReportingTest()1402 SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 3) {}
1403 };
1404
TEST_F(SingleBalancerWithClientLoadReportingTest,Vanilla)1405 TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
1406 SetNextResolutionAllBalancers();
1407 const size_t kNumRpcsPerAddress = 100;
1408 ScheduleResponseForBalancer(
1409 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
1410 0);
1411 // Wait until all backends are ready.
1412 int num_ok = 0;
1413 int num_failure = 0;
1414 int num_drops = 0;
1415 std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends();
1416 // Send kNumRpcsPerAddress RPCs per server.
1417 CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
1418 // Each backend should have gotten 100 requests.
1419 for (size_t i = 0; i < backends_.size(); ++i) {
1420 EXPECT_EQ(kNumRpcsPerAddress,
1421 backend_servers_[i].service_->request_count());
1422 }
1423 balancers_[0]->NotifyDoneWithServerlists();
1424 // The balancer got a single request.
1425 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1426 // and sent a single response.
1427 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1428
1429 const ClientStats client_stats = WaitForLoadReports();
1430 EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok,
1431 client_stats.num_calls_started);
1432 EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok,
1433 client_stats.num_calls_finished);
1434 EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
1435 EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + (num_ok + num_drops),
1436 client_stats.num_calls_finished_known_received);
1437 EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre());
1438 }
1439
TEST_F(SingleBalancerWithClientLoadReportingTest,Drop)1440 TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
1441 SetNextResolutionAllBalancers();
1442 const size_t kNumRpcsPerAddress = 3;
1443 const int num_of_drop_by_rate_limiting_addresses = 2;
1444 const int num_of_drop_by_load_balancing_addresses = 1;
1445 const int num_of_drop_addresses = num_of_drop_by_rate_limiting_addresses +
1446 num_of_drop_by_load_balancing_addresses;
1447 const int num_total_addresses = num_backends_ + num_of_drop_addresses;
1448 ScheduleResponseForBalancer(
1449 0,
1450 BalancerServiceImpl::BuildResponseForBackends(
1451 GetBackendPorts(),
1452 {{"rate_limiting", num_of_drop_by_rate_limiting_addresses},
1453 {"load_balancing", num_of_drop_by_load_balancing_addresses}}),
1454 0);
1455 // Wait until all backends are ready.
1456 int num_warmup_ok = 0;
1457 int num_warmup_failure = 0;
1458 int num_warmup_drops = 0;
1459 std::tie(num_warmup_ok, num_warmup_failure, num_warmup_drops) =
1460 WaitForAllBackends(num_total_addresses /* num_requests_multiple_of */);
1461 const int num_total_warmup_requests =
1462 num_warmup_ok + num_warmup_failure + num_warmup_drops;
1463 size_t num_drops = 0;
1464 for (size_t i = 0; i < kNumRpcsPerAddress * num_total_addresses; ++i) {
1465 EchoResponse response;
1466 const Status status = SendRpc(&response);
1467 if (!status.ok() &&
1468 status.error_message() == "Call dropped by load balancing policy") {
1469 ++num_drops;
1470 } else {
1471 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1472 << " message=" << status.error_message();
1473 EXPECT_EQ(response.message(), kRequestMessage_);
1474 }
1475 }
1476 EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops);
1477 // Each backend should have gotten 100 requests.
1478 for (size_t i = 0; i < backends_.size(); ++i) {
1479 EXPECT_EQ(kNumRpcsPerAddress,
1480 backend_servers_[i].service_->request_count());
1481 }
1482 balancers_[0]->NotifyDoneWithServerlists();
1483 // The balancer got a single request.
1484 EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1485 // and sent a single response.
1486 EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1487
1488 const ClientStats client_stats = WaitForLoadReports();
1489 EXPECT_EQ(
1490 kNumRpcsPerAddress * num_total_addresses + num_total_warmup_requests,
1491 client_stats.num_calls_started);
1492 EXPECT_EQ(
1493 kNumRpcsPerAddress * num_total_addresses + num_total_warmup_requests,
1494 client_stats.num_calls_finished);
1495 EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
1496 EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_warmup_ok,
1497 client_stats.num_calls_finished_known_received);
1498 // The number of warmup request is a multiple of the number of addresses.
1499 // Therefore, all addresses in the scheduled balancer response are hit the
1500 // same number of times.
1501 const int num_times_drop_addresses_hit =
1502 num_warmup_drops / num_of_drop_addresses;
1503 EXPECT_THAT(
1504 client_stats.drop_token_counts,
1505 ::testing::ElementsAre(
1506 ::testing::Pair("load_balancing",
1507 (kNumRpcsPerAddress + num_times_drop_addresses_hit)),
1508 ::testing::Pair(
1509 "rate_limiting",
1510 (kNumRpcsPerAddress + num_times_drop_addresses_hit) * 2)));
1511 }
1512
1513 } // namespace
1514 } // namespace testing
1515 } // namespace grpc
1516
main(int argc,char ** argv)1517 int main(int argc, char** argv) {
1518 grpc_init();
1519 grpc_test_init(argc, argv);
1520 ::testing::InitGoogleTest(&argc, argv);
1521 const auto result = RUN_ALL_TESTS();
1522 grpc_shutdown();
1523 return result;
1524 }
1525