• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/atm.h>
20 #include <grpc/support/port_platform.h>
21 
22 #include <utility>
23 
24 // FIXME: "posix" files shouldn't be depending on _GNU_SOURCE
25 #ifndef _GNU_SOURCE
26 #define _GNU_SOURCE
27 #endif
28 
29 #include "src/core/lib/iomgr/port.h"
30 
31 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
32 
33 #include <errno.h>
34 #include <fcntl.h>
35 #include <grpc/byte_buffer.h>
36 #include <grpc/event_engine/endpoint_config.h>
37 #include <grpc/event_engine/event_engine.h>
38 #include <grpc/support/alloc.h>
39 #include <grpc/support/sync.h>
40 #include <grpc/support/time.h>
41 #include <inttypes.h>
42 #include <netinet/in.h>
43 #include <netinet/tcp.h>
44 #include <string.h>
45 #include <sys/socket.h>
46 #include <sys/stat.h>
47 #include <sys/types.h>
48 #include <unistd.h>
49 
50 #include <string>
51 
52 #include "absl/log/check.h"
53 #include "absl/log/log.h"
54 #include "absl/strings/str_cat.h"
55 #include "absl/strings/str_format.h"
56 #include "src/core/lib/address_utils/sockaddr_utils.h"
57 #include "src/core/lib/event_engine/default_event_engine.h"
58 #include "src/core/lib/event_engine/memory_allocator_factory.h"
59 #include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
60 #include "src/core/lib/event_engine/query_extensions.h"
61 #include "src/core/lib/event_engine/resolved_address_internal.h"
62 #include "src/core/lib/event_engine/shim.h"
63 #include "src/core/lib/iomgr/event_engine_shims/closure.h"
64 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
65 #include "src/core/lib/iomgr/exec_ctx.h"
66 #include "src/core/lib/iomgr/resolve_address.h"
67 #include "src/core/lib/iomgr/sockaddr.h"
68 #include "src/core/lib/iomgr/socket_utils_posix.h"
69 #include "src/core/lib/iomgr/systemd_utils.h"
70 #include "src/core/lib/iomgr/tcp_posix.h"
71 #include "src/core/lib/iomgr/tcp_server.h"
72 #include "src/core/lib/iomgr/tcp_server_utils_posix.h"
73 #include "src/core/lib/iomgr/unix_sockets_posix.h"
74 #include "src/core/lib/iomgr/vsock.h"
75 #include "src/core/lib/transport/error_utils.h"
76 #include "src/core/util/strerror.h"
77 
78 static std::atomic<int64_t> num_dropped_connections{0};
79 static constexpr grpc_core::Duration kRetryAcceptWaitTime{
80     grpc_core::Duration::Seconds(1)};
81 
82 using ::grpc_event_engine::experimental::EndpointConfig;
83 using ::grpc_event_engine::experimental::EventEngine;
84 using ::grpc_event_engine::experimental::MemoryAllocator;
85 using ::grpc_event_engine::experimental::MemoryQuotaBasedMemoryAllocatorFactory;
86 using ::grpc_event_engine::experimental::PosixEventEngineWithFdSupport;
87 using ::grpc_event_engine::experimental::SliceBuffer;
88 
finish_shutdown(grpc_tcp_server * s)89 static void finish_shutdown(grpc_tcp_server* s) {
90   gpr_mu_lock(&s->mu);
91   CHECK(s->shutdown);
92   gpr_mu_unlock(&s->mu);
93   if (s->shutdown_complete != nullptr) {
94     grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
95                             absl::OkStatus());
96   }
97   gpr_mu_destroy(&s->mu);
98   while (s->head) {
99     grpc_tcp_listener* sp = s->head;
100     s->head = sp->next;
101     gpr_free(sp);
102   }
103   delete s->fd_handler;
104   delete s;
105 }
106 
CreateEventEngineListener(grpc_tcp_server * s,grpc_closure * shutdown_complete,const EndpointConfig & config,grpc_tcp_server ** server)107 static grpc_error_handle CreateEventEngineListener(
108     grpc_tcp_server* s, grpc_closure* shutdown_complete,
109     const EndpointConfig& config, grpc_tcp_server** server) {
110   absl::StatusOr<std::unique_ptr<EventEngine::Listener>> listener;
111   auto* engine = reinterpret_cast<EventEngine*>(
112       config.GetVoidPointer(GRPC_INTERNAL_ARG_EVENT_ENGINE));
113   // Keeps the engine alive for some tests that have not otherwise
114   // instantiated an EventEngine
115   std::shared_ptr<EventEngine> keeper;
116   if (engine == nullptr) {
117     keeper = grpc_event_engine::experimental::GetDefaultEventEngine();
118     engine = keeper.get();
119   }
120   auto* event_engine_supports_fd =
121       grpc_event_engine::experimental::QueryExtension<
122           grpc_event_engine::experimental::EventEngineSupportsFdExtension>(
123           engine);
124   if (event_engine_supports_fd != nullptr) {
125     PosixEventEngineWithFdSupport::PosixAcceptCallback accept_cb =
126         [s](int listener_fd, std::unique_ptr<EventEngine::Endpoint> ep,
127             bool is_external, MemoryAllocator /*allocator*/,
128             SliceBuffer* pending_data) {
129           grpc_core::ApplicationCallbackExecCtx app_ctx;
130           grpc_core::ExecCtx exec_ctx;
131           grpc_pollset* read_notifier_pollset;
132           grpc_tcp_server_acceptor* acceptor;
133           void* cb_arg;
134           // Scoped for server lock, to ensure it's released before the callback
135           // is called.
136           {
137             grpc_core::MutexLockForGprMu lock(&s->mu);
138             if (s->shutdown) {
139               return;
140             }
141             cb_arg = s->on_accept_cb_arg;
142             acceptor = static_cast<grpc_tcp_server_acceptor*>(
143                 gpr_malloc(sizeof(*acceptor)));
144             acceptor->from_server = s;
145             acceptor->port_index = -1;
146             acceptor->fd_index = -1;
147             if (!is_external) {
148               auto it = s->listen_fd_to_index_map.find(listener_fd);
149               if (it != s->listen_fd_to_index_map.end()) {
150                 acceptor->port_index = std::get<0>(it->second);
151                 acceptor->fd_index = std::get<1>(it->second);
152               }
153             } else {
154               // External connection handling.
155               grpc_resolved_address addr;
156               memset(&addr, 0, sizeof(addr));
157               addr.len =
158                   static_cast<socklen_t>(sizeof(struct sockaddr_storage));
159               // Get the fd of the socket connected to peer.
160               int fd =
161                   reinterpret_cast<
162                       grpc_event_engine::experimental::PosixEndpoint*>(ep.get())
163                       ->GetWrappedFd();
164               if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
165                               &(addr.len)) < 0) {
166                 LOG(ERROR) << "Failed getpeername: "
167                            << grpc_core::StrError(errno);
168                 close(fd);
169                 return;
170               }
171               (void)grpc_set_socket_no_sigpipe_if_possible(fd);
172               auto addr_uri = grpc_sockaddr_to_uri(&addr);
173               if (!addr_uri.ok()) {
174                 LOG(ERROR) << "Invalid address: "
175                            << addr_uri.status().ToString();
176                 return;
177               }
178               GRPC_TRACE_LOG(tcp, INFO) << "SERVER_CONNECT: incoming external "
179                                            "connection: "
180                                         << addr_uri->c_str();
181             }
182             read_notifier_pollset =
183                 (*(s->pollsets))[static_cast<size_t>(
184                                      gpr_atm_no_barrier_fetch_add(
185                                          &s->next_pollset_to_assign, 1)) %
186                                  s->pollsets->size()];
187             acceptor->external_connection = is_external;
188             acceptor->listener_fd = listener_fd;
189             grpc_byte_buffer* buf = nullptr;
190             if (pending_data != nullptr && pending_data->Length() > 0) {
191               buf = grpc_raw_byte_buffer_create(nullptr, 0);
192               grpc_slice_buffer_swap(&buf->data.raw.slice_buffer,
193                                      pending_data->c_slice_buffer());
194               pending_data->Clear();
195             }
196             acceptor->pending_data = buf;
197           }
198           s->on_accept_cb(cb_arg,
199                           grpc_event_engine::experimental::
200                               grpc_event_engine_endpoint_create(std::move(ep)),
201                           read_notifier_pollset, acceptor);
202         };
203     listener = event_engine_supports_fd->CreatePosixListener(
204         std::move(accept_cb),
205         [s, shutdown_complete](absl::Status status) {
206           grpc_event_engine::experimental::RunEventEngineClosure(
207               shutdown_complete, absl_status_to_grpc_error(status));
208           finish_shutdown(s);
209         },
210         config,
211         std::make_unique<MemoryQuotaBasedMemoryAllocatorFactory>(
212             s->memory_quota));
213   } else {
214     EventEngine::Listener::AcceptCallback accept_cb =
215         [s](std::unique_ptr<EventEngine::Endpoint> ep, MemoryAllocator) {
216           grpc_core::ApplicationCallbackExecCtx app_ctx;
217           grpc_core::ExecCtx exec_ctx;
218           void* cb_arg;
219           {
220             grpc_core::MutexLockForGprMu lock(&s->mu);
221             if (s->shutdown) {
222               return;
223             }
224             cb_arg = s->on_accept_cb_arg;
225           }
226           s->on_accept_cb(cb_arg,
227                           grpc_event_engine::experimental::
228                               grpc_event_engine_endpoint_create(std::move(ep)),
229                           nullptr, nullptr);
230         };
231     listener = engine->CreateListener(
232         std::move(accept_cb),
233         [s, ee = keeper, shutdown_complete](absl::Status status) {
234           CHECK_EQ(gpr_atm_no_barrier_load(&s->refs.count), 0);
235           grpc_event_engine::experimental::RunEventEngineClosure(
236               shutdown_complete, absl_status_to_grpc_error(status));
237           finish_shutdown(s);
238         },
239         config,
240         std::make_unique<MemoryQuotaBasedMemoryAllocatorFactory>(
241             s->memory_quota));
242   }
243   if (!listener.ok()) {
244     delete s;
245     *server = nullptr;
246     return listener.status();
247   }
248   s->ee_listener = std::move(*listener);
249   return absl::OkStatus();
250 }
251 
tcp_server_create(grpc_closure * shutdown_complete,const EndpointConfig & config,grpc_tcp_server_cb on_accept_cb,void * on_accept_cb_arg,grpc_tcp_server ** server)252 static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
253                                            const EndpointConfig& config,
254                                            grpc_tcp_server_cb on_accept_cb,
255                                            void* on_accept_cb_arg,
256                                            grpc_tcp_server** server) {
257   grpc_tcp_server* s = new grpc_tcp_server;
258   s->so_reuseport = grpc_is_socket_reuse_port_supported();
259   s->expand_wildcard_addrs = false;
260   auto value = config.GetInt(GRPC_ARG_ALLOW_REUSEPORT);
261   if (value.has_value()) {
262     s->so_reuseport = (grpc_is_socket_reuse_port_supported() && *value != 0);
263   }
264   value = config.GetInt(GRPC_ARG_EXPAND_WILDCARD_ADDRS);
265   if (value.has_value()) {
266     s->expand_wildcard_addrs = (*value != 0);
267   }
268   gpr_ref_init(&s->refs, 1);
269   gpr_mu_init(&s->mu);
270   s->active_ports = 0;
271   s->destroyed_ports = 0;
272   s->shutdown = false;
273   s->shutdown_starting.head = nullptr;
274   s->shutdown_starting.tail = nullptr;
275   if (!grpc_event_engine::experimental::UseEventEngineListener()) {
276     s->shutdown_complete = shutdown_complete;
277   } else {
278     s->shutdown_complete = nullptr;
279   }
280   s->on_accept_cb = on_accept_cb;
281   s->on_accept_cb_arg = on_accept_cb_arg;
282   s->head = nullptr;
283   s->tail = nullptr;
284   s->nports = 0;
285   s->options = ::TcpOptionsFromEndpointConfig(config);
286   s->fd_handler = nullptr;
287   CHECK(s->options.resource_quota != nullptr);
288   CHECK(s->on_accept_cb);
289   s->memory_quota = s->options.resource_quota->memory_quota();
290   s->pre_allocated_fd = -1;
291   gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
292   s->n_bind_ports = 0;
293   new (&s->listen_fd_to_index_map)
294       absl::flat_hash_map<int, std::tuple<int, int>>();
295   *server = s;
296   if (grpc_event_engine::experimental::UseEventEngineListener()) {
297     return CreateEventEngineListener(s, shutdown_complete, config, server);
298   }
299   return absl::OkStatus();
300 }
301 
destroyed_port(void * server,grpc_error_handle)302 static void destroyed_port(void* server, grpc_error_handle /*error*/) {
303   grpc_tcp_server* s = static_cast<grpc_tcp_server*>(server);
304   gpr_mu_lock(&s->mu);
305   s->destroyed_ports++;
306   if (s->destroyed_ports == s->nports) {
307     gpr_mu_unlock(&s->mu);
308     finish_shutdown(s);
309   } else {
310     CHECK(s->destroyed_ports < s->nports);
311     gpr_mu_unlock(&s->mu);
312   }
313 }
314 
315 // called when all listening endpoints have been shutdown, so no further
316 // events will be received on them - at this point it's safe to destroy
317 // things
deactivated_all_ports(grpc_tcp_server * s)318 static void deactivated_all_ports(grpc_tcp_server* s) {
319   // delete ALL the things
320   gpr_mu_lock(&s->mu);
321 
322   CHECK(s->shutdown);
323 
324   if (s->head) {
325     grpc_tcp_listener* sp;
326     for (sp = s->head; sp; sp = sp->next) {
327       // Do not unlink if there is a pre-allocated FD
328       if (grpc_tcp_server_pre_allocated_fd(s) <= 0) {
329         grpc_unlink_if_unix_domain_socket(&sp->addr);
330       }
331       GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
332                         grpc_schedule_on_exec_ctx);
333       grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
334                      "tcp_listener_shutdown");
335     }
336     gpr_mu_unlock(&s->mu);
337   } else {
338     gpr_mu_unlock(&s->mu);
339     if (grpc_event_engine::experimental::UseEventEngineListener()) {
340       // This will trigger asynchronous execution of the on_shutdown_complete
341       // callback when appropriate. That callback will delete the server.
342       s->ee_listener.reset();
343     } else {
344       finish_shutdown(s);
345     }
346   }
347 }
348 
tcp_server_destroy(grpc_tcp_server * s)349 static void tcp_server_destroy(grpc_tcp_server* s) {
350   gpr_mu_lock(&s->mu);
351   CHECK(!s->shutdown);
352   s->shutdown = true;
353   // shutdown all fd's
354   if (s->active_ports) {
355     grpc_tcp_listener* sp;
356     for (sp = s->head; sp; sp = sp->next) {
357       grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE("Server destroyed"));
358     }
359     gpr_mu_unlock(&s->mu);
360   } else {
361     gpr_mu_unlock(&s->mu);
362     deactivated_all_ports(s);
363   }
364 }
365 
366 // event manager callback when reads are ready
on_read(void * arg,grpc_error_handle err)367 static void on_read(void* arg, grpc_error_handle err) {
368   grpc_tcp_listener* sp = static_cast<grpc_tcp_listener*>(arg);
369   grpc_pollset* read_notifier_pollset;
370   if (!err.ok()) {
371     goto error;
372   }
373 
374   // loop until accept4 returns EAGAIN, and then re-arm notification
375   for (;;) {
376     grpc_resolved_address addr;
377     memset(&addr, 0, sizeof(addr));
378     addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
379     // Note: If we ever decide to return this address to the user, remember to
380     // strip off the ::ffff:0.0.0.0/96 prefix first.
381     int fd = grpc_accept4(sp->fd, &addr, 1, 1);
382     if (fd < 0) {
383       if (errno == EINTR) {
384         continue;
385       }
386       // When the process runs out of fds, accept4() returns EMFILE. When this
387       // happens, the connection is left in the accept queue until either a
388       // read event triggers the on_read callback, or time has passed and the
389       // accept should be re-tried regardless. This callback is not cancelled,
390       // so a spurious wakeup may occur even when there's nothing to accept.
391       // This is not a performant code path, but if an fd limit has been
392       // reached, the system is likely in an unhappy state regardless.
393       if (errno == EMFILE) {
394         LOG_EVERY_N_SEC(ERROR, 1) << "File descriptor limit reached. Retrying.";
395         grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
396         if (gpr_atm_full_xchg(&sp->retry_timer_armed, true)) return;
397         grpc_timer_init(&sp->retry_timer,
398                         grpc_core::Timestamp::Now() + kRetryAcceptWaitTime,
399                         &sp->retry_closure);
400         return;
401       }
402       if (errno == EAGAIN || errno == ECONNABORTED || errno == EWOULDBLOCK) {
403         grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
404         return;
405       }
406       gpr_mu_lock(&sp->server->mu);
407       if (!sp->server->shutdown_listeners) {
408         LOG(ERROR) << "Failed accept4: " << grpc_core::StrError(errno);
409       } else {
410         // if we have shutdown listeners, accept4 could fail, and we
411         // needn't notify users
412       }
413       gpr_mu_unlock(&sp->server->mu);
414       goto error;
415     }
416 
417     if (sp->server->memory_quota->IsMemoryPressureHigh()) {
418       int64_t dropped_connections_count =
419           num_dropped_connections.fetch_add(1, std::memory_order_relaxed) + 1;
420       if (dropped_connections_count % 1000 == 1) {
421         GRPC_TRACE_LOG(tcp, INFO)
422             << "Dropped >= " << dropped_connections_count
423             << " new connection attempts due to high memory pressure";
424       }
425       close(fd);
426       continue;
427     }
428 
429     // For UNIX sockets, the accept call might not fill up the member sun_path
430     // of sockaddr_un, so explicitly call getpeername to get it.
431     if (grpc_is_unix_socket(&addr)) {
432       memset(&addr, 0, sizeof(addr));
433       addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
434       if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
435                       &(addr.len)) < 0) {
436         auto listener_addr_uri = grpc_sockaddr_to_uri(&sp->addr);
437         LOG(ERROR) << "Failed getpeername: " << grpc_core::StrError(errno)
438                    << ". Dropping the connection, and continuing to listen on "
439                    << (listener_addr_uri.ok() ? *listener_addr_uri
440                                               : "<unknown>")
441                    << ":" << sp->port;
442         close(fd);
443         continue;
444       }
445     }
446 
447     (void)grpc_set_socket_no_sigpipe_if_possible(fd);
448 
449     err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_SERVER_CONNECTION_USAGE,
450                                             sp->server->options);
451     if (!err.ok()) {
452       goto error;
453     }
454 
455     auto addr_uri = grpc_sockaddr_to_uri(&addr);
456     if (!addr_uri.ok()) {
457       LOG(ERROR) << "Invalid address: " << addr_uri.status();
458       goto error;
459     }
460     GRPC_TRACE_LOG(tcp, INFO)
461         << "SERVER_CONNECT: incoming connection: " << *addr_uri;
462 
463     std::string name = absl::StrCat("tcp-server-connection:", addr_uri.value());
464     grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
465 
466     read_notifier_pollset = (*(sp->server->pollsets))
467         [static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
468              &sp->server->next_pollset_to_assign, 1)) %
469          sp->server->pollsets->size()];
470 
471     grpc_pollset_add_fd(read_notifier_pollset, fdobj);
472 
473     // Create acceptor.
474     grpc_tcp_server_acceptor* acceptor =
475         static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
476     acceptor->from_server = sp->server;
477     acceptor->port_index = sp->port_index;
478     acceptor->fd_index = sp->fd_index;
479     acceptor->external_connection = false;
480     sp->server->on_accept_cb(
481         sp->server->on_accept_cb_arg,
482         grpc_tcp_create(fdobj, sp->server->options, addr_uri.value()),
483         read_notifier_pollset, acceptor);
484   }
485 
486   GPR_UNREACHABLE_CODE(return);
487 
488 error:
489   gpr_mu_lock(&sp->server->mu);
490   if (0 == --sp->server->active_ports && sp->server->shutdown) {
491     gpr_mu_unlock(&sp->server->mu);
492     deactivated_all_ports(sp->server);
493   } else {
494     gpr_mu_unlock(&sp->server->mu);
495   }
496 }
497 
498 // Treat :: or 0.0.0.0 as a family-agnostic wildcard.
add_wildcard_addrs_to_server(grpc_tcp_server * s,unsigned port_index,int requested_port,int * out_port)499 static grpc_error_handle add_wildcard_addrs_to_server(grpc_tcp_server* s,
500                                                       unsigned port_index,
501                                                       int requested_port,
502                                                       int* out_port) {
503   grpc_resolved_address wild4;
504   grpc_resolved_address wild6;
505   unsigned fd_index = 0;
506   grpc_dualstack_mode dsmode;
507   grpc_tcp_listener* sp = nullptr;
508   grpc_tcp_listener* sp2 = nullptr;
509   grpc_error_handle v6_err;
510   grpc_error_handle v4_err;
511   *out_port = -1;
512 
513   if (grpc_tcp_server_have_ifaddrs() && s->expand_wildcard_addrs) {
514     return grpc_tcp_server_add_all_local_addrs(s, port_index, requested_port,
515                                                out_port);
516   }
517 
518   grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
519   // Try listening on IPv6 first.
520   if ((v6_err = grpc_tcp_server_add_addr(s, &wild6, port_index, fd_index,
521                                          &dsmode, &sp)) == absl::OkStatus()) {
522     ++fd_index;
523     requested_port = *out_port = sp->port;
524     if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) {
525       return absl::OkStatus();
526     }
527   }
528   // If we got a v6-only socket or nothing, try adding 0.0.0.0.
529   grpc_sockaddr_set_port(&wild4, requested_port);
530   if ((v4_err = grpc_tcp_server_add_addr(s, &wild4, port_index, fd_index,
531                                          &dsmode, &sp2)) == absl::OkStatus()) {
532     *out_port = sp2->port;
533     if (sp != nullptr) {
534       sp2->is_sibling = 1;
535       sp->sibling = sp2;
536     }
537   }
538   if (*out_port > 0) {
539     if (!v6_err.ok()) {
540       GRPC_TRACE_LOG(tcp, INFO) << "Failed to add :: listener, "
541                                 << "the environment may not support IPv6: "
542                                 << grpc_core::StatusToString(v6_err);
543     }
544     if (!v4_err.ok()) {
545       GRPC_TRACE_LOG(tcp, INFO) << "Failed to add 0.0.0.0 listener, "
546                                 << "the environment may not support IPv4: "
547                                 << grpc_core::StatusToString(v4_err);
548     }
549     return absl::OkStatus();
550   } else {
551     grpc_error_handle root_err =
552         GRPC_ERROR_CREATE("Failed to add any wildcard listeners");
553     CHECK(!v6_err.ok());
554     CHECK(!v4_err.ok());
555     root_err = grpc_error_add_child(root_err, v6_err);
556     root_err = grpc_error_add_child(root_err, v4_err);
557     return root_err;
558   }
559 }
560 
clone_port(grpc_tcp_listener * listener,unsigned count)561 static grpc_error_handle clone_port(grpc_tcp_listener* listener,
562                                     unsigned count) {
563   grpc_tcp_listener* sp = nullptr;
564   absl::StatusOr<std::string> addr_str;
565   grpc_error_handle err;
566 
567   for (grpc_tcp_listener* l = listener->next; l && l->is_sibling; l = l->next) {
568     l->fd_index += count;
569   }
570 
571   for (unsigned i = 0; i < count; i++) {
572     int fd = -1;
573     int port = -1;
574     grpc_dualstack_mode dsmode;
575     err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode,
576                                        &fd);
577     if (!err.ok()) return err;
578     err = grpc_tcp_server_prepare_socket(listener->server, fd, &listener->addr,
579                                          true, &port);
580     if (!err.ok()) return err;
581     listener->server->nports++;
582     addr_str = grpc_sockaddr_to_string(&listener->addr, true);
583     if (!addr_str.ok()) {
584       return GRPC_ERROR_CREATE(addr_str.status().ToString());
585     }
586     sp = static_cast<grpc_tcp_listener*>(gpr_malloc(sizeof(grpc_tcp_listener)));
587     sp->next = listener->next;
588     listener->next = sp;
589     // sp (the new listener) is a sibling of 'listener' (the original
590     // listener).
591     sp->is_sibling = 1;
592     sp->sibling = listener->sibling;
593     listener->sibling = sp;
594     sp->server = listener->server;
595     sp->fd = fd;
596     sp->emfd = grpc_fd_create(
597         fd,
598         absl::StrFormat("tcp-server-listener:%s/clone-%d", *addr_str, i)
599             .c_str(),
600         true);
601     memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
602     sp->port = port;
603     sp->port_index = listener->port_index;
604     sp->fd_index = listener->fd_index + count - i;
605     CHECK(sp->emfd);
606     grpc_tcp_server_listener_initialize_retry_timer(sp);
607     while (listener->server->tail->next != nullptr) {
608       listener->server->tail = listener->server->tail->next;
609     }
610   }
611 
612   return absl::OkStatus();
613 }
614 
tcp_server_add_port(grpc_tcp_server * s,const grpc_resolved_address * addr,int * out_port)615 static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
616                                              const grpc_resolved_address* addr,
617                                              int* out_port) {
618   if (grpc_event_engine::experimental::UseEventEngineListener()) {
619     gpr_mu_lock(&s->mu);
620     if (s->shutdown_listeners) {
621       gpr_mu_unlock(&s->mu);
622       return absl::UnknownError("Server already shutdown");
623     }
624     int fd_index = 0;
625     absl::StatusOr<int> port;
626     auto* listener_supports_fd =
627         grpc_event_engine::experimental::QueryExtension<
628             grpc_event_engine::experimental::ListenerSupportsFdExtension>(
629             s->ee_listener.get());
630     if (listener_supports_fd != nullptr) {
631       port = listener_supports_fd->BindWithFd(
632           grpc_event_engine::experimental::CreateResolvedAddress(*addr),
633           [s, &fd_index](absl::StatusOr<int> listen_fd) {
634             if (!listen_fd.ok()) {
635               return;
636             }
637             DCHECK_GT(*listen_fd, 0);
638             s->listen_fd_to_index_map.insert_or_assign(
639                 *listen_fd, std::make_tuple(s->n_bind_ports, fd_index++));
640           });
641     } else {
642       port = s->ee_listener->Bind(
643           grpc_event_engine::experimental::CreateResolvedAddress(*addr));
644     }
645     if (port.ok()) {
646       s->n_bind_ports++;
647       *out_port = *port;
648     }
649     gpr_mu_unlock(&s->mu);
650     return port.status();
651   }
652   CHECK(addr->len <= GRPC_MAX_SOCKADDR_SIZE);
653   grpc_tcp_listener* sp;
654   grpc_resolved_address sockname_temp;
655   grpc_resolved_address addr6_v4mapped;
656   int requested_port = grpc_sockaddr_get_port(addr);
657   unsigned port_index = 0;
658   grpc_dualstack_mode dsmode;
659   grpc_error_handle err;
660   *out_port = -1;
661   if (s->tail != nullptr) {
662     port_index = s->tail->port_index + 1;
663   }
664 
665   // Check if this is a wildcard port, and if so, try to keep the port the same
666   // as some previously created listener.
667   if (requested_port == 0) {
668     for (sp = s->head; sp; sp = sp->next) {
669       sockname_temp.len =
670           static_cast<socklen_t>(sizeof(struct sockaddr_storage));
671       if (0 ==
672           getsockname(sp->fd,
673                       reinterpret_cast<grpc_sockaddr*>(&sockname_temp.addr),
674                       &sockname_temp.len)) {
675         int used_port = grpc_sockaddr_get_port(&sockname_temp);
676         if (used_port > 0) {
677           memcpy(&sockname_temp, addr, sizeof(grpc_resolved_address));
678           grpc_sockaddr_set_port(&sockname_temp, used_port);
679           requested_port = used_port;
680           addr = &sockname_temp;
681           break;
682         }
683       }
684     }
685   }
686 
687   /* Check if systemd has pre-allocated valid FDs */
688   set_matching_sd_fds(s, addr, requested_port);
689 
690   /* Do not unlink if there are pre-allocated FDs, or it will stop
691      working after the first client connects */
692   if (grpc_tcp_server_pre_allocated_fd(s) <= 0) {
693     grpc_unlink_if_unix_domain_socket(addr);
694   }
695 
696   if (grpc_sockaddr_is_wildcard(addr, &requested_port)) {
697     return add_wildcard_addrs_to_server(s, port_index, requested_port,
698                                         out_port);
699   }
700   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
701     addr = &addr6_v4mapped;
702   }
703   if ((err = grpc_tcp_server_add_addr(s, addr, port_index, 0, &dsmode, &sp)) ==
704       absl::OkStatus()) {
705     *out_port = sp->port;
706   }
707   return err;
708 }
709 
710 // Return listener at port_index or NULL. Should only be called with s->mu
711 // locked.
get_port_index(grpc_tcp_server * s,unsigned port_index)712 static grpc_tcp_listener* get_port_index(grpc_tcp_server* s,
713                                          unsigned port_index) {
714   unsigned num_ports = 0;
715   grpc_tcp_listener* sp;
716   for (sp = s->head; sp; sp = sp->next) {
717     if (!sp->is_sibling) {
718       if (++num_ports > port_index) {
719         return sp;
720       }
721     }
722   }
723   return nullptr;
724 }
725 
tcp_server_port_fd_count(grpc_tcp_server * s,unsigned port_index)726 unsigned tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index) {
727   unsigned num_fds = 0;
728   gpr_mu_lock(&s->mu);
729   if (grpc_event_engine::experimental::UseEventEngineListener()) {
730     // This doesn't need to be very fast. Used in tests.
731     for (auto it = s->listen_fd_to_index_map.begin();
732          it != s->listen_fd_to_index_map.end(); it++) {
733       if (std::get<0>(it->second) == static_cast<int>(port_index)) {
734         num_fds++;
735       }
736     }
737     gpr_mu_unlock(&s->mu);
738     return num_fds;
739   }
740   grpc_tcp_listener* sp = get_port_index(s, port_index);
741   for (; sp; sp = sp->sibling) {
742     ++num_fds;
743   }
744   gpr_mu_unlock(&s->mu);
745   return num_fds;
746 }
747 
tcp_server_port_fd(grpc_tcp_server * s,unsigned port_index,unsigned fd_index)748 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
749                               unsigned fd_index) {
750   gpr_mu_lock(&s->mu);
751   if (grpc_event_engine::experimental::UseEventEngineListener()) {
752     // This doesn't need to be very fast. Used in tests.
753     for (auto it = s->listen_fd_to_index_map.begin();
754          it != s->listen_fd_to_index_map.end(); it++) {
755       if (std::get<0>(it->second) == static_cast<int>(port_index) &&
756           std::get<1>(it->second) == static_cast<int>(fd_index)) {
757         gpr_mu_unlock(&s->mu);
758         return it->first;
759       }
760     }
761     gpr_mu_unlock(&s->mu);
762     return -1;
763   }
764   grpc_tcp_listener* sp = get_port_index(s, port_index);
765   for (; sp; sp = sp->sibling, --fd_index) {
766     if (fd_index == 0) {
767       gpr_mu_unlock(&s->mu);
768       return sp->fd;
769     }
770   }
771   gpr_mu_unlock(&s->mu);
772   return -1;
773 }
774 
tcp_server_start(grpc_tcp_server * s,const std::vector<grpc_pollset * > * pollsets)775 static void tcp_server_start(grpc_tcp_server* s,
776                              const std::vector<grpc_pollset*>* pollsets) {
777   size_t i;
778   grpc_tcp_listener* sp;
779   gpr_mu_lock(&s->mu);
780   CHECK(s->on_accept_cb);
781   CHECK_EQ(s->active_ports, 0u);
782   s->pollsets = pollsets;
783   if (grpc_event_engine::experimental::UseEventEngineListener()) {
784     CHECK(!s->shutdown_listeners);
785     CHECK(GRPC_LOG_IF_ERROR("listener_start", s->ee_listener->Start()));
786     gpr_mu_unlock(&s->mu);
787     return;
788   }
789   sp = s->head;
790   while (sp != nullptr) {
791     if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr) &&
792         !grpc_is_vsock(&sp->addr) && pollsets->size() > 1) {
793       CHECK(GRPC_LOG_IF_ERROR(
794           "clone_port", clone_port(sp, (unsigned)(pollsets->size() - 1))));
795       for (i = 0; i < pollsets->size(); i++) {
796         grpc_pollset_add_fd((*pollsets)[i], sp->emfd);
797         GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
798                           grpc_schedule_on_exec_ctx);
799         grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
800         s->active_ports++;
801         sp = sp->next;
802       }
803     } else {
804       for (i = 0; i < pollsets->size(); i++) {
805         grpc_pollset_add_fd((*pollsets)[i], sp->emfd);
806       }
807       GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
808                         grpc_schedule_on_exec_ctx);
809       grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
810       s->active_ports++;
811       sp = sp->next;
812     }
813   }
814   gpr_mu_unlock(&s->mu);
815 }
816 
tcp_server_ref(grpc_tcp_server * s)817 grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
818   gpr_ref_non_zero(&s->refs);
819   return s;
820 }
821 
tcp_server_shutdown_starting_add(grpc_tcp_server * s,grpc_closure * shutdown_starting)822 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
823                                              grpc_closure* shutdown_starting) {
824   gpr_mu_lock(&s->mu);
825   grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
826                            absl::OkStatus());
827   gpr_mu_unlock(&s->mu);
828 }
829 
tcp_server_unref(grpc_tcp_server * s)830 static void tcp_server_unref(grpc_tcp_server* s) {
831   if (gpr_unref(&s->refs)) {
832     grpc_tcp_server_shutdown_listeners(s);
833     gpr_mu_lock(&s->mu);
834     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
835     gpr_mu_unlock(&s->mu);
836     tcp_server_destroy(s);
837   }
838 }
839 
tcp_server_shutdown_listeners(grpc_tcp_server * s)840 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
841   gpr_mu_lock(&s->mu);
842   s->shutdown_listeners = true;
843   if (grpc_event_engine::experimental::UseEventEngineListener()) {
844     auto* listener_supports_fd =
845         grpc_event_engine::experimental::QueryExtension<
846             grpc_event_engine::experimental::ListenerSupportsFdExtension>(
847             s->ee_listener.get());
848     if (listener_supports_fd != nullptr) {
849       listener_supports_fd->ShutdownListeningFds();
850     }
851   }
852   /* shutdown all fd's */
853   if (s->active_ports) {
854     grpc_tcp_listener* sp;
855     for (sp = s->head; sp; sp = sp->next) {
856       grpc_timer_cancel(&sp->retry_timer);
857       grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE("Server shutdown"));
858     }
859   }
860   gpr_mu_unlock(&s->mu);
861 }
862 
tcp_server_pre_allocated_fd(grpc_tcp_server * s)863 static int tcp_server_pre_allocated_fd(grpc_tcp_server* s) {
864   return s->pre_allocated_fd;
865 }
866 
tcp_server_set_pre_allocated_fd(grpc_tcp_server * s,int fd)867 static void tcp_server_set_pre_allocated_fd(grpc_tcp_server* s, int fd) {
868   gpr_mu_lock(&s->mu);
869   s->pre_allocated_fd = fd;
870   gpr_mu_unlock(&s->mu);
871 }
872 
873 namespace {
874 class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
875  public:
ExternalConnectionHandler(grpc_tcp_server * s)876   explicit ExternalConnectionHandler(grpc_tcp_server* s) : s_(s) {}
877 
878   // TODO(yangg) resolve duplicate code with on_read
Handle(int listener_fd,int fd,grpc_byte_buffer * buf)879   void Handle(int listener_fd, int fd, grpc_byte_buffer* buf) override {
880     if (grpc_event_engine::experimental::UseEventEngineListener()) {
881       auto* listener_supports_fd =
882           grpc_event_engine::experimental::QueryExtension<
883               grpc_event_engine::experimental::ListenerSupportsFdExtension>(
884               s_->ee_listener.get());
885       CHECK_NE(listener_supports_fd, nullptr);
886       grpc_event_engine::experimental::SliceBuffer pending_data;
887       if (buf != nullptr) {
888         pending_data =
889             grpc_event_engine::experimental::SliceBuffer::TakeCSliceBuffer(
890                 buf->data.raw.slice_buffer);
891       }
892       CHECK(GRPC_LOG_IF_ERROR("listener_handle_external_connection",
893                               listener_supports_fd->HandleExternalConnection(
894                                   listener_fd, fd, &pending_data)));
895       return;
896     }
897     grpc_pollset* read_notifier_pollset;
898     grpc_resolved_address addr;
899     memset(&addr, 0, sizeof(addr));
900     addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
901     grpc_core::ExecCtx exec_ctx;
902 
903     if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
904                     &(addr.len)) < 0) {
905       LOG(ERROR) << "Failed getpeername: " << grpc_core::StrError(errno);
906       close(fd);
907       return;
908     }
909     (void)grpc_set_socket_no_sigpipe_if_possible(fd);
910     auto addr_uri = grpc_sockaddr_to_uri(&addr);
911     if (!addr_uri.ok()) {
912       LOG(ERROR) << "Invalid address: " << addr_uri.status();
913       return;
914     }
915     GRPC_TRACE_LOG(tcp, INFO)
916         << "SERVER_CONNECT: incoming external connection: " << *addr_uri;
917     std::string name = absl::StrCat("tcp-server-connection:", addr_uri.value());
918     grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
919     read_notifier_pollset =
920         (*(s_->pollsets))[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
921                               &s_->next_pollset_to_assign, 1)) %
922                           s_->pollsets->size()];
923     grpc_pollset_add_fd(read_notifier_pollset, fdobj);
924     grpc_tcp_server_acceptor* acceptor =
925         static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
926     acceptor->from_server = s_;
927     acceptor->port_index = -1;
928     acceptor->fd_index = -1;
929     acceptor->external_connection = true;
930     acceptor->listener_fd = listener_fd;
931     acceptor->pending_data = buf;
932     s_->on_accept_cb(s_->on_accept_cb_arg,
933                      grpc_tcp_create(fdobj, s_->options, addr_uri.value()),
934                      read_notifier_pollset, acceptor);
935   }
936 
937  private:
938   grpc_tcp_server* s_;
939 };
940 }  // namespace
941 
tcp_server_create_fd_handler(grpc_tcp_server * s)942 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
943     grpc_tcp_server* s) {
944   s->fd_handler = new ExternalConnectionHandler(s);
945   return s->fd_handler;
946 }
947 
948 grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
949     tcp_server_create,
950     tcp_server_start,
951     tcp_server_add_port,
952     tcp_server_create_fd_handler,
953     tcp_server_port_fd_count,
954     tcp_server_port_fd,
955     tcp_server_ref,
956     tcp_server_shutdown_starting_add,
957     tcp_server_unref,
958     tcp_server_shutdown_listeners,
959     tcp_server_pre_allocated_fd,
960     tcp_server_set_pre_allocated_fd};
961 
962 #endif  // GRPC_POSIX_SOCKET_TCP_SERVER
963