• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #ifdef GPR_WINDOWS
17 #include <grpc/support/alloc.h>
18 #include <grpc/support/log_windows.h>
19 
20 #include "src/core/lib/event_engine/tcp_socket_utils.h"
21 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
22 #include "src/core/lib/event_engine/trace.h"
23 #include "src/core/lib/event_engine/windows/win_socket.h"
24 #include "src/core/lib/gprpp/debug_location.h"
25 #include "src/core/lib/gprpp/sync.h"
26 #include "src/core/lib/iomgr/error.h"
27 
28 #if defined(__MSYS__) && defined(GPR_ARCH_64)
29 // Nasty workaround for nasty bug when using the 64 bits msys compiler
30 // in conjunction with Microsoft Windows headers.
31 #define GRPC_FIONBIO _IOW('f', 126, uint32_t)
32 #else
33 #define GRPC_FIONBIO FIONBIO
34 #endif
35 
36 namespace grpc_event_engine {
37 namespace experimental {
38 
39 // ---- WinSocket ----
40 
WinSocket(SOCKET socket,ThreadPool * thread_pool)41 WinSocket::WinSocket(SOCKET socket, ThreadPool* thread_pool) noexcept
42     : socket_(socket),
43       thread_pool_(thread_pool),
44       read_info_(this),
45       write_info_(this) {}
46 
~WinSocket()47 WinSocket::~WinSocket() {
48   GPR_ASSERT(is_shutdown_.load());
49   GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WinSocket::%p destroyed", this);
50 }
51 
raw_socket()52 SOCKET WinSocket::raw_socket() { return socket_; }
53 
Shutdown()54 void WinSocket::Shutdown() {
55   // if already shutdown, return early. Otherwise, set the shutdown flag.
56   if (is_shutdown_.exchange(true)) {
57     GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WinSocket::%p already shutting down",
58                                      this);
59     return;
60   }
61   // Grab the function pointer for DisconnectEx for that specific socket.
62   // It may change depending on the interface.
63   GUID guid = WSAID_DISCONNECTEX;
64   LPFN_DISCONNECTEX DisconnectEx;
65   DWORD ioctl_num_bytes;
66   int status = WSAIoctl(socket_, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid,
67                         sizeof(guid), &DisconnectEx, sizeof(DisconnectEx),
68                         &ioctl_num_bytes, NULL, NULL);
69 
70   if (status == 0) {
71     DisconnectEx(socket_, NULL, 0, 0);
72   } else {
73     char* utf8_message = gpr_format_message(WSAGetLastError());
74     gpr_log(GPR_INFO, "Unable to retrieve DisconnectEx pointer : %s",
75             utf8_message);
76     gpr_free(utf8_message);
77   }
78   closesocket(socket_);
79   GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WinSocket::%p socket closed", this);
80 }
81 
Shutdown(const grpc_core::DebugLocation & location,absl::string_view reason)82 void WinSocket::Shutdown(const grpc_core::DebugLocation& location,
83                          absl::string_view reason) {
84   GRPC_EVENT_ENGINE_ENDPOINT_TRACE(
85       "WinSocket::%p Shut down from %s:%d. Reason: %s", this, location.file(),
86       location.line(), reason.data());
87   Shutdown();
88 }
89 
NotifyOnReady(OpState & info,EventEngine::Closure * closure)90 void WinSocket::NotifyOnReady(OpState& info, EventEngine::Closure* closure) {
91   if (IsShutdown()) {
92     info.SetError(WSAESHUTDOWN);
93     thread_pool_->Run(closure);
94     return;
95   };
96   // It is an error if any notification is already registered for this socket.
97   GPR_ASSERT(std::exchange(info.closure_, closure) == nullptr);
98 }
99 
NotifyOnRead(EventEngine::Closure * on_read)100 void WinSocket::NotifyOnRead(EventEngine::Closure* on_read) {
101   NotifyOnReady(read_info_, on_read);
102 }
103 
NotifyOnWrite(EventEngine::Closure * on_write)104 void WinSocket::NotifyOnWrite(EventEngine::Closure* on_write) {
105   NotifyOnReady(write_info_, on_write);
106 }
107 
UnregisterReadCallback()108 void WinSocket::UnregisterReadCallback() {
109   GPR_ASSERT(std::exchange(read_info_.closure_, nullptr) != nullptr);
110 }
111 
UnregisterWriteCallback()112 void WinSocket::UnregisterWriteCallback() {
113   GPR_ASSERT(std::exchange(write_info_.closure_, nullptr) != nullptr);
114 }
115 
116 // ---- WinSocket::OpState ----
117 
OpState(WinSocket * win_socket)118 WinSocket::OpState::OpState(WinSocket* win_socket) noexcept
119     : win_socket_(win_socket) {
120   memset(&overlapped_, 0, sizeof(OVERLAPPED));
121 }
122 
SetReady()123 void WinSocket::OpState::SetReady() {
124   auto* closure = std::exchange(closure_, nullptr);
125   // If an IOCP event is returned for a socket, and no callback has been
126   // registered for notification, this is invalid usage.
127   GPR_ASSERT(closure != nullptr);
128   win_socket_->thread_pool_->Run(closure);
129 }
130 
SetError(int wsa_error)131 void WinSocket::OpState::SetError(int wsa_error) {
132   result_ = OverlappedResult{/*wsa_error=*/wsa_error, /*bytes_transferred=*/0};
133 }
134 
SetResult(OverlappedResult result)135 void WinSocket::OpState::SetResult(OverlappedResult result) {
136   result_ = result;
137 }
138 
SetErrorStatus(absl::Status error_status)139 void WinSocket::OpState::SetErrorStatus(absl::Status error_status) {
140   result_ = OverlappedResult{/*wsa_error=*/0, /*bytes_transferred=*/0,
141                              /*error_status=*/error_status};
142 }
143 
GetOverlappedResult()144 void WinSocket::OpState::GetOverlappedResult() {
145   GetOverlappedResult(win_socket_->raw_socket());
146 }
147 
GetOverlappedResult(SOCKET sock)148 void WinSocket::OpState::GetOverlappedResult(SOCKET sock) {
149   if (win_socket_->IsShutdown()) {
150     result_ = OverlappedResult{/*wsa_error=*/WSA_OPERATION_ABORTED,
151                                /*bytes_transferred=*/0};
152     return;
153   }
154   DWORD flags = 0;
155   DWORD bytes;
156   BOOL success =
157       WSAGetOverlappedResult(sock, &overlapped_, &bytes, FALSE, &flags);
158   result_ = OverlappedResult{/*wsa_error=*/success ? 0 : WSAGetLastError(),
159                              /*bytes_transferred=*/bytes};
160 }
161 
IsShutdown()162 bool WinSocket::IsShutdown() { return is_shutdown_.load(); }
163 
GetOpInfoForOverlapped(OVERLAPPED * overlapped)164 WinSocket::OpState* WinSocket::GetOpInfoForOverlapped(OVERLAPPED* overlapped) {
165   GRPC_EVENT_ENGINE_POLLER_TRACE(
166       "WinSocket::%p looking for matching OVERLAPPED::%p. "
167       "read(%p) write(%p)",
168       this, overlapped, &read_info_.overlapped_, &write_info_.overlapped_);
169   if (overlapped == &read_info_.overlapped_) return &read_info_;
170   if (overlapped == &write_info_.overlapped_) return &write_info_;
171   return nullptr;
172 }
173 
174 namespace {
175 
grpc_tcp_set_non_block(SOCKET sock)176 grpc_error_handle grpc_tcp_set_non_block(SOCKET sock) {
177   int status;
178   uint32_t param = 1;
179   DWORD ret;
180   status = WSAIoctl(sock, GRPC_FIONBIO, &param, sizeof(param), NULL, 0, &ret,
181                     NULL, NULL);
182   return status == 0
183              ? absl::OkStatus()
184              : GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)");
185 }
186 
set_dualstack(SOCKET sock)187 static grpc_error_handle set_dualstack(SOCKET sock) {
188   int status;
189   DWORD param = 0;
190   status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&param,
191                       sizeof(param));
192   return status == 0
193              ? absl::OkStatus()
194              : GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)");
195 }
196 
enable_socket_low_latency(SOCKET sock)197 static grpc_error_handle enable_socket_low_latency(SOCKET sock) {
198   int status;
199   BOOL param = TRUE;
200   status = ::setsockopt(sock, IPPROTO_TCP, TCP_NODELAY,
201                         reinterpret_cast<char*>(&param), sizeof(param));
202   if (status == SOCKET_ERROR) {
203     status = WSAGetLastError();
204   }
205   return status == 0 ? absl::OkStatus()
206                      : GRPC_WSA_ERROR(status, "setsockopt(TCP_NODELAY)");
207 }
208 
209 }  // namespace
210 
SetSocketNonBlock(SOCKET sock)211 absl::Status SetSocketNonBlock(SOCKET sock) {
212   return grpc_tcp_set_non_block(sock);
213 }
214 
PrepareSocket(SOCKET sock)215 absl::Status PrepareSocket(SOCKET sock) {
216   absl::Status err;
217   err = grpc_tcp_set_non_block(sock);
218   if (!err.ok()) return err;
219   err = enable_socket_low_latency(sock);
220   if (!err.ok()) return err;
221   err = set_dualstack(sock);
222   if (!err.ok()) return err;
223   return absl::OkStatus();
224 }
225 
226 }  // namespace experimental
227 }  // namespace grpc_event_engine
228 
229 #endif  // GPR_WINDOWS
230