1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <fcntl.h>
20 #include <gmock/gmock.h>
21 #include <grpc/credentials.h>
22 #include <grpc/grpc.h>
23 #include <grpc/grpc_security.h>
24 #include <grpc/slice.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/port_platform.h>
27 #include <grpc/support/string_util.h>
28 #include <grpc/support/time.h>
29 #include <grpcpp/impl/service_type.h>
30 #include <grpcpp/server_builder.h>
31 #include <netinet/in.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <sys/socket.h>
35 #include <sys/stat.h>
36 #include <sys/types.h>
37 #include <unistd.h>
38
39 #include <functional>
40 #include <set>
41 #include <thread>
42
43 #include "absl/log/check.h"
44 #include "absl/log/log.h"
45 #include "absl/memory/memory.h"
46 #include "absl/strings/str_cat.h"
47 #include "src/core/lib/iomgr/error.h"
48 #include "src/core/lib/security/credentials/alts/alts_credentials.h"
49 #include "src/core/lib/security/credentials/credentials.h"
50 #include "src/core/lib/security/security_connector/alts/alts_security_connector.h"
51 #include "src/core/lib/slice/slice_string_helpers.h"
52 #include "src/core/util/crash.h"
53 #include "src/core/util/host_port.h"
54 #include "src/core/util/thd.h"
55 #include "src/core/util/useful.h"
56 #include "test/core/end2end/cq_verifier.h"
57 #include "test/core/test_util/build.h"
58 #include "test/core/test_util/fake_udp_and_tcp_server.h"
59 #include "test/core/test_util/port.h"
60 #include "test/core/test_util/test_config.h"
61 #include "test/core/tsi/alts/fake_handshaker/fake_handshaker_server.h"
62
63 namespace {
64
drain_cq(grpc_completion_queue * cq)65 void drain_cq(grpc_completion_queue* cq) {
66 grpc_event ev;
67 do {
68 ev = grpc_completion_queue_next(
69 cq, grpc_timeout_milliseconds_to_deadline(5000), nullptr);
70 } while (ev.type != GRPC_QUEUE_SHUTDOWN);
71 }
72
create_secure_channel_for_test(const char * server_addr,const char * fake_handshake_server_addr,int reconnect_backoff_ms)73 grpc_channel* create_secure_channel_for_test(
74 const char* server_addr, const char* fake_handshake_server_addr,
75 int reconnect_backoff_ms) {
76 grpc_alts_credentials_options* alts_options =
77 grpc_alts_credentials_client_options_create();
78 grpc_channel_credentials* channel_creds =
79 grpc_alts_credentials_create_customized(alts_options,
80 fake_handshake_server_addr,
81 true /* enable_untrusted_alts */);
82 grpc_alts_credentials_options_destroy(alts_options);
83 // The main goal of these tests are to stress concurrent ALTS handshakes,
84 // so we prevent subchnannel sharing.
85 std::vector<grpc_arg> new_args;
86 new_args.push_back(grpc_channel_arg_integer_create(
87 const_cast<char*>(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), true));
88 if (reconnect_backoff_ms != 0) {
89 new_args.push_back(grpc_channel_arg_integer_create(
90 const_cast<char*>("grpc.testing.fixed_reconnect_backoff_ms"),
91 reconnect_backoff_ms));
92 }
93 grpc_channel_args* channel_args =
94 grpc_channel_args_copy_and_add(nullptr, new_args.data(), new_args.size());
95 grpc_channel* channel =
96 grpc_channel_create(server_addr, channel_creds, channel_args);
97 grpc_channel_args_destroy(channel_args);
98 grpc_channel_credentials_release(channel_creds);
99 return channel;
100 }
101
102 class FakeHandshakeServer {
103 public:
FakeHandshakeServer()104 FakeHandshakeServer() {
105 int port = grpc_pick_unused_port_or_die();
106 address_ = grpc_core::JoinHostPort("localhost", port);
107 service_ = grpc::gcp::CreateFakeHandshakerService("peer_identity");
108 grpc::ServerBuilder builder;
109 builder.AddListeningPort(address_, grpc::InsecureServerCredentials());
110 builder.RegisterService(service_.get());
111 // TODO(apolcyn): when removing the global concurrent handshake limiting
112 // queue, set MAX_CONCURRENT_STREAMS on this server.
113 server_ = builder.BuildAndStart();
114 LOG(INFO) << "Fake handshaker server listening on " << address_;
115 }
116
~FakeHandshakeServer()117 ~FakeHandshakeServer() {
118 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
119 }
120
address()121 const char* address() { return address_.c_str(); }
122
123 private:
124 std::string address_;
125 std::unique_ptr<grpc::Service> service_;
126 std::unique_ptr<grpc::Server> server_;
127 };
128
129 class TestServer {
130 public:
TestServer()131 TestServer() {
132 grpc_alts_credentials_options* alts_options =
133 grpc_alts_credentials_server_options_create();
134 grpc_server_credentials* server_creds =
135 grpc_alts_server_credentials_create_customized(
136 alts_options, fake_handshake_server_.address(),
137 true /* enable_untrusted_alts */);
138 grpc_alts_credentials_options_destroy(alts_options);
139 server_ = grpc_server_create(nullptr, nullptr);
140 server_cq_ = grpc_completion_queue_create_for_next(nullptr);
141 grpc_server_register_completion_queue(server_, server_cq_, nullptr);
142 int port = grpc_pick_unused_port_or_die();
143 server_addr_ = grpc_core::JoinHostPort("localhost", port);
144 CHECK(grpc_server_add_http2_port(server_, server_addr_.c_str(),
145 server_creds));
146 grpc_server_credentials_release(server_creds);
147 grpc_server_start(server_);
148 VLOG(2) << "Start TestServer " << this << ". listen on " << server_addr_;
149 server_thd_ = std::make_unique<std::thread>(PollUntilShutdown, this);
150 }
151
~TestServer()152 ~TestServer() {
153 VLOG(2) << "Begin dtor of TestServer " << this;
154 grpc_server_shutdown_and_notify(server_, server_cq_, this);
155 server_thd_->join();
156 grpc_server_destroy(server_);
157 grpc_completion_queue_shutdown(server_cq_);
158 drain_cq(server_cq_);
159 grpc_completion_queue_destroy(server_cq_);
160 }
161
address()162 const char* address() { return server_addr_.c_str(); }
163
PollUntilShutdown(const TestServer * self)164 static void PollUntilShutdown(const TestServer* self) {
165 grpc_event ev = grpc_completion_queue_next(
166 self->server_cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
167 CHECK(ev.type == GRPC_OP_COMPLETE);
168 CHECK(ev.tag == self);
169 VLOG(2) << "TestServer " << self << " stop polling";
170 }
171
172 private:
173 grpc_server* server_;
174 grpc_completion_queue* server_cq_;
175 std::unique_ptr<std::thread> server_thd_;
176 std::string server_addr_;
177 // Give this test server its own ALTS handshake server
178 // so that we avoid competing for ALTS handshake server resources (e.g.
179 // available HTTP2 streams on a globally shared handshaker subchannel)
180 // with clients that are trying to do mutual ALTS handshakes
181 // with this server (which could "deadlock" mutual handshakes).
182 // TODO(apolcyn): remove this workaround from this test and have
183 // clients/servers share a single fake handshake server if
184 // the underlying issue needs to be fixed.
185 FakeHandshakeServer fake_handshake_server_;
186 };
187
188 class ConnectLoopRunner {
189 public:
ConnectLoopRunner(const char * server_address,const char * fake_handshake_server_addr,int per_connect_deadline_seconds,size_t loops,grpc_connectivity_state expected_connectivity_states,int reconnect_backoff_ms)190 explicit ConnectLoopRunner(
191 const char* server_address, const char* fake_handshake_server_addr,
192 int per_connect_deadline_seconds, size_t loops,
193 grpc_connectivity_state expected_connectivity_states,
194 int reconnect_backoff_ms)
195 : server_address_(grpc_core::UniquePtr<char>(gpr_strdup(server_address))),
196 fake_handshake_server_addr_(
197 grpc_core::UniquePtr<char>(gpr_strdup(fake_handshake_server_addr))),
198 per_connect_deadline_seconds_(per_connect_deadline_seconds),
199 loops_(loops),
200 expected_connectivity_states_(expected_connectivity_states),
201 reconnect_backoff_ms_(reconnect_backoff_ms) {
202 thd_ = std::make_unique<std::thread>(ConnectLoop, this);
203 }
204
~ConnectLoopRunner()205 ~ConnectLoopRunner() { thd_->join(); }
206
ConnectLoop(const ConnectLoopRunner * self)207 static void ConnectLoop(const ConnectLoopRunner* self) {
208 for (size_t i = 0; i < self->loops_; i++) {
209 VLOG(2) << "runner:" << self << " connect_loop begin loop " << i;
210 grpc_completion_queue* cq =
211 grpc_completion_queue_create_for_next(nullptr);
212 grpc_channel* channel = create_secure_channel_for_test(
213 self->server_address_.get(), self->fake_handshake_server_addr_.get(),
214 self->reconnect_backoff_ms_);
215 // Connect, forcing an ALTS handshake
216 grpc_connectivity_state state =
217 grpc_channel_check_connectivity_state(channel, 1);
218 ASSERT_EQ(state, GRPC_CHANNEL_IDLE);
219 while (state != self->expected_connectivity_states_) {
220 if (self->expected_connectivity_states_ ==
221 GRPC_CHANNEL_TRANSIENT_FAILURE) {
222 ASSERT_NE(state, GRPC_CHANNEL_READY); // sanity check
223 } else {
224 ASSERT_EQ(self->expected_connectivity_states_, GRPC_CHANNEL_READY);
225 }
226 grpc_channel_watch_connectivity_state(
227 channel, state, gpr_inf_future(GPR_CLOCK_REALTIME), cq, nullptr);
228 grpc_event ev =
229 grpc_completion_queue_next(cq,
230 grpc_timeout_seconds_to_deadline(
231 self->per_connect_deadline_seconds_),
232 nullptr);
233 ASSERT_EQ(ev.type, GRPC_OP_COMPLETE)
234 << "connect_loop runner:" << std::hex << self
235 << " got ev.type:" << ev.type << " i:" << i;
236 ASSERT_TRUE(ev.success);
237 grpc_connectivity_state prev_state = state;
238 state = grpc_channel_check_connectivity_state(channel, 1);
239 if (self->expected_connectivity_states_ ==
240 GRPC_CHANNEL_TRANSIENT_FAILURE &&
241 prev_state == GRPC_CHANNEL_CONNECTING &&
242 state == GRPC_CHANNEL_CONNECTING) {
243 // Detect a race in state checking: if the watch_connectivity_state
244 // completed from prior state "connecting", this could be because the
245 // channel momentarily entered state "transient failure", which is
246 // what we want. However, if the channel immediately re-enters
247 // "connecting" state, then the new state check might still result in
248 // "connecting". A continuous repeat of this can cause this loop to
249 // never terminate in time. So take this scenario to indicate that the
250 // channel momentarily entered transient failure.
251 break;
252 }
253 }
254 grpc_channel_destroy(channel);
255 grpc_completion_queue_shutdown(cq);
256 drain_cq(cq);
257 grpc_completion_queue_destroy(cq);
258 VLOG(2) << "runner:" << self << " connect_loop finished loop " << i;
259 }
260 }
261
262 private:
263 grpc_core::UniquePtr<char> server_address_;
264 grpc_core::UniquePtr<char> fake_handshake_server_addr_;
265 int per_connect_deadline_seconds_;
266 size_t loops_;
267 grpc_connectivity_state expected_connectivity_states_;
268 std::unique_ptr<std::thread> thd_;
269 int reconnect_backoff_ms_;
270 };
271
272 // Perform a few ALTS handshakes sequentially (using the fake, in-process ALTS
273 // handshake server).
TEST(AltsConcurrentConnectivityTest,TestBasicClientServerHandshakes)274 TEST(AltsConcurrentConnectivityTest, TestBasicClientServerHandshakes) {
275 FakeHandshakeServer fake_handshake_server;
276 TestServer test_server;
277 {
278 ConnectLoopRunner runner(
279 test_server.address(), fake_handshake_server.address(),
280 10 * grpc_test_slowdown_factor() /* per connect deadline seconds */,
281 10 /* loops */, GRPC_CHANNEL_READY /* expected connectivity states */,
282 0 /* reconnect_backoff_ms unset */);
283 }
284 }
285
286 // Run a bunch of concurrent ALTS handshakes on concurrent channels
287 // (using the fake, in-process handshake server).
TEST(AltsConcurrentConnectivityTest,TestConcurrentClientServerHandshakes)288 TEST(AltsConcurrentConnectivityTest, TestConcurrentClientServerHandshakes) {
289 FakeHandshakeServer fake_handshake_server;
290 // Test
291 {
292 TestServer test_server;
293 size_t num_concurrent_connects = 50;
294 if (BuiltUnderMsan()) {
295 num_concurrent_connects = 25;
296 }
297 std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
298 VLOG(2) << "start performing concurrent expected-to-succeed connects";
299 for (size_t i = 0; i < num_concurrent_connects; i++) {
300 connect_loop_runners.push_back(std::make_unique<ConnectLoopRunner>(
301 test_server.address(), fake_handshake_server.address(),
302 15 * grpc_test_slowdown_factor() /* per connect deadline seconds */,
303 5 /* loops */, GRPC_CHANNEL_READY /* expected connectivity states */,
304 0 /* reconnect_backoff_ms unset */));
305 }
306 connect_loop_runners.clear();
307 VLOG(2) << "done performing concurrent expected-to-succeed connects";
308 }
309 }
310
311 // This test is intended to make sure that ALTS handshakes we correctly
312 // fail fast when the security handshaker gets an error while reading
313 // from the remote peer, after having earlier sent the first bytes of the
314 // ALTS handshake to the peer, i.e. after getting into the middle of a
315 // handshake.
TEST(AltsConcurrentConnectivityTest,TestHandshakeFailsFastWhenPeerEndpointClosesConnectionAfterAccepting)316 TEST(AltsConcurrentConnectivityTest,
317 TestHandshakeFailsFastWhenPeerEndpointClosesConnectionAfterAccepting) {
318 // Don't enforce the number of concurrent rpcs for the fake handshake
319 // server in this test, because this test will involve handshake RPCs
320 // getting cancelled. Because there isn't explicit synchronization between
321 // an ALTS handshake client's RECV_STATUS op completing after call
322 // cancellation, and the corresponding fake handshake server's sync
323 // method handler returning, enforcing a limit on the number of active
324 // RPCs at the fake handshake server would be inherently racey.
325 FakeHandshakeServer fake_handshake_server;
326 // The fake_backend_server emulates a secure (ALTS based) gRPC backend. So
327 // it waits for the client to send the first bytes.
328 grpc_core::testing::FakeUdpAndTcpServer fake_backend_server(
329 grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
330 kWaitForClientToSendFirstBytes,
331 grpc_core::testing::FakeUdpAndTcpServer::
332 CloseSocketUponReceivingBytesFromPeer);
333 {
334 std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
335 size_t num_concurrent_connects = 100;
336 VLOG(2) << "start performing concurrent expected-to-fail connects";
337 for (size_t i = 0; i < num_concurrent_connects; i++) {
338 connect_loop_runners.push_back(std::make_unique<ConnectLoopRunner>(
339 fake_backend_server.address(), fake_handshake_server.address(),
340 10 * grpc_test_slowdown_factor() /* per connect deadline seconds */,
341 3 /* loops */,
342 GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */,
343 0 /* reconnect_backoff_ms unset */));
344 }
345 connect_loop_runners.clear();
346 VLOG(2) << "done performing concurrent expected-to-fail connects";
347 }
348 }
349
350 // This test is intended to make sure that ALTS handshakes correctly
351 // fail fast when the ALTS handshake server fails incoming handshakes fast.
TEST(AltsConcurrentConnectivityTest,TestHandshakeFailsFastWhenHandshakeServerClosesConnectionAfterAccepting)352 TEST(AltsConcurrentConnectivityTest,
353 TestHandshakeFailsFastWhenHandshakeServerClosesConnectionAfterAccepting) {
354 // The fake_handshake_server emulates a broken ALTS handshaker, which
355 // is an insecure server. So send settings to the client eagerly.
356 grpc_core::testing::FakeUdpAndTcpServer fake_handshake_server(
357 grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::kEagerlySendSettings,
358 grpc_core::testing::FakeUdpAndTcpServer::
359 CloseSocketUponReceivingBytesFromPeer);
360 // The fake_backend_server emulates a secure (ALTS based) server, so wait
361 // for the client to send the first bytes.
362 grpc_core::testing::FakeUdpAndTcpServer fake_backend_server(
363 grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
364 kWaitForClientToSendFirstBytes,
365 grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
366 {
367 std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
368 size_t num_concurrent_connects = 100;
369 VLOG(2) << "start performing concurrent expected-to-fail connects";
370 for (size_t i = 0; i < num_concurrent_connects; i++) {
371 connect_loop_runners.push_back(std::make_unique<ConnectLoopRunner>(
372 fake_backend_server.address(), fake_handshake_server.address(),
373 20 * grpc_test_slowdown_factor() /* per connect deadline seconds */,
374 2 /* loops */,
375 GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */,
376 0 /* reconnect_backoff_ms unset */));
377 }
378 connect_loop_runners.clear();
379 VLOG(2) << "done performing concurrent expected-to-fail connects";
380 }
381 }
382
383 // This test is intended to make sure that ALTS handshakes correctly
384 // fail fast when the ALTS handshake server is non-responsive, in which case
385 // the overall connection deadline kicks in.
TEST(AltsConcurrentConnectivityTest,TestHandshakeFailsFastWhenHandshakeServerHangsAfterAccepting)386 TEST(AltsConcurrentConnectivityTest,
387 TestHandshakeFailsFastWhenHandshakeServerHangsAfterAccepting) {
388 // fake_handshake_server emulates an insecure server, so send settings first.
389 // It will be unresponsive for the rest of the connection, though.
390 grpc_core::testing::FakeUdpAndTcpServer fake_handshake_server(
391 grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::kEagerlySendSettings,
392 grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
393 // fake_backend_server emulates an ALTS based server, so wait for the client
394 // to send the first bytes.
395 grpc_core::testing::FakeUdpAndTcpServer fake_backend_server(
396 grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
397 kWaitForClientToSendFirstBytes,
398 grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
399 {
400 std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
401 size_t num_concurrent_connects = 100;
402 VLOG(2) << "start performing concurrent expected-to-fail connects";
403 for (size_t i = 0; i < num_concurrent_connects; i++) {
404 connect_loop_runners.push_back(std::make_unique<ConnectLoopRunner>(
405 fake_backend_server.address(), fake_handshake_server.address(),
406 10 * grpc_test_slowdown_factor() /* per connect deadline seconds */,
407 2 /* loops */,
408 GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */,
409 100 /* reconnect_backoff_ms */));
410 }
411 connect_loop_runners.clear();
412 VLOG(2) << "done performing concurrent expected-to-fail connects";
413 }
414 }
415
416 } // namespace
417
main(int argc,char ** argv)418 int main(int argc, char** argv) {
419 ::testing::InitGoogleTest(&argc, argv);
420 grpc::testing::TestEnvironment env(&argc, argv);
421 grpc_init();
422 auto result = RUN_ALL_TESTS();
423 grpc_shutdown();
424 return result;
425 }
426