• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #ifdef GPR_WINDOWS
17 #include <grpc/support/alloc.h>
18 #include <grpc/support/log_windows.h>
19 
20 #include "absl/log/check.h"
21 #include "absl/log/log.h"
22 #include "src/core/lib/event_engine/tcp_socket_utils.h"
23 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
24 #include "src/core/lib/event_engine/windows/win_socket.h"
25 #include "src/core/lib/iomgr/error.h"
26 #include "src/core/util/debug_location.h"
27 #include "src/core/util/sync.h"
28 
29 #if defined(__MSYS__) && defined(GPR_ARCH_64)
30 // Nasty workaround for nasty bug when using the 64 bits msys compiler
31 // in conjunction with Microsoft Windows headers.
32 #define GRPC_FIONBIO _IOW('f', 126, uint32_t)
33 #else
34 #define GRPC_FIONBIO FIONBIO
35 #endif
36 
37 namespace grpc_event_engine {
38 namespace experimental {
39 
40 // ---- WinSocket ----
41 
WinSocket(SOCKET socket,ThreadPool * thread_pool)42 WinSocket::WinSocket(SOCKET socket, ThreadPool* thread_pool) noexcept
43     : socket_(socket),
44       thread_pool_(thread_pool),
45       read_info_(this),
46       write_info_(this) {}
47 
~WinSocket()48 WinSocket::~WinSocket() {
49   CHECK(is_shutdown_.load());
50   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
51       << "WinSocket::" << this << " destroyed";
52 }
53 
raw_socket()54 SOCKET WinSocket::raw_socket() { return socket_; }
55 
Shutdown()56 void WinSocket::Shutdown() {
57   // if already shutdown, return early. Otherwise, set the shutdown flag.
58   if (is_shutdown_.exchange(true)) {
59     GRPC_TRACE_LOG(event_engine_endpoint, INFO)
60         << "WinSocket::" << this << " already shutting down";
61     return;
62   }
63   // Grab the function pointer for DisconnectEx for that specific socket.
64   // It may change depending on the interface.
65   GUID guid = WSAID_DISCONNECTEX;
66   LPFN_DISCONNECTEX DisconnectEx;
67   DWORD ioctl_num_bytes;
68   int status = WSAIoctl(socket_, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid,
69                         sizeof(guid), &DisconnectEx, sizeof(DisconnectEx),
70                         &ioctl_num_bytes, NULL, NULL);
71   if (status != 0) {
72     char* utf8_message = gpr_format_message(WSAGetLastError());
73     GRPC_TRACE_LOG(event_engine_endpoint, INFO)
74         << "Unable to retrieve DisconnectEx pointer : " << utf8_message;
75     gpr_free(utf8_message);
76   } else if (DisconnectEx(socket_, NULL, 0, 0) == FALSE) {
77     auto last_error = WSAGetLastError();
78     // DisconnectEx may be called when the socket is not connected. Ignore that
79     // error, and log all others.
80     if (last_error != WSAENOTCONN) {
81       char* utf8_message = gpr_format_message(last_error);
82       GRPC_TRACE_LOG(event_engine_endpoint, INFO)
83           << "DisconnectEx failed: " << utf8_message;
84       gpr_free(utf8_message);
85     }
86   }
87   closesocket(socket_);
88   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
89       << "WinSocket::" << this << " socket closed";
90 }
91 
Shutdown(const grpc_core::DebugLocation & location,absl::string_view reason)92 void WinSocket::Shutdown(const grpc_core::DebugLocation& location,
93                          absl::string_view reason) {
94   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
95       << "WinSocket::" << this << " Shut down from " << location.file() << ":"
96       << location.line() << ". Reason: " << reason.data();
97   Shutdown();
98 }
99 
NotifyOnReady(OpState & info,EventEngine::Closure * closure)100 void WinSocket::NotifyOnReady(OpState& info, EventEngine::Closure* closure) {
101   if (IsShutdown()) {
102     info.SetResult(WSAESHUTDOWN, 0, "NotifyOnReady");
103     thread_pool_->Run(closure);
104     return;
105   };
106   // It is an error if any notification is already registered for this socket.
107   CHECK_EQ(std::exchange(info.closure_, closure), nullptr);
108 }
109 
NotifyOnRead(EventEngine::Closure * on_read)110 void WinSocket::NotifyOnRead(EventEngine::Closure* on_read) {
111   NotifyOnReady(read_info_, on_read);
112 }
113 
NotifyOnWrite(EventEngine::Closure * on_write)114 void WinSocket::NotifyOnWrite(EventEngine::Closure* on_write) {
115   NotifyOnReady(write_info_, on_write);
116 }
117 
UnregisterReadCallback()118 void WinSocket::UnregisterReadCallback() {
119   CHECK_NE(std::exchange(read_info_.closure_, nullptr), nullptr);
120 }
121 
UnregisterWriteCallback()122 void WinSocket::UnregisterWriteCallback() {
123   CHECK_NE(std::exchange(write_info_.closure_, nullptr), nullptr);
124 }
125 
126 // ---- WinSocket::OpState ----
127 
OpState(WinSocket * win_socket)128 WinSocket::OpState::OpState(WinSocket* win_socket) noexcept
129     : win_socket_(win_socket) {
130   memset(&overlapped_, 0, sizeof(OVERLAPPED));
131 }
132 
SetReady()133 void WinSocket::OpState::SetReady() {
134   auto* closure = std::exchange(closure_, nullptr);
135   // If an IOCP event is returned for a socket, and no callback has been
136   // registered for notification, this is invalid usage.
137   CHECK_NE(closure, nullptr);
138   win_socket_->thread_pool_->Run(closure);
139 }
140 
SetResult(int wsa_error,DWORD bytes,absl::string_view context)141 void WinSocket::OpState::SetResult(int wsa_error, DWORD bytes,
142                                    absl::string_view context) {
143   bytes = wsa_error == 0 ? bytes : 0;
144   result_ = OverlappedResult{
145       /*wsa_error=*/wsa_error, /*bytes_transferred=*/bytes,
146       /*error_status=*/wsa_error == 0 ? absl::OkStatus()
147                                       : GRPC_WSA_ERROR(wsa_error, context)};
148 }
149 
SetErrorStatus(absl::Status error_status)150 void WinSocket::OpState::SetErrorStatus(absl::Status error_status) {
151   result_ = OverlappedResult{/*wsa_error=*/0, /*bytes_transferred=*/0,
152                              /*error_status=*/error_status};
153 }
154 
GetOverlappedResult()155 void WinSocket::OpState::GetOverlappedResult() {
156   GetOverlappedResult(win_socket_->raw_socket());
157 }
158 
GetOverlappedResult(SOCKET sock)159 void WinSocket::OpState::GetOverlappedResult(SOCKET sock) {
160   if (win_socket_->IsShutdown()) {
161     SetResult(WSA_OPERATION_ABORTED, 0, "GetOverlappedResult");
162     return;
163   }
164   DWORD flags = 0;
165   DWORD bytes;
166   BOOL success =
167       WSAGetOverlappedResult(sock, &overlapped_, &bytes, FALSE, &flags);
168   auto wsa_error = success ? 0 : WSAGetLastError();
169   SetResult(wsa_error, bytes, "WSAGetOverlappedResult");
170 }
171 
IsShutdown()172 bool WinSocket::IsShutdown() { return is_shutdown_.load(); }
173 
GetOpInfoForOverlapped(OVERLAPPED * overlapped)174 WinSocket::OpState* WinSocket::GetOpInfoForOverlapped(OVERLAPPED* overlapped) {
175   GRPC_TRACE_LOG(event_engine_poller, INFO)
176       << "WinSocket::" << this
177       << " looking for matching OVERLAPPED::" << overlapped << ". read("
178       << &read_info_.overlapped_ << ") write(" << &write_info_.overlapped_
179       << ")";
180   if (overlapped == &read_info_.overlapped_) return &read_info_;
181   if (overlapped == &write_info_.overlapped_) return &write_info_;
182   return nullptr;
183 }
184 
185 namespace {
186 
grpc_tcp_set_non_block(SOCKET sock)187 grpc_error_handle grpc_tcp_set_non_block(SOCKET sock) {
188   int status;
189   uint32_t param = 1;
190   DWORD ret;
191   status = WSAIoctl(sock, GRPC_FIONBIO, &param, sizeof(param), NULL, 0, &ret,
192                     NULL, NULL);
193   return status == 0
194              ? absl::OkStatus()
195              : GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)");
196 }
197 
set_dualstack(SOCKET sock)198 static grpc_error_handle set_dualstack(SOCKET sock) {
199   int status;
200   DWORD param = 0;
201   status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&param,
202                       sizeof(param));
203   return status == 0
204              ? absl::OkStatus()
205              : GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)");
206 }
207 
enable_socket_low_latency(SOCKET sock)208 static grpc_error_handle enable_socket_low_latency(SOCKET sock) {
209   int status;
210   BOOL param = TRUE;
211   status = ::setsockopt(sock, IPPROTO_TCP, TCP_NODELAY,
212                         reinterpret_cast<char*>(&param), sizeof(param));
213   if (status == SOCKET_ERROR) {
214     status = WSAGetLastError();
215   }
216   return status == 0 ? absl::OkStatus()
217                      : GRPC_WSA_ERROR(status, "setsockopt(TCP_NODELAY)");
218 }
219 
220 }  // namespace
221 
SetSocketNonBlock(SOCKET sock)222 absl::Status SetSocketNonBlock(SOCKET sock) {
223   return grpc_tcp_set_non_block(sock);
224 }
225 
PrepareSocket(SOCKET sock)226 absl::Status PrepareSocket(SOCKET sock) {
227   absl::Status err;
228   err = grpc_tcp_set_non_block(sock);
229   if (!err.ok()) return err;
230   err = enable_socket_low_latency(sock);
231   if (!err.ok()) return err;
232   err = set_dualstack(sock);
233   if (!err.ok()) return err;
234   return absl::OkStatus();
235 }
236 
237 }  // namespace experimental
238 }  // namespace grpc_event_engine
239 
240 #endif  // GPR_WINDOWS
241