1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <gtest/gtest.h>
20
21 #include "src/core/lib/address_utils/parse_address.h"
22 #include "src/core/lib/channel/channel_args.h"
23 #include "src/core/lib/iomgr/port.h"
24 #include "src/core/util/time.h"
25 #include "test/core/test_util/port.h"
26 #include "test/core/test_util/test_config.h"
27
28 // This test won't work except with posix sockets enabled
29 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
30
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <grpc/grpc.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/time.h>
36 #include <netinet/in.h>
37 #include <poll.h>
38 #include <string.h>
39 #include <sys/socket.h>
40 #include <unistd.h>
41
42 #include "absl/log/log.h"
43 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
44 #include "src/core/lib/iomgr/iomgr.h"
45 #include "src/core/lib/iomgr/pollset_set.h"
46 #include "src/core/lib/iomgr/socket_utils_posix.h"
47 #include "src/core/lib/iomgr/tcp_client.h"
48 #include "src/core/lib/iomgr/timer.h"
49 #include "src/core/lib/resource_quota/api.h"
50 #include "src/core/util/crash.h"
51
52 static grpc_pollset_set* g_pollset_set;
53 static gpr_mu* g_mu;
54 static grpc_pollset* g_pollset;
55 static int g_connections_complete = 0;
56 static grpc_endpoint* g_connecting = nullptr;
57
test_deadline(void)58 static grpc_core::Timestamp test_deadline(void) {
59 return grpc_core::Timestamp::FromTimespecRoundUp(
60 grpc_timeout_seconds_to_deadline(10));
61 }
62
finish_connection()63 static void finish_connection() {
64 gpr_mu_lock(g_mu);
65 g_connections_complete++;
66 grpc_core::ExecCtx exec_ctx;
67 ASSERT_TRUE(
68 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
69
70 gpr_mu_unlock(g_mu);
71 }
72
must_succeed(void *,grpc_error_handle error)73 static void must_succeed(void* /*arg*/, grpc_error_handle error) {
74 ASSERT_NE(g_connecting, nullptr);
75 ASSERT_TRUE(error.ok());
76 grpc_endpoint_destroy(g_connecting);
77 g_connecting = nullptr;
78 finish_connection();
79 }
80
must_fail(void *,grpc_error_handle error)81 static void must_fail(void* /*arg*/, grpc_error_handle error) {
82 ASSERT_EQ(g_connecting, nullptr);
83 ASSERT_FALSE(error.ok());
84 finish_connection();
85 }
86
test_succeeds(void)87 void test_succeeds(void) {
88 LOG(ERROR) << "---- starting test_succeeds() ----";
89 grpc_resolved_address resolved_addr;
90 struct sockaddr_in* addr =
91 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
92 int svr_fd;
93 int r;
94 int connections_complete_before;
95 grpc_closure done;
96 grpc_core::ExecCtx exec_ctx;
97
98 memset(&resolved_addr, 0, sizeof(resolved_addr));
99 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
100 addr->sin_family = AF_INET;
101
102 // create a phony server
103 svr_fd = socket(AF_INET, SOCK_STREAM, 0);
104 ASSERT_GE(svr_fd, 0);
105 ASSERT_EQ(bind(svr_fd, (struct sockaddr*)addr, (socklen_t)resolved_addr.len),
106 0);
107 ASSERT_EQ(listen(svr_fd, 1), 0);
108
109 gpr_mu_lock(g_mu);
110 connections_complete_before = g_connections_complete;
111 gpr_mu_unlock(g_mu);
112
113 // connect to it
114 ASSERT_EQ(getsockname(svr_fd, (struct sockaddr*)addr,
115 (socklen_t*)&resolved_addr.len),
116 0);
117 GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx);
118 grpc_core::ChannelArgs args = grpc_core::CoreConfiguration::Get()
119 .channel_args_preconditioning()
120 .PreconditionChannelArgs(nullptr);
121 int64_t connection_handle = grpc_tcp_client_connect(
122 &done, &g_connecting, g_pollset_set,
123 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
124 &resolved_addr, grpc_core::Timestamp::InfFuture());
125 // await the connection
126 do {
127 resolved_addr.len = static_cast<socklen_t>(sizeof(addr));
128 r = accept(svr_fd, reinterpret_cast<struct sockaddr*>(addr),
129 reinterpret_cast<socklen_t*>(&resolved_addr.len));
130 } while (r == -1 && errno == EINTR);
131 ASSERT_GE(r, 0);
132 close(r);
133
134 gpr_mu_lock(g_mu);
135
136 while (g_connections_complete == connections_complete_before) {
137 grpc_pollset_worker* worker = nullptr;
138 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
139 "pollset_work",
140 grpc_pollset_work(g_pollset, &worker,
141 grpc_core::Timestamp::FromTimespecRoundUp(
142 grpc_timeout_seconds_to_deadline(5)))));
143 gpr_mu_unlock(g_mu);
144 grpc_core::ExecCtx::Get()->Flush();
145 gpr_mu_lock(g_mu);
146 }
147
148 gpr_mu_unlock(g_mu);
149
150 // A cancellation attempt should fail because connect already succeeded.
151 ASSERT_EQ(grpc_tcp_client_cancel_connect(connection_handle), false);
152
153 LOG(ERROR) << "---- finished test_succeeds() ----";
154 }
155
test_fails(void)156 void test_fails(void) {
157 LOG(ERROR) << "---- starting test_fails() ----";
158 grpc_resolved_address resolved_addr;
159 struct sockaddr_in* addr =
160 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
161 int connections_complete_before;
162 grpc_closure done;
163 grpc_core::ExecCtx exec_ctx;
164
165 memset(&resolved_addr, 0, sizeof(resolved_addr));
166 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
167 addr->sin_family = AF_INET;
168
169 gpr_mu_lock(g_mu);
170 connections_complete_before = g_connections_complete;
171 gpr_mu_unlock(g_mu);
172
173 // connect to a broken address
174 GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
175 int64_t connection_handle = grpc_tcp_client_connect(
176 &done, &g_connecting, g_pollset_set,
177 grpc_event_engine::experimental::ChannelArgsEndpointConfig(),
178 &resolved_addr, grpc_core::Timestamp::InfFuture());
179 gpr_mu_lock(g_mu);
180
181 // wait for the connection callback to finish
182 while (g_connections_complete == connections_complete_before) {
183 grpc_pollset_worker* worker = nullptr;
184 grpc_core::Timestamp polling_deadline = test_deadline();
185 switch (grpc_timer_check(&polling_deadline)) {
186 case GRPC_TIMERS_FIRED:
187 break;
188 case GRPC_TIMERS_NOT_CHECKED:
189 polling_deadline = grpc_core::Timestamp::ProcessEpoch();
190 ABSL_FALLTHROUGH_INTENDED;
191 case GRPC_TIMERS_CHECKED_AND_EMPTY:
192 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
193 "pollset_work",
194 grpc_pollset_work(g_pollset, &worker, polling_deadline)));
195 break;
196 }
197 gpr_mu_unlock(g_mu);
198 grpc_core::ExecCtx::Get()->Flush();
199 gpr_mu_lock(g_mu);
200 }
201
202 gpr_mu_unlock(g_mu);
203
204 // A cancellation attempt should fail because connect already failed.
205 ASSERT_EQ(grpc_tcp_client_cancel_connect(connection_handle), false);
206
207 LOG(ERROR) << "---- finished test_fails() ----";
208 }
209
test_connect_cancellation_succeeds(void)210 void test_connect_cancellation_succeeds(void) {
211 LOG(ERROR) << "---- starting test_connect_cancellation_succeeds() ----";
212 auto target_ipv6_addr_uri = *grpc_core::URI::Parse(absl::StrCat(
213 "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())));
214 auto target_ipv4_addr_uri = *grpc_core::URI::Parse(absl::StrCat(
215 "ipv4:127.0.0.1:", std::to_string(grpc_pick_unused_port_or_die())));
216 grpc_resolved_address resolved_addr;
217 int svr_fd;
218 grpc_closure done;
219 grpc_core::ExecCtx exec_ctx;
220 bool tried_ipv4 = false;
221 ASSERT_TRUE(grpc_parse_uri(target_ipv6_addr_uri, &resolved_addr));
222 auto try_bind = [&](int sock) {
223 return (sock >= 0 &&
224 bind(sock, reinterpret_cast<sockaddr*>(resolved_addr.addr),
225 resolved_addr.len) == 0);
226 };
227 // create a phony server
228 svr_fd = socket(AF_INET6, SOCK_STREAM, 0);
229 // Try ipv6
230 if (!try_bind(svr_fd)) {
231 if (svr_fd >= 0) {
232 close(svr_fd);
233 }
234 // Failed to bind ipv6. Try ipv4
235 ASSERT_TRUE(grpc_parse_uri(target_ipv4_addr_uri, &resolved_addr));
236 svr_fd = socket(AF_INET, SOCK_STREAM, 0);
237 tried_ipv4 = true;
238 if (!try_bind(svr_fd)) {
239 if (svr_fd >= 0) {
240 close(svr_fd);
241 }
242 LOG(ERROR) << "Skipping test. Failed to create a phony server bound to "
243 "ipv6 or ipv4 address";
244 return;
245 }
246 }
247
248 ASSERT_EQ(listen(svr_fd, 1), 0);
249
250 std::vector<int> client_sockets;
251 bool create_more_client_connections = true;
252 // Create and connect client sockets until the connection attempt times out.
253 // Even if the backlog specified to listen is 1, the kernel continues to
254 // accept a certain number of SYN packets before dropping them. This loop
255 // attempts to identify the number of new connection attempts that will
256 // be allowed by the kernel before any subsequent connection attempts
257 // become pending indefinitely.
258 while (create_more_client_connections) {
259 const int kOne = 1;
260 int client_socket = socket(tried_ipv4 ? AF_INET : AF_INET6, SOCK_STREAM, 0);
261 ASSERT_GE(client_socket, 0);
262 setsockopt(client_socket, SOL_SOCKET, SO_REUSEADDR, &kOne, sizeof(kOne));
263 // Make fd non-blocking.
264 int flags = fcntl(client_socket, F_GETFL, 0);
265 ASSERT_EQ(fcntl(client_socket, F_SETFL, flags | O_NONBLOCK), 0);
266
267 if (connect(client_socket, reinterpret_cast<sockaddr*>(resolved_addr.addr),
268 resolved_addr.len) == -1) {
269 if (errno == EINPROGRESS) {
270 struct pollfd pfd;
271 pfd.fd = client_socket;
272 pfd.events = POLLOUT;
273 pfd.revents = 0;
274 int ret = poll(&pfd, 1, 1000);
275 if (ret == -1) {
276 FAIL() << "poll() failed during connect; errno=" << errno;
277 } else if (ret == 0) {
278 // current connection attempt timed out. It indicates that the
279 // kernel will cause any subsequent connection attempts to
280 // become pending indefinitely.
281 create_more_client_connections = false;
282 }
283 } else {
284 FAIL() << "Failed to connect to the server. errno=%d" << errno;
285 }
286 }
287 client_sockets.push_back(client_socket);
288 }
289
290 // connect to it. accept() is not called on the bind socket. So the connection
291 // should appear to be stuck giving ample time to try to cancel it.
292 ASSERT_EQ(getsockname(svr_fd, reinterpret_cast<sockaddr*>(resolved_addr.addr),
293 (socklen_t*)&resolved_addr.len),
294 0);
295 GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx);
296 grpc_core::ChannelArgs args = grpc_core::CoreConfiguration::Get()
297 .channel_args_preconditioning()
298 .PreconditionChannelArgs(nullptr);
299 int64_t connection_handle = grpc_tcp_client_connect(
300 &done, &g_connecting, g_pollset_set,
301 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
302 &resolved_addr, grpc_core::Timestamp::InfFuture());
303 ASSERT_GT(connection_handle, 0);
304 ASSERT_EQ(grpc_tcp_client_cancel_connect(connection_handle), true);
305 for (auto sock : client_sockets) {
306 close(sock);
307 }
308 close(svr_fd);
309 LOG(ERROR) << "---- finished test_connect_cancellation_succeeds() ----";
310 }
311
test_fails_bad_addr_no_leak(void)312 void test_fails_bad_addr_no_leak(void) {
313 LOG(ERROR) << "---- starting test_fails_bad_addr_no_leak() ----";
314 grpc_resolved_address resolved_addr;
315 struct sockaddr_in* addr =
316 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
317 int connections_complete_before;
318 grpc_closure done;
319 grpc_core::ExecCtx exec_ctx;
320 memset(&resolved_addr, 0, sizeof(resolved_addr));
321 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
322 // force `grpc_tcp_client_prepare_fd` to fail. contrived, but effective.
323 addr->sin_family = AF_IPX;
324 gpr_mu_lock(g_mu);
325 connections_complete_before = g_connections_complete;
326 gpr_mu_unlock(g_mu);
327 // connect to an invalid address.
328 GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
329 grpc_tcp_client_connect(
330 &done, &g_connecting, g_pollset_set,
331 grpc_event_engine::experimental::ChannelArgsEndpointConfig(),
332 &resolved_addr, grpc_core::Timestamp::InfFuture());
333 gpr_mu_lock(g_mu);
334 while (g_connections_complete == connections_complete_before) {
335 grpc_pollset_worker* worker = nullptr;
336 grpc_core::Timestamp polling_deadline = test_deadline();
337 switch (grpc_timer_check(&polling_deadline)) {
338 case GRPC_TIMERS_FIRED:
339 break;
340 case GRPC_TIMERS_NOT_CHECKED:
341 polling_deadline = grpc_core::Timestamp::ProcessEpoch();
342 ABSL_FALLTHROUGH_INTENDED;
343 case GRPC_TIMERS_CHECKED_AND_EMPTY:
344 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
345 "pollset_work",
346 grpc_pollset_work(g_pollset, &worker, polling_deadline)));
347 break;
348 }
349 gpr_mu_unlock(g_mu);
350 grpc_core::ExecCtx::Get()->Flush();
351 gpr_mu_lock(g_mu);
352 }
353 gpr_mu_unlock(g_mu);
354 LOG(ERROR) << "---- finished test_fails_bad_addr_no_leak() ----";
355 }
356
destroy_pollset(void * p,grpc_error_handle)357 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
358 grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
359 }
360
TEST(TcpClientPosixTest,MainTest)361 TEST(TcpClientPosixTest, MainTest) {
362 grpc_closure destroyed;
363 grpc_init();
364
365 {
366 grpc_core::ExecCtx exec_ctx;
367 g_pollset_set = grpc_pollset_set_create();
368 g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
369 grpc_pollset_init(g_pollset, &g_mu);
370 grpc_pollset_set_add_pollset(g_pollset_set, g_pollset);
371
372 test_succeeds();
373 test_connect_cancellation_succeeds();
374 test_fails();
375 test_fails_bad_addr_no_leak();
376 grpc_pollset_set_destroy(g_pollset_set);
377 GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
378 grpc_schedule_on_exec_ctx);
379 grpc_pollset_shutdown(g_pollset, &destroyed);
380 }
381
382 grpc_shutdown();
383 gpr_free(g_pollset);
384 }
385
386 #endif // GRPC_POSIX_SOCKET_CLIENT
387
main(int argc,char ** argv)388 int main(int argc, char** argv) {
389 grpc::testing::TestEnvironment env(&argc, argv);
390 ::testing::InitGoogleTest(&argc, argv);
391 return RUN_ALL_TESTS();
392 }
393