1 // Copyright 2023 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #ifdef GPR_WINDOWS
17
18 #include "absl/log/check.h"
19 #include "absl/log/log.h"
20 #include "absl/status/status.h"
21 #include "absl/strings/str_format.h"
22 #include "src/core/lib/event_engine/tcp_socket_utils.h"
23 #include "src/core/lib/event_engine/windows/iocp.h"
24 #include "src/core/lib/event_engine/windows/win_socket.h"
25 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
26 #include "src/core/lib/event_engine/windows/windows_listener.h"
27 #include "src/core/lib/iomgr/error.h"
28 #include "src/core/lib/iomgr/port.h"
29 #include "src/core/util/crash.h"
30 #include "src/core/util/sync.h"
31
32 namespace grpc_event_engine {
33 namespace experimental {
34
35 // ---- SinglePortSocketListener::AsyncIOState ----
36
37 WindowsEventEngineListener::SinglePortSocketListener::AsyncIOState::
AsyncIOState(SinglePortSocketListener * port_listener,std::unique_ptr<WinSocket> listener_socket)38 AsyncIOState(SinglePortSocketListener* port_listener,
39 std::unique_ptr<WinSocket> listener_socket)
40 : port_listener(port_listener),
41 listener_socket(std::move(listener_socket)) {}
42
43 WindowsEventEngineListener::SinglePortSocketListener::AsyncIOState::
~AsyncIOState()44 ~AsyncIOState() {
45 closesocket(accept_socket);
46 }
47
48 void WindowsEventEngineListener::SinglePortSocketListener::
Run()49 OnAcceptCallbackWrapper::Run() {
50 CHECK_NE(io_state_, nullptr);
51 grpc_core::ReleasableMutexLock lock(&io_state_->mu);
52 if (io_state_->listener_socket->IsShutdown()) {
53 GRPC_TRACE_LOG(event_engine, INFO)
54 << "SinglePortSocketListener::" << io_state_->port_listener
55 << " listener socket is shut down. Shutting down listener.";
56 lock.Release();
57 io_state_.reset();
58 return;
59 }
60 io_state_->port_listener->OnAcceptCallbackLocked();
61 }
62
63 void WindowsEventEngineListener::SinglePortSocketListener::
Prime(std::shared_ptr<AsyncIOState> io_state)64 OnAcceptCallbackWrapper::Prime(std::shared_ptr<AsyncIOState> io_state) {
65 io_state_ = std::move(io_state);
66 }
67
68 // ---- SinglePortSocketListener ----
69
70 // TODO(hork): This may be refactored to share with posix engine.
UnlinkIfUnixDomainSocket(const EventEngine::ResolvedAddress & resolved_addr)71 void UnlinkIfUnixDomainSocket(
72 const EventEngine::ResolvedAddress& resolved_addr) {
73 #ifdef GRPC_HAVE_UNIX_SOCKET
74 if (resolved_addr.address()->sa_family != AF_UNIX) {
75 return;
76 }
77 struct sockaddr_un* un = reinterpret_cast<struct sockaddr_un*>(
78 const_cast<sockaddr*>(resolved_addr.address()));
79 // There is nothing to unlink for an abstract unix socket.
80 if (un->sun_path[0] == '\0' && un->sun_path[1] != '\0') {
81 return;
82 }
83 // For windows we need to remove the file instead of unlink.
84 DWORD attr = ::GetFileAttributesA(un->sun_path);
85 if (attr == INVALID_FILE_ATTRIBUTES) {
86 return;
87 }
88 if (attr & FILE_ATTRIBUTE_DIRECTORY || attr & FILE_ATTRIBUTE_READONLY) {
89 return;
90 }
91 ::DeleteFileA(un->sun_path);
92 #else
93 (void)resolved_addr;
94 #endif
95 }
96
97 WindowsEventEngineListener::SinglePortSocketListener::
~SinglePortSocketListener()98 ~SinglePortSocketListener() {
99 grpc_core::MutexLock lock(&io_state_->mu);
100 io_state_->listener_socket->Shutdown(DEBUG_LOCATION,
101 "~SinglePortSocketListener");
102 UnlinkIfUnixDomainSocket(listener_sockname());
103 GRPC_TRACE_LOG(event_engine, INFO) << "~SinglePortSocketListener::" << this;
104 }
105
106 absl::StatusOr<
107 std::unique_ptr<WindowsEventEngineListener::SinglePortSocketListener>>
Create(WindowsEventEngineListener * listener,SOCKET sock,EventEngine::ResolvedAddress addr)108 WindowsEventEngineListener::SinglePortSocketListener::Create(
109 WindowsEventEngineListener* listener, SOCKET sock,
110 EventEngine::ResolvedAddress addr) {
111 // We need to grab the AcceptEx pointer for that port, as it may be
112 // interface-dependent. We'll cache it to avoid doing that again.
113 GUID guid = WSAID_ACCEPTEX;
114 DWORD ioctl_num_bytes;
115 LPFN_ACCEPTEX AcceptEx;
116 int status =
117 WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
118 &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
119 if (status != 0) {
120 auto error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
121 closesocket(sock);
122 return error;
123 }
124 auto result = SinglePortSocketListener::PrepareListenerSocket(sock, addr);
125 GRPC_RETURN_IF_ERROR(result.status());
126 CHECK_GE(result->port, 0);
127 // Using `new` to access non-public constructor
128 return absl::WrapUnique(new SinglePortSocketListener(
129 listener, AcceptEx, /*win_socket=*/listener->iocp_->Watch(sock),
130 result->port, result->hostbyname));
131 }
132
Start()133 absl::Status WindowsEventEngineListener::SinglePortSocketListener::Start() {
134 grpc_core::MutexLock lock(&io_state_->mu);
135 return StartLocked();
136 }
137
138 absl::Status
StartLocked()139 WindowsEventEngineListener::SinglePortSocketListener::StartLocked() {
140 const EventEngine::ResolvedAddress addr = listener_sockname();
141 const int addr_family =
142 (addr.address()->sa_family == AF_UNIX) ? AF_UNIX : AF_INET6;
143 const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP;
144 SOCKET accept_socket = WSASocket(addr_family, SOCK_STREAM, protocol, NULL, 0,
145 IOCP::GetDefaultSocketFlags());
146 if (accept_socket == INVALID_SOCKET) {
147 return GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
148 }
149 auto fail = [&](absl::Status error) -> absl::Status {
150 if (accept_socket != INVALID_SOCKET) closesocket(accept_socket);
151 return error;
152 };
153 absl::Status error;
154 if (addr_family == AF_UNIX) {
155 error = SetSocketNonBlock(accept_socket);
156 } else {
157 error = PrepareSocket(accept_socket);
158 }
159 if (!error.ok()) return fail(error);
160 // Start the "accept" asynchronously.
161 io_state_->listener_socket->NotifyOnRead(&io_state_->on_accept_cb);
162 DWORD addrlen =
163 sizeof(addresses_) / 2; // half of the buffer is for remote addr.
164 DWORD bytes_received = 0;
165 int success =
166 AcceptEx(io_state_->listener_socket->raw_socket(), accept_socket,
167 addresses_, 0, addrlen, addrlen, &bytes_received,
168 io_state_->listener_socket->read_info()->overlapped());
169 // It is possible to get an accept immediately without delay. However, we
170 // will still get an IOCP notification for it. So let's just ignore it.
171 if (success != 0) {
172 int last_error = WSAGetLastError();
173 if (last_error != ERROR_IO_PENDING) {
174 io_state_->listener_socket->UnregisterReadCallback();
175 return fail(GRPC_WSA_ERROR(last_error, "AcceptEx"));
176 }
177 }
178 io_state_->accept_socket = accept_socket;
179 GRPC_TRACE_LOG(event_engine, INFO)
180 << "SinglePortSocketListener::" << this
181 << " listening. listener_socket::" << io_state_->listener_socket.get();
182 return absl::OkStatus();
183 }
184
185 void WindowsEventEngineListener::SinglePortSocketListener::
OnAcceptCallbackLocked()186 OnAcceptCallbackLocked() {
187 auto close_socket_and_restart =
188 [&](bool do_close_socket = true)
189 ABSL_EXCLUSIVE_LOCKS_REQUIRED(io_state_->mu) {
190 if (do_close_socket) closesocket(io_state_->accept_socket);
191 io_state_->accept_socket = INVALID_SOCKET;
192 CHECK(GRPC_LOG_IF_ERROR("SinglePortSocketListener::Start",
193 StartLocked()));
194 };
195 const auto& overlapped_result =
196 io_state_->listener_socket->read_info()->result();
197 if (overlapped_result.wsa_error != 0) {
198 LOG(ERROR) << GRPC_WSA_ERROR(overlapped_result.wsa_error,
199 "Skipping on_accept due to error");
200 return close_socket_and_restart();
201 }
202 SOCKET tmp_listener_socket = io_state_->listener_socket->raw_socket();
203 int err =
204 setsockopt(io_state_->accept_socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
205 reinterpret_cast<char*>(&tmp_listener_socket),
206 sizeof(tmp_listener_socket));
207 if (err != 0) {
208 LOG(ERROR) << GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt");
209 return close_socket_and_restart();
210 }
211 EventEngine::ResolvedAddress peer_address;
212 int peer_name_len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
213 err = getpeername(io_state_->accept_socket,
214 const_cast<sockaddr*>(peer_address.address()),
215 &peer_name_len);
216 if (err != 0) {
217 LOG(ERROR) << GRPC_WSA_ERROR(WSAGetLastError(), "getpeername");
218 return close_socket_and_restart();
219 }
220 peer_address =
221 EventEngine::ResolvedAddress(peer_address.address(), peer_name_len);
222 auto addr_uri = ResolvedAddressToURI(peer_address);
223 std::string peer_name = "unknown";
224 if (!addr_uri.ok()) {
225 // TODO(hork): test an early exit/restart here with end2end tests
226 LOG(ERROR) << "invalid peer name: " << addr_uri.status();
227 } else {
228 peer_name = *addr_uri;
229 }
230 auto endpoint = std::make_unique<WindowsEndpoint>(
231 peer_address, listener_->iocp_->Watch(io_state_->accept_socket),
232 listener_->memory_allocator_factory_->CreateMemoryAllocator(
233 absl::StrFormat("listener endpoint %s", peer_name)),
234 listener_->config_, listener_->thread_pool_, listener_->engine_);
235 listener_->accept_cb_(
236 std::move(endpoint),
237 listener_->memory_allocator_factory_->CreateMemoryAllocator(
238 absl::StrFormat("listener accept cb for %s", peer_name)));
239 close_socket_and_restart(/*do_close_socket=*/false);
240 }
241
SinglePortSocketListener(WindowsEventEngineListener * listener,LPFN_ACCEPTEX AcceptEx,std::unique_ptr<WinSocket> listener_socket,int port,EventEngine::ResolvedAddress hostbyname)242 WindowsEventEngineListener::SinglePortSocketListener::SinglePortSocketListener(
243 WindowsEventEngineListener* listener, LPFN_ACCEPTEX AcceptEx,
244 std::unique_ptr<WinSocket> listener_socket, int port,
245 EventEngine::ResolvedAddress hostbyname)
246 : AcceptEx(AcceptEx),
247 listener_(listener),
248 io_state_(
249 std::make_shared<AsyncIOState>(this, std::move(listener_socket))),
250 port_(port),
251 listener_sockname_(hostbyname) {
252 io_state_->on_accept_cb.Prime(io_state_);
253 }
254
255 absl::StatusOr<WindowsEventEngineListener::SinglePortSocketListener::
256 PrepareListenerSocketResult>
PrepareListenerSocket(SOCKET sock,const EventEngine::ResolvedAddress & addr)257 WindowsEventEngineListener::SinglePortSocketListener::PrepareListenerSocket(
258 SOCKET sock, const EventEngine::ResolvedAddress& addr) {
259 auto fail = [&](absl::Status error) -> absl::Status {
260 CHECK(!error.ok());
261 error = grpc_error_set_int(
262 GRPC_ERROR_CREATE_REFERENCING("Failed to prepare server socket", &error,
263 1),
264 grpc_core::StatusIntProperty::kFd, static_cast<intptr_t>(sock));
265 if (sock != INVALID_SOCKET) closesocket(sock);
266 return error;
267 };
268 absl::Status error;
269 if (addr.address()->sa_family == AF_UNIX) {
270 error = SetSocketNonBlock(sock);
271 } else {
272 error = PrepareSocket(sock);
273 }
274 if (!error.ok()) return fail(error);
275 UnlinkIfUnixDomainSocket(addr);
276 if (bind(sock, addr.address(), addr.size()) == SOCKET_ERROR) {
277 return fail(GRPC_WSA_ERROR(WSAGetLastError(), "bind"));
278 }
279 if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
280 return fail(GRPC_WSA_ERROR(WSAGetLastError(), "listen"));
281 }
282 int sockname_temp_len = sizeof(struct sockaddr_storage);
283 EventEngine::ResolvedAddress sockname_temp;
284 if (getsockname(sock, const_cast<sockaddr*>(sockname_temp.address()),
285 &sockname_temp_len) == SOCKET_ERROR) {
286 return fail(GRPC_WSA_ERROR(WSAGetLastError(), "getsockname"));
287 }
288 sockname_temp =
289 EventEngine::ResolvedAddress(sockname_temp.address(), sockname_temp_len);
290 return PrepareListenerSocketResult{ResolvedAddressGetPort(sockname_temp),
291 sockname_temp};
292 }
293
294 // ---- WindowsEventEngineListener ----
295
WindowsEventEngineListener(IOCP * iocp,AcceptCallback accept_cb,absl::AnyInvocable<void (absl::Status)> on_shutdown,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory,std::shared_ptr<EventEngine> engine,ThreadPool * thread_pool,const EndpointConfig & config)296 WindowsEventEngineListener::WindowsEventEngineListener(
297 IOCP* iocp, AcceptCallback accept_cb,
298 absl::AnyInvocable<void(absl::Status)> on_shutdown,
299 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory,
300 std::shared_ptr<EventEngine> engine, ThreadPool* thread_pool,
301 const EndpointConfig& config)
302 : iocp_(iocp),
303 config_(config),
304 engine_(std::move(engine)),
305 thread_pool_(thread_pool),
306 memory_allocator_factory_(std::move(memory_allocator_factory)),
307 accept_cb_(std::move(accept_cb)),
308 on_shutdown_(std::move(on_shutdown)) {}
309
~WindowsEventEngineListener()310 WindowsEventEngineListener::~WindowsEventEngineListener() {
311 GRPC_TRACE_LOG(event_engine, INFO) << "~WindowsEventEngineListener::" << this;
312 ShutdownListeners();
313 on_shutdown_(absl::OkStatus());
314 }
315
Bind(const EventEngine::ResolvedAddress & addr)316 absl::StatusOr<int> WindowsEventEngineListener::Bind(
317 const EventEngine::ResolvedAddress& addr) {
318 if (started_.load()) {
319 return absl::FailedPreconditionError(
320 absl::StrFormat("WindowsEventEngineListener::%p is already started, "
321 "ports can no longer be bound",
322 this));
323 }
324 int out_port = ResolvedAddressGetPort(addr);
325 EventEngine::ResolvedAddress out_addr(addr);
326 EventEngine::ResolvedAddress tmp_addr;
327 // Check if this is a wildcard port, and if so, try to keep the port the same
328 // as some previously created listener.
329 if (out_port == 0) {
330 grpc_core::MutexLock lock(&port_listeners_mu_);
331 for (const auto& port_listener : port_listeners_) {
332 tmp_addr = port_listener->listener_sockname();
333 out_port = ResolvedAddressGetPort(tmp_addr);
334 if (out_port > 0) {
335 ResolvedAddressSetPort(out_addr, out_port);
336 break;
337 }
338 }
339 }
340 if (ResolvedAddressToV4Mapped(out_addr, &tmp_addr)) {
341 out_addr = tmp_addr;
342 }
343 // Treat :: or 0.0.0.0 as a family-agnostic wildcard.
344 if (MaybeGetWildcardPortFromAddress(out_addr).has_value()) {
345 out_addr = ResolvedAddressMakeWild6(out_port);
346 }
347 // open the socket
348 const int addr_family =
349 (out_addr.address()->sa_family == AF_UNIX) ? AF_UNIX : AF_INET6;
350 const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP;
351 SOCKET sock = WSASocket(addr_family, SOCK_STREAM, protocol, nullptr, 0,
352 IOCP::GetDefaultSocketFlags());
353 if (sock == INVALID_SOCKET) {
354 auto error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
355 return GRPC_ERROR_CREATE_REFERENCING("Failed to add port to server", &error,
356 1);
357 }
358 auto port_listener = AddSinglePortSocketListener(sock, out_addr);
359 GRPC_RETURN_IF_ERROR(port_listener.status());
360 return (*port_listener)->port();
361 }
362
Start()363 absl::Status WindowsEventEngineListener::Start() {
364 CHECK(!started_.exchange(true));
365 grpc_core::MutexLock lock(&port_listeners_mu_);
366 for (auto& port_listener : port_listeners_) {
367 GRPC_RETURN_IF_ERROR(port_listener->Start());
368 }
369 return absl::OkStatus();
370 }
371
ShutdownListeners()372 void WindowsEventEngineListener::ShutdownListeners() {
373 grpc_core::MutexLock lock(&port_listeners_mu_);
374 if (std::exchange(listeners_shutdown_, true)) return;
375 // Shut down each port listener before destroying this EventEngine::Listener
376 for (auto& port_listener : port_listeners_) {
377 port_listener.reset();
378 }
379 }
380
381 absl::StatusOr<WindowsEventEngineListener::SinglePortSocketListener*>
AddSinglePortSocketListener(SOCKET sock,EventEngine::ResolvedAddress addr)382 WindowsEventEngineListener::AddSinglePortSocketListener(
383 SOCKET sock, EventEngine::ResolvedAddress addr) {
384 auto single_port_listener =
385 SinglePortSocketListener::Create(this, sock, addr);
386 GRPC_RETURN_IF_ERROR(single_port_listener.status());
387 auto* single_port_listener_ptr = single_port_listener->get();
388 grpc_core::MutexLock lock(&port_listeners_mu_);
389 port_listeners_.emplace_back(std::move(*single_port_listener));
390 if (started_.load()) {
391 LOG(ERROR) << "WindowsEventEngineListener::" << this
392 << " Bind was called concurrently while the Listener was "
393 "starting. This is invalid usage, all ports must be bound "
394 "before the Listener is started.";
395 GRPC_RETURN_IF_ERROR(single_port_listener_ptr->Start());
396 }
397 return single_port_listener_ptr;
398 }
399
400 } // namespace experimental
401 } // namespace grpc_event_engine
402
403 #endif // GPR_WINDOWS
404