• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <gtest/gtest.h>
20 
21 #include "src/core/lib/address_utils/parse_address.h"
22 #include "src/core/lib/channel/channel_args.h"
23 #include "src/core/lib/iomgr/port.h"
24 #include "src/core/util/time.h"
25 #include "test/core/test_util/port.h"
26 #include "test/core/test_util/test_config.h"
27 
28 // This test won't work except with posix sockets enabled
29 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
30 
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <grpc/grpc.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/time.h>
36 #include <netinet/in.h>
37 #include <poll.h>
38 #include <string.h>
39 #include <sys/socket.h>
40 #include <unistd.h>
41 
42 #include "absl/log/log.h"
43 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
44 #include "src/core/lib/iomgr/iomgr.h"
45 #include "src/core/lib/iomgr/pollset_set.h"
46 #include "src/core/lib/iomgr/socket_utils_posix.h"
47 #include "src/core/lib/iomgr/tcp_client.h"
48 #include "src/core/lib/iomgr/timer.h"
49 #include "src/core/lib/resource_quota/api.h"
50 #include "src/core/util/crash.h"
51 
52 static grpc_pollset_set* g_pollset_set;
53 static gpr_mu* g_mu;
54 static grpc_pollset* g_pollset;
55 static int g_connections_complete = 0;
56 static grpc_endpoint* g_connecting = nullptr;
57 
test_deadline(void)58 static grpc_core::Timestamp test_deadline(void) {
59   return grpc_core::Timestamp::FromTimespecRoundUp(
60       grpc_timeout_seconds_to_deadline(10));
61 }
62 
finish_connection()63 static void finish_connection() {
64   gpr_mu_lock(g_mu);
65   g_connections_complete++;
66   grpc_core::ExecCtx exec_ctx;
67   ASSERT_TRUE(
68       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
69 
70   gpr_mu_unlock(g_mu);
71 }
72 
must_succeed(void *,grpc_error_handle error)73 static void must_succeed(void* /*arg*/, grpc_error_handle error) {
74   ASSERT_NE(g_connecting, nullptr);
75   ASSERT_TRUE(error.ok());
76   grpc_endpoint_destroy(g_connecting);
77   g_connecting = nullptr;
78   finish_connection();
79 }
80 
must_fail(void *,grpc_error_handle error)81 static void must_fail(void* /*arg*/, grpc_error_handle error) {
82   ASSERT_EQ(g_connecting, nullptr);
83   ASSERT_FALSE(error.ok());
84   finish_connection();
85 }
86 
test_succeeds(void)87 void test_succeeds(void) {
88   LOG(ERROR) << "---- starting test_succeeds() ----";
89   grpc_resolved_address resolved_addr;
90   struct sockaddr_in* addr =
91       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
92   int svr_fd;
93   int r;
94   int connections_complete_before;
95   grpc_closure done;
96   grpc_core::ExecCtx exec_ctx;
97 
98   memset(&resolved_addr, 0, sizeof(resolved_addr));
99   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
100   addr->sin_family = AF_INET;
101 
102   // create a phony server
103   svr_fd = socket(AF_INET, SOCK_STREAM, 0);
104   ASSERT_GE(svr_fd, 0);
105   ASSERT_EQ(bind(svr_fd, (struct sockaddr*)addr, (socklen_t)resolved_addr.len),
106             0);
107   ASSERT_EQ(listen(svr_fd, 1), 0);
108 
109   gpr_mu_lock(g_mu);
110   connections_complete_before = g_connections_complete;
111   gpr_mu_unlock(g_mu);
112 
113   // connect to it
114   ASSERT_EQ(getsockname(svr_fd, (struct sockaddr*)addr,
115                         (socklen_t*)&resolved_addr.len),
116             0);
117   GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx);
118   grpc_core::ChannelArgs args = grpc_core::CoreConfiguration::Get()
119                                     .channel_args_preconditioning()
120                                     .PreconditionChannelArgs(nullptr);
121   int64_t connection_handle = grpc_tcp_client_connect(
122       &done, &g_connecting, g_pollset_set,
123       grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
124       &resolved_addr, grpc_core::Timestamp::InfFuture());
125   // await the connection
126   do {
127     resolved_addr.len = static_cast<socklen_t>(sizeof(addr));
128     r = accept(svr_fd, reinterpret_cast<struct sockaddr*>(addr),
129                reinterpret_cast<socklen_t*>(&resolved_addr.len));
130   } while (r == -1 && errno == EINTR);
131   ASSERT_GE(r, 0);
132   close(r);
133 
134   gpr_mu_lock(g_mu);
135 
136   while (g_connections_complete == connections_complete_before) {
137     grpc_pollset_worker* worker = nullptr;
138     ASSERT_TRUE(GRPC_LOG_IF_ERROR(
139         "pollset_work",
140         grpc_pollset_work(g_pollset, &worker,
141                           grpc_core::Timestamp::FromTimespecRoundUp(
142                               grpc_timeout_seconds_to_deadline(5)))));
143     gpr_mu_unlock(g_mu);
144     grpc_core::ExecCtx::Get()->Flush();
145     gpr_mu_lock(g_mu);
146   }
147 
148   gpr_mu_unlock(g_mu);
149 
150   // A cancellation attempt should fail because connect already succeeded.
151   ASSERT_EQ(grpc_tcp_client_cancel_connect(connection_handle), false);
152 
153   LOG(ERROR) << "---- finished test_succeeds() ----";
154 }
155 
test_fails(void)156 void test_fails(void) {
157   LOG(ERROR) << "---- starting test_fails() ----";
158   grpc_resolved_address resolved_addr;
159   struct sockaddr_in* addr =
160       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
161   int connections_complete_before;
162   grpc_closure done;
163   grpc_core::ExecCtx exec_ctx;
164 
165   memset(&resolved_addr, 0, sizeof(resolved_addr));
166   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
167   addr->sin_family = AF_INET;
168 
169   gpr_mu_lock(g_mu);
170   connections_complete_before = g_connections_complete;
171   gpr_mu_unlock(g_mu);
172 
173   // connect to a broken address
174   GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
175   int64_t connection_handle = grpc_tcp_client_connect(
176       &done, &g_connecting, g_pollset_set,
177       grpc_event_engine::experimental::ChannelArgsEndpointConfig(),
178       &resolved_addr, grpc_core::Timestamp::InfFuture());
179   gpr_mu_lock(g_mu);
180 
181   // wait for the connection callback to finish
182   while (g_connections_complete == connections_complete_before) {
183     grpc_pollset_worker* worker = nullptr;
184     grpc_core::Timestamp polling_deadline = test_deadline();
185     switch (grpc_timer_check(&polling_deadline)) {
186       case GRPC_TIMERS_FIRED:
187         break;
188       case GRPC_TIMERS_NOT_CHECKED:
189         polling_deadline = grpc_core::Timestamp::ProcessEpoch();
190         ABSL_FALLTHROUGH_INTENDED;
191       case GRPC_TIMERS_CHECKED_AND_EMPTY:
192         ASSERT_TRUE(GRPC_LOG_IF_ERROR(
193             "pollset_work",
194             grpc_pollset_work(g_pollset, &worker, polling_deadline)));
195         break;
196     }
197     gpr_mu_unlock(g_mu);
198     grpc_core::ExecCtx::Get()->Flush();
199     gpr_mu_lock(g_mu);
200   }
201 
202   gpr_mu_unlock(g_mu);
203 
204   // A cancellation attempt should fail because connect already failed.
205   ASSERT_EQ(grpc_tcp_client_cancel_connect(connection_handle), false);
206 
207   LOG(ERROR) << "---- finished test_fails() ----";
208 }
209 
test_connect_cancellation_succeeds(void)210 void test_connect_cancellation_succeeds(void) {
211   LOG(ERROR) << "---- starting test_connect_cancellation_succeeds() ----";
212   auto target_ipv6_addr_uri = *grpc_core::URI::Parse(absl::StrCat(
213       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())));
214   auto target_ipv4_addr_uri = *grpc_core::URI::Parse(absl::StrCat(
215       "ipv4:127.0.0.1:", std::to_string(grpc_pick_unused_port_or_die())));
216   grpc_resolved_address resolved_addr;
217   int svr_fd;
218   grpc_closure done;
219   grpc_core::ExecCtx exec_ctx;
220   bool tried_ipv4 = false;
221   ASSERT_TRUE(grpc_parse_uri(target_ipv6_addr_uri, &resolved_addr));
222   auto try_bind = [&](int sock) {
223     return (sock >= 0 &&
224             bind(sock, reinterpret_cast<sockaddr*>(resolved_addr.addr),
225                  resolved_addr.len) == 0);
226   };
227   // create a phony server
228   svr_fd = socket(AF_INET6, SOCK_STREAM, 0);
229   // Try ipv6
230   if (!try_bind(svr_fd)) {
231     if (svr_fd >= 0) {
232       close(svr_fd);
233     }
234     // Failed to bind ipv6. Try ipv4
235     ASSERT_TRUE(grpc_parse_uri(target_ipv4_addr_uri, &resolved_addr));
236     svr_fd = socket(AF_INET, SOCK_STREAM, 0);
237     tried_ipv4 = true;
238     if (!try_bind(svr_fd)) {
239       if (svr_fd >= 0) {
240         close(svr_fd);
241       }
242       LOG(ERROR) << "Skipping test. Failed to create a phony server bound to "
243                     "ipv6 or ipv4 address";
244       return;
245     }
246   }
247 
248   ASSERT_EQ(listen(svr_fd, 1), 0);
249 
250   std::vector<int> client_sockets;
251   bool create_more_client_connections = true;
252   // Create and connect client sockets until the connection attempt times out.
253   // Even if the backlog specified to listen is 1, the kernel continues to
254   // accept a certain number of SYN packets before dropping them. This loop
255   // attempts to identify the number of new connection attempts that will
256   // be allowed by the kernel before any subsequent connection attempts
257   // become pending indefinitely.
258   while (create_more_client_connections) {
259     const int kOne = 1;
260     int client_socket = socket(tried_ipv4 ? AF_INET : AF_INET6, SOCK_STREAM, 0);
261     ASSERT_GE(client_socket, 0);
262     setsockopt(client_socket, SOL_SOCKET, SO_REUSEADDR, &kOne, sizeof(kOne));
263     // Make fd non-blocking.
264     int flags = fcntl(client_socket, F_GETFL, 0);
265     ASSERT_EQ(fcntl(client_socket, F_SETFL, flags | O_NONBLOCK), 0);
266 
267     if (connect(client_socket, reinterpret_cast<sockaddr*>(resolved_addr.addr),
268                 resolved_addr.len) == -1) {
269       if (errno == EINPROGRESS) {
270         struct pollfd pfd;
271         pfd.fd = client_socket;
272         pfd.events = POLLOUT;
273         pfd.revents = 0;
274         int ret = poll(&pfd, 1, 1000);
275         if (ret == -1) {
276           FAIL() << "poll() failed during connect; errno=" << errno;
277         } else if (ret == 0) {
278           // current connection attempt timed out. It indicates that the
279           // kernel will cause any subsequent connection attempts to
280           // become pending indefinitely.
281           create_more_client_connections = false;
282         }
283       } else {
284         FAIL() << "Failed to connect to the server. errno=%d" << errno;
285       }
286     }
287     client_sockets.push_back(client_socket);
288   }
289 
290   // connect to it. accept() is not called on the bind socket. So the connection
291   // should appear to be stuck giving ample time to try to cancel it.
292   ASSERT_EQ(getsockname(svr_fd, reinterpret_cast<sockaddr*>(resolved_addr.addr),
293                         (socklen_t*)&resolved_addr.len),
294             0);
295   GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx);
296   grpc_core::ChannelArgs args = grpc_core::CoreConfiguration::Get()
297                                     .channel_args_preconditioning()
298                                     .PreconditionChannelArgs(nullptr);
299   int64_t connection_handle = grpc_tcp_client_connect(
300       &done, &g_connecting, g_pollset_set,
301       grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
302       &resolved_addr, grpc_core::Timestamp::InfFuture());
303   ASSERT_GT(connection_handle, 0);
304   ASSERT_EQ(grpc_tcp_client_cancel_connect(connection_handle), true);
305   for (auto sock : client_sockets) {
306     close(sock);
307   }
308   close(svr_fd);
309   LOG(ERROR) << "---- finished test_connect_cancellation_succeeds() ----";
310 }
311 
test_fails_bad_addr_no_leak(void)312 void test_fails_bad_addr_no_leak(void) {
313   LOG(ERROR) << "---- starting test_fails_bad_addr_no_leak() ----";
314   grpc_resolved_address resolved_addr;
315   struct sockaddr_in* addr =
316       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
317   int connections_complete_before;
318   grpc_closure done;
319   grpc_core::ExecCtx exec_ctx;
320   memset(&resolved_addr, 0, sizeof(resolved_addr));
321   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
322   // force `grpc_tcp_client_prepare_fd` to fail. contrived, but effective.
323   addr->sin_family = AF_IPX;
324   gpr_mu_lock(g_mu);
325   connections_complete_before = g_connections_complete;
326   gpr_mu_unlock(g_mu);
327   // connect to an invalid address.
328   GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
329   grpc_tcp_client_connect(
330       &done, &g_connecting, g_pollset_set,
331       grpc_event_engine::experimental::ChannelArgsEndpointConfig(),
332       &resolved_addr, grpc_core::Timestamp::InfFuture());
333   gpr_mu_lock(g_mu);
334   while (g_connections_complete == connections_complete_before) {
335     grpc_pollset_worker* worker = nullptr;
336     grpc_core::Timestamp polling_deadline = test_deadline();
337     switch (grpc_timer_check(&polling_deadline)) {
338       case GRPC_TIMERS_FIRED:
339         break;
340       case GRPC_TIMERS_NOT_CHECKED:
341         polling_deadline = grpc_core::Timestamp::ProcessEpoch();
342         ABSL_FALLTHROUGH_INTENDED;
343       case GRPC_TIMERS_CHECKED_AND_EMPTY:
344         ASSERT_TRUE(GRPC_LOG_IF_ERROR(
345             "pollset_work",
346             grpc_pollset_work(g_pollset, &worker, polling_deadline)));
347         break;
348     }
349     gpr_mu_unlock(g_mu);
350     grpc_core::ExecCtx::Get()->Flush();
351     gpr_mu_lock(g_mu);
352   }
353   gpr_mu_unlock(g_mu);
354   LOG(ERROR) << "---- finished test_fails_bad_addr_no_leak() ----";
355 }
356 
destroy_pollset(void * p,grpc_error_handle)357 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
358   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
359 }
360 
TEST(TcpClientPosixTest,MainTest)361 TEST(TcpClientPosixTest, MainTest) {
362   grpc_closure destroyed;
363   grpc_init();
364 
365   {
366     grpc_core::ExecCtx exec_ctx;
367     g_pollset_set = grpc_pollset_set_create();
368     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
369     grpc_pollset_init(g_pollset, &g_mu);
370     grpc_pollset_set_add_pollset(g_pollset_set, g_pollset);
371 
372     test_succeeds();
373     test_connect_cancellation_succeeds();
374     test_fails();
375     test_fails_bad_addr_no_leak();
376     grpc_pollset_set_destroy(g_pollset_set);
377     GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
378                       grpc_schedule_on_exec_ctx);
379     grpc_pollset_shutdown(g_pollset, &destroyed);
380   }
381 
382   grpc_shutdown();
383   gpr_free(g_pollset);
384 }
385 
386 #endif  // GRPC_POSIX_SOCKET_CLIENT
387 
main(int argc,char ** argv)388 int main(int argc, char** argv) {
389   grpc::testing::TestEnvironment env(&argc, argv);
390   ::testing::InitGoogleTest(&argc, argv);
391   return RUN_ALL_TESTS();
392 }
393