• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #ifdef GPR_WINDOWS
17 
18 #include "absl/log/check.h"
19 #include "absl/log/log.h"
20 #include "absl/status/status.h"
21 #include "absl/strings/str_format.h"
22 #include "src/core/lib/event_engine/tcp_socket_utils.h"
23 #include "src/core/lib/event_engine/windows/iocp.h"
24 #include "src/core/lib/event_engine/windows/win_socket.h"
25 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
26 #include "src/core/lib/event_engine/windows/windows_listener.h"
27 #include "src/core/lib/iomgr/error.h"
28 #include "src/core/lib/iomgr/port.h"
29 #include "src/core/util/crash.h"
30 #include "src/core/util/sync.h"
31 
32 namespace grpc_event_engine {
33 namespace experimental {
34 
35 // ---- SinglePortSocketListener::AsyncIOState ----
36 
37 WindowsEventEngineListener::SinglePortSocketListener::AsyncIOState::
AsyncIOState(SinglePortSocketListener * port_listener,std::unique_ptr<WinSocket> listener_socket)38     AsyncIOState(SinglePortSocketListener* port_listener,
39                  std::unique_ptr<WinSocket> listener_socket)
40     : port_listener(port_listener),
41       listener_socket(std::move(listener_socket)) {}
42 
43 WindowsEventEngineListener::SinglePortSocketListener::AsyncIOState::
~AsyncIOState()44     ~AsyncIOState() {
45   closesocket(accept_socket);
46 }
47 
48 void WindowsEventEngineListener::SinglePortSocketListener::
Run()49     OnAcceptCallbackWrapper::Run() {
50   CHECK_NE(io_state_, nullptr);
51   grpc_core::ReleasableMutexLock lock(&io_state_->mu);
52   if (io_state_->listener_socket->IsShutdown()) {
53     GRPC_TRACE_LOG(event_engine, INFO)
54         << "SinglePortSocketListener::" << io_state_->port_listener
55         << " listener socket is shut down. Shutting down listener.";
56     lock.Release();
57     io_state_.reset();
58     return;
59   }
60   io_state_->port_listener->OnAcceptCallbackLocked();
61 }
62 
63 void WindowsEventEngineListener::SinglePortSocketListener::
Prime(std::shared_ptr<AsyncIOState> io_state)64     OnAcceptCallbackWrapper::Prime(std::shared_ptr<AsyncIOState> io_state) {
65   io_state_ = std::move(io_state);
66 }
67 
68 // ---- SinglePortSocketListener ----
69 
70 // TODO(hork): This may be refactored to share with posix engine.
UnlinkIfUnixDomainSocket(const EventEngine::ResolvedAddress & resolved_addr)71 void UnlinkIfUnixDomainSocket(
72     const EventEngine::ResolvedAddress& resolved_addr) {
73 #ifdef GRPC_HAVE_UNIX_SOCKET
74   if (resolved_addr.address()->sa_family != AF_UNIX) {
75     return;
76   }
77   struct sockaddr_un* un = reinterpret_cast<struct sockaddr_un*>(
78       const_cast<sockaddr*>(resolved_addr.address()));
79   // There is nothing to unlink for an abstract unix socket.
80   if (un->sun_path[0] == '\0' && un->sun_path[1] != '\0') {
81     return;
82   }
83   // For windows we need to remove the file instead of unlink.
84   DWORD attr = ::GetFileAttributesA(un->sun_path);
85   if (attr == INVALID_FILE_ATTRIBUTES) {
86     return;
87   }
88   if (attr & FILE_ATTRIBUTE_DIRECTORY || attr & FILE_ATTRIBUTE_READONLY) {
89     return;
90   }
91   ::DeleteFileA(un->sun_path);
92 #else
93   (void)resolved_addr;
94 #endif
95 }
96 
97 WindowsEventEngineListener::SinglePortSocketListener::
~SinglePortSocketListener()98     ~SinglePortSocketListener() {
99   grpc_core::MutexLock lock(&io_state_->mu);
100   io_state_->listener_socket->Shutdown(DEBUG_LOCATION,
101                                        "~SinglePortSocketListener");
102   UnlinkIfUnixDomainSocket(listener_sockname());
103   GRPC_TRACE_LOG(event_engine, INFO) << "~SinglePortSocketListener::" << this;
104 }
105 
106 absl::StatusOr<
107     std::unique_ptr<WindowsEventEngineListener::SinglePortSocketListener>>
Create(WindowsEventEngineListener * listener,SOCKET sock,EventEngine::ResolvedAddress addr)108 WindowsEventEngineListener::SinglePortSocketListener::Create(
109     WindowsEventEngineListener* listener, SOCKET sock,
110     EventEngine::ResolvedAddress addr) {
111   // We need to grab the AcceptEx pointer for that port, as it may be
112   // interface-dependent. We'll cache it to avoid doing that again.
113   GUID guid = WSAID_ACCEPTEX;
114   DWORD ioctl_num_bytes;
115   LPFN_ACCEPTEX AcceptEx;
116   int status =
117       WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
118                &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
119   if (status != 0) {
120     auto error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
121     closesocket(sock);
122     return error;
123   }
124   auto result = SinglePortSocketListener::PrepareListenerSocket(sock, addr);
125   GRPC_RETURN_IF_ERROR(result.status());
126   CHECK_GE(result->port, 0);
127   // Using `new` to access non-public constructor
128   return absl::WrapUnique(new SinglePortSocketListener(
129       listener, AcceptEx, /*win_socket=*/listener->iocp_->Watch(sock),
130       result->port, result->hostbyname));
131 }
132 
Start()133 absl::Status WindowsEventEngineListener::SinglePortSocketListener::Start() {
134   grpc_core::MutexLock lock(&io_state_->mu);
135   return StartLocked();
136 }
137 
138 absl::Status
StartLocked()139 WindowsEventEngineListener::SinglePortSocketListener::StartLocked() {
140   const EventEngine::ResolvedAddress addr = listener_sockname();
141   const int addr_family =
142       (addr.address()->sa_family == AF_UNIX) ? AF_UNIX : AF_INET6;
143   const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP;
144   SOCKET accept_socket = WSASocket(addr_family, SOCK_STREAM, protocol, NULL, 0,
145                                    IOCP::GetDefaultSocketFlags());
146   if (accept_socket == INVALID_SOCKET) {
147     return GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
148   }
149   auto fail = [&](absl::Status error) -> absl::Status {
150     if (accept_socket != INVALID_SOCKET) closesocket(accept_socket);
151     return error;
152   };
153   absl::Status error;
154   if (addr_family == AF_UNIX) {
155     error = SetSocketNonBlock(accept_socket);
156   } else {
157     error = PrepareSocket(accept_socket);
158   }
159   if (!error.ok()) return fail(error);
160   // Start the "accept" asynchronously.
161   io_state_->listener_socket->NotifyOnRead(&io_state_->on_accept_cb);
162   DWORD addrlen =
163       sizeof(addresses_) / 2;  // half of the buffer is for remote addr.
164   DWORD bytes_received = 0;
165   int success =
166       AcceptEx(io_state_->listener_socket->raw_socket(), accept_socket,
167                addresses_, 0, addrlen, addrlen, &bytes_received,
168                io_state_->listener_socket->read_info()->overlapped());
169   // It is possible to get an accept immediately without delay. However, we
170   // will still get an IOCP notification for it. So let's just ignore it.
171   if (success != 0) {
172     int last_error = WSAGetLastError();
173     if (last_error != ERROR_IO_PENDING) {
174       io_state_->listener_socket->UnregisterReadCallback();
175       return fail(GRPC_WSA_ERROR(last_error, "AcceptEx"));
176     }
177   }
178   io_state_->accept_socket = accept_socket;
179   GRPC_TRACE_LOG(event_engine, INFO)
180       << "SinglePortSocketListener::" << this
181       << " listening. listener_socket::" << io_state_->listener_socket.get();
182   return absl::OkStatus();
183 }
184 
185 void WindowsEventEngineListener::SinglePortSocketListener::
OnAcceptCallbackLocked()186     OnAcceptCallbackLocked() {
187   auto close_socket_and_restart =
188       [&](bool do_close_socket = true)
189           ABSL_EXCLUSIVE_LOCKS_REQUIRED(io_state_->mu) {
190             if (do_close_socket) closesocket(io_state_->accept_socket);
191             io_state_->accept_socket = INVALID_SOCKET;
192             CHECK(GRPC_LOG_IF_ERROR("SinglePortSocketListener::Start",
193                                     StartLocked()));
194           };
195   const auto& overlapped_result =
196       io_state_->listener_socket->read_info()->result();
197   if (overlapped_result.wsa_error != 0) {
198     LOG(ERROR) << GRPC_WSA_ERROR(overlapped_result.wsa_error,
199                                  "Skipping on_accept due to error");
200     return close_socket_and_restart();
201   }
202   SOCKET tmp_listener_socket = io_state_->listener_socket->raw_socket();
203   int err =
204       setsockopt(io_state_->accept_socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
205                  reinterpret_cast<char*>(&tmp_listener_socket),
206                  sizeof(tmp_listener_socket));
207   if (err != 0) {
208     LOG(ERROR) << GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt");
209     return close_socket_and_restart();
210   }
211   EventEngine::ResolvedAddress peer_address;
212   int peer_name_len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
213   err = getpeername(io_state_->accept_socket,
214                     const_cast<sockaddr*>(peer_address.address()),
215                     &peer_name_len);
216   if (err != 0) {
217     LOG(ERROR) << GRPC_WSA_ERROR(WSAGetLastError(), "getpeername");
218     return close_socket_and_restart();
219   }
220   peer_address =
221       EventEngine::ResolvedAddress(peer_address.address(), peer_name_len);
222   auto addr_uri = ResolvedAddressToURI(peer_address);
223   std::string peer_name = "unknown";
224   if (!addr_uri.ok()) {
225     // TODO(hork): test an early exit/restart here with end2end tests
226     LOG(ERROR) << "invalid peer name: " << addr_uri.status();
227   } else {
228     peer_name = *addr_uri;
229   }
230   auto endpoint = std::make_unique<WindowsEndpoint>(
231       peer_address, listener_->iocp_->Watch(io_state_->accept_socket),
232       listener_->memory_allocator_factory_->CreateMemoryAllocator(
233           absl::StrFormat("listener endpoint %s", peer_name)),
234       listener_->config_, listener_->thread_pool_, listener_->engine_);
235   listener_->accept_cb_(
236       std::move(endpoint),
237       listener_->memory_allocator_factory_->CreateMemoryAllocator(
238           absl::StrFormat("listener accept cb for %s", peer_name)));
239   close_socket_and_restart(/*do_close_socket=*/false);
240 }
241 
SinglePortSocketListener(WindowsEventEngineListener * listener,LPFN_ACCEPTEX AcceptEx,std::unique_ptr<WinSocket> listener_socket,int port,EventEngine::ResolvedAddress hostbyname)242 WindowsEventEngineListener::SinglePortSocketListener::SinglePortSocketListener(
243     WindowsEventEngineListener* listener, LPFN_ACCEPTEX AcceptEx,
244     std::unique_ptr<WinSocket> listener_socket, int port,
245     EventEngine::ResolvedAddress hostbyname)
246     : AcceptEx(AcceptEx),
247       listener_(listener),
248       io_state_(
249           std::make_shared<AsyncIOState>(this, std::move(listener_socket))),
250       port_(port),
251       listener_sockname_(hostbyname) {
252   io_state_->on_accept_cb.Prime(io_state_);
253 }
254 
255 absl::StatusOr<WindowsEventEngineListener::SinglePortSocketListener::
256                    PrepareListenerSocketResult>
PrepareListenerSocket(SOCKET sock,const EventEngine::ResolvedAddress & addr)257 WindowsEventEngineListener::SinglePortSocketListener::PrepareListenerSocket(
258     SOCKET sock, const EventEngine::ResolvedAddress& addr) {
259   auto fail = [&](absl::Status error) -> absl::Status {
260     CHECK(!error.ok());
261     error = grpc_error_set_int(
262         GRPC_ERROR_CREATE_REFERENCING("Failed to prepare server socket", &error,
263                                       1),
264         grpc_core::StatusIntProperty::kFd, static_cast<intptr_t>(sock));
265     if (sock != INVALID_SOCKET) closesocket(sock);
266     return error;
267   };
268   absl::Status error;
269   if (addr.address()->sa_family == AF_UNIX) {
270     error = SetSocketNonBlock(sock);
271   } else {
272     error = PrepareSocket(sock);
273   }
274   if (!error.ok()) return fail(error);
275   UnlinkIfUnixDomainSocket(addr);
276   if (bind(sock, addr.address(), addr.size()) == SOCKET_ERROR) {
277     return fail(GRPC_WSA_ERROR(WSAGetLastError(), "bind"));
278   }
279   if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
280     return fail(GRPC_WSA_ERROR(WSAGetLastError(), "listen"));
281   }
282   int sockname_temp_len = sizeof(struct sockaddr_storage);
283   EventEngine::ResolvedAddress sockname_temp;
284   if (getsockname(sock, const_cast<sockaddr*>(sockname_temp.address()),
285                   &sockname_temp_len) == SOCKET_ERROR) {
286     return fail(GRPC_WSA_ERROR(WSAGetLastError(), "getsockname"));
287   }
288   sockname_temp =
289       EventEngine::ResolvedAddress(sockname_temp.address(), sockname_temp_len);
290   return PrepareListenerSocketResult{ResolvedAddressGetPort(sockname_temp),
291                                      sockname_temp};
292 }
293 
294 // ---- WindowsEventEngineListener ----
295 
WindowsEventEngineListener(IOCP * iocp,AcceptCallback accept_cb,absl::AnyInvocable<void (absl::Status)> on_shutdown,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory,std::shared_ptr<EventEngine> engine,ThreadPool * thread_pool,const EndpointConfig & config)296 WindowsEventEngineListener::WindowsEventEngineListener(
297     IOCP* iocp, AcceptCallback accept_cb,
298     absl::AnyInvocable<void(absl::Status)> on_shutdown,
299     std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory,
300     std::shared_ptr<EventEngine> engine, ThreadPool* thread_pool,
301     const EndpointConfig& config)
302     : iocp_(iocp),
303       config_(config),
304       engine_(std::move(engine)),
305       thread_pool_(thread_pool),
306       memory_allocator_factory_(std::move(memory_allocator_factory)),
307       accept_cb_(std::move(accept_cb)),
308       on_shutdown_(std::move(on_shutdown)) {}
309 
~WindowsEventEngineListener()310 WindowsEventEngineListener::~WindowsEventEngineListener() {
311   GRPC_TRACE_LOG(event_engine, INFO) << "~WindowsEventEngineListener::" << this;
312   ShutdownListeners();
313   on_shutdown_(absl::OkStatus());
314 }
315 
Bind(const EventEngine::ResolvedAddress & addr)316 absl::StatusOr<int> WindowsEventEngineListener::Bind(
317     const EventEngine::ResolvedAddress& addr) {
318   if (started_.load()) {
319     return absl::FailedPreconditionError(
320         absl::StrFormat("WindowsEventEngineListener::%p is already started, "
321                         "ports can no longer be bound",
322                         this));
323   }
324   int out_port = ResolvedAddressGetPort(addr);
325   EventEngine::ResolvedAddress out_addr(addr);
326   EventEngine::ResolvedAddress tmp_addr;
327   // Check if this is a  wildcard port, and if so, try to keep the port the same
328   // as some previously created listener.
329   if (out_port == 0) {
330     grpc_core::MutexLock lock(&port_listeners_mu_);
331     for (const auto& port_listener : port_listeners_) {
332       tmp_addr = port_listener->listener_sockname();
333       out_port = ResolvedAddressGetPort(tmp_addr);
334       if (out_port > 0) {
335         ResolvedAddressSetPort(out_addr, out_port);
336         break;
337       }
338     }
339   }
340   if (ResolvedAddressToV4Mapped(out_addr, &tmp_addr)) {
341     out_addr = tmp_addr;
342   }
343   // Treat :: or 0.0.0.0 as a family-agnostic wildcard.
344   if (MaybeGetWildcardPortFromAddress(out_addr).has_value()) {
345     out_addr = ResolvedAddressMakeWild6(out_port);
346   }
347   // open the socket
348   const int addr_family =
349       (out_addr.address()->sa_family == AF_UNIX) ? AF_UNIX : AF_INET6;
350   const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP;
351   SOCKET sock = WSASocket(addr_family, SOCK_STREAM, protocol, nullptr, 0,
352                           IOCP::GetDefaultSocketFlags());
353   if (sock == INVALID_SOCKET) {
354     auto error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
355     return GRPC_ERROR_CREATE_REFERENCING("Failed to add port to server", &error,
356                                          1);
357   }
358   auto port_listener = AddSinglePortSocketListener(sock, out_addr);
359   GRPC_RETURN_IF_ERROR(port_listener.status());
360   return (*port_listener)->port();
361 }
362 
Start()363 absl::Status WindowsEventEngineListener::Start() {
364   CHECK(!started_.exchange(true));
365   grpc_core::MutexLock lock(&port_listeners_mu_);
366   for (auto& port_listener : port_listeners_) {
367     GRPC_RETURN_IF_ERROR(port_listener->Start());
368   }
369   return absl::OkStatus();
370 }
371 
ShutdownListeners()372 void WindowsEventEngineListener::ShutdownListeners() {
373   grpc_core::MutexLock lock(&port_listeners_mu_);
374   if (std::exchange(listeners_shutdown_, true)) return;
375   // Shut down each port listener before destroying this EventEngine::Listener
376   for (auto& port_listener : port_listeners_) {
377     port_listener.reset();
378   }
379 }
380 
381 absl::StatusOr<WindowsEventEngineListener::SinglePortSocketListener*>
AddSinglePortSocketListener(SOCKET sock,EventEngine::ResolvedAddress addr)382 WindowsEventEngineListener::AddSinglePortSocketListener(
383     SOCKET sock, EventEngine::ResolvedAddress addr) {
384   auto single_port_listener =
385       SinglePortSocketListener::Create(this, sock, addr);
386   GRPC_RETURN_IF_ERROR(single_port_listener.status());
387   auto* single_port_listener_ptr = single_port_listener->get();
388   grpc_core::MutexLock lock(&port_listeners_mu_);
389   port_listeners_.emplace_back(std::move(*single_port_listener));
390   if (started_.load()) {
391     LOG(ERROR) << "WindowsEventEngineListener::" << this
392                << " Bind was called concurrently while the Listener was "
393                   "starting. This is invalid usage, all ports must be bound "
394                   "before the Listener is started.";
395     GRPC_RETURN_IF_ERROR(single_port_listener_ptr->Start());
396   }
397   return single_port_listener_ptr;
398 }
399 
400 }  // namespace experimental
401 }  // namespace grpc_event_engine
402 
403 #endif  // GPR_WINDOWS
404