• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <fcntl.h>
22 #include <gmock/gmock.h>
23 #include <netinet/in.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/socket.h>
27 #include <sys/stat.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30 #include <functional>
31 #include <set>
32 #include <thread>
33 
34 #include "absl/strings/str_cat.h"
35 
36 #include <grpc/grpc.h>
37 #include <grpc/grpc_security.h>
38 #include <grpc/slice.h>
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/log.h>
41 #include <grpc/support/string_util.h>
42 #include <grpc/support/time.h>
43 
44 #include <grpcpp/impl/codegen/service_type.h>
45 #include <grpcpp/server_builder.h>
46 
47 #include "src/core/lib/gpr/useful.h"
48 #include "src/core/lib/gprpp/host_port.h"
49 #include "src/core/lib/gprpp/thd.h"
50 #include "src/core/lib/iomgr/error.h"
51 #include "src/core/lib/security/credentials/alts/alts_credentials.h"
52 #include "src/core/lib/security/credentials/credentials.h"
53 #include "src/core/lib/security/security_connector/alts/alts_security_connector.h"
54 #include "src/core/lib/slice/slice_string_helpers.h"
55 
56 #include "test/core/tsi/alts/fake_handshaker/fake_handshaker_server.h"
57 #include "test/core/util/memory_counters.h"
58 #include "test/core/util/port.h"
59 #include "test/core/util/test_config.h"
60 
61 #include "test/core/end2end/cq_verifier.h"
62 
63 namespace {
64 
65 const int kFakeHandshakeServerMaxConcurrentStreams = 40;
66 
drain_cq(grpc_completion_queue * cq)67 void drain_cq(grpc_completion_queue* cq) {
68   grpc_event ev;
69   do {
70     ev = grpc_completion_queue_next(
71         cq, grpc_timeout_milliseconds_to_deadline(5000), nullptr);
72   } while (ev.type != GRPC_QUEUE_SHUTDOWN);
73 }
74 
create_secure_channel_for_test(const char * server_addr,const char * fake_handshake_server_addr,int reconnect_backoff_ms)75 grpc_channel* create_secure_channel_for_test(
76     const char* server_addr, const char* fake_handshake_server_addr,
77     int reconnect_backoff_ms) {
78   grpc_alts_credentials_options* alts_options =
79       grpc_alts_credentials_client_options_create();
80   grpc_channel_credentials* channel_creds =
81       grpc_alts_credentials_create_customized(alts_options,
82                                               fake_handshake_server_addr,
83                                               true /* enable_untrusted_alts */);
84   grpc_alts_credentials_options_destroy(alts_options);
85   // The main goal of these tests are to stress concurrent ALTS handshakes,
86   // so we prevent subchnannel sharing.
87   std::vector<grpc_arg> new_args;
88   new_args.push_back(grpc_channel_arg_integer_create(
89       const_cast<char*>(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), true));
90   if (reconnect_backoff_ms != 0) {
91     new_args.push_back(grpc_channel_arg_integer_create(
92         const_cast<char*>("grpc.testing.fixed_reconnect_backoff_ms"),
93         reconnect_backoff_ms));
94   }
95   grpc_channel_args* channel_args =
96       grpc_channel_args_copy_and_add(nullptr, new_args.data(), new_args.size());
97   grpc_channel* channel = grpc_secure_channel_create(channel_creds, server_addr,
98                                                      channel_args, nullptr);
99   grpc_channel_args_destroy(channel_args);
100   grpc_channel_credentials_release(channel_creds);
101   return channel;
102 }
103 
104 class FakeHandshakeServer {
105  public:
FakeHandshakeServer(bool check_num_concurrent_rpcs)106   FakeHandshakeServer(bool check_num_concurrent_rpcs) {
107     int port = grpc_pick_unused_port_or_die();
108     address_ = grpc_core::JoinHostPort("localhost", port);
109     if (check_num_concurrent_rpcs) {
110       service_ = grpc::gcp::
111           CreateFakeHandshakerService(kFakeHandshakeServerMaxConcurrentStreams /* expected max concurrent rpcs */);
112     } else {
113       service_ = grpc::gcp::CreateFakeHandshakerService(
114           0 /* expected max concurrent rpcs unset */);
115     }
116     grpc::ServerBuilder builder;
117     builder.AddListeningPort(address_.c_str(),
118                              grpc::InsecureServerCredentials());
119     builder.RegisterService(service_.get());
120     // TODO(apolcyn): when removing the global concurrent handshake limiting
121     // queue, set MAX_CONCURRENT_STREAMS on this server.
122     server_ = builder.BuildAndStart();
123     gpr_log(GPR_INFO, "Fake handshaker server listening on %s",
124             address_.c_str());
125   }
126 
~FakeHandshakeServer()127   ~FakeHandshakeServer() {
128     server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
129   }
130 
address()131   const char* address() { return address_.c_str(); }
132 
133  private:
134   std::string address_;
135   std::unique_ptr<grpc::Service> service_;
136   std::unique_ptr<grpc::Server> server_;
137 };
138 
139 class TestServer {
140  public:
TestServer()141   explicit TestServer()
142       : fake_handshake_server_(true /* check num concurrent rpcs */) {
143     grpc_alts_credentials_options* alts_options =
144         grpc_alts_credentials_server_options_create();
145     grpc_server_credentials* server_creds =
146         grpc_alts_server_credentials_create_customized(
147             alts_options, fake_handshake_server_.address(),
148             true /* enable_untrusted_alts */);
149     grpc_alts_credentials_options_destroy(alts_options);
150     server_ = grpc_server_create(nullptr, nullptr);
151     server_cq_ = grpc_completion_queue_create_for_next(nullptr);
152     grpc_server_register_completion_queue(server_, server_cq_, nullptr);
153     int port = grpc_pick_unused_port_or_die();
154     server_addr_ = grpc_core::JoinHostPort("localhost", port);
155     GPR_ASSERT(grpc_server_add_secure_http2_port(server_, server_addr_.c_str(),
156                                                  server_creds));
157     grpc_server_credentials_release(server_creds);
158     grpc_server_start(server_);
159     gpr_log(GPR_DEBUG, "Start TestServer %p. listen on %s", this,
160             server_addr_.c_str());
161     server_thd_ =
162         std::unique_ptr<std::thread>(new std::thread(PollUntilShutdown, this));
163   }
164 
~TestServer()165   ~TestServer() {
166     gpr_log(GPR_DEBUG, "Begin dtor of TestServer %p", this);
167     grpc_server_shutdown_and_notify(server_, server_cq_, this);
168     server_thd_->join();
169     grpc_server_destroy(server_);
170     grpc_completion_queue_shutdown(server_cq_);
171     drain_cq(server_cq_);
172     grpc_completion_queue_destroy(server_cq_);
173   }
174 
address()175   const char* address() { return server_addr_.c_str(); }
176 
PollUntilShutdown(const TestServer * self)177   static void PollUntilShutdown(const TestServer* self) {
178     grpc_event ev = grpc_completion_queue_next(
179         self->server_cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
180     GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
181     GPR_ASSERT(ev.tag == self);
182     gpr_log(GPR_DEBUG, "TestServer %p stop polling", self);
183   }
184 
185  private:
186   grpc_server* server_;
187   grpc_completion_queue* server_cq_;
188   std::unique_ptr<std::thread> server_thd_;
189   std::string server_addr_;
190   // Give this test server its own ALTS handshake server
191   // so that we avoid competing for ALTS handshake server resources (e.g.
192   // available HTTP2 streams on a globally shared handshaker subchannel)
193   // with clients that are trying to do mutual ALTS handshakes
194   // with this server (which could "deadlock" mutual handshakes).
195   // TODO(apolcyn): remove this workaround from this test and have
196   // clients/servers share a single fake handshake server if
197   // the underlying issue needs to be fixed.
198   FakeHandshakeServer fake_handshake_server_;
199 };
200 
201 class ConnectLoopRunner {
202  public:
ConnectLoopRunner(const char * server_address,const char * fake_handshake_server_addr,int per_connect_deadline_seconds,size_t loops,grpc_connectivity_state expected_connectivity_states,int reconnect_backoff_ms)203   explicit ConnectLoopRunner(
204       const char* server_address, const char* fake_handshake_server_addr,
205       int per_connect_deadline_seconds, size_t loops,
206       grpc_connectivity_state expected_connectivity_states,
207       int reconnect_backoff_ms)
208       : server_address_(grpc_core::UniquePtr<char>(gpr_strdup(server_address))),
209         fake_handshake_server_addr_(
210             grpc_core::UniquePtr<char>(gpr_strdup(fake_handshake_server_addr))),
211         per_connect_deadline_seconds_(per_connect_deadline_seconds),
212         loops_(loops),
213         expected_connectivity_states_(expected_connectivity_states),
214         reconnect_backoff_ms_(reconnect_backoff_ms) {
215     thd_ = std::unique_ptr<std::thread>(new std::thread(ConnectLoop, this));
216   }
217 
~ConnectLoopRunner()218   ~ConnectLoopRunner() { thd_->join(); }
219 
ConnectLoop(const ConnectLoopRunner * self)220   static void ConnectLoop(const ConnectLoopRunner* self) {
221     for (size_t i = 0; i < self->loops_; i++) {
222       gpr_log(GPR_DEBUG, "runner:%p connect_loop begin loop %ld", self, i);
223       grpc_completion_queue* cq =
224           grpc_completion_queue_create_for_next(nullptr);
225       grpc_channel* channel = create_secure_channel_for_test(
226           self->server_address_.get(), self->fake_handshake_server_addr_.get(),
227           self->reconnect_backoff_ms_);
228       // Connect, forcing an ALTS handshake
229       gpr_timespec connect_deadline =
230           grpc_timeout_seconds_to_deadline(self->per_connect_deadline_seconds_);
231       grpc_connectivity_state state =
232           grpc_channel_check_connectivity_state(channel, 1);
233       ASSERT_EQ(state, GRPC_CHANNEL_IDLE);
234       while (state != self->expected_connectivity_states_) {
235         if (self->expected_connectivity_states_ ==
236             GRPC_CHANNEL_TRANSIENT_FAILURE) {
237           ASSERT_NE(state, GRPC_CHANNEL_READY);  // sanity check
238         } else {
239           ASSERT_EQ(self->expected_connectivity_states_, GRPC_CHANNEL_READY);
240         }
241         grpc_channel_watch_connectivity_state(
242             channel, state, gpr_inf_future(GPR_CLOCK_REALTIME), cq, nullptr);
243         grpc_event ev =
244             grpc_completion_queue_next(cq, connect_deadline, nullptr);
245         ASSERT_EQ(ev.type, GRPC_OP_COMPLETE)
246             << "connect_loop runner:" << std::hex << self
247             << " got ev.type:" << ev.type << " i:" << i;
248         ASSERT_TRUE(ev.success);
249         grpc_connectivity_state prev_state = state;
250         state = grpc_channel_check_connectivity_state(channel, 1);
251         if (self->expected_connectivity_states_ ==
252                 GRPC_CHANNEL_TRANSIENT_FAILURE &&
253             prev_state == GRPC_CHANNEL_CONNECTING &&
254             state == GRPC_CHANNEL_CONNECTING) {
255           // Detect a race in state checking: if the watch_connectivity_state
256           // completed from prior state "connecting", this could be because the
257           // channel momentarily entered state "transient failure", which is
258           // what we want. However, if the channel immediately re-enters
259           // "connecting" state, then the new state check might still result in
260           // "connecting". A continuous repeat of this can cause this loop to
261           // never terminate in time. So take this scenario to indicate that the
262           // channel momentarily entered transient failure.
263           break;
264         }
265       }
266       grpc_channel_destroy(channel);
267       grpc_completion_queue_shutdown(cq);
268       drain_cq(cq);
269       grpc_completion_queue_destroy(cq);
270       gpr_log(GPR_DEBUG, "runner:%p connect_loop finished loop %ld", self, i);
271     }
272   }
273 
274  private:
275   grpc_core::UniquePtr<char> server_address_;
276   grpc_core::UniquePtr<char> fake_handshake_server_addr_;
277   int per_connect_deadline_seconds_;
278   size_t loops_;
279   grpc_connectivity_state expected_connectivity_states_;
280   std::unique_ptr<std::thread> thd_;
281   int reconnect_backoff_ms_;
282 };
283 
284 // Perform a few ALTS handshakes sequentially (using the fake, in-process ALTS
285 // handshake server).
TEST(AltsConcurrentConnectivityTest,TestBasicClientServerHandshakes)286 TEST(AltsConcurrentConnectivityTest, TestBasicClientServerHandshakes) {
287   FakeHandshakeServer fake_handshake_server(
288       true /* check num concurrent rpcs */);
289   TestServer test_server;
290   {
291     ConnectLoopRunner runner(
292         test_server.address(), fake_handshake_server.address(),
293         5 /* per connect deadline seconds */, 10 /* loops */,
294         GRPC_CHANNEL_READY /* expected connectivity states */,
295         0 /* reconnect_backoff_ms unset */);
296   }
297 }
298 
299 /* Run a bunch of concurrent ALTS handshakes on concurrent channels
300  * (using the fake, in-process handshake server). */
TEST(AltsConcurrentConnectivityTest,TestConcurrentClientServerHandshakes)301 TEST(AltsConcurrentConnectivityTest, TestConcurrentClientServerHandshakes) {
302   FakeHandshakeServer fake_handshake_server(
303       true /* check num concurrent rpcs */);
304   // Test
305   {
306     TestServer test_server;
307     gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20);
308     size_t num_concurrent_connects = 50;
309     std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
310     gpr_log(GPR_DEBUG,
311             "start performing concurrent expected-to-succeed connects");
312     for (size_t i = 0; i < num_concurrent_connects; i++) {
313       connect_loop_runners.push_back(
314           std::unique_ptr<ConnectLoopRunner>(new ConnectLoopRunner(
315               test_server.address(), fake_handshake_server.address(),
316               15 /* per connect deadline seconds */, 5 /* loops */,
317               GRPC_CHANNEL_READY /* expected connectivity states */,
318               0 /* reconnect_backoff_ms unset */)));
319     }
320     connect_loop_runners.clear();
321     gpr_log(GPR_DEBUG,
322             "done performing concurrent expected-to-succeed connects");
323     if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), test_deadline) > 0) {
324       gpr_log(GPR_DEBUG, "Test took longer than expected.");
325       abort();
326     }
327   }
328 }
329 
330 class FakeTcpServer {
331  public:
332   enum ProcessReadResult {
333     CONTINUE_READING,
334     CLOSE_SOCKET,
335   };
336 
FakeTcpServer(const std::function<ProcessReadResult (int,int,int)> & process_read_cb)337   FakeTcpServer(
338       const std::function<ProcessReadResult(int, int, int)>& process_read_cb)
339       : process_read_cb_(process_read_cb) {
340     port_ = grpc_pick_unused_port_or_die();
341     accept_socket_ = socket(AF_INET6, SOCK_STREAM, 0);
342     address_ = absl::StrCat("[::]:", port_);
343     GPR_ASSERT(accept_socket_ != -1);
344     if (accept_socket_ == -1) {
345       gpr_log(GPR_ERROR, "Failed to create socket: %d", errno);
346       abort();
347     }
348     int val = 1;
349     if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val,
350                    sizeof(val)) != 0) {
351       gpr_log(GPR_ERROR,
352               "Failed to set SO_REUSEADDR on socket bound to [::1]:%d : %d",
353               port_, errno);
354       abort();
355     }
356     if (fcntl(accept_socket_, F_SETFL, O_NONBLOCK) != 0) {
357       gpr_log(GPR_ERROR, "Failed to set O_NONBLOCK on socket: %d", errno);
358       abort();
359     }
360     sockaddr_in6 addr;
361     memset(&addr, 0, sizeof(addr));
362     addr.sin6_family = AF_INET6;
363     addr.sin6_port = htons(port_);
364     ((char*)&addr.sin6_addr)[15] = 1;
365     if (bind(accept_socket_, (const sockaddr*)&addr, sizeof(addr)) != 0) {
366       gpr_log(GPR_ERROR, "Failed to bind socket to [::1]:%d : %d", port_,
367               errno);
368       abort();
369     }
370     if (listen(accept_socket_, 100)) {
371       gpr_log(GPR_ERROR, "Failed to listen on socket bound to [::1]:%d : %d",
372               port_, errno);
373       abort();
374     }
375     gpr_event_init(&stop_ev_);
376     run_server_loop_thd_ =
377         std::unique_ptr<std::thread>(new std::thread(RunServerLoop, this));
378   }
379 
~FakeTcpServer()380   ~FakeTcpServer() {
381     gpr_log(GPR_DEBUG,
382             "FakeTcpServer stop and "
383             "join server thread");
384     gpr_event_set(&stop_ev_, (void*)1);
385     run_server_loop_thd_->join();
386     gpr_log(GPR_DEBUG,
387             "FakeTcpServer join server "
388             "thread complete");
389   }
390 
address()391   const char* address() { return address_.c_str(); }
392 
CloseSocketUponReceivingBytesFromPeer(int bytes_received_size,int read_error,int s)393   static ProcessReadResult CloseSocketUponReceivingBytesFromPeer(
394       int bytes_received_size, int read_error, int s) {
395     if (bytes_received_size < 0 && read_error != EAGAIN &&
396         read_error != EWOULDBLOCK) {
397       gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
398               errno);
399       abort();
400     }
401     if (bytes_received_size >= 0) {
402       gpr_log(GPR_DEBUG,
403               "Fake TCP server received %d bytes from peer socket: %d. Close "
404               "the "
405               "connection.",
406               bytes_received_size, s);
407       return CLOSE_SOCKET;
408     }
409     return CONTINUE_READING;
410   }
411 
CloseSocketUponCloseFromPeer(int bytes_received_size,int read_error,int s)412   static ProcessReadResult CloseSocketUponCloseFromPeer(int bytes_received_size,
413                                                         int read_error, int s) {
414     if (bytes_received_size < 0 && read_error != EAGAIN &&
415         read_error != EWOULDBLOCK) {
416       gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
417               errno);
418       abort();
419     }
420     if (bytes_received_size == 0) {
421       // The peer has shut down the connection.
422       gpr_log(GPR_DEBUG,
423               "Fake TCP server received 0 bytes from peer socket: %d. Close "
424               "the "
425               "connection.",
426               s);
427       return CLOSE_SOCKET;
428     }
429     return CONTINUE_READING;
430   }
431 
432   // Run a loop that periodically, every 10 ms:
433   //   1) Checks if there are any new TCP connections to accept.
434   //   2) Checks if any data has arrived yet on established connections,
435   //      and reads from them if so, processing the sockets as configured.
RunServerLoop(FakeTcpServer * self)436   static void RunServerLoop(FakeTcpServer* self) {
437     std::set<int> peers;
438     while (!gpr_event_get(&self->stop_ev_)) {
439       int p = accept(self->accept_socket_, nullptr, nullptr);
440       if (p == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
441         gpr_log(GPR_ERROR, "Failed to accept connection: %d", errno);
442         abort();
443       }
444       if (p != -1) {
445         gpr_log(GPR_DEBUG, "accepted peer socket: %d", p);
446         if (fcntl(p, F_SETFL, O_NONBLOCK) != 0) {
447           gpr_log(GPR_ERROR,
448                   "Failed to set O_NONBLOCK on peer socket:%d errno:%d", p,
449                   errno);
450           abort();
451         }
452         peers.insert(p);
453       }
454       auto it = peers.begin();
455       while (it != peers.end()) {
456         int p = *it;
457         char buf[100];
458         int bytes_received_size = recv(p, buf, 100, 0);
459         ProcessReadResult r =
460             self->process_read_cb_(bytes_received_size, errno, p);
461         if (r == CLOSE_SOCKET) {
462           close(p);
463           it = peers.erase(it);
464         } else {
465           GPR_ASSERT(r == CONTINUE_READING);
466           it++;
467         }
468       }
469       gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
470                                    gpr_time_from_millis(10, GPR_TIMESPAN)));
471     }
472     for (auto it = peers.begin(); it != peers.end(); it++) {
473       close(*it);
474     }
475     close(self->accept_socket_);
476   }
477 
478  private:
479   int accept_socket_;
480   int port_;
481   gpr_event stop_ev_;
482   std::string address_;
483   std::unique_ptr<std::thread> run_server_loop_thd_;
484   std::function<ProcessReadResult(int, int, int)> process_read_cb_;
485 };
486 
487 /* This test is intended to make sure that ALTS handshakes we correctly
488  * fail fast when the security handshaker gets an error while reading
489  * from the remote peer, after having earlier sent the first bytes of the
490  * ALTS handshake to the peer, i.e. after getting into the middle of a
491  * handshake. */
TEST(AltsConcurrentConnectivityTest,TestHandshakeFailsFastWhenPeerEndpointClosesConnectionAfterAccepting)492 TEST(AltsConcurrentConnectivityTest,
493      TestHandshakeFailsFastWhenPeerEndpointClosesConnectionAfterAccepting) {
494   // Don't enforce the number of concurrent rpcs for the fake handshake
495   // server in this test, because this test will involve handshake RPCs
496   // getting cancelled. Because there isn't explicit synchronization between
497   // an ALTS handshake client's RECV_STATUS op completing after call
498   // cancellation, and the corresponding fake handshake server's sync
499   // method handler returning, enforcing a limit on the number of active
500   // RPCs at the fake handshake server would be inherently racey.
501   FakeHandshakeServer fake_handshake_server(
502       false /* check num concurrent rpcs */);
503   FakeTcpServer fake_tcp_server(
504       FakeTcpServer::CloseSocketUponReceivingBytesFromPeer);
505   {
506     gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20);
507     std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
508     size_t num_concurrent_connects = 100;
509     gpr_log(GPR_DEBUG, "start performing concurrent expected-to-fail connects");
510     for (size_t i = 0; i < num_concurrent_connects; i++) {
511       connect_loop_runners.push_back(
512           std::unique_ptr<ConnectLoopRunner>(new ConnectLoopRunner(
513               fake_tcp_server.address(), fake_handshake_server.address(),
514               10 /* per connect deadline seconds */, 3 /* loops */,
515               GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */,
516               0 /* reconnect_backoff_ms unset */)));
517     }
518     connect_loop_runners.clear();
519     gpr_log(GPR_DEBUG, "done performing concurrent expected-to-fail connects");
520     if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), test_deadline) > 0) {
521       gpr_log(GPR_ERROR,
522               "Exceeded test deadline. ALTS handshakes might not be failing "
523               "fast when the peer endpoint closes the connection abruptly");
524       abort();
525     }
526   }
527 }
528 
529 /* This test is intended to make sure that ALTS handshakes correctly
530  * fail fast when the ALTS handshake server fails incoming handshakes fast. */
TEST(AltsConcurrentConnectivityTest,TestHandshakeFailsFastWhenHandshakeServerClosesConnectionAfterAccepting)531 TEST(AltsConcurrentConnectivityTest,
532      TestHandshakeFailsFastWhenHandshakeServerClosesConnectionAfterAccepting) {
533   FakeTcpServer fake_handshake_server(
534       FakeTcpServer::CloseSocketUponReceivingBytesFromPeer);
535   FakeTcpServer fake_tcp_server(FakeTcpServer::CloseSocketUponCloseFromPeer);
536   {
537     gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20);
538     std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
539     size_t num_concurrent_connects = 100;
540     gpr_log(GPR_DEBUG, "start performing concurrent expected-to-fail connects");
541     for (size_t i = 0; i < num_concurrent_connects; i++) {
542       connect_loop_runners.push_back(
543           std::unique_ptr<ConnectLoopRunner>(new ConnectLoopRunner(
544               fake_tcp_server.address(), fake_handshake_server.address(),
545               10 /* per connect deadline seconds */, 2 /* loops */,
546               GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */,
547               0 /* reconnect_backoff_ms unset */)));
548     }
549     connect_loop_runners.clear();
550     gpr_log(GPR_DEBUG, "done performing concurrent expected-to-fail connects");
551     if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), test_deadline) > 0) {
552       gpr_log(GPR_ERROR,
553               "Exceeded test deadline. ALTS handshakes might not be failing "
554               "fast when the handshake server closes new connections");
555       abort();
556     }
557   }
558 }
559 
560 /* This test is intended to make sure that ALTS handshakes correctly
561  * fail fast when the ALTS handshake server is non-responsive, in which case
562  * the overall connection deadline kicks in. */
TEST(AltsConcurrentConnectivityTest,TestHandshakeFailsFastWhenHandshakeServerHangsAfterAccepting)563 TEST(AltsConcurrentConnectivityTest,
564      TestHandshakeFailsFastWhenHandshakeServerHangsAfterAccepting) {
565   FakeTcpServer fake_handshake_server(
566       FakeTcpServer::CloseSocketUponCloseFromPeer);
567   FakeTcpServer fake_tcp_server(FakeTcpServer::CloseSocketUponCloseFromPeer);
568   {
569     gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20);
570     std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
571     size_t num_concurrent_connects = 100;
572     gpr_log(GPR_DEBUG, "start performing concurrent expected-to-fail connects");
573     for (size_t i = 0; i < num_concurrent_connects; i++) {
574       connect_loop_runners.push_back(
575           std::unique_ptr<ConnectLoopRunner>(new ConnectLoopRunner(
576               fake_tcp_server.address(), fake_handshake_server.address(),
577               10 /* per connect deadline seconds */, 2 /* loops */,
578               GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */,
579               100 /* reconnect_backoff_ms */)));
580     }
581     connect_loop_runners.clear();
582     gpr_log(GPR_DEBUG, "done performing concurrent expected-to-fail connects");
583     if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), test_deadline) > 0) {
584       gpr_log(GPR_ERROR,
585               "Exceeded test deadline. ALTS handshakes might not be failing "
586               "fast when the handshake server is non-response timeout occurs");
587       abort();
588     }
589   }
590 }
591 
592 }  // namespace
593 
main(int argc,char ** argv)594 int main(int argc, char** argv) {
595   ::testing::InitGoogleTest(&argc, argv);
596   grpc::testing::TestEnvironment env(argc, argv);
597   grpc_init();
598   auto result = RUN_ALL_TESTS();
599   grpc_shutdown();
600   return result;
601 }
602