From 1e86ca5834b94cae7d5e6d219056c0fc895cf95d Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Wed, 12 Jul 2023 18:42:09 -0700 Subject: [PATCH] [backport][iomgr][EventEngine] Improve server handling of file descriptor exhaustion (#33672) Backport of #33656 --- src/core/lib/iomgr/tcp_server_posix.cc | 46 ++++++++++++++----- src/core/lib/iomgr/tcp_server_utils_posix.h | 13 +++++ .../iomgr/tcp_server_utils_posix_common.cc | 21 ++++++++ 3 files changed, 67 insertions(+), 13 deletions(-) diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index a1db16d916..6804928fe3 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -16,13 +16,17 @@ * */ +#include + +#include + +#include + /* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */ #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif -#include - #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET_TCP_SERVER @@ -45,6 +49,7 @@ #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" +#include #include #include #include @@ -350,21 +357,35 @@ static void on_read(void* arg, grpc_error_handle err) { if (fd < 0) { if (errno == EINTR) { continue; - } else if (errno == EAGAIN || errno == ECONNABORTED || - errno == EWOULDBLOCK) { + } + // When the process runs out of fds, accept4() returns EMFILE. When this + // happens, the connection is left in the accept queue until either a + // read event triggers the on_read callback, or time has passed and the + // accept should be re-tried regardless. This callback is not cancelled, + // so a spurious wakeup may occur even when there's nothing to accept. + // This is not a performant code path, but if an fd limit has been + // reached, the system is likely in an unhappy state regardless. + if (errno == EMFILE) { + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + if (gpr_atm_full_xchg(&sp->retry_timer_armed, true)) return; + grpc_timer_init(&sp->retry_timer, + grpc_core::ExecCtx::Get()->Now() + 1 * GPR_MS_PER_SEC, + &sp->retry_closure); + return; + } + if (errno == EAGAIN || errno == ECONNABORTED || errno == EWOULDBLOCK) { grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); return; + } + gpr_mu_lock(&sp->server->mu); + if (!sp->server->shutdown_listeners) { + gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); } else { - gpr_mu_lock(&sp->server->mu); - if (!sp->server->shutdown_listeners) { - gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); - } else { - /* if we have shutdown listeners, accept4 could fail, and we - needn't notify users */ - } - gpr_mu_unlock(&sp->server->mu); - goto error; + // if we have shutdown listeners, accept4 could fail, and we + // needn't notify users } + gpr_mu_unlock(&sp->server->mu); + goto error; } /* For UNIX sockets, the accept call might not fill up the member sun_path @@ -558,6 +581,7 @@ static grpc_error_handle clone_port(grpc_tcp_listener* listener, sp->port_index = listener->port_index; sp->fd_index = listener->fd_index + count - i; GPR_ASSERT(sp->emfd); + grpc_tcp_server_listener_initialize_retry_timer(sp); while (listener->server->tail->next != nullptr) { listener->server->tail = listener->server->tail->next; } @@ -791,6 +815,7 @@ static void tcp_server_shutdown_listeners(grpc_tcp_server* s) { if (s->active_ports) { grpc_tcp_listener* sp; for (sp = s->head; sp; sp = sp->next) { + grpc_timer_cancel(&sp->retry_timer); grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown")); } diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h index 26cef0209f..de5a888cff 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -30,6 +30,7 @@ #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_server.h" +#include "src/core/lib/iomgr/timer.h" /* one listening port */ typedef struct grpc_tcp_listener { @@ -52,6 +53,11 @@ typedef struct grpc_tcp_listener { identified while iterating through 'next'. */ struct grpc_tcp_listener* sibling; int is_sibling; + // If an accept4() call fails, a timer is started to drain the accept queue in + // case no further connection attempts reach the gRPC server. + grpc_closure retry_closure; + grpc_timer retry_timer; + gpr_atm retry_timer_armed; } grpc_tcp_listener; /* the overall server */ @@ -139,4 +145,10 @@ grpc_error_handle grpc_tcp_server_prepare_socket( /* Ruturn true if the platform supports ifaddrs */ bool grpc_tcp_server_have_ifaddrs(void); +// Initialize (but don't start) the timer and callback to retry accept4() on a +// listening socket after file descriptors have been exhausted. This must be +// called when creating a new listener. +void grpc_tcp_server_listener_initialize_retry_timer( + grpc_tcp_listener* listener); + #endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index 574fd02d0d..a32f542c4a 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -18,6 +18,8 @@ #include +#include + #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON @@ -81,6 +83,24 @@ static int get_max_accept_queue_size(void) { return s_max_accept_queue_size; } +static void listener_retry_timer_cb(void* arg, grpc_error_handle err) { + // Do nothing if cancelled. + if (err != GRPC_ERROR_NONE) return; + grpc_tcp_listener* listener = static_cast(arg); + gpr_atm_no_barrier_store(&listener->retry_timer_armed, false); + if (!grpc_fd_is_shutdown(listener->emfd)) { + grpc_fd_set_readable(listener->emfd); + } +} + +void grpc_tcp_server_listener_initialize_retry_timer( + grpc_tcp_listener* listener) { + gpr_atm_no_barrier_store(&listener->retry_timer_armed, false); + grpc_timer_init_unset(&listener->retry_timer); + GRPC_CLOSURE_INIT(&listener->retry_closure, listener_retry_timer_cb, listener, + grpc_schedule_on_exec_ctx); +} + static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd, const grpc_resolved_address* addr, unsigned port_index, @@ -112,6 +132,7 @@ static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd, sp->server = s; sp->fd = fd; sp->emfd = grpc_fd_create(fd, name.c_str(), true); + grpc_tcp_server_listener_initialize_retry_timer(sp); memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); sp->port = port; sp->port_index = port_index; -- 2.33.0