• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <gtest/gtest.h>
20 
21 #include "src/core/lib/event_engine/shim.h"
22 #include "src/core/lib/iomgr/port.h"
23 #include "src/core/util/time.h"
24 #include "test/core/test_util/test_config.h"
25 
26 // This test won't work except with posix sockets enabled
27 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
28 
29 #include <errno.h>
30 #include <ifaddrs.h>
31 #include <netinet/in.h>
32 #include <stdio.h>
33 #include <string.h>
34 #include <sys/socket.h>
35 #include <sys/types.h>
36 #include <unistd.h>
37 
38 #ifdef GRPC_HAVE_UNIX_SOCKET
39 #include <sys/un.h>
40 #endif
41 
42 #include <grpc/grpc.h>
43 #include <grpc/support/alloc.h>
44 #include <grpc/support/sync.h>
45 #include <grpc/support/time.h>
46 
47 #include <memory>
48 #include <string>
49 
50 #include "absl/log/log.h"
51 #include "src/core/lib/address_utils/sockaddr_utils.h"
52 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
53 #include "src/core/lib/iomgr/error.h"
54 #include "src/core/lib/iomgr/iomgr.h"
55 #include "src/core/lib/iomgr/resolve_address.h"
56 #include "src/core/lib/iomgr/tcp_server.h"
57 #include "src/core/lib/resource_quota/api.h"
58 #include "src/core/util/crash.h"
59 #include "src/core/util/memory.h"
60 #include "src/core/util/strerror.h"
61 #include "test/core/test_util/port.h"
62 
63 #define LOG_TEST(x) LOG(INFO) << #x
64 
65 static gpr_mu* g_mu;
66 static grpc_pollset* g_pollset;
67 static int g_nconnects = 0;
68 
69 typedef struct {
70   // Owns a ref to server.
71   grpc_tcp_server* server;
72   unsigned port_index;
73   unsigned fd_index;
74   int server_fd;
75 } on_connect_result;
76 
77 typedef struct {
78   grpc_tcp_server* server;
79 
80   // arg is this server_weak_ref.
81   grpc_closure server_shutdown;
82 } server_weak_ref;
83 
84 #define MAX_URI 1024
85 typedef struct {
86   grpc_resolved_address addr;
87   char str[MAX_URI];
88 } test_addr;
89 
90 #define MAX_ADDRS 100
91 typedef struct {
92   size_t naddrs;
93   test_addr addrs[MAX_ADDRS];
94 } test_addrs;
95 
96 static on_connect_result g_result = {nullptr, 0, 0, -1};
97 
98 static char family_name_buf[1024];
sock_family_name(int family)99 static const char* sock_family_name(int family) {
100   if (family == AF_INET) {
101     return "AF_INET";
102   } else if (family == AF_INET6) {
103     return "AF_INET6";
104   } else if (family == AF_UNSPEC) {
105     return "AF_UNSPEC";
106   } else {
107     sprintf(family_name_buf, "%d", family);
108     return family_name_buf;
109   }
110 }
111 
on_connect_result_init(on_connect_result * result)112 static void on_connect_result_init(on_connect_result* result) {
113   result->server = nullptr;
114   result->port_index = 0;
115   result->fd_index = 0;
116   result->server_fd = -1;
117 }
118 
on_connect_result_set(on_connect_result * result,const grpc_tcp_server_acceptor * acceptor)119 static void on_connect_result_set(on_connect_result* result,
120                                   const grpc_tcp_server_acceptor* acceptor) {
121   result->server = grpc_tcp_server_ref(acceptor->from_server);
122   result->port_index = acceptor->port_index;
123   result->fd_index = acceptor->fd_index;
124   result->server_fd = grpc_tcp_server_port_fd(
125       result->server, acceptor->port_index, acceptor->fd_index);
126 }
127 
server_weak_ref_shutdown(void * arg,grpc_error_handle)128 static void server_weak_ref_shutdown(void* arg, grpc_error_handle /*error*/) {
129   server_weak_ref* weak_ref = static_cast<server_weak_ref*>(arg);
130   weak_ref->server = nullptr;
131 }
132 
server_weak_ref_init(server_weak_ref * weak_ref)133 static void server_weak_ref_init(server_weak_ref* weak_ref) {
134   weak_ref->server = nullptr;
135   GRPC_CLOSURE_INIT(&weak_ref->server_shutdown, server_weak_ref_shutdown,
136                     weak_ref, grpc_schedule_on_exec_ctx);
137 }
138 
139 // Make weak_ref->server_shutdown a shutdown_starting cb on server.
140 // grpc_tcp_server promises that the server object will live until
141 // weak_ref->server_shutdown has returned. A strong ref on grpc_tcp_server
142 // should be held until server_weak_ref_set() returns to avoid a race where the
143 // server is deleted before the shutdown_starting cb is added.
server_weak_ref_set(server_weak_ref * weak_ref,grpc_tcp_server * server)144 static void server_weak_ref_set(server_weak_ref* weak_ref,
145                                 grpc_tcp_server* server) {
146   grpc_tcp_server_shutdown_starting_add(server, &weak_ref->server_shutdown);
147   weak_ref->server = server;
148 }
149 
test_addr_init_str(test_addr * addr)150 static void test_addr_init_str(test_addr* addr) {
151   std::string str = grpc_sockaddr_to_string(&addr->addr, false).value();
152   size_t str_len = std::min(str.size(), sizeof(addr->str) - 1);
153   memcpy(addr->str, str.c_str(), str_len);
154   addr->str[str_len] = '\0';
155 }
156 
on_connect(void *,grpc_endpoint * tcp,grpc_pollset *,grpc_tcp_server_acceptor * acceptor)157 static void on_connect(void* /*arg*/, grpc_endpoint* tcp,
158                        grpc_pollset* /*pollset*/,
159                        grpc_tcp_server_acceptor* acceptor) {
160   grpc_endpoint_destroy(tcp);
161 
162   on_connect_result temp_result;
163   on_connect_result_set(&temp_result, acceptor);
164   gpr_free(acceptor);
165 
166   gpr_mu_lock(g_mu);
167   g_result = temp_result;
168   g_nconnects++;
169   ASSERT_TRUE(
170       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
171   gpr_mu_unlock(g_mu);
172 }
173 
test_no_op(void)174 static void test_no_op(void) {
175   grpc_core::ExecCtx exec_ctx;
176   grpc_tcp_server* s;
177   auto args = grpc_core::CoreConfiguration::Get()
178                   .channel_args_preconditioning()
179                   .PreconditionChannelArgs(nullptr);
180   ASSERT_EQ(
181       absl::OkStatus(),
182       grpc_tcp_server_create(
183           nullptr,
184           grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
185           on_connect, nullptr, &s));
186   grpc_tcp_server_unref(s);
187 }
188 
test_no_op_with_start(void)189 static void test_no_op_with_start(void) {
190   grpc_core::ExecCtx exec_ctx;
191   grpc_tcp_server* s;
192   auto args = grpc_core::CoreConfiguration::Get()
193                   .channel_args_preconditioning()
194                   .PreconditionChannelArgs(nullptr);
195   ASSERT_EQ(
196       absl::OkStatus(),
197       grpc_tcp_server_create(
198           nullptr,
199           grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
200           on_connect, nullptr, &s));
201   LOG_TEST("test_no_op_with_start");
202   std::vector<grpc_pollset*> empty_pollset;
203   grpc_tcp_server_start(s, &empty_pollset);
204   grpc_tcp_server_unref(s);
205 }
206 
test_no_op_with_port(void)207 static void test_no_op_with_port(void) {
208   grpc_core::ExecCtx exec_ctx;
209   grpc_resolved_address resolved_addr;
210   struct sockaddr_in* addr =
211       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
212   grpc_tcp_server* s;
213   auto args = grpc_core::CoreConfiguration::Get()
214                   .channel_args_preconditioning()
215                   .PreconditionChannelArgs(nullptr);
216   ASSERT_EQ(
217       absl::OkStatus(),
218       grpc_tcp_server_create(
219           nullptr,
220           grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
221           on_connect, nullptr, &s));
222   LOG_TEST("test_no_op_with_port");
223 
224   memset(&resolved_addr, 0, sizeof(resolved_addr));
225   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
226   addr->sin_family = AF_INET;
227   int port = -1;
228   ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &port),
229             absl::OkStatus());
230   ASSERT_GT(port, 0);
231 
232   grpc_tcp_server_unref(s);
233 }
234 
test_no_op_with_port_and_start(void)235 static void test_no_op_with_port_and_start(void) {
236   grpc_core::ExecCtx exec_ctx;
237   grpc_resolved_address resolved_addr;
238   struct sockaddr_in* addr =
239       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
240   grpc_tcp_server* s;
241   auto args = grpc_core::CoreConfiguration::Get()
242                   .channel_args_preconditioning()
243                   .PreconditionChannelArgs(nullptr);
244   ASSERT_EQ(
245       absl::OkStatus(),
246       grpc_tcp_server_create(
247           nullptr,
248           grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
249           on_connect, nullptr, &s));
250   LOG_TEST("test_no_op_with_port_and_start");
251   int port = -1;
252 
253   memset(&resolved_addr, 0, sizeof(resolved_addr));
254   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
255   addr->sin_family = AF_INET;
256   ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &port),
257             absl::OkStatus());
258   ASSERT_GT(port, 0);
259 
260   std::vector<grpc_pollset*> empty_pollset;
261   grpc_tcp_server_start(s, &empty_pollset);
262 
263   grpc_tcp_server_unref(s);
264 }
265 
tcp_connect(const test_addr * remote,on_connect_result * result)266 static grpc_error_handle tcp_connect(const test_addr* remote,
267                                      on_connect_result* result) {
268   grpc_core::Timestamp deadline = grpc_core::Timestamp::FromTimespecRoundUp(
269       grpc_timeout_seconds_to_deadline(10));
270   int clifd;
271   int nconnects_before;
272   const struct sockaddr* remote_addr =
273       reinterpret_cast<const struct sockaddr*>(remote->addr.addr);
274 
275   LOG(INFO) << "Connecting to " << remote->str;
276   gpr_mu_lock(g_mu);
277   nconnects_before = g_nconnects;
278   on_connect_result_init(&g_result);
279   clifd = socket(remote_addr->sa_family, SOCK_STREAM, 0);
280   if (clifd < 0) {
281     gpr_mu_unlock(g_mu);
282     return GRPC_OS_ERROR(errno, "Failed to create socket");
283   }
284   VLOG(2) << "start connect to " << remote->str;
285   if (connect(clifd, remote_addr, static_cast<socklen_t>(remote->addr.len)) !=
286       0) {
287     gpr_mu_unlock(g_mu);
288     close(clifd);
289     return GRPC_OS_ERROR(errno, "connect");
290   }
291   VLOG(2) << "wait";
292   while (g_nconnects == nconnects_before &&
293          deadline > grpc_core::Timestamp::Now()) {
294     grpc_pollset_worker* worker = nullptr;
295     grpc_error_handle err;
296     if ((err = grpc_pollset_work(g_pollset, &worker, deadline)) !=
297         absl::OkStatus()) {
298       gpr_mu_unlock(g_mu);
299       close(clifd);
300       return err;
301     }
302     gpr_mu_unlock(g_mu);
303 
304     gpr_mu_lock(g_mu);
305   }
306   VLOG(2) << "wait done";
307   if (g_nconnects != nconnects_before + 1) {
308     gpr_mu_unlock(g_mu);
309     close(clifd);
310     return GRPC_ERROR_CREATE("Didn't connect");
311   }
312   close(clifd);
313   *result = g_result;
314 
315   gpr_mu_unlock(g_mu);
316   LOG(INFO) << "Result (" << result->port_index << ", " << result->fd_index
317             << ") fd " << result->server_fd;
318   grpc_tcp_server_unref(result->server);
319   return absl::OkStatus();
320 }
321 
322 // Tests a tcp server on "::" listeners with multiple ports. If channel_args is
323 // non-NULL, pass them to the server. If dst_addrs is non-NULL, use valid addrs
324 // as destination addrs (port is not set). If dst_addrs is NULL, use listener
325 // addrs as destination addrs. If test_dst_addrs is true, test connectivity with
326 // each destination address, set grpc_resolved_address::len=0 for failures, but
327 // don't fail the overall unitest.
test_connect(size_t num_connects,const grpc_channel_args * channel_args,test_addrs * dst_addrs,bool test_dst_addrs)328 static void test_connect(size_t num_connects,
329                          const grpc_channel_args* channel_args,
330                          test_addrs* dst_addrs, bool test_dst_addrs) {
331   grpc_core::ExecCtx exec_ctx;
332   // Use aligned_storage to allocate grpc_resolved_address objects on stack
333   // to meet the alignment requirement of sockaddr_storage type.
334   std::aligned_storage<sizeof(grpc_resolved_address),
335                        alignof(sockaddr_storage)>::type resolved_addr_buffer;
336   std::aligned_storage<sizeof(grpc_resolved_address),
337                        alignof(sockaddr_storage)>::type resolved_addr1_buffer;
338   grpc_resolved_address& resolved_addr =
339       *reinterpret_cast<grpc_resolved_address*>(&resolved_addr_buffer);
340   grpc_resolved_address& resolved_addr1 =
341       *reinterpret_cast<grpc_resolved_address*>(&resolved_addr1_buffer);
342   struct sockaddr_storage* const addr =
343       reinterpret_cast<struct sockaddr_storage*>(resolved_addr.addr);
344   struct sockaddr_storage* const addr1 =
345       reinterpret_cast<struct sockaddr_storage*>(resolved_addr1.addr);
346   unsigned svr_fd_count;
347   int port;
348   int svr_port;
349   unsigned svr1_fd_count;
350   int svr1_port;
351   grpc_tcp_server* s;
352   const unsigned num_ports = 2;
353   auto new_channel_args = grpc_core::CoreConfiguration::Get()
354                               .channel_args_preconditioning()
355                               .PreconditionChannelArgs(channel_args);
356   ASSERT_EQ(absl::OkStatus(),
357             grpc_tcp_server_create(
358                 nullptr,
359                 grpc_event_engine::experimental::ChannelArgsEndpointConfig(
360                     new_channel_args),
361                 on_connect, nullptr, &s));
362   unsigned port_num;
363   server_weak_ref weak_ref;
364   server_weak_ref_init(&weak_ref);
365   server_weak_ref_set(&weak_ref, s);
366   LOG_TEST("test_connect");
367   LOG(INFO) << "clients=" << num_connects << ", num chan args="
368             << (channel_args != nullptr ? channel_args->num_args : 0)
369             << ", remote IP=" << (dst_addrs != nullptr ? "<specific>" : "::")
370             << ", test_dst_addrs=" << test_dst_addrs;
371   memset(&resolved_addr, 0, sizeof(resolved_addr));
372   memset(&resolved_addr1, 0, sizeof(resolved_addr1));
373   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
374   resolved_addr1.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
375   addr->ss_family = addr1->ss_family = AF_INET;
376   ASSERT_TRUE(GRPC_LOG_IF_ERROR(
377       "grpc_tcp_server_add_port",
378       grpc_tcp_server_add_port(s, &resolved_addr, &svr_port)));
379   LOG(INFO) << "Allocated port " << svr_port;
380   ASSERT_GT(svr_port, 0);
381   // Cannot use wildcard (port==0), because add_port() will try to reuse the
382   // same port as a previous add_port().
383   svr1_port = grpc_pick_unused_port_or_die();
384   ASSERT_GT(svr1_port, 0);
385   LOG(INFO) << "Picked unused port " << svr1_port;
386   grpc_sockaddr_set_port(&resolved_addr1, svr1_port);
387   ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr1, &port),
388             absl::OkStatus());
389   ASSERT_EQ(port, svr1_port);
390 
391   // Bad port_index.
392   ASSERT_EQ(grpc_tcp_server_port_fd_count(s, 2), 0);
393   ASSERT_LT(grpc_tcp_server_port_fd(s, 2, 0), 0);
394 
395   // Bad fd_index.
396   ASSERT_LT(grpc_tcp_server_port_fd(s, 0, 100), 0);
397   ASSERT_LT(grpc_tcp_server_port_fd(s, 1, 100), 0);
398 
399   // Got at least one fd per port.
400   svr_fd_count = grpc_tcp_server_port_fd_count(s, 0);
401   ASSERT_GE(svr_fd_count, 1);
402   svr1_fd_count = grpc_tcp_server_port_fd_count(s, 1);
403   ASSERT_GE(svr1_fd_count, 1);
404 
405   std::vector<grpc_pollset*> test_pollset;
406   test_pollset.push_back(g_pollset);
407   grpc_tcp_server_start(s, &test_pollset);
408 
409   if (dst_addrs != nullptr) {
410     int ports[] = {svr_port, svr1_port};
411     for (port_num = 0; port_num < num_ports; ++port_num) {
412       size_t dst_idx;
413       size_t num_tested = 0;
414       for (dst_idx = 0; dst_idx < dst_addrs->naddrs; ++dst_idx) {
415         test_addr dst = dst_addrs->addrs[dst_idx];
416         on_connect_result result;
417         grpc_error_handle err;
418         if (dst.addr.len == 0) {
419           VLOG(2) << "Skipping test of non-functional local IP " << dst.str;
420           continue;
421         }
422         ASSERT_TRUE(grpc_sockaddr_set_port(&dst.addr, ports[port_num]));
423         test_addr_init_str(&dst);
424         ++num_tested;
425         on_connect_result_init(&result);
426         if ((err = tcp_connect(&dst, &result)) == absl::OkStatus() &&
427             result.server_fd >= 0 && result.server == s) {
428           continue;
429         }
430         LOG(ERROR) << "Failed to connect to " << dst.str << ": "
431                    << grpc_core::StatusToString(err);
432         ASSERT_TRUE(test_dst_addrs);
433         dst_addrs->addrs[dst_idx].addr.len = 0;
434       }
435       ASSERT_GT(num_tested, 0);
436     }
437   } else {
438     for (port_num = 0; port_num < num_ports; ++port_num) {
439       const unsigned num_fds = grpc_tcp_server_port_fd_count(s, port_num);
440       unsigned fd_num;
441       for (fd_num = 0; fd_num < num_fds; ++fd_num) {
442         int fd = grpc_tcp_server_port_fd(s, port_num, fd_num);
443         size_t connect_num;
444         test_addr dst;
445         ASSERT_GE(fd, 0);
446         dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr));
447         ASSERT_EQ(getsockname(fd, (struct sockaddr*)dst.addr.addr,
448                               (socklen_t*)&dst.addr.len),
449                   0);
450         ASSERT_LE(dst.addr.len, sizeof(dst.addr.addr));
451         test_addr_init_str(&dst);
452         LOG(INFO) << "(" << port_num << ", " << fd_num << ") fd " << fd
453                   << " family " << sock_family_name(addr->ss_family)
454                   << " listening on " << dst.str;
455         for (connect_num = 0; connect_num < num_connects; ++connect_num) {
456           on_connect_result result;
457           on_connect_result_init(&result);
458           ASSERT_TRUE(
459               GRPC_LOG_IF_ERROR("tcp_connect", tcp_connect(&dst, &result)));
460           ASSERT_EQ(result.server_fd, fd);
461           ASSERT_EQ(result.port_index, port_num);
462           ASSERT_EQ(result.fd_index, fd_num);
463           ASSERT_EQ(result.server, s);
464           ASSERT_EQ(
465               grpc_tcp_server_port_fd(s, result.port_index, result.fd_index),
466               result.server_fd);
467         }
468       }
469     }
470   }
471   // Weak ref to server valid until final unref.
472   ASSERT_NE(weak_ref.server, nullptr);
473   ASSERT_GE(grpc_tcp_server_port_fd(s, 0, 0), 0);
474 
475   grpc_tcp_server_unref(s);
476   grpc_core::ExecCtx::Get()->Flush();
477 
478   // Weak ref lost.
479   ASSERT_EQ(weak_ref.server, nullptr);
480 }
481 
pre_allocate_inet_sock(grpc_tcp_server * s,int family,int port,int * fd)482 static int pre_allocate_inet_sock(grpc_tcp_server* s, int family, int port,
483                                   int* fd) {
484   struct sockaddr_in6 address;
485   memset(&address, 0, sizeof(address));
486   address.sin6_family = family;
487   address.sin6_port = htons(port);
488 
489   int pre_fd = socket(address.sin6_family, SOCK_STREAM, 0);
490   if (pre_fd < 0) {
491     LOG(ERROR) << "Unable to create inet socket: %m";
492     return -1;
493   }
494 
495   const int enable = 1;
496   setsockopt(pre_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
497 
498   int b = bind(pre_fd, reinterpret_cast<struct sockaddr*>(&address),
499                sizeof(address));
500   if (b < 0) {
501     LOG(ERROR) << "Unable to bind inet socket: %m";
502     return -1;
503   }
504 
505   int l = listen(pre_fd, SOMAXCONN);
506   if (l < 0) {
507     LOG(ERROR) << "Unable to listen on inet socket: %m";
508     return -1;
509   }
510 
511   grpc_tcp_server_set_pre_allocated_fd(s, pre_fd);
512   *fd = pre_fd;
513 
514   return 0;
515 }
516 
test_pre_allocated_inet_fd()517 static void test_pre_allocated_inet_fd() {
518   grpc_core::ExecCtx exec_ctx;
519   grpc_resolved_address resolved_addr;
520   struct sockaddr_in6* addr =
521       reinterpret_cast<struct sockaddr_in6*>(resolved_addr.addr);
522   grpc_tcp_server* s;
523   if (grpc_event_engine::experimental::UseEventEngineListener()) {
524     // TODO(vigneshbabu): Skip the test when event engine is enabled.
525     // Pre-allocated fd support will be added to event engine later.
526     return;
527   }
528   auto args = grpc_core::CoreConfiguration::Get()
529                   .channel_args_preconditioning()
530                   .PreconditionChannelArgs(nullptr);
531   ASSERT_EQ(
532       absl::OkStatus(),
533       grpc_tcp_server_create(
534           nullptr,
535           grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
536           on_connect, nullptr, &s));
537   LOG_TEST("test_pre_allocated_inet_fd");
538 
539   // Pre allocate FD
540   int pre_fd;
541   int port = grpc_pick_unused_port_or_die();
542 
543   int res_pre = pre_allocate_inet_sock(s, AF_INET6, port, &pre_fd);
544   if (res_pre < 0) {
545     grpc_tcp_server_unref(s);
546     close(pre_fd);
547     return;
548   }
549   ASSERT_EQ(grpc_tcp_server_pre_allocated_fd(s), pre_fd);
550 
551   // Add port
552   int pt;
553   memset(&resolved_addr, 0, sizeof(resolved_addr));
554   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
555   addr->sin6_family = AF_INET6;
556   addr->sin6_port = htons(port);
557   ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &pt), absl::OkStatus());
558   ASSERT_GE(grpc_tcp_server_port_fd_count(s, 0), 1);
559   ASSERT_EQ(grpc_tcp_server_port_fd(s, 0, 0), pre_fd);
560 
561   // Start server
562   std::vector<grpc_pollset*> test_pollset;
563   test_pollset.push_back(g_pollset);
564   grpc_tcp_server_start(s, &test_pollset);
565 
566   // Test connection
567   test_addr dst;
568   dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr));
569   ASSERT_EQ(getsockname(pre_fd, (struct sockaddr*)dst.addr.addr,
570                         (socklen_t*)&dst.addr.len),
571             0);
572   ASSERT_LE(dst.addr.len, sizeof(dst.addr.addr));
573   test_addr_init_str(&dst);
574   on_connect_result result;
575   on_connect_result_init(&result);
576   ASSERT_EQ(tcp_connect(&dst, &result), absl::OkStatus());
577   ASSERT_EQ(result.server_fd, pre_fd);
578   ASSERT_EQ(result.server, s);
579   ASSERT_EQ(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index),
580             result.server_fd);
581 
582   grpc_tcp_server_unref(s);
583   close(pre_fd);
584 }
585 
586 #ifdef GRPC_HAVE_UNIX_SOCKET
pre_allocate_unix_sock(grpc_tcp_server * s,const char * path,int * fd)587 static int pre_allocate_unix_sock(grpc_tcp_server* s, const char* path,
588                                   int* fd) {
589   struct sockaddr_un address;
590   memset(&address, 0, sizeof(struct sockaddr_un));
591   address.sun_family = AF_UNIX;
592   strcpy(address.sun_path, path);
593 
594   int pre_fd = socket(address.sun_family, SOCK_STREAM, 0);
595   if (pre_fd < 0) {
596     LOG(ERROR) << "Unable to create unix socket: %m";
597     return -1;
598   }
599 
600   int b = bind(pre_fd, reinterpret_cast<struct sockaddr*>(&address),
601                sizeof(address));
602   if (b < 0) {
603     LOG(ERROR) << "Unable to bind unix socket: %m";
604     return -1;
605   }
606 
607   int l = listen(pre_fd, SOMAXCONN);
608   if (l < 0) {
609     LOG(ERROR) << "Unable to listen on unix socket: %m";
610     return -1;
611   }
612 
613   grpc_tcp_server_set_pre_allocated_fd(s, pre_fd);
614   *fd = pre_fd;
615 
616   return 0;
617 }
618 
test_pre_allocated_unix_fd()619 static void test_pre_allocated_unix_fd() {
620   grpc_core::ExecCtx exec_ctx;
621   grpc_resolved_address resolved_addr;
622   struct sockaddr_un* addr =
623       reinterpret_cast<struct sockaddr_un*>(resolved_addr.addr);
624   grpc_tcp_server* s;
625   if (grpc_event_engine::experimental::UseEventEngineListener()) {
626     // TODO(vigneshbabu): Skip the test when event engine is enabled.
627     // Pre-allocated fd support will be added to event engine later.
628     return;
629   }
630   auto args = grpc_core::CoreConfiguration::Get()
631                   .channel_args_preconditioning()
632                   .PreconditionChannelArgs(nullptr);
633   ASSERT_EQ(
634       absl::OkStatus(),
635       grpc_tcp_server_create(
636           nullptr,
637           grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
638           on_connect, nullptr, &s));
639   LOG_TEST("test_pre_allocated_unix_fd");
640 
641   // Pre allocate FD
642   int pre_fd;
643   char path[100];
644   srand(time(nullptr));
645   memset(path, 0, sizeof(path));
646   sprintf(path, "/tmp/pre_fd_test_%d", rand());
647 
648   int res_pre = pre_allocate_unix_sock(s, path, &pre_fd);
649   if (res_pre < 0) {
650     grpc_tcp_server_unref(s);
651     close(pre_fd);
652     unlink(path);
653     return;
654   }
655 
656   ASSERT_EQ(grpc_tcp_server_pre_allocated_fd(s), pre_fd);
657 
658   // Add port
659   int pt;
660   memset(&resolved_addr, 0, sizeof(resolved_addr));
661   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_un));
662   addr->sun_family = AF_UNIX;
663   strcpy(addr->sun_path, path);
664   ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &pt), absl::OkStatus());
665   ASSERT_GE(grpc_tcp_server_port_fd_count(s, 0), 1);
666   ASSERT_EQ(grpc_tcp_server_port_fd(s, 0, 0), pre_fd);
667 
668   // Start server
669   std::vector<grpc_pollset*> test_pollset;
670   test_pollset.push_back(g_pollset);
671   grpc_tcp_server_start(s, &test_pollset);
672 
673   // Test connection
674   test_addr dst;
675   dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr));
676   ASSERT_EQ(getsockname(pre_fd, (struct sockaddr*)dst.addr.addr,
677                         (socklen_t*)&dst.addr.len),
678             0);
679   ASSERT_LE(dst.addr.len, sizeof(dst.addr.addr));
680   test_addr_init_str(&dst);
681   on_connect_result result;
682   on_connect_result_init(&result);
683 
684   grpc_error_handle res_conn = tcp_connect(&dst, &result);
685   // If the path no longer exists, errno is 2. This can happen when
686   // runninig the test multiple times in parallel. Do not fail the test
687   if (absl::IsUnknown(res_conn) && res_conn.raw_code() == 2) {
688     LOG(ERROR)
689         << "Unable to test pre_allocated unix socket: path does not exist";
690     grpc_tcp_server_unref(s);
691     close(pre_fd);
692     return;
693   }
694 
695   ASSERT_EQ(res_conn, absl::OkStatus());
696   ASSERT_EQ(result.server_fd, pre_fd);
697   ASSERT_EQ(result.server, s);
698   ASSERT_EQ(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index),
699             result.server_fd);
700 
701   grpc_tcp_server_unref(s);
702   close(pre_fd);
703   unlink(path);
704 }
705 #endif  // GRPC_HAVE_UNIX_SOCKET
706 
destroy_pollset(void * p,grpc_error_handle)707 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
708   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
709 }
710 
TEST(TcpServerPosixTest,MainTest)711 TEST(TcpServerPosixTest, MainTest) {
712   grpc_closure destroyed;
713   grpc_arg chan_args[1];
714   chan_args[0].type = GRPC_ARG_INTEGER;
715   chan_args[0].key = const_cast<char*>(GRPC_ARG_EXPAND_WILDCARD_ADDRS);
716   chan_args[0].value.integer = 1;
717   const grpc_channel_args channel_args = {1, chan_args};
718   struct ifaddrs* ifa = nullptr;
719   struct ifaddrs* ifa_it;
720   // Zalloc dst_addrs to avoid oversized frames.
721   test_addrs* dst_addrs = grpc_core::Zalloc<test_addrs>();
722   grpc_init();
723   // wait a few seconds to make sure IPv6 link-local addresses can be bound
724   // if we are running under docker container that has just started.
725   // See https://github.com/moby/moby/issues/38491
726   // See https://github.com/grpc/grpc/issues/15610
727   gpr_sleep_until(grpc_timeout_seconds_to_deadline(4));
728   {
729     grpc_core::ExecCtx exec_ctx;
730     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
731     grpc_pollset_init(g_pollset, &g_mu);
732 
733     test_no_op();
734     test_no_op_with_start();
735     test_no_op_with_port();
736     test_no_op_with_port_and_start();
737     test_pre_allocated_inet_fd();
738 #ifdef GRPC_HAVE_UNIX_SOCKET
739     test_pre_allocated_unix_fd();
740 #endif
741 
742     if (getifaddrs(&ifa) != 0 || ifa == nullptr) {
743       FAIL() << "getifaddrs: " << grpc_core::StrError(errno);
744     }
745     dst_addrs->naddrs = 0;
746     for (ifa_it = ifa; ifa_it != nullptr && dst_addrs->naddrs < MAX_ADDRS;
747          ifa_it = ifa_it->ifa_next) {
748       if (ifa_it->ifa_addr == nullptr) {
749         continue;
750       } else if (ifa_it->ifa_addr->sa_family == AF_INET) {
751         dst_addrs->addrs[dst_addrs->naddrs].addr.len =
752             static_cast<socklen_t>(sizeof(struct sockaddr_in));
753       } else if (ifa_it->ifa_addr->sa_family == AF_INET6) {
754         dst_addrs->addrs[dst_addrs->naddrs].addr.len =
755             static_cast<socklen_t>(sizeof(struct sockaddr_in6));
756       } else {
757         continue;
758       }
759       memcpy(dst_addrs->addrs[dst_addrs->naddrs].addr.addr, ifa_it->ifa_addr,
760              dst_addrs->addrs[dst_addrs->naddrs].addr.len);
761       ASSERT_TRUE(
762           grpc_sockaddr_set_port(&dst_addrs->addrs[dst_addrs->naddrs].addr, 0));
763       test_addr_init_str(&dst_addrs->addrs[dst_addrs->naddrs]);
764       ++dst_addrs->naddrs;
765     }
766     freeifaddrs(ifa);
767     ifa = nullptr;
768 
769     // Connect to same addresses as listeners.
770     test_connect(1, nullptr, nullptr, false);
771     test_connect(10, nullptr, nullptr, false);
772 
773     // Set dst_addrs->addrs[i].len=0 for dst_addrs that are unreachable with a
774     // "::" listener.
775     test_connect(1, nullptr, dst_addrs, true);
776 
777     // Test connect(2) with dst_addrs.
778     test_connect(1, &channel_args, dst_addrs, false);
779     // Test connect(2) with dst_addrs.
780     test_connect(10, &channel_args, dst_addrs, false);
781 
782     GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
783                       grpc_schedule_on_exec_ctx);
784     grpc_pollset_shutdown(g_pollset, &destroyed);
785   }
786   grpc_shutdown();
787   gpr_free(dst_addrs);
788   gpr_free(g_pollset);
789 }
790 
791 #endif  // GRPC_POSIX_SOCKET_SERVER
792 
main(int argc,char ** argv)793 int main(int argc, char** argv) {
794   grpc::testing::TestEnvironment env(&argc, argv);
795   ::testing::InitGoogleTest(&argc, argv);
796   return RUN_ALL_TESTS();
797 }
798