1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #ifdef GPR_WINDOWS
17 #include <grpc/support/alloc.h>
18 #include <grpc/support/log_windows.h>
19
20 #include "src/core/lib/event_engine/tcp_socket_utils.h"
21 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
22 #include "src/core/lib/event_engine/trace.h"
23 #include "src/core/lib/event_engine/windows/win_socket.h"
24 #include "src/core/lib/gprpp/debug_location.h"
25 #include "src/core/lib/gprpp/sync.h"
26 #include "src/core/lib/iomgr/error.h"
27
28 #if defined(__MSYS__) && defined(GPR_ARCH_64)
29 // Nasty workaround for nasty bug when using the 64 bits msys compiler
30 // in conjunction with Microsoft Windows headers.
31 #define GRPC_FIONBIO _IOW('f', 126, uint32_t)
32 #else
33 #define GRPC_FIONBIO FIONBIO
34 #endif
35
36 namespace grpc_event_engine {
37 namespace experimental {
38
39 // ---- WinSocket ----
40
WinSocket(SOCKET socket,ThreadPool * thread_pool)41 WinSocket::WinSocket(SOCKET socket, ThreadPool* thread_pool) noexcept
42 : socket_(socket),
43 thread_pool_(thread_pool),
44 read_info_(this),
45 write_info_(this) {}
46
~WinSocket()47 WinSocket::~WinSocket() {
48 GPR_ASSERT(is_shutdown_.load());
49 GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WinSocket::%p destroyed", this);
50 }
51
raw_socket()52 SOCKET WinSocket::raw_socket() { return socket_; }
53
Shutdown()54 void WinSocket::Shutdown() {
55 // if already shutdown, return early. Otherwise, set the shutdown flag.
56 if (is_shutdown_.exchange(true)) {
57 GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WinSocket::%p already shutting down",
58 this);
59 return;
60 }
61 // Grab the function pointer for DisconnectEx for that specific socket.
62 // It may change depending on the interface.
63 GUID guid = WSAID_DISCONNECTEX;
64 LPFN_DISCONNECTEX DisconnectEx;
65 DWORD ioctl_num_bytes;
66 int status = WSAIoctl(socket_, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid,
67 sizeof(guid), &DisconnectEx, sizeof(DisconnectEx),
68 &ioctl_num_bytes, NULL, NULL);
69
70 if (status == 0) {
71 DisconnectEx(socket_, NULL, 0, 0);
72 } else {
73 char* utf8_message = gpr_format_message(WSAGetLastError());
74 gpr_log(GPR_INFO, "Unable to retrieve DisconnectEx pointer : %s",
75 utf8_message);
76 gpr_free(utf8_message);
77 }
78 closesocket(socket_);
79 GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WinSocket::%p socket closed", this);
80 }
81
Shutdown(const grpc_core::DebugLocation & location,absl::string_view reason)82 void WinSocket::Shutdown(const grpc_core::DebugLocation& location,
83 absl::string_view reason) {
84 GRPC_EVENT_ENGINE_ENDPOINT_TRACE(
85 "WinSocket::%p Shut down from %s:%d. Reason: %s", this, location.file(),
86 location.line(), reason.data());
87 Shutdown();
88 }
89
NotifyOnReady(OpState & info,EventEngine::Closure * closure)90 void WinSocket::NotifyOnReady(OpState& info, EventEngine::Closure* closure) {
91 if (IsShutdown()) {
92 info.SetError(WSAESHUTDOWN);
93 thread_pool_->Run(closure);
94 return;
95 };
96 // It is an error if any notification is already registered for this socket.
97 GPR_ASSERT(std::exchange(info.closure_, closure) == nullptr);
98 }
99
NotifyOnRead(EventEngine::Closure * on_read)100 void WinSocket::NotifyOnRead(EventEngine::Closure* on_read) {
101 NotifyOnReady(read_info_, on_read);
102 }
103
NotifyOnWrite(EventEngine::Closure * on_write)104 void WinSocket::NotifyOnWrite(EventEngine::Closure* on_write) {
105 NotifyOnReady(write_info_, on_write);
106 }
107
UnregisterReadCallback()108 void WinSocket::UnregisterReadCallback() {
109 GPR_ASSERT(std::exchange(read_info_.closure_, nullptr) != nullptr);
110 }
111
UnregisterWriteCallback()112 void WinSocket::UnregisterWriteCallback() {
113 GPR_ASSERT(std::exchange(write_info_.closure_, nullptr) != nullptr);
114 }
115
116 // ---- WinSocket::OpState ----
117
OpState(WinSocket * win_socket)118 WinSocket::OpState::OpState(WinSocket* win_socket) noexcept
119 : win_socket_(win_socket) {
120 memset(&overlapped_, 0, sizeof(OVERLAPPED));
121 }
122
SetReady()123 void WinSocket::OpState::SetReady() {
124 auto* closure = std::exchange(closure_, nullptr);
125 // If an IOCP event is returned for a socket, and no callback has been
126 // registered for notification, this is invalid usage.
127 GPR_ASSERT(closure != nullptr);
128 win_socket_->thread_pool_->Run(closure);
129 }
130
SetError(int wsa_error)131 void WinSocket::OpState::SetError(int wsa_error) {
132 result_ = OverlappedResult{/*wsa_error=*/wsa_error, /*bytes_transferred=*/0};
133 }
134
SetResult(OverlappedResult result)135 void WinSocket::OpState::SetResult(OverlappedResult result) {
136 result_ = result;
137 }
138
SetErrorStatus(absl::Status error_status)139 void WinSocket::OpState::SetErrorStatus(absl::Status error_status) {
140 result_ = OverlappedResult{/*wsa_error=*/0, /*bytes_transferred=*/0,
141 /*error_status=*/error_status};
142 }
143
GetOverlappedResult()144 void WinSocket::OpState::GetOverlappedResult() {
145 GetOverlappedResult(win_socket_->raw_socket());
146 }
147
GetOverlappedResult(SOCKET sock)148 void WinSocket::OpState::GetOverlappedResult(SOCKET sock) {
149 if (win_socket_->IsShutdown()) {
150 result_ = OverlappedResult{/*wsa_error=*/WSA_OPERATION_ABORTED,
151 /*bytes_transferred=*/0};
152 return;
153 }
154 DWORD flags = 0;
155 DWORD bytes;
156 BOOL success =
157 WSAGetOverlappedResult(sock, &overlapped_, &bytes, FALSE, &flags);
158 result_ = OverlappedResult{/*wsa_error=*/success ? 0 : WSAGetLastError(),
159 /*bytes_transferred=*/bytes};
160 }
161
IsShutdown()162 bool WinSocket::IsShutdown() { return is_shutdown_.load(); }
163
GetOpInfoForOverlapped(OVERLAPPED * overlapped)164 WinSocket::OpState* WinSocket::GetOpInfoForOverlapped(OVERLAPPED* overlapped) {
165 GRPC_EVENT_ENGINE_POLLER_TRACE(
166 "WinSocket::%p looking for matching OVERLAPPED::%p. "
167 "read(%p) write(%p)",
168 this, overlapped, &read_info_.overlapped_, &write_info_.overlapped_);
169 if (overlapped == &read_info_.overlapped_) return &read_info_;
170 if (overlapped == &write_info_.overlapped_) return &write_info_;
171 return nullptr;
172 }
173
174 namespace {
175
grpc_tcp_set_non_block(SOCKET sock)176 grpc_error_handle grpc_tcp_set_non_block(SOCKET sock) {
177 int status;
178 uint32_t param = 1;
179 DWORD ret;
180 status = WSAIoctl(sock, GRPC_FIONBIO, ¶m, sizeof(param), NULL, 0, &ret,
181 NULL, NULL);
182 return status == 0
183 ? absl::OkStatus()
184 : GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)");
185 }
186
set_dualstack(SOCKET sock)187 static grpc_error_handle set_dualstack(SOCKET sock) {
188 int status;
189 DWORD param = 0;
190 status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)¶m,
191 sizeof(param));
192 return status == 0
193 ? absl::OkStatus()
194 : GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)");
195 }
196
enable_socket_low_latency(SOCKET sock)197 static grpc_error_handle enable_socket_low_latency(SOCKET sock) {
198 int status;
199 BOOL param = TRUE;
200 status = ::setsockopt(sock, IPPROTO_TCP, TCP_NODELAY,
201 reinterpret_cast<char*>(¶m), sizeof(param));
202 if (status == SOCKET_ERROR) {
203 status = WSAGetLastError();
204 }
205 return status == 0 ? absl::OkStatus()
206 : GRPC_WSA_ERROR(status, "setsockopt(TCP_NODELAY)");
207 }
208
209 } // namespace
210
SetSocketNonBlock(SOCKET sock)211 absl::Status SetSocketNonBlock(SOCKET sock) {
212 return grpc_tcp_set_non_block(sock);
213 }
214
PrepareSocket(SOCKET sock)215 absl::Status PrepareSocket(SOCKET sock) {
216 absl::Status err;
217 err = grpc_tcp_set_non_block(sock);
218 if (!err.ok()) return err;
219 err = enable_socket_low_latency(sock);
220 if (!err.ok()) return err;
221 err = set_dualstack(sock);
222 if (!err.ok()) return err;
223 return absl::OkStatus();
224 }
225
226 } // namespace experimental
227 } // namespace grpc_event_engine
228
229 #endif // GPR_WINDOWS
230