• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_WINSOCK_SOCKET
24 
25 #include <grpc/event_engine/endpoint_config.h>
26 #include <grpc/event_engine/event_engine.h>
27 #include <grpc/event_engine/memory_allocator.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log_windows.h>
30 #include <grpc/support/string_util.h>
31 #include <grpc/support/sync.h>
32 #include <grpc/support/time.h>
33 #include <inttypes.h>
34 #include <io.h>
35 
36 #include <vector>
37 
38 #include "absl/log/check.h"
39 #include "absl/log/log.h"
40 #include "absl/strings/str_cat.h"
41 #include "src/core/lib/address_utils/sockaddr_utils.h"
42 #include "src/core/lib/event_engine/memory_allocator_factory.h"
43 #include "src/core/lib/event_engine/resolved_address_internal.h"
44 #include "src/core/lib/event_engine/tcp_socket_utils.h"
45 #include "src/core/lib/event_engine/windows/windows_engine.h"
46 #include "src/core/lib/event_engine/windows/windows_listener.h"
47 #include "src/core/lib/iomgr/closure.h"
48 #include "src/core/lib/iomgr/event_engine_shims/closure.h"
49 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
50 #include "src/core/lib/iomgr/iocp_windows.h"
51 #include "src/core/lib/iomgr/pollset_windows.h"
52 #include "src/core/lib/iomgr/resolve_address.h"
53 #include "src/core/lib/iomgr/sockaddr.h"
54 #include "src/core/lib/iomgr/socket_windows.h"
55 #include "src/core/lib/iomgr/tcp_server.h"
56 #include "src/core/lib/iomgr/tcp_windows.h"
57 #include "src/core/lib/resource_quota/api.h"
58 #include "src/core/lib/resource_quota/resource_quota.h"
59 #include "src/core/lib/slice/slice_internal.h"
60 #include "src/core/util/crash.h"
61 
62 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
63 
64 namespace {
65 using ::grpc_event_engine::experimental::CreateResolvedAddress;
66 using ::grpc_event_engine::experimental::EndpointConfig;
67 using ::grpc_event_engine::experimental::EventEngine;
68 using ::grpc_event_engine::experimental::grpc_event_engine_endpoint_create;
69 using ::grpc_event_engine::experimental::MemoryAllocator;
70 using ::grpc_event_engine::experimental::MemoryQuotaBasedMemoryAllocatorFactory;
71 using ::grpc_event_engine::experimental::ResolvedAddressSetPort;
72 using ::grpc_event_engine::experimental::RunEventEngineClosure;
73 using ::grpc_event_engine::experimental::WindowsEventEngine;
74 using ::grpc_event_engine::experimental::WindowsEventEngineListener;
75 }  // namespace
76 
77 // one listening port
78 typedef struct grpc_tcp_listener grpc_tcp_listener;
79 struct grpc_tcp_listener {
80   // Buffer to hold the local and remote address.
81   // This seemingly magic number comes from AcceptEx's documentation. each
82   // address buffer needs to have at least 16 more bytes at their end.
83 #ifdef GRPC_HAVE_UNIX_SOCKET
84   // unix addr is larger than ip addr.
85   uint8_t addresses[(sizeof(sockaddr_un) + 16) * 2] = {};
86 #else
87   uint8_t addresses[(sizeof(grpc_sockaddr_in6) + 16) * 2];
88 #endif  // GRPC_HAVE_UNIX_SOCKET
89   // This will hold the socket for the next accept.
90   SOCKET new_socket;
91   // The listener winsocket.
92   grpc_winsocket* socket;
93   // address of listener
94   grpc_resolved_address resolved_addr;
95   // The actual TCP port number.
96   int port;
97   unsigned port_index;
98   grpc_tcp_server* server;
99   // The cached AcceptEx for that port.
100   LPFN_ACCEPTEX AcceptEx;
101   int shutting_down;
102   int outstanding_calls;
103   // closure for socket notification of accept being ready
104   grpc_closure on_accept;
105   // linked list
106   struct grpc_tcp_listener* next;
107 };
108 
109 // the overall server
110 struct grpc_tcp_server {
111   gpr_refcount refs;
112   // Called whenever accept() succeeds on a server port.
113   grpc_tcp_server_cb on_accept_cb;
114   void* on_accept_cb_arg;
115 
116   gpr_mu mu;
117 
118   // active port count: how many ports are actually still listening
119   int active_ports;
120 
121   // linked list of server ports
122   grpc_tcp_listener* head;
123   grpc_tcp_listener* tail;
124 
125   // List of closures passed to shutdown_starting_add().
126   grpc_closure_list shutdown_starting;
127 
128   // shutdown callback
129   grpc_closure* shutdown_complete;
130 
131   // used for the EventEngine shim
132   WindowsEventEngineListener* ee_listener;
133 };
134 
135 // TODO(hork): This may be refactored to share with posix engine and event
136 // engine.
unlink_if_unix_domain_socket(const grpc_resolved_address * resolved_addr)137 void unlink_if_unix_domain_socket(const grpc_resolved_address* resolved_addr) {
138 #ifdef GRPC_HAVE_UNIX_SOCKET
139   const grpc_sockaddr* addr =
140       reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
141   if (addr->sa_family != AF_UNIX) {
142     return;
143   }
144   struct sockaddr_un* un =
145       reinterpret_cast<struct sockaddr_un*>(const_cast<sockaddr*>(addr));
146   // There is nothing to unlink for an abstract unix socket.
147   if (un->sun_path[0] == '\0' && un->sun_path[1] != '\0') {
148     return;
149   }
150   // For windows we need to remove the file instead of unlink.
151   DWORD attr = ::GetFileAttributesA(un->sun_path);
152   if (attr == INVALID_FILE_ATTRIBUTES) {
153     return;
154   }
155   if (attr & FILE_ATTRIBUTE_DIRECTORY || attr & FILE_ATTRIBUTE_READONLY) {
156     return;
157   }
158   ::DeleteFileA(un->sun_path);
159 #else
160   (void)resolved_addr;
161 #endif
162 }
163 
164 // Public function. Allocates the proper data structures to hold a
165 // grpc_tcp_server.
tcp_server_create(grpc_closure * shutdown_complete,const EndpointConfig &,grpc_tcp_server_cb on_accept_cb,void * on_accept_cb_arg,grpc_tcp_server ** server)166 static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
167                                            const EndpointConfig& /* config */,
168                                            grpc_tcp_server_cb on_accept_cb,
169                                            void* on_accept_cb_arg,
170                                            grpc_tcp_server** server) {
171   grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
172   gpr_ref_init(&s->refs, 1);
173   gpr_mu_init(&s->mu);
174   s->active_ports = 0;
175   s->on_accept_cb = on_accept_cb;
176   s->on_accept_cb_arg = on_accept_cb_arg;
177   s->head = NULL;
178   s->tail = NULL;
179   s->shutdown_starting.head = NULL;
180   s->shutdown_starting.tail = NULL;
181   s->shutdown_complete = shutdown_complete;
182   *server = s;
183   return absl::OkStatus();
184 }
185 
destroy_server(void * arg,grpc_error_handle)186 static void destroy_server(void* arg, grpc_error_handle /* error */) {
187   grpc_tcp_server* s = (grpc_tcp_server*)arg;
188 
189   // Now that the accepts have been aborted, we can destroy the sockets.
190   // The IOCP won't get notified on these, so we can flag them as already
191   // closed by the system.
192   while (s->head) {
193     grpc_tcp_listener* sp = s->head;
194     s->head = sp->next;
195     sp->next = NULL;
196     grpc_winsocket_destroy(sp->socket);
197     unlink_if_unix_domain_socket(&sp->resolved_addr);
198     gpr_free(sp);
199   }
200   gpr_mu_destroy(&s->mu);
201   gpr_free(s);
202 }
203 
finish_shutdown_locked(grpc_tcp_server * s)204 static void finish_shutdown_locked(grpc_tcp_server* s) {
205   if (s->shutdown_complete != NULL) {
206     grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
207                             absl::OkStatus());
208   }
209 
210   grpc_core::ExecCtx::Run(
211       DEBUG_LOCATION,
212       GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx),
213       absl::OkStatus());
214 }
215 
tcp_server_ref(grpc_tcp_server * s)216 static grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
217   gpr_ref_non_zero(&s->refs);
218   return s;
219 }
220 
tcp_server_shutdown_starting_add(grpc_tcp_server * s,grpc_closure * shutdown_starting)221 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
222                                              grpc_closure* shutdown_starting) {
223   gpr_mu_lock(&s->mu);
224   grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
225                            absl::OkStatus());
226   gpr_mu_unlock(&s->mu);
227 }
228 
tcp_server_destroy(grpc_tcp_server * s)229 static void tcp_server_destroy(grpc_tcp_server* s) {
230   grpc_tcp_listener* sp;
231   gpr_mu_lock(&s->mu);
232   // First, shutdown all fd's. This will queue abortion calls for all
233   // of the pending accepts due to the normal operation mechanism.
234   if (s->active_ports == 0) {
235     finish_shutdown_locked(s);
236   } else {
237     for (sp = s->head; sp; sp = sp->next) {
238       sp->shutting_down = 1;
239       grpc_winsocket_shutdown(sp->socket);
240     }
241   }
242   gpr_mu_unlock(&s->mu);
243 }
244 
tcp_server_unref(grpc_tcp_server * s)245 static void tcp_server_unref(grpc_tcp_server* s) {
246   if (gpr_unref(&s->refs)) {
247     grpc_tcp_server_shutdown_listeners(s);
248     gpr_mu_lock(&s->mu);
249     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
250     gpr_mu_unlock(&s->mu);
251     tcp_server_destroy(s);
252   }
253 }
254 
255 // Prepare (bind) a recently-created socket for listening.
prepare_socket(SOCKET sock,const grpc_resolved_address * addr,int * port)256 static grpc_error_handle prepare_socket(SOCKET sock,
257                                         const grpc_resolved_address* addr,
258                                         int* port) {
259   grpc_resolved_address sockname_temp;
260   grpc_error_handle error;
261   int sockname_temp_len;
262   if (grpc_sockaddr_get_family(addr) == AF_UNIX) {
263     error = grpc_tcp_set_non_block(sock);
264   } else {
265     error = grpc_tcp_prepare_socket(sock);
266   }
267   if (!error.ok()) {
268     goto failure;
269   }
270   unlink_if_unix_domain_socket(addr);
271   if (bind(sock, (const grpc_sockaddr*)addr->addr, (int)addr->len) ==
272       SOCKET_ERROR) {
273     error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
274     goto failure;
275   }
276 
277   if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
278     error = GRPC_WSA_ERROR(WSAGetLastError(), "listen");
279     goto failure;
280   }
281 
282   sockname_temp_len = sizeof(struct sockaddr_storage);
283   if (getsockname(sock, (grpc_sockaddr*)sockname_temp.addr,
284                   &sockname_temp_len) == SOCKET_ERROR) {
285     error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
286     goto failure;
287   }
288   sockname_temp.len = (size_t)sockname_temp_len;
289 
290   *port = grpc_sockaddr_get_port(&sockname_temp);
291   return absl::OkStatus();
292 
293 failure:
294   CHECK(!error.ok());
295   error = grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING(
296                                  "Failed to prepare server socket", &error, 1),
297                              grpc_core::StatusIntProperty::kFd, (intptr_t)sock);
298   if (sock != INVALID_SOCKET) closesocket(sock);
299   return error;
300 }
301 
decrement_active_ports_and_notify_locked(grpc_tcp_listener * sp)302 static void decrement_active_ports_and_notify_locked(grpc_tcp_listener* sp) {
303   sp->shutting_down = 0;
304   CHECK_GT(sp->server->active_ports, 0u);
305   if (0 == --sp->server->active_ports) {
306     finish_shutdown_locked(sp->server);
307   }
308 }
309 
310 // In order to do an async accept, we need to create a socket first which
311 // will be the one assigned to the new incoming connection.
start_accept_locked(grpc_tcp_listener * port)312 static grpc_error_handle start_accept_locked(grpc_tcp_listener* port) {
313   SOCKET sock = INVALID_SOCKET;
314   BOOL success;
315   const DWORD addrlen = sizeof(port->addresses) / 2;
316   DWORD bytes_received = 0;
317   grpc_error_handle error;
318 
319   if (port->shutting_down) {
320     return absl::OkStatus();
321   }
322   const int addr_family =
323       grpc_sockaddr_get_family(&port->resolved_addr) == AF_UNIX ? AF_UNIX
324                                                                 : AF_INET6;
325   const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP;
326   sock = WSASocket(addr_family, SOCK_STREAM, protocol, NULL, 0,
327                    grpc_get_default_wsa_socket_flags());
328   if (sock == INVALID_SOCKET) {
329     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
330     goto failure;
331   }
332   if (addr_family == AF_UNIX) {
333     error = grpc_tcp_set_non_block(sock);
334   } else {
335     error = grpc_tcp_prepare_socket(sock);
336   }
337   if (!error.ok()) goto failure;
338 
339   // Start the "accept" asynchronously.
340   success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
341                            addrlen, addrlen, &bytes_received,
342                            &port->socket->read_info.overlapped);
343 
344   // It is possible to get an accept immediately without delay. However, we
345   // will still get an IOCP notification for it. So let's just ignore it.
346   if (!success) {
347     int last_error = WSAGetLastError();
348     if (last_error != ERROR_IO_PENDING) {
349       error = GRPC_WSA_ERROR(last_error, "AcceptEx");
350       goto failure;
351     }
352   }
353 
354   // We're ready to do the accept. Calling grpc_socket_notify_on_read may
355   // immediately process an accept that happened in the meantime.
356   port->new_socket = sock;
357   grpc_socket_notify_on_read(port->socket, &port->on_accept);
358   port->outstanding_calls++;
359   return error;
360 
361 failure:
362   CHECK(!error.ok());
363   if (sock != INVALID_SOCKET) closesocket(sock);
364   return error;
365 }
366 
367 // Event manager callback when reads are ready.
on_accept(void * arg,grpc_error_handle error)368 static void on_accept(void* arg, grpc_error_handle error) {
369   grpc_tcp_listener* sp = (grpc_tcp_listener*)arg;
370   SOCKET sock = sp->new_socket;
371   grpc_winsocket_callback_info* info = &sp->socket->read_info;
372   grpc_endpoint* ep = NULL;
373   grpc_resolved_address peer_name;
374   DWORD transferred_bytes;
375   DWORD flags;
376   BOOL wsa_success;
377   int err;
378 
379   gpr_mu_lock(&sp->server->mu);
380 
381   peer_name.len = sizeof(struct sockaddr_storage);
382 
383   // The general mechanism for shutting down is to queue abortion calls. While
384   // this is necessary in the read/write case, it's useless for the accept
385   // case. We only need to adjust the pending callback count
386   if (!error.ok()) {
387     VLOG(2) << "Skipping on_accept due to error: "
388             << grpc_core::StatusToString(error);
389 
390     gpr_mu_unlock(&sp->server->mu);
391     return;
392   }
393   // The IOCP notified us of a completed operation. Let's grab the results,
394   // and act accordingly.
395   transferred_bytes = 0;
396   wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
397                                        &transferred_bytes, FALSE, &flags);
398   if (!wsa_success) {
399     if (!sp->shutting_down) {
400       char* utf8_message = gpr_format_message(WSAGetLastError());
401       LOG(ERROR) << "on_accept error: " << utf8_message;
402       gpr_free(utf8_message);
403     }
404     closesocket(sock);
405   } else {
406     if (!sp->shutting_down) {
407       err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
408                        (char*)&sp->socket->socket, sizeof(sp->socket->socket));
409       if (err) {
410         char* utf8_message = gpr_format_message(WSAGetLastError());
411         LOG(ERROR) << "setsockopt error: " << utf8_message;
412         gpr_free(utf8_message);
413       }
414       int peer_name_len = (int)peer_name.len;
415       err = getpeername(sock, (grpc_sockaddr*)peer_name.addr, &peer_name_len);
416       peer_name.len = (size_t)peer_name_len;
417       std::string peer_name_string;
418       if (!err) {
419         auto addr_uri = grpc_sockaddr_to_uri(&peer_name);
420         if (addr_uri.ok()) {
421           peer_name_string = addr_uri.value();
422         } else {
423           LOG(ERROR) << "invalid peer name: " << addr_uri.status();
424         }
425       } else {
426         char* utf8_message = gpr_format_message(WSAGetLastError());
427         LOG(ERROR) << "getpeername error: " << utf8_message;
428         gpr_free(utf8_message);
429       }
430       std::string fd_name = absl::StrCat("tcp_server:", peer_name_string);
431       ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name.c_str()),
432                            peer_name_string);
433     } else {
434       closesocket(sock);
435     }
436   }
437 
438   // The only time we should call our callback, is where we successfully
439   // managed to accept a connection, and created an endpoint.
440   if (ep) {
441     // Create acceptor.
442     grpc_tcp_server_acceptor* acceptor =
443         (grpc_tcp_server_acceptor*)gpr_malloc(sizeof(*acceptor));
444     acceptor->from_server = sp->server;
445     acceptor->port_index = sp->port_index;
446     acceptor->fd_index = 0;
447     acceptor->external_connection = false;
448     sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, NULL, acceptor);
449   }
450   // As we were notified from the IOCP of one and exactly one accept,
451   // the former socked we created has now either been destroy or assigned
452   // to the new connection. We need to create a new one for the next
453   // connection.
454   CHECK(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
455   if (0 == --sp->outstanding_calls) {
456     decrement_active_ports_and_notify_locked(sp);
457   }
458   gpr_mu_unlock(&sp->server->mu);
459 }
460 
add_socket_to_server(grpc_tcp_server * s,SOCKET sock,const grpc_resolved_address * addr,unsigned port_index,grpc_tcp_listener ** listener)461 static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, SOCKET sock,
462                                               const grpc_resolved_address* addr,
463                                               unsigned port_index,
464                                               grpc_tcp_listener** listener) {
465   grpc_tcp_listener* sp = NULL;
466   int port = -1;
467   int status;
468   GUID guid = WSAID_ACCEPTEX;
469   DWORD ioctl_num_bytes;
470   LPFN_ACCEPTEX AcceptEx;
471   grpc_error_handle error;
472 
473   // We need to grab the AcceptEx pointer for that port, as it may be
474   // interface-dependent. We'll cache it to avoid doing that again.
475   status =
476       WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
477                &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
478 
479   if (status != 0) {
480     error = GRPC_WSA_ERROR(WSAGetLastError(), "AcceptEx pointer retrieval");
481     closesocket(sock);
482     return error;
483   }
484 
485   error = prepare_socket(sock, addr, &port);
486   if (!error.ok()) {
487     return error;
488   }
489 
490   CHECK_GE(port, 0);
491   gpr_mu_lock(&s->mu);
492   sp = (grpc_tcp_listener*)gpr_malloc(sizeof(grpc_tcp_listener));
493   sp->next = NULL;
494   if (s->head == NULL) {
495     s->head = sp;
496   } else {
497     s->tail->next = sp;
498   }
499   s->tail = sp;
500   sp->server = s;
501   sp->socket = grpc_winsocket_create(sock, "listener");
502   sp->shutting_down = 0;
503   sp->outstanding_calls = 0;
504   sp->AcceptEx = AcceptEx;
505   sp->new_socket = INVALID_SOCKET;
506   sp->resolved_addr = *addr;
507   sp->port = port;
508   sp->port_index = port_index;
509   GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
510   CHECK(sp->socket);
511   gpr_mu_unlock(&s->mu);
512   *listener = sp;
513 
514   return absl::OkStatus();
515 }
516 
tcp_server_add_port(grpc_tcp_server * s,const grpc_resolved_address * addr,int * port)517 static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
518                                              const grpc_resolved_address* addr,
519                                              int* port) {
520   grpc_tcp_listener* sp = NULL;
521   SOCKET sock;
522   grpc_resolved_address addr6_v4mapped;
523   grpc_resolved_address wildcard;
524   grpc_resolved_address* allocated_addr = NULL;
525   unsigned port_index = 0;
526   grpc_error_handle error;
527 
528   if (s->tail != NULL) {
529     port_index = s->tail->port_index + 1;
530   }
531 
532   // Check if this is a wildcard port, and if so, try to keep the port the same
533   // as some previously created listener.
534   if (grpc_sockaddr_get_port(addr) == 0) {
535     for (sp = s->head; sp; sp = sp->next) {
536       grpc_resolved_address sockname_temp;
537       int sockname_temp_len = sizeof(struct sockaddr_storage);
538       if (0 == getsockname(sp->socket->socket,
539                            (grpc_sockaddr*)sockname_temp.addr,
540                            &sockname_temp_len)) {
541         sockname_temp.len = (size_t)sockname_temp_len;
542         *port = grpc_sockaddr_get_port(&sockname_temp);
543         if (*port > 0) {
544           allocated_addr =
545               (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address));
546           memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
547           grpc_sockaddr_set_port(allocated_addr, *port);
548           addr = allocated_addr;
549           break;
550         }
551       }
552     }
553   }
554 
555   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
556     addr = &addr6_v4mapped;
557   }
558 
559   // Treat :: or 0.0.0.0 as a family-agnostic wildcard.
560   if (grpc_sockaddr_is_wildcard(addr, port)) {
561     grpc_sockaddr_make_wildcard6(*port, &wildcard);
562 
563     addr = &wildcard;
564   }
565 
566   const int addr_family =
567       grpc_sockaddr_get_family(addr) == AF_UNIX ? AF_UNIX : AF_INET6;
568   const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP;
569   sock = WSASocket(addr_family, SOCK_STREAM, protocol, NULL, 0,
570                    grpc_get_default_wsa_socket_flags());
571   if (sock == INVALID_SOCKET) {
572     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
573     goto done;
574   }
575 
576   error = add_socket_to_server(s, sock, addr, port_index, &sp);
577 
578 done:
579   gpr_free(allocated_addr);
580 
581   if (!error.ok()) {
582     grpc_error_handle error_out = GRPC_ERROR_CREATE_REFERENCING(
583         "Failed to add port to server", &error, 1);
584     error = error_out;
585     *port = -1;
586   } else {
587     CHECK(sp != NULL);
588     *port = sp->port;
589   }
590   return error;
591 }
592 
tcp_server_start(grpc_tcp_server * s,const std::vector<grpc_pollset * > *)593 static void tcp_server_start(grpc_tcp_server* s,
594                              const std::vector<grpc_pollset*>* /*pollsets*/) {
595   grpc_tcp_listener* sp;
596   gpr_mu_lock(&s->mu);
597   CHECK_EQ(s->active_ports, 0u);
598   for (sp = s->head; sp; sp = sp->next) {
599     CHECK(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
600     s->active_ports++;
601   }
602   gpr_mu_unlock(&s->mu);
603 }
604 
tcp_server_port_fd_count(grpc_tcp_server *,unsigned)605 static unsigned tcp_server_port_fd_count(grpc_tcp_server* /* s */,
606                                          unsigned /* port_index */) {
607   return 0;
608 }
609 
tcp_server_port_fd(grpc_tcp_server *,unsigned,unsigned)610 static int tcp_server_port_fd(grpc_tcp_server* /* s */,
611                               unsigned /* port_index */,
612                               unsigned /* fd_index */) {
613   return -1;
614 }
615 
tcp_server_create_fd_handler(grpc_tcp_server *)616 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
617     grpc_tcp_server* /* s */) {
618   return nullptr;
619 }
620 
tcp_server_shutdown_listeners(grpc_tcp_server *)621 static void tcp_server_shutdown_listeners(grpc_tcp_server* /* s */) {}
622 
tcp_pre_allocated_fd(grpc_tcp_server *)623 static int tcp_pre_allocated_fd(grpc_tcp_server* /* s */) { return -1; }
624 
tcp_set_pre_allocated_fd(grpc_tcp_server *,int)625 static void tcp_set_pre_allocated_fd(grpc_tcp_server* /* s */, int /* fd */) {}
626 
627 grpc_tcp_server_vtable grpc_windows_tcp_server_vtable = {
628     tcp_server_create,        tcp_server_start,
629     tcp_server_add_port,      tcp_server_create_fd_handler,
630     tcp_server_port_fd_count, tcp_server_port_fd,
631     tcp_server_ref,           tcp_server_shutdown_starting_add,
632     tcp_server_unref,         tcp_server_shutdown_listeners,
633     tcp_pre_allocated_fd,     tcp_set_pre_allocated_fd};
634 
635 // ---- EventEngine shim ------------------------------------------------------
636 
637 namespace {
638 
event_engine_create(grpc_closure * shutdown_complete,const EndpointConfig & config,grpc_tcp_server_cb on_accept_cb,void * on_accept_cb_arg,grpc_tcp_server ** server)639 static grpc_error_handle event_engine_create(grpc_closure* shutdown_complete,
640                                              const EndpointConfig& config,
641                                              grpc_tcp_server_cb on_accept_cb,
642                                              void* on_accept_cb_arg,
643                                              grpc_tcp_server** server) {
644   // On Windows, the event_engine_listener experiment only supports the
645   // default engine
646   WindowsEventEngine* engine_ptr = reinterpret_cast<WindowsEventEngine*>(
647       config.GetVoidPointer(GRPC_INTERNAL_ARG_EVENT_ENGINE));
648   grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
649   CHECK_NE(on_accept_cb, nullptr);
650   auto accept_cb = [s, on_accept_cb, on_accept_cb_arg](
651                        std::unique_ptr<EventEngine::Endpoint> endpoint,
652                        MemoryAllocator memory_allocator) {
653     grpc_core::ApplicationCallbackExecCtx app_ctx;
654     grpc_core::ExecCtx exec_ctx;
655     grpc_tcp_server_acceptor* acceptor =
656         static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
657     acceptor->from_server = s;
658     acceptor->port_index = -1;
659     acceptor->fd_index = -1;
660     acceptor->external_connection = false;
661     on_accept_cb(on_accept_cb_arg,
662                  grpc_event_engine_endpoint_create(std::move(endpoint)),
663                  nullptr, acceptor);
664   };
665   auto on_shutdown = [shutdown_complete](absl::Status status) {
666     RunEventEngineClosure(shutdown_complete, status);
667   };
668   grpc_core::RefCountedPtr<grpc_core::ResourceQuota> resource_quota;
669   {
670     void* tmp_quota = config.GetVoidPointer(GRPC_ARG_RESOURCE_QUOTA);
671     CHECK_NE(tmp_quota, nullptr);
672     resource_quota =
673         reinterpret_cast<grpc_core::ResourceQuota*>(tmp_quota)->Ref();
674   }
675   gpr_ref_init(&s->refs, 1);
676   gpr_mu_init(&s->mu);
677   s->ee_listener = new WindowsEventEngineListener(
678       engine_ptr->poller(), std::move(accept_cb), std::move(on_shutdown),
679       std::make_unique<MemoryQuotaBasedMemoryAllocatorFactory>(
680           resource_quota->memory_quota()),
681       engine_ptr->shared_from_this(), engine_ptr->thread_pool(), config);
682   s->active_ports = -1;
683   s->on_accept_cb = [](void* /* arg */, grpc_endpoint* /* ep */,
684                        grpc_pollset* /* accepting_pollset */,
685                        grpc_tcp_server_acceptor* /* acceptor */) {
686     grpc_core::Crash("iomgr on_accept_cb callback should be unused");
687   };
688   s->on_accept_cb_arg = nullptr;
689   s->head = nullptr;
690   s->tail = nullptr;
691   s->shutdown_starting.head = nullptr;
692   s->shutdown_starting.tail = nullptr;
693   s->shutdown_complete = grpc_core::NewClosure([](absl::Status) {
694     grpc_core::Crash("iomgr shutdown_complete callback should be unused");
695   });
696   *server = s;
697   return absl::OkStatus();
698 }
699 
event_engine_start(grpc_tcp_server * s,const std::vector<grpc_pollset * > *)700 static void event_engine_start(grpc_tcp_server* s,
701                                const std::vector<grpc_pollset*>* /*pollsets*/) {
702   CHECK(s->ee_listener->Start().ok());
703 }
704 
event_engine_add_port(grpc_tcp_server * s,const grpc_resolved_address * addr,int * port)705 static grpc_error_handle event_engine_add_port(
706     grpc_tcp_server* s, const grpc_resolved_address* addr, int* port) {
707   CHECK_NE(addr, nullptr);
708   CHECK_NE(port, nullptr);
709   auto ee_addr = CreateResolvedAddress(*addr);
710   auto out_port = s->ee_listener->Bind(ee_addr);
711   *port = out_port.ok() ? *out_port : -1;
712   return out_port.status();
713 }
714 
event_engine_create_fd_handler(grpc_tcp_server *)715 static grpc_core::TcpServerFdHandler* event_engine_create_fd_handler(
716     grpc_tcp_server* /* s */) {
717   return nullptr;
718 }
719 
event_engine_port_fd_count(grpc_tcp_server *,unsigned)720 static unsigned event_engine_port_fd_count(grpc_tcp_server* /* s */,
721                                            unsigned /* port_index */) {
722   return 0;
723 }
724 
event_engine_port_fd(grpc_tcp_server *,unsigned,unsigned)725 static int event_engine_port_fd(grpc_tcp_server* /* s */,
726                                 unsigned /* port_index */,
727                                 unsigned /* fd_index */) {
728   return -1;
729 }
730 
event_engine_ref(grpc_tcp_server * s)731 static grpc_tcp_server* event_engine_ref(grpc_tcp_server* s) {
732   gpr_ref_non_zero(&s->refs);
733   return s;
734 }
735 
event_engine_shutdown_listeners(grpc_tcp_server * s)736 static void event_engine_shutdown_listeners(grpc_tcp_server* s) {
737   s->ee_listener->ShutdownListeners();
738 }
739 
event_engine_unref(grpc_tcp_server * s)740 static void event_engine_unref(grpc_tcp_server* s) {
741   if (gpr_unref(&s->refs)) {
742     event_engine_shutdown_listeners(s);
743     gpr_mu_lock(&s->mu);
744     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
745     gpr_mu_unlock(&s->mu);
746     gpr_mu_destroy(&s->mu);
747     delete s->ee_listener;
748     gpr_free(s);
749   }
750 }
751 
event_engine_shutdown_starting_add(grpc_tcp_server * s,grpc_closure * shutdown_starting)752 static void event_engine_shutdown_starting_add(
753     grpc_tcp_server* s, grpc_closure* shutdown_starting) {
754   gpr_mu_lock(&s->mu);
755   grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
756                            absl::OkStatus());
757   gpr_mu_unlock(&s->mu);
758 }
759 
760 }  // namespace
761 
762 grpc_tcp_server_vtable grpc_windows_event_engine_tcp_server_vtable = {
763     event_engine_create,        event_engine_start,
764     event_engine_add_port,      event_engine_create_fd_handler,
765     event_engine_port_fd_count, event_engine_port_fd,
766     event_engine_ref,           event_engine_shutdown_starting_add,
767     event_engine_unref,         event_engine_shutdown_listeners,
768     tcp_pre_allocated_fd,       tcp_set_pre_allocated_fd};
769 
770 #endif  // GRPC_WINSOCK_SOCKET
771