1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/atm.h>
20 #include <grpc/support/port_platform.h>
21
22 #include <utility>
23
24 // FIXME: "posix" files shouldn't be depending on _GNU_SOURCE
25 #ifndef _GNU_SOURCE
26 #define _GNU_SOURCE
27 #endif
28
29 #include "src/core/lib/iomgr/port.h"
30
31 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
32
33 #include <errno.h>
34 #include <fcntl.h>
35 #include <grpc/byte_buffer.h>
36 #include <grpc/event_engine/endpoint_config.h>
37 #include <grpc/event_engine/event_engine.h>
38 #include <grpc/support/alloc.h>
39 #include <grpc/support/sync.h>
40 #include <grpc/support/time.h>
41 #include <inttypes.h>
42 #include <netinet/in.h>
43 #include <netinet/tcp.h>
44 #include <string.h>
45 #include <sys/socket.h>
46 #include <sys/stat.h>
47 #include <sys/types.h>
48 #include <unistd.h>
49
50 #include <string>
51
52 #include "absl/log/check.h"
53 #include "absl/log/log.h"
54 #include "absl/strings/str_cat.h"
55 #include "absl/strings/str_format.h"
56 #include "src/core/lib/address_utils/sockaddr_utils.h"
57 #include "src/core/lib/event_engine/default_event_engine.h"
58 #include "src/core/lib/event_engine/memory_allocator_factory.h"
59 #include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
60 #include "src/core/lib/event_engine/query_extensions.h"
61 #include "src/core/lib/event_engine/resolved_address_internal.h"
62 #include "src/core/lib/event_engine/shim.h"
63 #include "src/core/lib/iomgr/event_engine_shims/closure.h"
64 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
65 #include "src/core/lib/iomgr/exec_ctx.h"
66 #include "src/core/lib/iomgr/resolve_address.h"
67 #include "src/core/lib/iomgr/sockaddr.h"
68 #include "src/core/lib/iomgr/socket_utils_posix.h"
69 #include "src/core/lib/iomgr/systemd_utils.h"
70 #include "src/core/lib/iomgr/tcp_posix.h"
71 #include "src/core/lib/iomgr/tcp_server.h"
72 #include "src/core/lib/iomgr/tcp_server_utils_posix.h"
73 #include "src/core/lib/iomgr/unix_sockets_posix.h"
74 #include "src/core/lib/iomgr/vsock.h"
75 #include "src/core/lib/transport/error_utils.h"
76 #include "src/core/util/strerror.h"
77
78 static std::atomic<int64_t> num_dropped_connections{0};
79 static constexpr grpc_core::Duration kRetryAcceptWaitTime{
80 grpc_core::Duration::Seconds(1)};
81
82 using ::grpc_event_engine::experimental::EndpointConfig;
83 using ::grpc_event_engine::experimental::EventEngine;
84 using ::grpc_event_engine::experimental::MemoryAllocator;
85 using ::grpc_event_engine::experimental::MemoryQuotaBasedMemoryAllocatorFactory;
86 using ::grpc_event_engine::experimental::PosixEventEngineWithFdSupport;
87 using ::grpc_event_engine::experimental::SliceBuffer;
88
finish_shutdown(grpc_tcp_server * s)89 static void finish_shutdown(grpc_tcp_server* s) {
90 gpr_mu_lock(&s->mu);
91 CHECK(s->shutdown);
92 gpr_mu_unlock(&s->mu);
93 if (s->shutdown_complete != nullptr) {
94 grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
95 absl::OkStatus());
96 }
97 gpr_mu_destroy(&s->mu);
98 while (s->head) {
99 grpc_tcp_listener* sp = s->head;
100 s->head = sp->next;
101 gpr_free(sp);
102 }
103 delete s->fd_handler;
104 delete s;
105 }
106
CreateEventEngineListener(grpc_tcp_server * s,grpc_closure * shutdown_complete,const EndpointConfig & config,grpc_tcp_server ** server)107 static grpc_error_handle CreateEventEngineListener(
108 grpc_tcp_server* s, grpc_closure* shutdown_complete,
109 const EndpointConfig& config, grpc_tcp_server** server) {
110 absl::StatusOr<std::unique_ptr<EventEngine::Listener>> listener;
111 auto* engine = reinterpret_cast<EventEngine*>(
112 config.GetVoidPointer(GRPC_INTERNAL_ARG_EVENT_ENGINE));
113 // Keeps the engine alive for some tests that have not otherwise
114 // instantiated an EventEngine
115 std::shared_ptr<EventEngine> keeper;
116 if (engine == nullptr) {
117 keeper = grpc_event_engine::experimental::GetDefaultEventEngine();
118 engine = keeper.get();
119 }
120 auto* event_engine_supports_fd =
121 grpc_event_engine::experimental::QueryExtension<
122 grpc_event_engine::experimental::EventEngineSupportsFdExtension>(
123 engine);
124 if (event_engine_supports_fd != nullptr) {
125 PosixEventEngineWithFdSupport::PosixAcceptCallback accept_cb =
126 [s](int listener_fd, std::unique_ptr<EventEngine::Endpoint> ep,
127 bool is_external, MemoryAllocator /*allocator*/,
128 SliceBuffer* pending_data) {
129 grpc_core::ApplicationCallbackExecCtx app_ctx;
130 grpc_core::ExecCtx exec_ctx;
131 grpc_pollset* read_notifier_pollset;
132 grpc_tcp_server_acceptor* acceptor;
133 void* cb_arg;
134 // Scoped for server lock, to ensure it's released before the callback
135 // is called.
136 {
137 grpc_core::MutexLockForGprMu lock(&s->mu);
138 if (s->shutdown) {
139 return;
140 }
141 cb_arg = s->on_accept_cb_arg;
142 acceptor = static_cast<grpc_tcp_server_acceptor*>(
143 gpr_malloc(sizeof(*acceptor)));
144 acceptor->from_server = s;
145 acceptor->port_index = -1;
146 acceptor->fd_index = -1;
147 if (!is_external) {
148 auto it = s->listen_fd_to_index_map.find(listener_fd);
149 if (it != s->listen_fd_to_index_map.end()) {
150 acceptor->port_index = std::get<0>(it->second);
151 acceptor->fd_index = std::get<1>(it->second);
152 }
153 } else {
154 // External connection handling.
155 grpc_resolved_address addr;
156 memset(&addr, 0, sizeof(addr));
157 addr.len =
158 static_cast<socklen_t>(sizeof(struct sockaddr_storage));
159 // Get the fd of the socket connected to peer.
160 int fd =
161 reinterpret_cast<
162 grpc_event_engine::experimental::PosixEndpoint*>(ep.get())
163 ->GetWrappedFd();
164 if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
165 &(addr.len)) < 0) {
166 LOG(ERROR) << "Failed getpeername: "
167 << grpc_core::StrError(errno);
168 close(fd);
169 return;
170 }
171 (void)grpc_set_socket_no_sigpipe_if_possible(fd);
172 auto addr_uri = grpc_sockaddr_to_uri(&addr);
173 if (!addr_uri.ok()) {
174 LOG(ERROR) << "Invalid address: "
175 << addr_uri.status().ToString();
176 return;
177 }
178 GRPC_TRACE_LOG(tcp, INFO) << "SERVER_CONNECT: incoming external "
179 "connection: "
180 << addr_uri->c_str();
181 }
182 read_notifier_pollset =
183 (*(s->pollsets))[static_cast<size_t>(
184 gpr_atm_no_barrier_fetch_add(
185 &s->next_pollset_to_assign, 1)) %
186 s->pollsets->size()];
187 acceptor->external_connection = is_external;
188 acceptor->listener_fd = listener_fd;
189 grpc_byte_buffer* buf = nullptr;
190 if (pending_data != nullptr && pending_data->Length() > 0) {
191 buf = grpc_raw_byte_buffer_create(nullptr, 0);
192 grpc_slice_buffer_swap(&buf->data.raw.slice_buffer,
193 pending_data->c_slice_buffer());
194 pending_data->Clear();
195 }
196 acceptor->pending_data = buf;
197 }
198 s->on_accept_cb(cb_arg,
199 grpc_event_engine::experimental::
200 grpc_event_engine_endpoint_create(std::move(ep)),
201 read_notifier_pollset, acceptor);
202 };
203 listener = event_engine_supports_fd->CreatePosixListener(
204 std::move(accept_cb),
205 [s, shutdown_complete](absl::Status status) {
206 grpc_event_engine::experimental::RunEventEngineClosure(
207 shutdown_complete, absl_status_to_grpc_error(status));
208 finish_shutdown(s);
209 },
210 config,
211 std::make_unique<MemoryQuotaBasedMemoryAllocatorFactory>(
212 s->memory_quota));
213 } else {
214 EventEngine::Listener::AcceptCallback accept_cb =
215 [s](std::unique_ptr<EventEngine::Endpoint> ep, MemoryAllocator) {
216 grpc_core::ApplicationCallbackExecCtx app_ctx;
217 grpc_core::ExecCtx exec_ctx;
218 void* cb_arg;
219 {
220 grpc_core::MutexLockForGprMu lock(&s->mu);
221 if (s->shutdown) {
222 return;
223 }
224 cb_arg = s->on_accept_cb_arg;
225 }
226 s->on_accept_cb(cb_arg,
227 grpc_event_engine::experimental::
228 grpc_event_engine_endpoint_create(std::move(ep)),
229 nullptr, nullptr);
230 };
231 listener = engine->CreateListener(
232 std::move(accept_cb),
233 [s, ee = keeper, shutdown_complete](absl::Status status) {
234 CHECK_EQ(gpr_atm_no_barrier_load(&s->refs.count), 0);
235 grpc_event_engine::experimental::RunEventEngineClosure(
236 shutdown_complete, absl_status_to_grpc_error(status));
237 finish_shutdown(s);
238 },
239 config,
240 std::make_unique<MemoryQuotaBasedMemoryAllocatorFactory>(
241 s->memory_quota));
242 }
243 if (!listener.ok()) {
244 delete s;
245 *server = nullptr;
246 return listener.status();
247 }
248 s->ee_listener = std::move(*listener);
249 return absl::OkStatus();
250 }
251
tcp_server_create(grpc_closure * shutdown_complete,const EndpointConfig & config,grpc_tcp_server_cb on_accept_cb,void * on_accept_cb_arg,grpc_tcp_server ** server)252 static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
253 const EndpointConfig& config,
254 grpc_tcp_server_cb on_accept_cb,
255 void* on_accept_cb_arg,
256 grpc_tcp_server** server) {
257 grpc_tcp_server* s = new grpc_tcp_server;
258 s->so_reuseport = grpc_is_socket_reuse_port_supported();
259 s->expand_wildcard_addrs = false;
260 auto value = config.GetInt(GRPC_ARG_ALLOW_REUSEPORT);
261 if (value.has_value()) {
262 s->so_reuseport = (grpc_is_socket_reuse_port_supported() && *value != 0);
263 }
264 value = config.GetInt(GRPC_ARG_EXPAND_WILDCARD_ADDRS);
265 if (value.has_value()) {
266 s->expand_wildcard_addrs = (*value != 0);
267 }
268 gpr_ref_init(&s->refs, 1);
269 gpr_mu_init(&s->mu);
270 s->active_ports = 0;
271 s->destroyed_ports = 0;
272 s->shutdown = false;
273 s->shutdown_starting.head = nullptr;
274 s->shutdown_starting.tail = nullptr;
275 if (!grpc_event_engine::experimental::UseEventEngineListener()) {
276 s->shutdown_complete = shutdown_complete;
277 } else {
278 s->shutdown_complete = nullptr;
279 }
280 s->on_accept_cb = on_accept_cb;
281 s->on_accept_cb_arg = on_accept_cb_arg;
282 s->head = nullptr;
283 s->tail = nullptr;
284 s->nports = 0;
285 s->options = ::TcpOptionsFromEndpointConfig(config);
286 s->fd_handler = nullptr;
287 CHECK(s->options.resource_quota != nullptr);
288 CHECK(s->on_accept_cb);
289 s->memory_quota = s->options.resource_quota->memory_quota();
290 s->pre_allocated_fd = -1;
291 gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
292 s->n_bind_ports = 0;
293 new (&s->listen_fd_to_index_map)
294 absl::flat_hash_map<int, std::tuple<int, int>>();
295 *server = s;
296 if (grpc_event_engine::experimental::UseEventEngineListener()) {
297 return CreateEventEngineListener(s, shutdown_complete, config, server);
298 }
299 return absl::OkStatus();
300 }
301
destroyed_port(void * server,grpc_error_handle)302 static void destroyed_port(void* server, grpc_error_handle /*error*/) {
303 grpc_tcp_server* s = static_cast<grpc_tcp_server*>(server);
304 gpr_mu_lock(&s->mu);
305 s->destroyed_ports++;
306 if (s->destroyed_ports == s->nports) {
307 gpr_mu_unlock(&s->mu);
308 finish_shutdown(s);
309 } else {
310 CHECK(s->destroyed_ports < s->nports);
311 gpr_mu_unlock(&s->mu);
312 }
313 }
314
315 // called when all listening endpoints have been shutdown, so no further
316 // events will be received on them - at this point it's safe to destroy
317 // things
deactivated_all_ports(grpc_tcp_server * s)318 static void deactivated_all_ports(grpc_tcp_server* s) {
319 // delete ALL the things
320 gpr_mu_lock(&s->mu);
321
322 CHECK(s->shutdown);
323
324 if (s->head) {
325 grpc_tcp_listener* sp;
326 for (sp = s->head; sp; sp = sp->next) {
327 // Do not unlink if there is a pre-allocated FD
328 if (grpc_tcp_server_pre_allocated_fd(s) <= 0) {
329 grpc_unlink_if_unix_domain_socket(&sp->addr);
330 }
331 GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
332 grpc_schedule_on_exec_ctx);
333 grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
334 "tcp_listener_shutdown");
335 }
336 gpr_mu_unlock(&s->mu);
337 } else {
338 gpr_mu_unlock(&s->mu);
339 if (grpc_event_engine::experimental::UseEventEngineListener()) {
340 // This will trigger asynchronous execution of the on_shutdown_complete
341 // callback when appropriate. That callback will delete the server.
342 s->ee_listener.reset();
343 } else {
344 finish_shutdown(s);
345 }
346 }
347 }
348
tcp_server_destroy(grpc_tcp_server * s)349 static void tcp_server_destroy(grpc_tcp_server* s) {
350 gpr_mu_lock(&s->mu);
351 CHECK(!s->shutdown);
352 s->shutdown = true;
353 // shutdown all fd's
354 if (s->active_ports) {
355 grpc_tcp_listener* sp;
356 for (sp = s->head; sp; sp = sp->next) {
357 grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE("Server destroyed"));
358 }
359 gpr_mu_unlock(&s->mu);
360 } else {
361 gpr_mu_unlock(&s->mu);
362 deactivated_all_ports(s);
363 }
364 }
365
366 // event manager callback when reads are ready
on_read(void * arg,grpc_error_handle err)367 static void on_read(void* arg, grpc_error_handle err) {
368 grpc_tcp_listener* sp = static_cast<grpc_tcp_listener*>(arg);
369 grpc_pollset* read_notifier_pollset;
370 if (!err.ok()) {
371 goto error;
372 }
373
374 // loop until accept4 returns EAGAIN, and then re-arm notification
375 for (;;) {
376 grpc_resolved_address addr;
377 memset(&addr, 0, sizeof(addr));
378 addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
379 // Note: If we ever decide to return this address to the user, remember to
380 // strip off the ::ffff:0.0.0.0/96 prefix first.
381 int fd = grpc_accept4(sp->fd, &addr, 1, 1);
382 if (fd < 0) {
383 if (errno == EINTR) {
384 continue;
385 }
386 // When the process runs out of fds, accept4() returns EMFILE. When this
387 // happens, the connection is left in the accept queue until either a
388 // read event triggers the on_read callback, or time has passed and the
389 // accept should be re-tried regardless. This callback is not cancelled,
390 // so a spurious wakeup may occur even when there's nothing to accept.
391 // This is not a performant code path, but if an fd limit has been
392 // reached, the system is likely in an unhappy state regardless.
393 if (errno == EMFILE) {
394 LOG_EVERY_N_SEC(ERROR, 1) << "File descriptor limit reached. Retrying.";
395 grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
396 if (gpr_atm_full_xchg(&sp->retry_timer_armed, true)) return;
397 grpc_timer_init(&sp->retry_timer,
398 grpc_core::Timestamp::Now() + kRetryAcceptWaitTime,
399 &sp->retry_closure);
400 return;
401 }
402 if (errno == EAGAIN || errno == ECONNABORTED || errno == EWOULDBLOCK) {
403 grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
404 return;
405 }
406 gpr_mu_lock(&sp->server->mu);
407 if (!sp->server->shutdown_listeners) {
408 LOG(ERROR) << "Failed accept4: " << grpc_core::StrError(errno);
409 } else {
410 // if we have shutdown listeners, accept4 could fail, and we
411 // needn't notify users
412 }
413 gpr_mu_unlock(&sp->server->mu);
414 goto error;
415 }
416
417 if (sp->server->memory_quota->IsMemoryPressureHigh()) {
418 int64_t dropped_connections_count =
419 num_dropped_connections.fetch_add(1, std::memory_order_relaxed) + 1;
420 if (dropped_connections_count % 1000 == 1) {
421 GRPC_TRACE_LOG(tcp, INFO)
422 << "Dropped >= " << dropped_connections_count
423 << " new connection attempts due to high memory pressure";
424 }
425 close(fd);
426 continue;
427 }
428
429 // For UNIX sockets, the accept call might not fill up the member sun_path
430 // of sockaddr_un, so explicitly call getpeername to get it.
431 if (grpc_is_unix_socket(&addr)) {
432 memset(&addr, 0, sizeof(addr));
433 addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
434 if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
435 &(addr.len)) < 0) {
436 auto listener_addr_uri = grpc_sockaddr_to_uri(&sp->addr);
437 LOG(ERROR) << "Failed getpeername: " << grpc_core::StrError(errno)
438 << ". Dropping the connection, and continuing to listen on "
439 << (listener_addr_uri.ok() ? *listener_addr_uri
440 : "<unknown>")
441 << ":" << sp->port;
442 close(fd);
443 continue;
444 }
445 }
446
447 (void)grpc_set_socket_no_sigpipe_if_possible(fd);
448
449 err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_SERVER_CONNECTION_USAGE,
450 sp->server->options);
451 if (!err.ok()) {
452 goto error;
453 }
454
455 auto addr_uri = grpc_sockaddr_to_uri(&addr);
456 if (!addr_uri.ok()) {
457 LOG(ERROR) << "Invalid address: " << addr_uri.status();
458 goto error;
459 }
460 GRPC_TRACE_LOG(tcp, INFO)
461 << "SERVER_CONNECT: incoming connection: " << *addr_uri;
462
463 std::string name = absl::StrCat("tcp-server-connection:", addr_uri.value());
464 grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
465
466 read_notifier_pollset = (*(sp->server->pollsets))
467 [static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
468 &sp->server->next_pollset_to_assign, 1)) %
469 sp->server->pollsets->size()];
470
471 grpc_pollset_add_fd(read_notifier_pollset, fdobj);
472
473 // Create acceptor.
474 grpc_tcp_server_acceptor* acceptor =
475 static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
476 acceptor->from_server = sp->server;
477 acceptor->port_index = sp->port_index;
478 acceptor->fd_index = sp->fd_index;
479 acceptor->external_connection = false;
480 sp->server->on_accept_cb(
481 sp->server->on_accept_cb_arg,
482 grpc_tcp_create(fdobj, sp->server->options, addr_uri.value()),
483 read_notifier_pollset, acceptor);
484 }
485
486 GPR_UNREACHABLE_CODE(return);
487
488 error:
489 gpr_mu_lock(&sp->server->mu);
490 if (0 == --sp->server->active_ports && sp->server->shutdown) {
491 gpr_mu_unlock(&sp->server->mu);
492 deactivated_all_ports(sp->server);
493 } else {
494 gpr_mu_unlock(&sp->server->mu);
495 }
496 }
497
498 // Treat :: or 0.0.0.0 as a family-agnostic wildcard.
add_wildcard_addrs_to_server(grpc_tcp_server * s,unsigned port_index,int requested_port,int * out_port)499 static grpc_error_handle add_wildcard_addrs_to_server(grpc_tcp_server* s,
500 unsigned port_index,
501 int requested_port,
502 int* out_port) {
503 grpc_resolved_address wild4;
504 grpc_resolved_address wild6;
505 unsigned fd_index = 0;
506 grpc_dualstack_mode dsmode;
507 grpc_tcp_listener* sp = nullptr;
508 grpc_tcp_listener* sp2 = nullptr;
509 grpc_error_handle v6_err;
510 grpc_error_handle v4_err;
511 *out_port = -1;
512
513 if (grpc_tcp_server_have_ifaddrs() && s->expand_wildcard_addrs) {
514 return grpc_tcp_server_add_all_local_addrs(s, port_index, requested_port,
515 out_port);
516 }
517
518 grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
519 // Try listening on IPv6 first.
520 if ((v6_err = grpc_tcp_server_add_addr(s, &wild6, port_index, fd_index,
521 &dsmode, &sp)) == absl::OkStatus()) {
522 ++fd_index;
523 requested_port = *out_port = sp->port;
524 if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) {
525 return absl::OkStatus();
526 }
527 }
528 // If we got a v6-only socket or nothing, try adding 0.0.0.0.
529 grpc_sockaddr_set_port(&wild4, requested_port);
530 if ((v4_err = grpc_tcp_server_add_addr(s, &wild4, port_index, fd_index,
531 &dsmode, &sp2)) == absl::OkStatus()) {
532 *out_port = sp2->port;
533 if (sp != nullptr) {
534 sp2->is_sibling = 1;
535 sp->sibling = sp2;
536 }
537 }
538 if (*out_port > 0) {
539 if (!v6_err.ok()) {
540 GRPC_TRACE_LOG(tcp, INFO) << "Failed to add :: listener, "
541 << "the environment may not support IPv6: "
542 << grpc_core::StatusToString(v6_err);
543 }
544 if (!v4_err.ok()) {
545 GRPC_TRACE_LOG(tcp, INFO) << "Failed to add 0.0.0.0 listener, "
546 << "the environment may not support IPv4: "
547 << grpc_core::StatusToString(v4_err);
548 }
549 return absl::OkStatus();
550 } else {
551 grpc_error_handle root_err =
552 GRPC_ERROR_CREATE("Failed to add any wildcard listeners");
553 CHECK(!v6_err.ok());
554 CHECK(!v4_err.ok());
555 root_err = grpc_error_add_child(root_err, v6_err);
556 root_err = grpc_error_add_child(root_err, v4_err);
557 return root_err;
558 }
559 }
560
clone_port(grpc_tcp_listener * listener,unsigned count)561 static grpc_error_handle clone_port(grpc_tcp_listener* listener,
562 unsigned count) {
563 grpc_tcp_listener* sp = nullptr;
564 absl::StatusOr<std::string> addr_str;
565 grpc_error_handle err;
566
567 for (grpc_tcp_listener* l = listener->next; l && l->is_sibling; l = l->next) {
568 l->fd_index += count;
569 }
570
571 for (unsigned i = 0; i < count; i++) {
572 int fd = -1;
573 int port = -1;
574 grpc_dualstack_mode dsmode;
575 err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode,
576 &fd);
577 if (!err.ok()) return err;
578 err = grpc_tcp_server_prepare_socket(listener->server, fd, &listener->addr,
579 true, &port);
580 if (!err.ok()) return err;
581 listener->server->nports++;
582 addr_str = grpc_sockaddr_to_string(&listener->addr, true);
583 if (!addr_str.ok()) {
584 return GRPC_ERROR_CREATE(addr_str.status().ToString());
585 }
586 sp = static_cast<grpc_tcp_listener*>(gpr_malloc(sizeof(grpc_tcp_listener)));
587 sp->next = listener->next;
588 listener->next = sp;
589 // sp (the new listener) is a sibling of 'listener' (the original
590 // listener).
591 sp->is_sibling = 1;
592 sp->sibling = listener->sibling;
593 listener->sibling = sp;
594 sp->server = listener->server;
595 sp->fd = fd;
596 sp->emfd = grpc_fd_create(
597 fd,
598 absl::StrFormat("tcp-server-listener:%s/clone-%d", *addr_str, i)
599 .c_str(),
600 true);
601 memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
602 sp->port = port;
603 sp->port_index = listener->port_index;
604 sp->fd_index = listener->fd_index + count - i;
605 CHECK(sp->emfd);
606 grpc_tcp_server_listener_initialize_retry_timer(sp);
607 while (listener->server->tail->next != nullptr) {
608 listener->server->tail = listener->server->tail->next;
609 }
610 }
611
612 return absl::OkStatus();
613 }
614
tcp_server_add_port(grpc_tcp_server * s,const grpc_resolved_address * addr,int * out_port)615 static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
616 const grpc_resolved_address* addr,
617 int* out_port) {
618 if (grpc_event_engine::experimental::UseEventEngineListener()) {
619 gpr_mu_lock(&s->mu);
620 if (s->shutdown_listeners) {
621 gpr_mu_unlock(&s->mu);
622 return absl::UnknownError("Server already shutdown");
623 }
624 int fd_index = 0;
625 absl::StatusOr<int> port;
626 auto* listener_supports_fd =
627 grpc_event_engine::experimental::QueryExtension<
628 grpc_event_engine::experimental::ListenerSupportsFdExtension>(
629 s->ee_listener.get());
630 if (listener_supports_fd != nullptr) {
631 port = listener_supports_fd->BindWithFd(
632 grpc_event_engine::experimental::CreateResolvedAddress(*addr),
633 [s, &fd_index](absl::StatusOr<int> listen_fd) {
634 if (!listen_fd.ok()) {
635 return;
636 }
637 DCHECK_GT(*listen_fd, 0);
638 s->listen_fd_to_index_map.insert_or_assign(
639 *listen_fd, std::make_tuple(s->n_bind_ports, fd_index++));
640 });
641 } else {
642 port = s->ee_listener->Bind(
643 grpc_event_engine::experimental::CreateResolvedAddress(*addr));
644 }
645 if (port.ok()) {
646 s->n_bind_ports++;
647 *out_port = *port;
648 }
649 gpr_mu_unlock(&s->mu);
650 return port.status();
651 }
652 CHECK(addr->len <= GRPC_MAX_SOCKADDR_SIZE);
653 grpc_tcp_listener* sp;
654 grpc_resolved_address sockname_temp;
655 grpc_resolved_address addr6_v4mapped;
656 int requested_port = grpc_sockaddr_get_port(addr);
657 unsigned port_index = 0;
658 grpc_dualstack_mode dsmode;
659 grpc_error_handle err;
660 *out_port = -1;
661 if (s->tail != nullptr) {
662 port_index = s->tail->port_index + 1;
663 }
664
665 // Check if this is a wildcard port, and if so, try to keep the port the same
666 // as some previously created listener.
667 if (requested_port == 0) {
668 for (sp = s->head; sp; sp = sp->next) {
669 sockname_temp.len =
670 static_cast<socklen_t>(sizeof(struct sockaddr_storage));
671 if (0 ==
672 getsockname(sp->fd,
673 reinterpret_cast<grpc_sockaddr*>(&sockname_temp.addr),
674 &sockname_temp.len)) {
675 int used_port = grpc_sockaddr_get_port(&sockname_temp);
676 if (used_port > 0) {
677 memcpy(&sockname_temp, addr, sizeof(grpc_resolved_address));
678 grpc_sockaddr_set_port(&sockname_temp, used_port);
679 requested_port = used_port;
680 addr = &sockname_temp;
681 break;
682 }
683 }
684 }
685 }
686
687 /* Check if systemd has pre-allocated valid FDs */
688 set_matching_sd_fds(s, addr, requested_port);
689
690 /* Do not unlink if there are pre-allocated FDs, or it will stop
691 working after the first client connects */
692 if (grpc_tcp_server_pre_allocated_fd(s) <= 0) {
693 grpc_unlink_if_unix_domain_socket(addr);
694 }
695
696 if (grpc_sockaddr_is_wildcard(addr, &requested_port)) {
697 return add_wildcard_addrs_to_server(s, port_index, requested_port,
698 out_port);
699 }
700 if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
701 addr = &addr6_v4mapped;
702 }
703 if ((err = grpc_tcp_server_add_addr(s, addr, port_index, 0, &dsmode, &sp)) ==
704 absl::OkStatus()) {
705 *out_port = sp->port;
706 }
707 return err;
708 }
709
710 // Return listener at port_index or NULL. Should only be called with s->mu
711 // locked.
get_port_index(grpc_tcp_server * s,unsigned port_index)712 static grpc_tcp_listener* get_port_index(grpc_tcp_server* s,
713 unsigned port_index) {
714 unsigned num_ports = 0;
715 grpc_tcp_listener* sp;
716 for (sp = s->head; sp; sp = sp->next) {
717 if (!sp->is_sibling) {
718 if (++num_ports > port_index) {
719 return sp;
720 }
721 }
722 }
723 return nullptr;
724 }
725
tcp_server_port_fd_count(grpc_tcp_server * s,unsigned port_index)726 unsigned tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index) {
727 unsigned num_fds = 0;
728 gpr_mu_lock(&s->mu);
729 if (grpc_event_engine::experimental::UseEventEngineListener()) {
730 // This doesn't need to be very fast. Used in tests.
731 for (auto it = s->listen_fd_to_index_map.begin();
732 it != s->listen_fd_to_index_map.end(); it++) {
733 if (std::get<0>(it->second) == static_cast<int>(port_index)) {
734 num_fds++;
735 }
736 }
737 gpr_mu_unlock(&s->mu);
738 return num_fds;
739 }
740 grpc_tcp_listener* sp = get_port_index(s, port_index);
741 for (; sp; sp = sp->sibling) {
742 ++num_fds;
743 }
744 gpr_mu_unlock(&s->mu);
745 return num_fds;
746 }
747
tcp_server_port_fd(grpc_tcp_server * s,unsigned port_index,unsigned fd_index)748 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
749 unsigned fd_index) {
750 gpr_mu_lock(&s->mu);
751 if (grpc_event_engine::experimental::UseEventEngineListener()) {
752 // This doesn't need to be very fast. Used in tests.
753 for (auto it = s->listen_fd_to_index_map.begin();
754 it != s->listen_fd_to_index_map.end(); it++) {
755 if (std::get<0>(it->second) == static_cast<int>(port_index) &&
756 std::get<1>(it->second) == static_cast<int>(fd_index)) {
757 gpr_mu_unlock(&s->mu);
758 return it->first;
759 }
760 }
761 gpr_mu_unlock(&s->mu);
762 return -1;
763 }
764 grpc_tcp_listener* sp = get_port_index(s, port_index);
765 for (; sp; sp = sp->sibling, --fd_index) {
766 if (fd_index == 0) {
767 gpr_mu_unlock(&s->mu);
768 return sp->fd;
769 }
770 }
771 gpr_mu_unlock(&s->mu);
772 return -1;
773 }
774
tcp_server_start(grpc_tcp_server * s,const std::vector<grpc_pollset * > * pollsets)775 static void tcp_server_start(grpc_tcp_server* s,
776 const std::vector<grpc_pollset*>* pollsets) {
777 size_t i;
778 grpc_tcp_listener* sp;
779 gpr_mu_lock(&s->mu);
780 CHECK(s->on_accept_cb);
781 CHECK_EQ(s->active_ports, 0u);
782 s->pollsets = pollsets;
783 if (grpc_event_engine::experimental::UseEventEngineListener()) {
784 CHECK(!s->shutdown_listeners);
785 CHECK(GRPC_LOG_IF_ERROR("listener_start", s->ee_listener->Start()));
786 gpr_mu_unlock(&s->mu);
787 return;
788 }
789 sp = s->head;
790 while (sp != nullptr) {
791 if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr) &&
792 !grpc_is_vsock(&sp->addr) && pollsets->size() > 1) {
793 CHECK(GRPC_LOG_IF_ERROR(
794 "clone_port", clone_port(sp, (unsigned)(pollsets->size() - 1))));
795 for (i = 0; i < pollsets->size(); i++) {
796 grpc_pollset_add_fd((*pollsets)[i], sp->emfd);
797 GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
798 grpc_schedule_on_exec_ctx);
799 grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
800 s->active_ports++;
801 sp = sp->next;
802 }
803 } else {
804 for (i = 0; i < pollsets->size(); i++) {
805 grpc_pollset_add_fd((*pollsets)[i], sp->emfd);
806 }
807 GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
808 grpc_schedule_on_exec_ctx);
809 grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
810 s->active_ports++;
811 sp = sp->next;
812 }
813 }
814 gpr_mu_unlock(&s->mu);
815 }
816
tcp_server_ref(grpc_tcp_server * s)817 grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
818 gpr_ref_non_zero(&s->refs);
819 return s;
820 }
821
tcp_server_shutdown_starting_add(grpc_tcp_server * s,grpc_closure * shutdown_starting)822 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
823 grpc_closure* shutdown_starting) {
824 gpr_mu_lock(&s->mu);
825 grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
826 absl::OkStatus());
827 gpr_mu_unlock(&s->mu);
828 }
829
tcp_server_unref(grpc_tcp_server * s)830 static void tcp_server_unref(grpc_tcp_server* s) {
831 if (gpr_unref(&s->refs)) {
832 grpc_tcp_server_shutdown_listeners(s);
833 gpr_mu_lock(&s->mu);
834 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
835 gpr_mu_unlock(&s->mu);
836 tcp_server_destroy(s);
837 }
838 }
839
tcp_server_shutdown_listeners(grpc_tcp_server * s)840 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
841 gpr_mu_lock(&s->mu);
842 s->shutdown_listeners = true;
843 if (grpc_event_engine::experimental::UseEventEngineListener()) {
844 auto* listener_supports_fd =
845 grpc_event_engine::experimental::QueryExtension<
846 grpc_event_engine::experimental::ListenerSupportsFdExtension>(
847 s->ee_listener.get());
848 if (listener_supports_fd != nullptr) {
849 listener_supports_fd->ShutdownListeningFds();
850 }
851 }
852 /* shutdown all fd's */
853 if (s->active_ports) {
854 grpc_tcp_listener* sp;
855 for (sp = s->head; sp; sp = sp->next) {
856 grpc_timer_cancel(&sp->retry_timer);
857 grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE("Server shutdown"));
858 }
859 }
860 gpr_mu_unlock(&s->mu);
861 }
862
tcp_server_pre_allocated_fd(grpc_tcp_server * s)863 static int tcp_server_pre_allocated_fd(grpc_tcp_server* s) {
864 return s->pre_allocated_fd;
865 }
866
tcp_server_set_pre_allocated_fd(grpc_tcp_server * s,int fd)867 static void tcp_server_set_pre_allocated_fd(grpc_tcp_server* s, int fd) {
868 gpr_mu_lock(&s->mu);
869 s->pre_allocated_fd = fd;
870 gpr_mu_unlock(&s->mu);
871 }
872
873 namespace {
874 class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
875 public:
ExternalConnectionHandler(grpc_tcp_server * s)876 explicit ExternalConnectionHandler(grpc_tcp_server* s) : s_(s) {}
877
878 // TODO(yangg) resolve duplicate code with on_read
Handle(int listener_fd,int fd,grpc_byte_buffer * buf)879 void Handle(int listener_fd, int fd, grpc_byte_buffer* buf) override {
880 if (grpc_event_engine::experimental::UseEventEngineListener()) {
881 auto* listener_supports_fd =
882 grpc_event_engine::experimental::QueryExtension<
883 grpc_event_engine::experimental::ListenerSupportsFdExtension>(
884 s_->ee_listener.get());
885 CHECK_NE(listener_supports_fd, nullptr);
886 grpc_event_engine::experimental::SliceBuffer pending_data;
887 if (buf != nullptr) {
888 pending_data =
889 grpc_event_engine::experimental::SliceBuffer::TakeCSliceBuffer(
890 buf->data.raw.slice_buffer);
891 }
892 CHECK(GRPC_LOG_IF_ERROR("listener_handle_external_connection",
893 listener_supports_fd->HandleExternalConnection(
894 listener_fd, fd, &pending_data)));
895 return;
896 }
897 grpc_pollset* read_notifier_pollset;
898 grpc_resolved_address addr;
899 memset(&addr, 0, sizeof(addr));
900 addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
901 grpc_core::ExecCtx exec_ctx;
902
903 if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
904 &(addr.len)) < 0) {
905 LOG(ERROR) << "Failed getpeername: " << grpc_core::StrError(errno);
906 close(fd);
907 return;
908 }
909 (void)grpc_set_socket_no_sigpipe_if_possible(fd);
910 auto addr_uri = grpc_sockaddr_to_uri(&addr);
911 if (!addr_uri.ok()) {
912 LOG(ERROR) << "Invalid address: " << addr_uri.status();
913 return;
914 }
915 GRPC_TRACE_LOG(tcp, INFO)
916 << "SERVER_CONNECT: incoming external connection: " << *addr_uri;
917 std::string name = absl::StrCat("tcp-server-connection:", addr_uri.value());
918 grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
919 read_notifier_pollset =
920 (*(s_->pollsets))[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
921 &s_->next_pollset_to_assign, 1)) %
922 s_->pollsets->size()];
923 grpc_pollset_add_fd(read_notifier_pollset, fdobj);
924 grpc_tcp_server_acceptor* acceptor =
925 static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
926 acceptor->from_server = s_;
927 acceptor->port_index = -1;
928 acceptor->fd_index = -1;
929 acceptor->external_connection = true;
930 acceptor->listener_fd = listener_fd;
931 acceptor->pending_data = buf;
932 s_->on_accept_cb(s_->on_accept_cb_arg,
933 grpc_tcp_create(fdobj, s_->options, addr_uri.value()),
934 read_notifier_pollset, acceptor);
935 }
936
937 private:
938 grpc_tcp_server* s_;
939 };
940 } // namespace
941
tcp_server_create_fd_handler(grpc_tcp_server * s)942 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
943 grpc_tcp_server* s) {
944 s->fd_handler = new ExternalConnectionHandler(s);
945 return s->fd_handler;
946 }
947
948 grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
949 tcp_server_create,
950 tcp_server_start,
951 tcp_server_add_port,
952 tcp_server_create_fd_handler,
953 tcp_server_port_fd_count,
954 tcp_server_port_fd,
955 tcp_server_ref,
956 tcp_server_shutdown_starting_add,
957 tcp_server_unref,
958 tcp_server_shutdown_listeners,
959 tcp_server_pre_allocated_fd,
960 tcp_server_set_pre_allocated_fd};
961
962 #endif // GRPC_POSIX_SOCKET_TCP_SERVER
963