1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #ifdef GPR_WINDOWS
17 #include <grpc/support/alloc.h>
18 #include <grpc/support/log_windows.h>
19
20 #include "absl/log/check.h"
21 #include "absl/log/log.h"
22 #include "src/core/lib/event_engine/tcp_socket_utils.h"
23 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
24 #include "src/core/lib/event_engine/windows/win_socket.h"
25 #include "src/core/lib/iomgr/error.h"
26 #include "src/core/util/debug_location.h"
27 #include "src/core/util/sync.h"
28
29 #if defined(__MSYS__) && defined(GPR_ARCH_64)
30 // Nasty workaround for nasty bug when using the 64 bits msys compiler
31 // in conjunction with Microsoft Windows headers.
32 #define GRPC_FIONBIO _IOW('f', 126, uint32_t)
33 #else
34 #define GRPC_FIONBIO FIONBIO
35 #endif
36
37 namespace grpc_event_engine {
38 namespace experimental {
39
40 // ---- WinSocket ----
41
WinSocket(SOCKET socket,ThreadPool * thread_pool)42 WinSocket::WinSocket(SOCKET socket, ThreadPool* thread_pool) noexcept
43 : socket_(socket),
44 thread_pool_(thread_pool),
45 read_info_(this),
46 write_info_(this) {}
47
~WinSocket()48 WinSocket::~WinSocket() {
49 CHECK(is_shutdown_.load());
50 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
51 << "WinSocket::" << this << " destroyed";
52 }
53
raw_socket()54 SOCKET WinSocket::raw_socket() { return socket_; }
55
Shutdown()56 void WinSocket::Shutdown() {
57 // if already shutdown, return early. Otherwise, set the shutdown flag.
58 if (is_shutdown_.exchange(true)) {
59 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
60 << "WinSocket::" << this << " already shutting down";
61 return;
62 }
63 // Grab the function pointer for DisconnectEx for that specific socket.
64 // It may change depending on the interface.
65 GUID guid = WSAID_DISCONNECTEX;
66 LPFN_DISCONNECTEX DisconnectEx;
67 DWORD ioctl_num_bytes;
68 int status = WSAIoctl(socket_, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid,
69 sizeof(guid), &DisconnectEx, sizeof(DisconnectEx),
70 &ioctl_num_bytes, NULL, NULL);
71 if (status != 0) {
72 char* utf8_message = gpr_format_message(WSAGetLastError());
73 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
74 << "Unable to retrieve DisconnectEx pointer : " << utf8_message;
75 gpr_free(utf8_message);
76 } else if (DisconnectEx(socket_, NULL, 0, 0) == FALSE) {
77 auto last_error = WSAGetLastError();
78 // DisconnectEx may be called when the socket is not connected. Ignore that
79 // error, and log all others.
80 if (last_error != WSAENOTCONN) {
81 char* utf8_message = gpr_format_message(last_error);
82 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
83 << "DisconnectEx failed: " << utf8_message;
84 gpr_free(utf8_message);
85 }
86 }
87 closesocket(socket_);
88 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
89 << "WinSocket::" << this << " socket closed";
90 }
91
Shutdown(const grpc_core::DebugLocation & location,absl::string_view reason)92 void WinSocket::Shutdown(const grpc_core::DebugLocation& location,
93 absl::string_view reason) {
94 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
95 << "WinSocket::" << this << " Shut down from " << location.file() << ":"
96 << location.line() << ". Reason: " << reason.data();
97 Shutdown();
98 }
99
NotifyOnReady(OpState & info,EventEngine::Closure * closure)100 void WinSocket::NotifyOnReady(OpState& info, EventEngine::Closure* closure) {
101 if (IsShutdown()) {
102 info.SetResult(WSAESHUTDOWN, 0, "NotifyOnReady");
103 thread_pool_->Run(closure);
104 return;
105 };
106 // It is an error if any notification is already registered for this socket.
107 CHECK_EQ(std::exchange(info.closure_, closure), nullptr);
108 }
109
NotifyOnRead(EventEngine::Closure * on_read)110 void WinSocket::NotifyOnRead(EventEngine::Closure* on_read) {
111 NotifyOnReady(read_info_, on_read);
112 }
113
NotifyOnWrite(EventEngine::Closure * on_write)114 void WinSocket::NotifyOnWrite(EventEngine::Closure* on_write) {
115 NotifyOnReady(write_info_, on_write);
116 }
117
UnregisterReadCallback()118 void WinSocket::UnregisterReadCallback() {
119 CHECK_NE(std::exchange(read_info_.closure_, nullptr), nullptr);
120 }
121
UnregisterWriteCallback()122 void WinSocket::UnregisterWriteCallback() {
123 CHECK_NE(std::exchange(write_info_.closure_, nullptr), nullptr);
124 }
125
126 // ---- WinSocket::OpState ----
127
OpState(WinSocket * win_socket)128 WinSocket::OpState::OpState(WinSocket* win_socket) noexcept
129 : win_socket_(win_socket) {
130 memset(&overlapped_, 0, sizeof(OVERLAPPED));
131 }
132
SetReady()133 void WinSocket::OpState::SetReady() {
134 auto* closure = std::exchange(closure_, nullptr);
135 // If an IOCP event is returned for a socket, and no callback has been
136 // registered for notification, this is invalid usage.
137 CHECK_NE(closure, nullptr);
138 win_socket_->thread_pool_->Run(closure);
139 }
140
SetResult(int wsa_error,DWORD bytes,absl::string_view context)141 void WinSocket::OpState::SetResult(int wsa_error, DWORD bytes,
142 absl::string_view context) {
143 bytes = wsa_error == 0 ? bytes : 0;
144 result_ = OverlappedResult{
145 /*wsa_error=*/wsa_error, /*bytes_transferred=*/bytes,
146 /*error_status=*/wsa_error == 0 ? absl::OkStatus()
147 : GRPC_WSA_ERROR(wsa_error, context)};
148 }
149
SetErrorStatus(absl::Status error_status)150 void WinSocket::OpState::SetErrorStatus(absl::Status error_status) {
151 result_ = OverlappedResult{/*wsa_error=*/0, /*bytes_transferred=*/0,
152 /*error_status=*/error_status};
153 }
154
GetOverlappedResult()155 void WinSocket::OpState::GetOverlappedResult() {
156 GetOverlappedResult(win_socket_->raw_socket());
157 }
158
GetOverlappedResult(SOCKET sock)159 void WinSocket::OpState::GetOverlappedResult(SOCKET sock) {
160 if (win_socket_->IsShutdown()) {
161 SetResult(WSA_OPERATION_ABORTED, 0, "GetOverlappedResult");
162 return;
163 }
164 DWORD flags = 0;
165 DWORD bytes;
166 BOOL success =
167 WSAGetOverlappedResult(sock, &overlapped_, &bytes, FALSE, &flags);
168 auto wsa_error = success ? 0 : WSAGetLastError();
169 SetResult(wsa_error, bytes, "WSAGetOverlappedResult");
170 }
171
IsShutdown()172 bool WinSocket::IsShutdown() { return is_shutdown_.load(); }
173
GetOpInfoForOverlapped(OVERLAPPED * overlapped)174 WinSocket::OpState* WinSocket::GetOpInfoForOverlapped(OVERLAPPED* overlapped) {
175 GRPC_TRACE_LOG(event_engine_poller, INFO)
176 << "WinSocket::" << this
177 << " looking for matching OVERLAPPED::" << overlapped << ". read("
178 << &read_info_.overlapped_ << ") write(" << &write_info_.overlapped_
179 << ")";
180 if (overlapped == &read_info_.overlapped_) return &read_info_;
181 if (overlapped == &write_info_.overlapped_) return &write_info_;
182 return nullptr;
183 }
184
185 namespace {
186
grpc_tcp_set_non_block(SOCKET sock)187 grpc_error_handle grpc_tcp_set_non_block(SOCKET sock) {
188 int status;
189 uint32_t param = 1;
190 DWORD ret;
191 status = WSAIoctl(sock, GRPC_FIONBIO, ¶m, sizeof(param), NULL, 0, &ret,
192 NULL, NULL);
193 return status == 0
194 ? absl::OkStatus()
195 : GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)");
196 }
197
set_dualstack(SOCKET sock)198 static grpc_error_handle set_dualstack(SOCKET sock) {
199 int status;
200 DWORD param = 0;
201 status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)¶m,
202 sizeof(param));
203 return status == 0
204 ? absl::OkStatus()
205 : GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)");
206 }
207
enable_socket_low_latency(SOCKET sock)208 static grpc_error_handle enable_socket_low_latency(SOCKET sock) {
209 int status;
210 BOOL param = TRUE;
211 status = ::setsockopt(sock, IPPROTO_TCP, TCP_NODELAY,
212 reinterpret_cast<char*>(¶m), sizeof(param));
213 if (status == SOCKET_ERROR) {
214 status = WSAGetLastError();
215 }
216 return status == 0 ? absl::OkStatus()
217 : GRPC_WSA_ERROR(status, "setsockopt(TCP_NODELAY)");
218 }
219
220 } // namespace
221
SetSocketNonBlock(SOCKET sock)222 absl::Status SetSocketNonBlock(SOCKET sock) {
223 return grpc_tcp_set_non_block(sock);
224 }
225
PrepareSocket(SOCKET sock)226 absl::Status PrepareSocket(SOCKET sock) {
227 absl::Status err;
228 err = grpc_tcp_set_non_block(sock);
229 if (!err.ok()) return err;
230 err = enable_socket_low_latency(sock);
231 if (!err.ok()) return err;
232 err = set_dualstack(sock);
233 if (!err.ok()) return err;
234 return absl::OkStatus();
235 }
236
237 } // namespace experimental
238 } // namespace grpc_event_engine
239
240 #endif // GPR_WINDOWS
241