1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_WINSOCK_SOCKET
24
25 #include <grpc/event_engine/endpoint_config.h>
26 #include <grpc/event_engine/event_engine.h>
27 #include <grpc/event_engine/memory_allocator.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log_windows.h>
30 #include <grpc/support/string_util.h>
31 #include <grpc/support/sync.h>
32 #include <grpc/support/time.h>
33 #include <inttypes.h>
34 #include <io.h>
35
36 #include <vector>
37
38 #include "absl/log/check.h"
39 #include "absl/log/log.h"
40 #include "absl/strings/str_cat.h"
41 #include "src/core/lib/address_utils/sockaddr_utils.h"
42 #include "src/core/lib/event_engine/memory_allocator_factory.h"
43 #include "src/core/lib/event_engine/resolved_address_internal.h"
44 #include "src/core/lib/event_engine/tcp_socket_utils.h"
45 #include "src/core/lib/event_engine/windows/windows_engine.h"
46 #include "src/core/lib/event_engine/windows/windows_listener.h"
47 #include "src/core/lib/iomgr/closure.h"
48 #include "src/core/lib/iomgr/event_engine_shims/closure.h"
49 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
50 #include "src/core/lib/iomgr/iocp_windows.h"
51 #include "src/core/lib/iomgr/pollset_windows.h"
52 #include "src/core/lib/iomgr/resolve_address.h"
53 #include "src/core/lib/iomgr/sockaddr.h"
54 #include "src/core/lib/iomgr/socket_windows.h"
55 #include "src/core/lib/iomgr/tcp_server.h"
56 #include "src/core/lib/iomgr/tcp_windows.h"
57 #include "src/core/lib/resource_quota/api.h"
58 #include "src/core/lib/resource_quota/resource_quota.h"
59 #include "src/core/lib/slice/slice_internal.h"
60 #include "src/core/util/crash.h"
61
62 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
63
64 namespace {
65 using ::grpc_event_engine::experimental::CreateResolvedAddress;
66 using ::grpc_event_engine::experimental::EndpointConfig;
67 using ::grpc_event_engine::experimental::EventEngine;
68 using ::grpc_event_engine::experimental::grpc_event_engine_endpoint_create;
69 using ::grpc_event_engine::experimental::MemoryAllocator;
70 using ::grpc_event_engine::experimental::MemoryQuotaBasedMemoryAllocatorFactory;
71 using ::grpc_event_engine::experimental::ResolvedAddressSetPort;
72 using ::grpc_event_engine::experimental::RunEventEngineClosure;
73 using ::grpc_event_engine::experimental::WindowsEventEngine;
74 using ::grpc_event_engine::experimental::WindowsEventEngineListener;
75 } // namespace
76
77 // one listening port
78 typedef struct grpc_tcp_listener grpc_tcp_listener;
79 struct grpc_tcp_listener {
80 // Buffer to hold the local and remote address.
81 // This seemingly magic number comes from AcceptEx's documentation. each
82 // address buffer needs to have at least 16 more bytes at their end.
83 #ifdef GRPC_HAVE_UNIX_SOCKET
84 // unix addr is larger than ip addr.
85 uint8_t addresses[(sizeof(sockaddr_un) + 16) * 2] = {};
86 #else
87 uint8_t addresses[(sizeof(grpc_sockaddr_in6) + 16) * 2];
88 #endif // GRPC_HAVE_UNIX_SOCKET
89 // This will hold the socket for the next accept.
90 SOCKET new_socket;
91 // The listener winsocket.
92 grpc_winsocket* socket;
93 // address of listener
94 grpc_resolved_address resolved_addr;
95 // The actual TCP port number.
96 int port;
97 unsigned port_index;
98 grpc_tcp_server* server;
99 // The cached AcceptEx for that port.
100 LPFN_ACCEPTEX AcceptEx;
101 int shutting_down;
102 int outstanding_calls;
103 // closure for socket notification of accept being ready
104 grpc_closure on_accept;
105 // linked list
106 struct grpc_tcp_listener* next;
107 };
108
109 // the overall server
110 struct grpc_tcp_server {
111 gpr_refcount refs;
112 // Called whenever accept() succeeds on a server port.
113 grpc_tcp_server_cb on_accept_cb;
114 void* on_accept_cb_arg;
115
116 gpr_mu mu;
117
118 // active port count: how many ports are actually still listening
119 int active_ports;
120
121 // linked list of server ports
122 grpc_tcp_listener* head;
123 grpc_tcp_listener* tail;
124
125 // List of closures passed to shutdown_starting_add().
126 grpc_closure_list shutdown_starting;
127
128 // shutdown callback
129 grpc_closure* shutdown_complete;
130
131 // used for the EventEngine shim
132 WindowsEventEngineListener* ee_listener;
133 };
134
135 // TODO(hork): This may be refactored to share with posix engine and event
136 // engine.
unlink_if_unix_domain_socket(const grpc_resolved_address * resolved_addr)137 void unlink_if_unix_domain_socket(const grpc_resolved_address* resolved_addr) {
138 #ifdef GRPC_HAVE_UNIX_SOCKET
139 const grpc_sockaddr* addr =
140 reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
141 if (addr->sa_family != AF_UNIX) {
142 return;
143 }
144 struct sockaddr_un* un =
145 reinterpret_cast<struct sockaddr_un*>(const_cast<sockaddr*>(addr));
146 // There is nothing to unlink for an abstract unix socket.
147 if (un->sun_path[0] == '\0' && un->sun_path[1] != '\0') {
148 return;
149 }
150 // For windows we need to remove the file instead of unlink.
151 DWORD attr = ::GetFileAttributesA(un->sun_path);
152 if (attr == INVALID_FILE_ATTRIBUTES) {
153 return;
154 }
155 if (attr & FILE_ATTRIBUTE_DIRECTORY || attr & FILE_ATTRIBUTE_READONLY) {
156 return;
157 }
158 ::DeleteFileA(un->sun_path);
159 #else
160 (void)resolved_addr;
161 #endif
162 }
163
164 // Public function. Allocates the proper data structures to hold a
165 // grpc_tcp_server.
tcp_server_create(grpc_closure * shutdown_complete,const EndpointConfig &,grpc_tcp_server_cb on_accept_cb,void * on_accept_cb_arg,grpc_tcp_server ** server)166 static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
167 const EndpointConfig& /* config */,
168 grpc_tcp_server_cb on_accept_cb,
169 void* on_accept_cb_arg,
170 grpc_tcp_server** server) {
171 grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
172 gpr_ref_init(&s->refs, 1);
173 gpr_mu_init(&s->mu);
174 s->active_ports = 0;
175 s->on_accept_cb = on_accept_cb;
176 s->on_accept_cb_arg = on_accept_cb_arg;
177 s->head = NULL;
178 s->tail = NULL;
179 s->shutdown_starting.head = NULL;
180 s->shutdown_starting.tail = NULL;
181 s->shutdown_complete = shutdown_complete;
182 *server = s;
183 return absl::OkStatus();
184 }
185
destroy_server(void * arg,grpc_error_handle)186 static void destroy_server(void* arg, grpc_error_handle /* error */) {
187 grpc_tcp_server* s = (grpc_tcp_server*)arg;
188
189 // Now that the accepts have been aborted, we can destroy the sockets.
190 // The IOCP won't get notified on these, so we can flag them as already
191 // closed by the system.
192 while (s->head) {
193 grpc_tcp_listener* sp = s->head;
194 s->head = sp->next;
195 sp->next = NULL;
196 grpc_winsocket_destroy(sp->socket);
197 unlink_if_unix_domain_socket(&sp->resolved_addr);
198 gpr_free(sp);
199 }
200 gpr_mu_destroy(&s->mu);
201 gpr_free(s);
202 }
203
finish_shutdown_locked(grpc_tcp_server * s)204 static void finish_shutdown_locked(grpc_tcp_server* s) {
205 if (s->shutdown_complete != NULL) {
206 grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
207 absl::OkStatus());
208 }
209
210 grpc_core::ExecCtx::Run(
211 DEBUG_LOCATION,
212 GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx),
213 absl::OkStatus());
214 }
215
tcp_server_ref(grpc_tcp_server * s)216 static grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
217 gpr_ref_non_zero(&s->refs);
218 return s;
219 }
220
tcp_server_shutdown_starting_add(grpc_tcp_server * s,grpc_closure * shutdown_starting)221 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
222 grpc_closure* shutdown_starting) {
223 gpr_mu_lock(&s->mu);
224 grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
225 absl::OkStatus());
226 gpr_mu_unlock(&s->mu);
227 }
228
tcp_server_destroy(grpc_tcp_server * s)229 static void tcp_server_destroy(grpc_tcp_server* s) {
230 grpc_tcp_listener* sp;
231 gpr_mu_lock(&s->mu);
232 // First, shutdown all fd's. This will queue abortion calls for all
233 // of the pending accepts due to the normal operation mechanism.
234 if (s->active_ports == 0) {
235 finish_shutdown_locked(s);
236 } else {
237 for (sp = s->head; sp; sp = sp->next) {
238 sp->shutting_down = 1;
239 grpc_winsocket_shutdown(sp->socket);
240 }
241 }
242 gpr_mu_unlock(&s->mu);
243 }
244
tcp_server_unref(grpc_tcp_server * s)245 static void tcp_server_unref(grpc_tcp_server* s) {
246 if (gpr_unref(&s->refs)) {
247 grpc_tcp_server_shutdown_listeners(s);
248 gpr_mu_lock(&s->mu);
249 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
250 gpr_mu_unlock(&s->mu);
251 tcp_server_destroy(s);
252 }
253 }
254
255 // Prepare (bind) a recently-created socket for listening.
prepare_socket(SOCKET sock,const grpc_resolved_address * addr,int * port)256 static grpc_error_handle prepare_socket(SOCKET sock,
257 const grpc_resolved_address* addr,
258 int* port) {
259 grpc_resolved_address sockname_temp;
260 grpc_error_handle error;
261 int sockname_temp_len;
262 if (grpc_sockaddr_get_family(addr) == AF_UNIX) {
263 error = grpc_tcp_set_non_block(sock);
264 } else {
265 error = grpc_tcp_prepare_socket(sock);
266 }
267 if (!error.ok()) {
268 goto failure;
269 }
270 unlink_if_unix_domain_socket(addr);
271 if (bind(sock, (const grpc_sockaddr*)addr->addr, (int)addr->len) ==
272 SOCKET_ERROR) {
273 error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
274 goto failure;
275 }
276
277 if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
278 error = GRPC_WSA_ERROR(WSAGetLastError(), "listen");
279 goto failure;
280 }
281
282 sockname_temp_len = sizeof(struct sockaddr_storage);
283 if (getsockname(sock, (grpc_sockaddr*)sockname_temp.addr,
284 &sockname_temp_len) == SOCKET_ERROR) {
285 error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
286 goto failure;
287 }
288 sockname_temp.len = (size_t)sockname_temp_len;
289
290 *port = grpc_sockaddr_get_port(&sockname_temp);
291 return absl::OkStatus();
292
293 failure:
294 CHECK(!error.ok());
295 error = grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING(
296 "Failed to prepare server socket", &error, 1),
297 grpc_core::StatusIntProperty::kFd, (intptr_t)sock);
298 if (sock != INVALID_SOCKET) closesocket(sock);
299 return error;
300 }
301
decrement_active_ports_and_notify_locked(grpc_tcp_listener * sp)302 static void decrement_active_ports_and_notify_locked(grpc_tcp_listener* sp) {
303 sp->shutting_down = 0;
304 CHECK_GT(sp->server->active_ports, 0u);
305 if (0 == --sp->server->active_ports) {
306 finish_shutdown_locked(sp->server);
307 }
308 }
309
310 // In order to do an async accept, we need to create a socket first which
311 // will be the one assigned to the new incoming connection.
start_accept_locked(grpc_tcp_listener * port)312 static grpc_error_handle start_accept_locked(grpc_tcp_listener* port) {
313 SOCKET sock = INVALID_SOCKET;
314 BOOL success;
315 const DWORD addrlen = sizeof(port->addresses) / 2;
316 DWORD bytes_received = 0;
317 grpc_error_handle error;
318
319 if (port->shutting_down) {
320 return absl::OkStatus();
321 }
322 const int addr_family =
323 grpc_sockaddr_get_family(&port->resolved_addr) == AF_UNIX ? AF_UNIX
324 : AF_INET6;
325 const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP;
326 sock = WSASocket(addr_family, SOCK_STREAM, protocol, NULL, 0,
327 grpc_get_default_wsa_socket_flags());
328 if (sock == INVALID_SOCKET) {
329 error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
330 goto failure;
331 }
332 if (addr_family == AF_UNIX) {
333 error = grpc_tcp_set_non_block(sock);
334 } else {
335 error = grpc_tcp_prepare_socket(sock);
336 }
337 if (!error.ok()) goto failure;
338
339 // Start the "accept" asynchronously.
340 success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
341 addrlen, addrlen, &bytes_received,
342 &port->socket->read_info.overlapped);
343
344 // It is possible to get an accept immediately without delay. However, we
345 // will still get an IOCP notification for it. So let's just ignore it.
346 if (!success) {
347 int last_error = WSAGetLastError();
348 if (last_error != ERROR_IO_PENDING) {
349 error = GRPC_WSA_ERROR(last_error, "AcceptEx");
350 goto failure;
351 }
352 }
353
354 // We're ready to do the accept. Calling grpc_socket_notify_on_read may
355 // immediately process an accept that happened in the meantime.
356 port->new_socket = sock;
357 grpc_socket_notify_on_read(port->socket, &port->on_accept);
358 port->outstanding_calls++;
359 return error;
360
361 failure:
362 CHECK(!error.ok());
363 if (sock != INVALID_SOCKET) closesocket(sock);
364 return error;
365 }
366
367 // Event manager callback when reads are ready.
on_accept(void * arg,grpc_error_handle error)368 static void on_accept(void* arg, grpc_error_handle error) {
369 grpc_tcp_listener* sp = (grpc_tcp_listener*)arg;
370 SOCKET sock = sp->new_socket;
371 grpc_winsocket_callback_info* info = &sp->socket->read_info;
372 grpc_endpoint* ep = NULL;
373 grpc_resolved_address peer_name;
374 DWORD transferred_bytes;
375 DWORD flags;
376 BOOL wsa_success;
377 int err;
378
379 gpr_mu_lock(&sp->server->mu);
380
381 peer_name.len = sizeof(struct sockaddr_storage);
382
383 // The general mechanism for shutting down is to queue abortion calls. While
384 // this is necessary in the read/write case, it's useless for the accept
385 // case. We only need to adjust the pending callback count
386 if (!error.ok()) {
387 VLOG(2) << "Skipping on_accept due to error: "
388 << grpc_core::StatusToString(error);
389
390 gpr_mu_unlock(&sp->server->mu);
391 return;
392 }
393 // The IOCP notified us of a completed operation. Let's grab the results,
394 // and act accordingly.
395 transferred_bytes = 0;
396 wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
397 &transferred_bytes, FALSE, &flags);
398 if (!wsa_success) {
399 if (!sp->shutting_down) {
400 char* utf8_message = gpr_format_message(WSAGetLastError());
401 LOG(ERROR) << "on_accept error: " << utf8_message;
402 gpr_free(utf8_message);
403 }
404 closesocket(sock);
405 } else {
406 if (!sp->shutting_down) {
407 err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
408 (char*)&sp->socket->socket, sizeof(sp->socket->socket));
409 if (err) {
410 char* utf8_message = gpr_format_message(WSAGetLastError());
411 LOG(ERROR) << "setsockopt error: " << utf8_message;
412 gpr_free(utf8_message);
413 }
414 int peer_name_len = (int)peer_name.len;
415 err = getpeername(sock, (grpc_sockaddr*)peer_name.addr, &peer_name_len);
416 peer_name.len = (size_t)peer_name_len;
417 std::string peer_name_string;
418 if (!err) {
419 auto addr_uri = grpc_sockaddr_to_uri(&peer_name);
420 if (addr_uri.ok()) {
421 peer_name_string = addr_uri.value();
422 } else {
423 LOG(ERROR) << "invalid peer name: " << addr_uri.status();
424 }
425 } else {
426 char* utf8_message = gpr_format_message(WSAGetLastError());
427 LOG(ERROR) << "getpeername error: " << utf8_message;
428 gpr_free(utf8_message);
429 }
430 std::string fd_name = absl::StrCat("tcp_server:", peer_name_string);
431 ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name.c_str()),
432 peer_name_string);
433 } else {
434 closesocket(sock);
435 }
436 }
437
438 // The only time we should call our callback, is where we successfully
439 // managed to accept a connection, and created an endpoint.
440 if (ep) {
441 // Create acceptor.
442 grpc_tcp_server_acceptor* acceptor =
443 (grpc_tcp_server_acceptor*)gpr_malloc(sizeof(*acceptor));
444 acceptor->from_server = sp->server;
445 acceptor->port_index = sp->port_index;
446 acceptor->fd_index = 0;
447 acceptor->external_connection = false;
448 sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, NULL, acceptor);
449 }
450 // As we were notified from the IOCP of one and exactly one accept,
451 // the former socked we created has now either been destroy or assigned
452 // to the new connection. We need to create a new one for the next
453 // connection.
454 CHECK(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
455 if (0 == --sp->outstanding_calls) {
456 decrement_active_ports_and_notify_locked(sp);
457 }
458 gpr_mu_unlock(&sp->server->mu);
459 }
460
add_socket_to_server(grpc_tcp_server * s,SOCKET sock,const grpc_resolved_address * addr,unsigned port_index,grpc_tcp_listener ** listener)461 static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, SOCKET sock,
462 const grpc_resolved_address* addr,
463 unsigned port_index,
464 grpc_tcp_listener** listener) {
465 grpc_tcp_listener* sp = NULL;
466 int port = -1;
467 int status;
468 GUID guid = WSAID_ACCEPTEX;
469 DWORD ioctl_num_bytes;
470 LPFN_ACCEPTEX AcceptEx;
471 grpc_error_handle error;
472
473 // We need to grab the AcceptEx pointer for that port, as it may be
474 // interface-dependent. We'll cache it to avoid doing that again.
475 status =
476 WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
477 &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
478
479 if (status != 0) {
480 error = GRPC_WSA_ERROR(WSAGetLastError(), "AcceptEx pointer retrieval");
481 closesocket(sock);
482 return error;
483 }
484
485 error = prepare_socket(sock, addr, &port);
486 if (!error.ok()) {
487 return error;
488 }
489
490 CHECK_GE(port, 0);
491 gpr_mu_lock(&s->mu);
492 sp = (grpc_tcp_listener*)gpr_malloc(sizeof(grpc_tcp_listener));
493 sp->next = NULL;
494 if (s->head == NULL) {
495 s->head = sp;
496 } else {
497 s->tail->next = sp;
498 }
499 s->tail = sp;
500 sp->server = s;
501 sp->socket = grpc_winsocket_create(sock, "listener");
502 sp->shutting_down = 0;
503 sp->outstanding_calls = 0;
504 sp->AcceptEx = AcceptEx;
505 sp->new_socket = INVALID_SOCKET;
506 sp->resolved_addr = *addr;
507 sp->port = port;
508 sp->port_index = port_index;
509 GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
510 CHECK(sp->socket);
511 gpr_mu_unlock(&s->mu);
512 *listener = sp;
513
514 return absl::OkStatus();
515 }
516
tcp_server_add_port(grpc_tcp_server * s,const grpc_resolved_address * addr,int * port)517 static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
518 const grpc_resolved_address* addr,
519 int* port) {
520 grpc_tcp_listener* sp = NULL;
521 SOCKET sock;
522 grpc_resolved_address addr6_v4mapped;
523 grpc_resolved_address wildcard;
524 grpc_resolved_address* allocated_addr = NULL;
525 unsigned port_index = 0;
526 grpc_error_handle error;
527
528 if (s->tail != NULL) {
529 port_index = s->tail->port_index + 1;
530 }
531
532 // Check if this is a wildcard port, and if so, try to keep the port the same
533 // as some previously created listener.
534 if (grpc_sockaddr_get_port(addr) == 0) {
535 for (sp = s->head; sp; sp = sp->next) {
536 grpc_resolved_address sockname_temp;
537 int sockname_temp_len = sizeof(struct sockaddr_storage);
538 if (0 == getsockname(sp->socket->socket,
539 (grpc_sockaddr*)sockname_temp.addr,
540 &sockname_temp_len)) {
541 sockname_temp.len = (size_t)sockname_temp_len;
542 *port = grpc_sockaddr_get_port(&sockname_temp);
543 if (*port > 0) {
544 allocated_addr =
545 (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address));
546 memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
547 grpc_sockaddr_set_port(allocated_addr, *port);
548 addr = allocated_addr;
549 break;
550 }
551 }
552 }
553 }
554
555 if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
556 addr = &addr6_v4mapped;
557 }
558
559 // Treat :: or 0.0.0.0 as a family-agnostic wildcard.
560 if (grpc_sockaddr_is_wildcard(addr, port)) {
561 grpc_sockaddr_make_wildcard6(*port, &wildcard);
562
563 addr = &wildcard;
564 }
565
566 const int addr_family =
567 grpc_sockaddr_get_family(addr) == AF_UNIX ? AF_UNIX : AF_INET6;
568 const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP;
569 sock = WSASocket(addr_family, SOCK_STREAM, protocol, NULL, 0,
570 grpc_get_default_wsa_socket_flags());
571 if (sock == INVALID_SOCKET) {
572 error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
573 goto done;
574 }
575
576 error = add_socket_to_server(s, sock, addr, port_index, &sp);
577
578 done:
579 gpr_free(allocated_addr);
580
581 if (!error.ok()) {
582 grpc_error_handle error_out = GRPC_ERROR_CREATE_REFERENCING(
583 "Failed to add port to server", &error, 1);
584 error = error_out;
585 *port = -1;
586 } else {
587 CHECK(sp != NULL);
588 *port = sp->port;
589 }
590 return error;
591 }
592
tcp_server_start(grpc_tcp_server * s,const std::vector<grpc_pollset * > *)593 static void tcp_server_start(grpc_tcp_server* s,
594 const std::vector<grpc_pollset*>* /*pollsets*/) {
595 grpc_tcp_listener* sp;
596 gpr_mu_lock(&s->mu);
597 CHECK_EQ(s->active_ports, 0u);
598 for (sp = s->head; sp; sp = sp->next) {
599 CHECK(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
600 s->active_ports++;
601 }
602 gpr_mu_unlock(&s->mu);
603 }
604
tcp_server_port_fd_count(grpc_tcp_server *,unsigned)605 static unsigned tcp_server_port_fd_count(grpc_tcp_server* /* s */,
606 unsigned /* port_index */) {
607 return 0;
608 }
609
tcp_server_port_fd(grpc_tcp_server *,unsigned,unsigned)610 static int tcp_server_port_fd(grpc_tcp_server* /* s */,
611 unsigned /* port_index */,
612 unsigned /* fd_index */) {
613 return -1;
614 }
615
tcp_server_create_fd_handler(grpc_tcp_server *)616 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
617 grpc_tcp_server* /* s */) {
618 return nullptr;
619 }
620
tcp_server_shutdown_listeners(grpc_tcp_server *)621 static void tcp_server_shutdown_listeners(grpc_tcp_server* /* s */) {}
622
tcp_pre_allocated_fd(grpc_tcp_server *)623 static int tcp_pre_allocated_fd(grpc_tcp_server* /* s */) { return -1; }
624
tcp_set_pre_allocated_fd(grpc_tcp_server *,int)625 static void tcp_set_pre_allocated_fd(grpc_tcp_server* /* s */, int /* fd */) {}
626
627 grpc_tcp_server_vtable grpc_windows_tcp_server_vtable = {
628 tcp_server_create, tcp_server_start,
629 tcp_server_add_port, tcp_server_create_fd_handler,
630 tcp_server_port_fd_count, tcp_server_port_fd,
631 tcp_server_ref, tcp_server_shutdown_starting_add,
632 tcp_server_unref, tcp_server_shutdown_listeners,
633 tcp_pre_allocated_fd, tcp_set_pre_allocated_fd};
634
635 // ---- EventEngine shim ------------------------------------------------------
636
637 namespace {
638
event_engine_create(grpc_closure * shutdown_complete,const EndpointConfig & config,grpc_tcp_server_cb on_accept_cb,void * on_accept_cb_arg,grpc_tcp_server ** server)639 static grpc_error_handle event_engine_create(grpc_closure* shutdown_complete,
640 const EndpointConfig& config,
641 grpc_tcp_server_cb on_accept_cb,
642 void* on_accept_cb_arg,
643 grpc_tcp_server** server) {
644 // On Windows, the event_engine_listener experiment only supports the
645 // default engine
646 WindowsEventEngine* engine_ptr = reinterpret_cast<WindowsEventEngine*>(
647 config.GetVoidPointer(GRPC_INTERNAL_ARG_EVENT_ENGINE));
648 grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
649 CHECK_NE(on_accept_cb, nullptr);
650 auto accept_cb = [s, on_accept_cb, on_accept_cb_arg](
651 std::unique_ptr<EventEngine::Endpoint> endpoint,
652 MemoryAllocator memory_allocator) {
653 grpc_core::ApplicationCallbackExecCtx app_ctx;
654 grpc_core::ExecCtx exec_ctx;
655 grpc_tcp_server_acceptor* acceptor =
656 static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
657 acceptor->from_server = s;
658 acceptor->port_index = -1;
659 acceptor->fd_index = -1;
660 acceptor->external_connection = false;
661 on_accept_cb(on_accept_cb_arg,
662 grpc_event_engine_endpoint_create(std::move(endpoint)),
663 nullptr, acceptor);
664 };
665 auto on_shutdown = [shutdown_complete](absl::Status status) {
666 RunEventEngineClosure(shutdown_complete, status);
667 };
668 grpc_core::RefCountedPtr<grpc_core::ResourceQuota> resource_quota;
669 {
670 void* tmp_quota = config.GetVoidPointer(GRPC_ARG_RESOURCE_QUOTA);
671 CHECK_NE(tmp_quota, nullptr);
672 resource_quota =
673 reinterpret_cast<grpc_core::ResourceQuota*>(tmp_quota)->Ref();
674 }
675 gpr_ref_init(&s->refs, 1);
676 gpr_mu_init(&s->mu);
677 s->ee_listener = new WindowsEventEngineListener(
678 engine_ptr->poller(), std::move(accept_cb), std::move(on_shutdown),
679 std::make_unique<MemoryQuotaBasedMemoryAllocatorFactory>(
680 resource_quota->memory_quota()),
681 engine_ptr->shared_from_this(), engine_ptr->thread_pool(), config);
682 s->active_ports = -1;
683 s->on_accept_cb = [](void* /* arg */, grpc_endpoint* /* ep */,
684 grpc_pollset* /* accepting_pollset */,
685 grpc_tcp_server_acceptor* /* acceptor */) {
686 grpc_core::Crash("iomgr on_accept_cb callback should be unused");
687 };
688 s->on_accept_cb_arg = nullptr;
689 s->head = nullptr;
690 s->tail = nullptr;
691 s->shutdown_starting.head = nullptr;
692 s->shutdown_starting.tail = nullptr;
693 s->shutdown_complete = grpc_core::NewClosure([](absl::Status) {
694 grpc_core::Crash("iomgr shutdown_complete callback should be unused");
695 });
696 *server = s;
697 return absl::OkStatus();
698 }
699
event_engine_start(grpc_tcp_server * s,const std::vector<grpc_pollset * > *)700 static void event_engine_start(grpc_tcp_server* s,
701 const std::vector<grpc_pollset*>* /*pollsets*/) {
702 CHECK(s->ee_listener->Start().ok());
703 }
704
event_engine_add_port(grpc_tcp_server * s,const grpc_resolved_address * addr,int * port)705 static grpc_error_handle event_engine_add_port(
706 grpc_tcp_server* s, const grpc_resolved_address* addr, int* port) {
707 CHECK_NE(addr, nullptr);
708 CHECK_NE(port, nullptr);
709 auto ee_addr = CreateResolvedAddress(*addr);
710 auto out_port = s->ee_listener->Bind(ee_addr);
711 *port = out_port.ok() ? *out_port : -1;
712 return out_port.status();
713 }
714
event_engine_create_fd_handler(grpc_tcp_server *)715 static grpc_core::TcpServerFdHandler* event_engine_create_fd_handler(
716 grpc_tcp_server* /* s */) {
717 return nullptr;
718 }
719
event_engine_port_fd_count(grpc_tcp_server *,unsigned)720 static unsigned event_engine_port_fd_count(grpc_tcp_server* /* s */,
721 unsigned /* port_index */) {
722 return 0;
723 }
724
event_engine_port_fd(grpc_tcp_server *,unsigned,unsigned)725 static int event_engine_port_fd(grpc_tcp_server* /* s */,
726 unsigned /* port_index */,
727 unsigned /* fd_index */) {
728 return -1;
729 }
730
event_engine_ref(grpc_tcp_server * s)731 static grpc_tcp_server* event_engine_ref(grpc_tcp_server* s) {
732 gpr_ref_non_zero(&s->refs);
733 return s;
734 }
735
event_engine_shutdown_listeners(grpc_tcp_server * s)736 static void event_engine_shutdown_listeners(grpc_tcp_server* s) {
737 s->ee_listener->ShutdownListeners();
738 }
739
event_engine_unref(grpc_tcp_server * s)740 static void event_engine_unref(grpc_tcp_server* s) {
741 if (gpr_unref(&s->refs)) {
742 event_engine_shutdown_listeners(s);
743 gpr_mu_lock(&s->mu);
744 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
745 gpr_mu_unlock(&s->mu);
746 gpr_mu_destroy(&s->mu);
747 delete s->ee_listener;
748 gpr_free(s);
749 }
750 }
751
event_engine_shutdown_starting_add(grpc_tcp_server * s,grpc_closure * shutdown_starting)752 static void event_engine_shutdown_starting_add(
753 grpc_tcp_server* s, grpc_closure* shutdown_starting) {
754 gpr_mu_lock(&s->mu);
755 grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
756 absl::OkStatus());
757 gpr_mu_unlock(&s->mu);
758 }
759
760 } // namespace
761
762 grpc_tcp_server_vtable grpc_windows_event_engine_tcp_server_vtable = {
763 event_engine_create, event_engine_start,
764 event_engine_add_port, event_engine_create_fd_handler,
765 event_engine_port_fd_count, event_engine_port_fd,
766 event_engine_ref, event_engine_shutdown_starting_add,
767 event_engine_unref, event_engine_shutdown_listeners,
768 tcp_pre_allocated_fd, tcp_set_pre_allocated_fd};
769
770 #endif // GRPC_WINSOCK_SOCKET
771