1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <gtest/gtest.h>
20
21 #include "src/core/lib/event_engine/shim.h"
22 #include "src/core/lib/iomgr/port.h"
23 #include "src/core/util/time.h"
24 #include "test/core/test_util/test_config.h"
25
26 // This test won't work except with posix sockets enabled
27 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
28
29 #include <errno.h>
30 #include <ifaddrs.h>
31 #include <netinet/in.h>
32 #include <stdio.h>
33 #include <string.h>
34 #include <sys/socket.h>
35 #include <sys/types.h>
36 #include <unistd.h>
37
38 #ifdef GRPC_HAVE_UNIX_SOCKET
39 #include <sys/un.h>
40 #endif
41
42 #include <grpc/grpc.h>
43 #include <grpc/support/alloc.h>
44 #include <grpc/support/sync.h>
45 #include <grpc/support/time.h>
46
47 #include <memory>
48 #include <string>
49
50 #include "absl/log/log.h"
51 #include "src/core/lib/address_utils/sockaddr_utils.h"
52 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
53 #include "src/core/lib/iomgr/error.h"
54 #include "src/core/lib/iomgr/iomgr.h"
55 #include "src/core/lib/iomgr/resolve_address.h"
56 #include "src/core/lib/iomgr/tcp_server.h"
57 #include "src/core/lib/resource_quota/api.h"
58 #include "src/core/util/crash.h"
59 #include "src/core/util/memory.h"
60 #include "src/core/util/strerror.h"
61 #include "test/core/test_util/port.h"
62
63 #define LOG_TEST(x) LOG(INFO) << #x
64
65 static gpr_mu* g_mu;
66 static grpc_pollset* g_pollset;
67 static int g_nconnects = 0;
68
69 typedef struct {
70 // Owns a ref to server.
71 grpc_tcp_server* server;
72 unsigned port_index;
73 unsigned fd_index;
74 int server_fd;
75 } on_connect_result;
76
77 typedef struct {
78 grpc_tcp_server* server;
79
80 // arg is this server_weak_ref.
81 grpc_closure server_shutdown;
82 } server_weak_ref;
83
84 #define MAX_URI 1024
85 typedef struct {
86 grpc_resolved_address addr;
87 char str[MAX_URI];
88 } test_addr;
89
90 #define MAX_ADDRS 100
91 typedef struct {
92 size_t naddrs;
93 test_addr addrs[MAX_ADDRS];
94 } test_addrs;
95
96 static on_connect_result g_result = {nullptr, 0, 0, -1};
97
98 static char family_name_buf[1024];
sock_family_name(int family)99 static const char* sock_family_name(int family) {
100 if (family == AF_INET) {
101 return "AF_INET";
102 } else if (family == AF_INET6) {
103 return "AF_INET6";
104 } else if (family == AF_UNSPEC) {
105 return "AF_UNSPEC";
106 } else {
107 sprintf(family_name_buf, "%d", family);
108 return family_name_buf;
109 }
110 }
111
on_connect_result_init(on_connect_result * result)112 static void on_connect_result_init(on_connect_result* result) {
113 result->server = nullptr;
114 result->port_index = 0;
115 result->fd_index = 0;
116 result->server_fd = -1;
117 }
118
on_connect_result_set(on_connect_result * result,const grpc_tcp_server_acceptor * acceptor)119 static void on_connect_result_set(on_connect_result* result,
120 const grpc_tcp_server_acceptor* acceptor) {
121 result->server = grpc_tcp_server_ref(acceptor->from_server);
122 result->port_index = acceptor->port_index;
123 result->fd_index = acceptor->fd_index;
124 result->server_fd = grpc_tcp_server_port_fd(
125 result->server, acceptor->port_index, acceptor->fd_index);
126 }
127
server_weak_ref_shutdown(void * arg,grpc_error_handle)128 static void server_weak_ref_shutdown(void* arg, grpc_error_handle /*error*/) {
129 server_weak_ref* weak_ref = static_cast<server_weak_ref*>(arg);
130 weak_ref->server = nullptr;
131 }
132
server_weak_ref_init(server_weak_ref * weak_ref)133 static void server_weak_ref_init(server_weak_ref* weak_ref) {
134 weak_ref->server = nullptr;
135 GRPC_CLOSURE_INIT(&weak_ref->server_shutdown, server_weak_ref_shutdown,
136 weak_ref, grpc_schedule_on_exec_ctx);
137 }
138
139 // Make weak_ref->server_shutdown a shutdown_starting cb on server.
140 // grpc_tcp_server promises that the server object will live until
141 // weak_ref->server_shutdown has returned. A strong ref on grpc_tcp_server
142 // should be held until server_weak_ref_set() returns to avoid a race where the
143 // server is deleted before the shutdown_starting cb is added.
server_weak_ref_set(server_weak_ref * weak_ref,grpc_tcp_server * server)144 static void server_weak_ref_set(server_weak_ref* weak_ref,
145 grpc_tcp_server* server) {
146 grpc_tcp_server_shutdown_starting_add(server, &weak_ref->server_shutdown);
147 weak_ref->server = server;
148 }
149
test_addr_init_str(test_addr * addr)150 static void test_addr_init_str(test_addr* addr) {
151 std::string str = grpc_sockaddr_to_string(&addr->addr, false).value();
152 size_t str_len = std::min(str.size(), sizeof(addr->str) - 1);
153 memcpy(addr->str, str.c_str(), str_len);
154 addr->str[str_len] = '\0';
155 }
156
on_connect(void *,grpc_endpoint * tcp,grpc_pollset *,grpc_tcp_server_acceptor * acceptor)157 static void on_connect(void* /*arg*/, grpc_endpoint* tcp,
158 grpc_pollset* /*pollset*/,
159 grpc_tcp_server_acceptor* acceptor) {
160 grpc_endpoint_destroy(tcp);
161
162 on_connect_result temp_result;
163 on_connect_result_set(&temp_result, acceptor);
164 gpr_free(acceptor);
165
166 gpr_mu_lock(g_mu);
167 g_result = temp_result;
168 g_nconnects++;
169 ASSERT_TRUE(
170 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
171 gpr_mu_unlock(g_mu);
172 }
173
test_no_op(void)174 static void test_no_op(void) {
175 grpc_core::ExecCtx exec_ctx;
176 grpc_tcp_server* s;
177 auto args = grpc_core::CoreConfiguration::Get()
178 .channel_args_preconditioning()
179 .PreconditionChannelArgs(nullptr);
180 ASSERT_EQ(
181 absl::OkStatus(),
182 grpc_tcp_server_create(
183 nullptr,
184 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
185 on_connect, nullptr, &s));
186 grpc_tcp_server_unref(s);
187 }
188
test_no_op_with_start(void)189 static void test_no_op_with_start(void) {
190 grpc_core::ExecCtx exec_ctx;
191 grpc_tcp_server* s;
192 auto args = grpc_core::CoreConfiguration::Get()
193 .channel_args_preconditioning()
194 .PreconditionChannelArgs(nullptr);
195 ASSERT_EQ(
196 absl::OkStatus(),
197 grpc_tcp_server_create(
198 nullptr,
199 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
200 on_connect, nullptr, &s));
201 LOG_TEST("test_no_op_with_start");
202 std::vector<grpc_pollset*> empty_pollset;
203 grpc_tcp_server_start(s, &empty_pollset);
204 grpc_tcp_server_unref(s);
205 }
206
test_no_op_with_port(void)207 static void test_no_op_with_port(void) {
208 grpc_core::ExecCtx exec_ctx;
209 grpc_resolved_address resolved_addr;
210 struct sockaddr_in* addr =
211 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
212 grpc_tcp_server* s;
213 auto args = grpc_core::CoreConfiguration::Get()
214 .channel_args_preconditioning()
215 .PreconditionChannelArgs(nullptr);
216 ASSERT_EQ(
217 absl::OkStatus(),
218 grpc_tcp_server_create(
219 nullptr,
220 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
221 on_connect, nullptr, &s));
222 LOG_TEST("test_no_op_with_port");
223
224 memset(&resolved_addr, 0, sizeof(resolved_addr));
225 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
226 addr->sin_family = AF_INET;
227 int port = -1;
228 ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &port),
229 absl::OkStatus());
230 ASSERT_GT(port, 0);
231
232 grpc_tcp_server_unref(s);
233 }
234
test_no_op_with_port_and_start(void)235 static void test_no_op_with_port_and_start(void) {
236 grpc_core::ExecCtx exec_ctx;
237 grpc_resolved_address resolved_addr;
238 struct sockaddr_in* addr =
239 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
240 grpc_tcp_server* s;
241 auto args = grpc_core::CoreConfiguration::Get()
242 .channel_args_preconditioning()
243 .PreconditionChannelArgs(nullptr);
244 ASSERT_EQ(
245 absl::OkStatus(),
246 grpc_tcp_server_create(
247 nullptr,
248 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
249 on_connect, nullptr, &s));
250 LOG_TEST("test_no_op_with_port_and_start");
251 int port = -1;
252
253 memset(&resolved_addr, 0, sizeof(resolved_addr));
254 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
255 addr->sin_family = AF_INET;
256 ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &port),
257 absl::OkStatus());
258 ASSERT_GT(port, 0);
259
260 std::vector<grpc_pollset*> empty_pollset;
261 grpc_tcp_server_start(s, &empty_pollset);
262
263 grpc_tcp_server_unref(s);
264 }
265
tcp_connect(const test_addr * remote,on_connect_result * result)266 static grpc_error_handle tcp_connect(const test_addr* remote,
267 on_connect_result* result) {
268 grpc_core::Timestamp deadline = grpc_core::Timestamp::FromTimespecRoundUp(
269 grpc_timeout_seconds_to_deadline(10));
270 int clifd;
271 int nconnects_before;
272 const struct sockaddr* remote_addr =
273 reinterpret_cast<const struct sockaddr*>(remote->addr.addr);
274
275 LOG(INFO) << "Connecting to " << remote->str;
276 gpr_mu_lock(g_mu);
277 nconnects_before = g_nconnects;
278 on_connect_result_init(&g_result);
279 clifd = socket(remote_addr->sa_family, SOCK_STREAM, 0);
280 if (clifd < 0) {
281 gpr_mu_unlock(g_mu);
282 return GRPC_OS_ERROR(errno, "Failed to create socket");
283 }
284 VLOG(2) << "start connect to " << remote->str;
285 if (connect(clifd, remote_addr, static_cast<socklen_t>(remote->addr.len)) !=
286 0) {
287 gpr_mu_unlock(g_mu);
288 close(clifd);
289 return GRPC_OS_ERROR(errno, "connect");
290 }
291 VLOG(2) << "wait";
292 while (g_nconnects == nconnects_before &&
293 deadline > grpc_core::Timestamp::Now()) {
294 grpc_pollset_worker* worker = nullptr;
295 grpc_error_handle err;
296 if ((err = grpc_pollset_work(g_pollset, &worker, deadline)) !=
297 absl::OkStatus()) {
298 gpr_mu_unlock(g_mu);
299 close(clifd);
300 return err;
301 }
302 gpr_mu_unlock(g_mu);
303
304 gpr_mu_lock(g_mu);
305 }
306 VLOG(2) << "wait done";
307 if (g_nconnects != nconnects_before + 1) {
308 gpr_mu_unlock(g_mu);
309 close(clifd);
310 return GRPC_ERROR_CREATE("Didn't connect");
311 }
312 close(clifd);
313 *result = g_result;
314
315 gpr_mu_unlock(g_mu);
316 LOG(INFO) << "Result (" << result->port_index << ", " << result->fd_index
317 << ") fd " << result->server_fd;
318 grpc_tcp_server_unref(result->server);
319 return absl::OkStatus();
320 }
321
322 // Tests a tcp server on "::" listeners with multiple ports. If channel_args is
323 // non-NULL, pass them to the server. If dst_addrs is non-NULL, use valid addrs
324 // as destination addrs (port is not set). If dst_addrs is NULL, use listener
325 // addrs as destination addrs. If test_dst_addrs is true, test connectivity with
326 // each destination address, set grpc_resolved_address::len=0 for failures, but
327 // don't fail the overall unitest.
test_connect(size_t num_connects,const grpc_channel_args * channel_args,test_addrs * dst_addrs,bool test_dst_addrs)328 static void test_connect(size_t num_connects,
329 const grpc_channel_args* channel_args,
330 test_addrs* dst_addrs, bool test_dst_addrs) {
331 grpc_core::ExecCtx exec_ctx;
332 // Use aligned_storage to allocate grpc_resolved_address objects on stack
333 // to meet the alignment requirement of sockaddr_storage type.
334 std::aligned_storage<sizeof(grpc_resolved_address),
335 alignof(sockaddr_storage)>::type resolved_addr_buffer;
336 std::aligned_storage<sizeof(grpc_resolved_address),
337 alignof(sockaddr_storage)>::type resolved_addr1_buffer;
338 grpc_resolved_address& resolved_addr =
339 *reinterpret_cast<grpc_resolved_address*>(&resolved_addr_buffer);
340 grpc_resolved_address& resolved_addr1 =
341 *reinterpret_cast<grpc_resolved_address*>(&resolved_addr1_buffer);
342 struct sockaddr_storage* const addr =
343 reinterpret_cast<struct sockaddr_storage*>(resolved_addr.addr);
344 struct sockaddr_storage* const addr1 =
345 reinterpret_cast<struct sockaddr_storage*>(resolved_addr1.addr);
346 unsigned svr_fd_count;
347 int port;
348 int svr_port;
349 unsigned svr1_fd_count;
350 int svr1_port;
351 grpc_tcp_server* s;
352 const unsigned num_ports = 2;
353 auto new_channel_args = grpc_core::CoreConfiguration::Get()
354 .channel_args_preconditioning()
355 .PreconditionChannelArgs(channel_args);
356 ASSERT_EQ(absl::OkStatus(),
357 grpc_tcp_server_create(
358 nullptr,
359 grpc_event_engine::experimental::ChannelArgsEndpointConfig(
360 new_channel_args),
361 on_connect, nullptr, &s));
362 unsigned port_num;
363 server_weak_ref weak_ref;
364 server_weak_ref_init(&weak_ref);
365 server_weak_ref_set(&weak_ref, s);
366 LOG_TEST("test_connect");
367 LOG(INFO) << "clients=" << num_connects << ", num chan args="
368 << (channel_args != nullptr ? channel_args->num_args : 0)
369 << ", remote IP=" << (dst_addrs != nullptr ? "<specific>" : "::")
370 << ", test_dst_addrs=" << test_dst_addrs;
371 memset(&resolved_addr, 0, sizeof(resolved_addr));
372 memset(&resolved_addr1, 0, sizeof(resolved_addr1));
373 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
374 resolved_addr1.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
375 addr->ss_family = addr1->ss_family = AF_INET;
376 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
377 "grpc_tcp_server_add_port",
378 grpc_tcp_server_add_port(s, &resolved_addr, &svr_port)));
379 LOG(INFO) << "Allocated port " << svr_port;
380 ASSERT_GT(svr_port, 0);
381 // Cannot use wildcard (port==0), because add_port() will try to reuse the
382 // same port as a previous add_port().
383 svr1_port = grpc_pick_unused_port_or_die();
384 ASSERT_GT(svr1_port, 0);
385 LOG(INFO) << "Picked unused port " << svr1_port;
386 grpc_sockaddr_set_port(&resolved_addr1, svr1_port);
387 ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr1, &port),
388 absl::OkStatus());
389 ASSERT_EQ(port, svr1_port);
390
391 // Bad port_index.
392 ASSERT_EQ(grpc_tcp_server_port_fd_count(s, 2), 0);
393 ASSERT_LT(grpc_tcp_server_port_fd(s, 2, 0), 0);
394
395 // Bad fd_index.
396 ASSERT_LT(grpc_tcp_server_port_fd(s, 0, 100), 0);
397 ASSERT_LT(grpc_tcp_server_port_fd(s, 1, 100), 0);
398
399 // Got at least one fd per port.
400 svr_fd_count = grpc_tcp_server_port_fd_count(s, 0);
401 ASSERT_GE(svr_fd_count, 1);
402 svr1_fd_count = grpc_tcp_server_port_fd_count(s, 1);
403 ASSERT_GE(svr1_fd_count, 1);
404
405 std::vector<grpc_pollset*> test_pollset;
406 test_pollset.push_back(g_pollset);
407 grpc_tcp_server_start(s, &test_pollset);
408
409 if (dst_addrs != nullptr) {
410 int ports[] = {svr_port, svr1_port};
411 for (port_num = 0; port_num < num_ports; ++port_num) {
412 size_t dst_idx;
413 size_t num_tested = 0;
414 for (dst_idx = 0; dst_idx < dst_addrs->naddrs; ++dst_idx) {
415 test_addr dst = dst_addrs->addrs[dst_idx];
416 on_connect_result result;
417 grpc_error_handle err;
418 if (dst.addr.len == 0) {
419 VLOG(2) << "Skipping test of non-functional local IP " << dst.str;
420 continue;
421 }
422 ASSERT_TRUE(grpc_sockaddr_set_port(&dst.addr, ports[port_num]));
423 test_addr_init_str(&dst);
424 ++num_tested;
425 on_connect_result_init(&result);
426 if ((err = tcp_connect(&dst, &result)) == absl::OkStatus() &&
427 result.server_fd >= 0 && result.server == s) {
428 continue;
429 }
430 LOG(ERROR) << "Failed to connect to " << dst.str << ": "
431 << grpc_core::StatusToString(err);
432 ASSERT_TRUE(test_dst_addrs);
433 dst_addrs->addrs[dst_idx].addr.len = 0;
434 }
435 ASSERT_GT(num_tested, 0);
436 }
437 } else {
438 for (port_num = 0; port_num < num_ports; ++port_num) {
439 const unsigned num_fds = grpc_tcp_server_port_fd_count(s, port_num);
440 unsigned fd_num;
441 for (fd_num = 0; fd_num < num_fds; ++fd_num) {
442 int fd = grpc_tcp_server_port_fd(s, port_num, fd_num);
443 size_t connect_num;
444 test_addr dst;
445 ASSERT_GE(fd, 0);
446 dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr));
447 ASSERT_EQ(getsockname(fd, (struct sockaddr*)dst.addr.addr,
448 (socklen_t*)&dst.addr.len),
449 0);
450 ASSERT_LE(dst.addr.len, sizeof(dst.addr.addr));
451 test_addr_init_str(&dst);
452 LOG(INFO) << "(" << port_num << ", " << fd_num << ") fd " << fd
453 << " family " << sock_family_name(addr->ss_family)
454 << " listening on " << dst.str;
455 for (connect_num = 0; connect_num < num_connects; ++connect_num) {
456 on_connect_result result;
457 on_connect_result_init(&result);
458 ASSERT_TRUE(
459 GRPC_LOG_IF_ERROR("tcp_connect", tcp_connect(&dst, &result)));
460 ASSERT_EQ(result.server_fd, fd);
461 ASSERT_EQ(result.port_index, port_num);
462 ASSERT_EQ(result.fd_index, fd_num);
463 ASSERT_EQ(result.server, s);
464 ASSERT_EQ(
465 grpc_tcp_server_port_fd(s, result.port_index, result.fd_index),
466 result.server_fd);
467 }
468 }
469 }
470 }
471 // Weak ref to server valid until final unref.
472 ASSERT_NE(weak_ref.server, nullptr);
473 ASSERT_GE(grpc_tcp_server_port_fd(s, 0, 0), 0);
474
475 grpc_tcp_server_unref(s);
476 grpc_core::ExecCtx::Get()->Flush();
477
478 // Weak ref lost.
479 ASSERT_EQ(weak_ref.server, nullptr);
480 }
481
pre_allocate_inet_sock(grpc_tcp_server * s,int family,int port,int * fd)482 static int pre_allocate_inet_sock(grpc_tcp_server* s, int family, int port,
483 int* fd) {
484 struct sockaddr_in6 address;
485 memset(&address, 0, sizeof(address));
486 address.sin6_family = family;
487 address.sin6_port = htons(port);
488
489 int pre_fd = socket(address.sin6_family, SOCK_STREAM, 0);
490 if (pre_fd < 0) {
491 LOG(ERROR) << "Unable to create inet socket: %m";
492 return -1;
493 }
494
495 const int enable = 1;
496 setsockopt(pre_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
497
498 int b = bind(pre_fd, reinterpret_cast<struct sockaddr*>(&address),
499 sizeof(address));
500 if (b < 0) {
501 LOG(ERROR) << "Unable to bind inet socket: %m";
502 return -1;
503 }
504
505 int l = listen(pre_fd, SOMAXCONN);
506 if (l < 0) {
507 LOG(ERROR) << "Unable to listen on inet socket: %m";
508 return -1;
509 }
510
511 grpc_tcp_server_set_pre_allocated_fd(s, pre_fd);
512 *fd = pre_fd;
513
514 return 0;
515 }
516
test_pre_allocated_inet_fd()517 static void test_pre_allocated_inet_fd() {
518 grpc_core::ExecCtx exec_ctx;
519 grpc_resolved_address resolved_addr;
520 struct sockaddr_in6* addr =
521 reinterpret_cast<struct sockaddr_in6*>(resolved_addr.addr);
522 grpc_tcp_server* s;
523 if (grpc_event_engine::experimental::UseEventEngineListener()) {
524 // TODO(vigneshbabu): Skip the test when event engine is enabled.
525 // Pre-allocated fd support will be added to event engine later.
526 return;
527 }
528 auto args = grpc_core::CoreConfiguration::Get()
529 .channel_args_preconditioning()
530 .PreconditionChannelArgs(nullptr);
531 ASSERT_EQ(
532 absl::OkStatus(),
533 grpc_tcp_server_create(
534 nullptr,
535 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
536 on_connect, nullptr, &s));
537 LOG_TEST("test_pre_allocated_inet_fd");
538
539 // Pre allocate FD
540 int pre_fd;
541 int port = grpc_pick_unused_port_or_die();
542
543 int res_pre = pre_allocate_inet_sock(s, AF_INET6, port, &pre_fd);
544 if (res_pre < 0) {
545 grpc_tcp_server_unref(s);
546 close(pre_fd);
547 return;
548 }
549 ASSERT_EQ(grpc_tcp_server_pre_allocated_fd(s), pre_fd);
550
551 // Add port
552 int pt;
553 memset(&resolved_addr, 0, sizeof(resolved_addr));
554 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
555 addr->sin6_family = AF_INET6;
556 addr->sin6_port = htons(port);
557 ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &pt), absl::OkStatus());
558 ASSERT_GE(grpc_tcp_server_port_fd_count(s, 0), 1);
559 ASSERT_EQ(grpc_tcp_server_port_fd(s, 0, 0), pre_fd);
560
561 // Start server
562 std::vector<grpc_pollset*> test_pollset;
563 test_pollset.push_back(g_pollset);
564 grpc_tcp_server_start(s, &test_pollset);
565
566 // Test connection
567 test_addr dst;
568 dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr));
569 ASSERT_EQ(getsockname(pre_fd, (struct sockaddr*)dst.addr.addr,
570 (socklen_t*)&dst.addr.len),
571 0);
572 ASSERT_LE(dst.addr.len, sizeof(dst.addr.addr));
573 test_addr_init_str(&dst);
574 on_connect_result result;
575 on_connect_result_init(&result);
576 ASSERT_EQ(tcp_connect(&dst, &result), absl::OkStatus());
577 ASSERT_EQ(result.server_fd, pre_fd);
578 ASSERT_EQ(result.server, s);
579 ASSERT_EQ(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index),
580 result.server_fd);
581
582 grpc_tcp_server_unref(s);
583 close(pre_fd);
584 }
585
586 #ifdef GRPC_HAVE_UNIX_SOCKET
pre_allocate_unix_sock(grpc_tcp_server * s,const char * path,int * fd)587 static int pre_allocate_unix_sock(grpc_tcp_server* s, const char* path,
588 int* fd) {
589 struct sockaddr_un address;
590 memset(&address, 0, sizeof(struct sockaddr_un));
591 address.sun_family = AF_UNIX;
592 strcpy(address.sun_path, path);
593
594 int pre_fd = socket(address.sun_family, SOCK_STREAM, 0);
595 if (pre_fd < 0) {
596 LOG(ERROR) << "Unable to create unix socket: %m";
597 return -1;
598 }
599
600 int b = bind(pre_fd, reinterpret_cast<struct sockaddr*>(&address),
601 sizeof(address));
602 if (b < 0) {
603 LOG(ERROR) << "Unable to bind unix socket: %m";
604 return -1;
605 }
606
607 int l = listen(pre_fd, SOMAXCONN);
608 if (l < 0) {
609 LOG(ERROR) << "Unable to listen on unix socket: %m";
610 return -1;
611 }
612
613 grpc_tcp_server_set_pre_allocated_fd(s, pre_fd);
614 *fd = pre_fd;
615
616 return 0;
617 }
618
test_pre_allocated_unix_fd()619 static void test_pre_allocated_unix_fd() {
620 grpc_core::ExecCtx exec_ctx;
621 grpc_resolved_address resolved_addr;
622 struct sockaddr_un* addr =
623 reinterpret_cast<struct sockaddr_un*>(resolved_addr.addr);
624 grpc_tcp_server* s;
625 if (grpc_event_engine::experimental::UseEventEngineListener()) {
626 // TODO(vigneshbabu): Skip the test when event engine is enabled.
627 // Pre-allocated fd support will be added to event engine later.
628 return;
629 }
630 auto args = grpc_core::CoreConfiguration::Get()
631 .channel_args_preconditioning()
632 .PreconditionChannelArgs(nullptr);
633 ASSERT_EQ(
634 absl::OkStatus(),
635 grpc_tcp_server_create(
636 nullptr,
637 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
638 on_connect, nullptr, &s));
639 LOG_TEST("test_pre_allocated_unix_fd");
640
641 // Pre allocate FD
642 int pre_fd;
643 char path[100];
644 srand(time(nullptr));
645 memset(path, 0, sizeof(path));
646 sprintf(path, "/tmp/pre_fd_test_%d", rand());
647
648 int res_pre = pre_allocate_unix_sock(s, path, &pre_fd);
649 if (res_pre < 0) {
650 grpc_tcp_server_unref(s);
651 close(pre_fd);
652 unlink(path);
653 return;
654 }
655
656 ASSERT_EQ(grpc_tcp_server_pre_allocated_fd(s), pre_fd);
657
658 // Add port
659 int pt;
660 memset(&resolved_addr, 0, sizeof(resolved_addr));
661 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_un));
662 addr->sun_family = AF_UNIX;
663 strcpy(addr->sun_path, path);
664 ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &pt), absl::OkStatus());
665 ASSERT_GE(grpc_tcp_server_port_fd_count(s, 0), 1);
666 ASSERT_EQ(grpc_tcp_server_port_fd(s, 0, 0), pre_fd);
667
668 // Start server
669 std::vector<grpc_pollset*> test_pollset;
670 test_pollset.push_back(g_pollset);
671 grpc_tcp_server_start(s, &test_pollset);
672
673 // Test connection
674 test_addr dst;
675 dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr));
676 ASSERT_EQ(getsockname(pre_fd, (struct sockaddr*)dst.addr.addr,
677 (socklen_t*)&dst.addr.len),
678 0);
679 ASSERT_LE(dst.addr.len, sizeof(dst.addr.addr));
680 test_addr_init_str(&dst);
681 on_connect_result result;
682 on_connect_result_init(&result);
683
684 grpc_error_handle res_conn = tcp_connect(&dst, &result);
685 // If the path no longer exists, errno is 2. This can happen when
686 // runninig the test multiple times in parallel. Do not fail the test
687 if (absl::IsUnknown(res_conn) && res_conn.raw_code() == 2) {
688 LOG(ERROR)
689 << "Unable to test pre_allocated unix socket: path does not exist";
690 grpc_tcp_server_unref(s);
691 close(pre_fd);
692 return;
693 }
694
695 ASSERT_EQ(res_conn, absl::OkStatus());
696 ASSERT_EQ(result.server_fd, pre_fd);
697 ASSERT_EQ(result.server, s);
698 ASSERT_EQ(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index),
699 result.server_fd);
700
701 grpc_tcp_server_unref(s);
702 close(pre_fd);
703 unlink(path);
704 }
705 #endif // GRPC_HAVE_UNIX_SOCKET
706
destroy_pollset(void * p,grpc_error_handle)707 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
708 grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
709 }
710
TEST(TcpServerPosixTest,MainTest)711 TEST(TcpServerPosixTest, MainTest) {
712 grpc_closure destroyed;
713 grpc_arg chan_args[1];
714 chan_args[0].type = GRPC_ARG_INTEGER;
715 chan_args[0].key = const_cast<char*>(GRPC_ARG_EXPAND_WILDCARD_ADDRS);
716 chan_args[0].value.integer = 1;
717 const grpc_channel_args channel_args = {1, chan_args};
718 struct ifaddrs* ifa = nullptr;
719 struct ifaddrs* ifa_it;
720 // Zalloc dst_addrs to avoid oversized frames.
721 test_addrs* dst_addrs = grpc_core::Zalloc<test_addrs>();
722 grpc_init();
723 // wait a few seconds to make sure IPv6 link-local addresses can be bound
724 // if we are running under docker container that has just started.
725 // See https://github.com/moby/moby/issues/38491
726 // See https://github.com/grpc/grpc/issues/15610
727 gpr_sleep_until(grpc_timeout_seconds_to_deadline(4));
728 {
729 grpc_core::ExecCtx exec_ctx;
730 g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
731 grpc_pollset_init(g_pollset, &g_mu);
732
733 test_no_op();
734 test_no_op_with_start();
735 test_no_op_with_port();
736 test_no_op_with_port_and_start();
737 test_pre_allocated_inet_fd();
738 #ifdef GRPC_HAVE_UNIX_SOCKET
739 test_pre_allocated_unix_fd();
740 #endif
741
742 if (getifaddrs(&ifa) != 0 || ifa == nullptr) {
743 FAIL() << "getifaddrs: " << grpc_core::StrError(errno);
744 }
745 dst_addrs->naddrs = 0;
746 for (ifa_it = ifa; ifa_it != nullptr && dst_addrs->naddrs < MAX_ADDRS;
747 ifa_it = ifa_it->ifa_next) {
748 if (ifa_it->ifa_addr == nullptr) {
749 continue;
750 } else if (ifa_it->ifa_addr->sa_family == AF_INET) {
751 dst_addrs->addrs[dst_addrs->naddrs].addr.len =
752 static_cast<socklen_t>(sizeof(struct sockaddr_in));
753 } else if (ifa_it->ifa_addr->sa_family == AF_INET6) {
754 dst_addrs->addrs[dst_addrs->naddrs].addr.len =
755 static_cast<socklen_t>(sizeof(struct sockaddr_in6));
756 } else {
757 continue;
758 }
759 memcpy(dst_addrs->addrs[dst_addrs->naddrs].addr.addr, ifa_it->ifa_addr,
760 dst_addrs->addrs[dst_addrs->naddrs].addr.len);
761 ASSERT_TRUE(
762 grpc_sockaddr_set_port(&dst_addrs->addrs[dst_addrs->naddrs].addr, 0));
763 test_addr_init_str(&dst_addrs->addrs[dst_addrs->naddrs]);
764 ++dst_addrs->naddrs;
765 }
766 freeifaddrs(ifa);
767 ifa = nullptr;
768
769 // Connect to same addresses as listeners.
770 test_connect(1, nullptr, nullptr, false);
771 test_connect(10, nullptr, nullptr, false);
772
773 // Set dst_addrs->addrs[i].len=0 for dst_addrs that are unreachable with a
774 // "::" listener.
775 test_connect(1, nullptr, dst_addrs, true);
776
777 // Test connect(2) with dst_addrs.
778 test_connect(1, &channel_args, dst_addrs, false);
779 // Test connect(2) with dst_addrs.
780 test_connect(10, &channel_args, dst_addrs, false);
781
782 GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
783 grpc_schedule_on_exec_ctx);
784 grpc_pollset_shutdown(g_pollset, &destroyed);
785 }
786 grpc_shutdown();
787 gpr_free(dst_addrs);
788 gpr_free(g_pollset);
789 }
790
791 #endif // GRPC_POSIX_SOCKET_SERVER
792
main(int argc,char ** argv)793 int main(int argc, char** argv) {
794 grpc::testing::TestEnvironment env(&argc, argv);
795 ::testing::InitGoogleTest(&argc, argv);
796 return RUN_ALL_TESTS();
797 }
798