1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #ifdef GPR_WINDOWS
17
18 #include <grpc/event_engine/memory_allocator.h>
19 #include <grpc/support/log_windows.h>
20
21 #include "absl/cleanup/cleanup.h"
22 #include "absl/functional/any_invocable.h"
23 #include "absl/log/check.h"
24 #include "absl/log/log.h"
25 #include "absl/status/status.h"
26 #include "absl/strings/str_format.h"
27 #include "src/core/lib/event_engine/tcp_socket_utils.h"
28 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
29 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
30 #include "src/core/lib/iomgr/error.h"
31 #include "src/core/util/debug_location.h"
32 #include "src/core/util/status_helper.h"
33
34 namespace grpc_event_engine {
35 namespace experimental {
36
37 namespace {
38 constexpr size_t kDefaultTargetReadSize = 8192;
39 constexpr int kMaxWSABUFCount = 16;
40
DumpSliceBuffer(SliceBuffer * buffer,absl::string_view context_string)41 void DumpSliceBuffer(SliceBuffer* buffer, absl::string_view context_string) {
42 for (size_t i = 0; i < buffer->Count(); i++) {
43 auto str = buffer->MutableSliceAt(i).as_string_view();
44 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
45 << context_string << " [" << i + 1 << "/" << buffer->Count()
46 << "]: " << str;
47 }
48 }
49
50 } // namespace
51
WindowsEndpoint(const EventEngine::ResolvedAddress & peer_address,std::unique_ptr<WinSocket> socket,MemoryAllocator && allocator,const EndpointConfig &,ThreadPool * thread_pool,std::shared_ptr<EventEngine> engine)52 WindowsEndpoint::WindowsEndpoint(
53 const EventEngine::ResolvedAddress& peer_address,
54 std::unique_ptr<WinSocket> socket, MemoryAllocator&& allocator,
55 const EndpointConfig& /* config */, ThreadPool* thread_pool,
56 std::shared_ptr<EventEngine> engine)
57 : peer_address_(peer_address),
58 allocator_(std::move(allocator)),
59 io_state_(std::make_shared<AsyncIOState>(
60 this, std::move(socket), std::move(engine), thread_pool)) {
61 char addr[EventEngine::ResolvedAddress::MAX_SIZE_BYTES];
62 int addr_len = sizeof(addr);
63 if (getsockname(io_state_->socket->raw_socket(),
64 reinterpret_cast<sockaddr*>(addr), &addr_len) < 0) {
65 grpc_core::Crash(absl::StrFormat(
66 "Unrecoverable error: Failed to get local socket name. %s",
67 GRPC_WSA_ERROR(WSAGetLastError(), "getsockname").ToString().c_str()));
68 }
69 local_address_ =
70 EventEngine::ResolvedAddress(reinterpret_cast<sockaddr*>(addr), addr_len);
71 local_address_string_ = *ResolvedAddressToURI(local_address_);
72 peer_address_string_ = *ResolvedAddressToURI(peer_address_);
73 }
74
~WindowsEndpoint()75 WindowsEndpoint::~WindowsEndpoint() {
76 io_state_->socket->Shutdown(DEBUG_LOCATION, "~WindowsEndpoint");
77 GRPC_TRACE_LOG(event_engine_endpoint, INFO) << "~WindowsEndpoint::" << this;
78 }
79
DoTcpRead(SliceBuffer * buffer)80 void WindowsEndpoint::AsyncIOState::DoTcpRead(SliceBuffer* buffer) {
81 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
82 << "WindowsEndpoint::" << endpoint << " attempting a read";
83 if (socket->IsShutdown()) {
84 socket->read_info()->SetErrorStatus(
85 absl::InternalError("Socket is shutting down."));
86 thread_pool->Run(&handle_read_event);
87 return;
88 }
89 // Prepare the WSABUF struct
90 CHECK(buffer->Count() <= kMaxWSABUFCount);
91 WSABUF wsa_buffers[kMaxWSABUFCount];
92 for (size_t i = 0; i < buffer->Count(); i++) {
93 auto& slice = buffer->MutableSliceAt(i);
94 wsa_buffers[i].buf = (char*)slice.begin();
95 wsa_buffers[i].len = slice.size();
96 }
97 DWORD bytes_read = 0;
98 DWORD flags = 0;
99 // First try a synchronous, non-blocking read.
100 int status =
101 WSARecv(socket->raw_socket(), wsa_buffers, (DWORD)buffer->Count(),
102 &bytes_read, &flags, nullptr, nullptr);
103 int wsa_error = status == 0 ? 0 : WSAGetLastError();
104 if (wsa_error != WSAEWOULDBLOCK) {
105 // Data or some error was returned immediately.
106 socket->read_info()->SetResult(wsa_error, bytes_read, "WSARecv");
107 thread_pool->Run(&handle_read_event);
108 return;
109 }
110 // If the endpoint has already received some data, and the next call would
111 // block, return the data in case that is all the data the reader expects.
112 if (handle_read_event.MaybeFinishIfDataHasAlreadyBeenRead()) {
113 return;
114 }
115 // Otherwise, let's retry, by queuing a read.
116 socket->NotifyOnRead(&handle_read_event);
117 status = WSARecv(socket->raw_socket(), wsa_buffers, (DWORD)buffer->Count(),
118 nullptr, &flags, socket->read_info()->overlapped(), nullptr);
119 wsa_error = status == 0 ? 0 : WSAGetLastError();
120 if (wsa_error != 0 && wsa_error != WSA_IO_PENDING) {
121 // The async read attempt returned an error immediately.
122 socket->UnregisterReadCallback();
123 socket->read_info()->SetResult(
124 wsa_error, 0, absl::StrFormat("WindowsEndpoint::%p Read failed", this));
125 thread_pool->Run(&handle_read_event);
126 }
127 }
128
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs *)129 bool WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
130 SliceBuffer* buffer, const ReadArgs* /* args */) {
131 if (io_state_->socket->IsShutdown()) {
132 io_state_->thread_pool->Run([on_read = std::move(on_read)]() mutable {
133 on_read(absl::InternalError("Socket is shutting down."));
134 });
135 return false;
136 }
137 buffer->Clear();
138 io_state_->handle_read_event.DonateSpareSlices(buffer);
139 // TODO(hork): sometimes args->read_hint_bytes is 1, which is not useful.
140 // Choose an appropriate size.
141 size_t min_read_size = kDefaultTargetReadSize;
142 if (buffer->Length() < min_read_size && buffer->Count() < kMaxWSABUFCount) {
143 buffer->AppendIndexed(Slice(allocator_.MakeSlice(min_read_size)));
144 }
145 io_state_->handle_read_event.Prime(io_state_, buffer, std::move(on_read));
146 io_state_->DoTcpRead(buffer);
147 return false;
148 }
149
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs *)150 bool WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
151 SliceBuffer* data, const WriteArgs* /* args */) {
152 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
153 << "WindowsEndpoint::" << this << " writing";
154 if (io_state_->socket->IsShutdown()) {
155 io_state_->thread_pool->Run(
156 [on_writable = std::move(on_writable)]() mutable {
157 on_writable(absl::InternalError("Socket is shutting down."));
158 });
159 return false;
160 }
161 if (GRPC_TRACE_FLAG_ENABLED(event_engine_endpoint_data)) {
162 for (size_t i = 0; i < data->Count(); i++) {
163 auto str = data->RefSlice(i).as_string_view();
164 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
165 << "WindowsEndpoint::" << this
166 << " WRITE (peer=" << peer_address_string_ << "): " << str;
167 }
168 }
169 CHECK(data->Count() <= UINT_MAX);
170 absl::InlinedVector<WSABUF, kMaxWSABUFCount> buffers(data->Count());
171 for (size_t i = 0; i < data->Count(); i++) {
172 auto& slice = data->MutableSliceAt(i);
173 CHECK(slice.size() <= ULONG_MAX);
174 buffers[i].len = slice.size();
175 buffers[i].buf = (char*)slice.begin();
176 }
177 // First, let's try a synchronous, non-blocking write.
178 DWORD bytes_sent;
179 int status = WSASend(io_state_->socket->raw_socket(), buffers.data(),
180 (DWORD)buffers.size(), &bytes_sent, 0, nullptr, nullptr);
181 size_t async_buffers_offset = 0;
182 if (status == 0) {
183 if (bytes_sent == data->Length()) {
184 // Write completed, exiting early
185 io_state_->thread_pool->Run(
186 [cb = std::move(on_writable)]() mutable { cb(absl::OkStatus()); });
187 return false;
188 }
189 // The data was not completely delivered, we should send the rest of it by
190 // doing an async write operation.
191 for (size_t i = 0; i < data->Count(); i++) {
192 if (buffers[i].len > bytes_sent) {
193 buffers[i].buf += bytes_sent;
194 buffers[i].len -= bytes_sent;
195 break;
196 }
197 bytes_sent -= buffers[i].len;
198 async_buffers_offset++;
199 }
200 } else {
201 // We would kind of expect to get a WSAEWOULDBLOCK here, especially on a
202 // busy connection that has its send queue filled up. But if we don't,
203 // then we can avoid doing an async write operation at all.
204 int wsa_error = WSAGetLastError();
205 if (wsa_error != WSAEWOULDBLOCK) {
206 io_state_->thread_pool->Run(
207 [cb = std::move(on_writable), wsa_error]() mutable {
208 cb(GRPC_WSA_ERROR(wsa_error, "WSASend"));
209 });
210 return false;
211 }
212 }
213 auto write_info = io_state_->socket->write_info();
214 io_state_->handle_write_event.Prime(io_state_, data, std::move(on_writable));
215 io_state_->socket->NotifyOnWrite(&io_state_->handle_write_event);
216 status =
217 WSASend(io_state_->socket->raw_socket(), &buffers[async_buffers_offset],
218 (DWORD)(data->Count() - async_buffers_offset), nullptr, 0,
219 write_info->overlapped(), nullptr);
220 if (status != 0) {
221 int wsa_error = WSAGetLastError();
222 if (wsa_error != WSA_IO_PENDING) {
223 io_state_->socket->UnregisterWriteCallback();
224 io_state_->socket->write_info()->SetResult(wsa_error, 0, "WSASend");
225 io_state_->thread_pool->Run(&io_state_->handle_write_event);
226 }
227 }
228 return false;
229 }
GetPeerAddress() const230 const EventEngine::ResolvedAddress& WindowsEndpoint::GetPeerAddress() const {
231 return peer_address_;
232 }
GetLocalAddress() const233 const EventEngine::ResolvedAddress& WindowsEndpoint::GetLocalAddress() const {
234 return local_address_;
235 }
236
237 // ---- Handle{Read|Write}Closure ----
238 namespace {
AbortOnEvent(absl::Status)239 void AbortOnEvent(absl::Status) {
240 grpc_core::Crash(
241 "INTERNAL ERROR: Asked to handle read/write event with an invalid "
242 "callback");
243 }
244 } // namespace
245
246 absl::AnyInvocable<void(absl::Status)>
ResetAndReturnCallback()247 WindowsEndpoint::HandleReadClosure::ResetAndReturnCallback() {
248 auto cb = std::move(cb_);
249 cb_ = &AbortOnEvent;
250 buffer_ = nullptr;
251 io_state_.reset();
252 return cb;
253 }
254
255 absl::AnyInvocable<void(absl::Status)>
ResetAndReturnCallback()256 WindowsEndpoint::HandleWriteClosure::ResetAndReturnCallback() {
257 auto cb = std::move(cb_);
258 cb_ = &AbortOnEvent;
259 buffer_ = nullptr;
260 io_state_.reset();
261 return cb;
262 }
263
Prime(std::shared_ptr<AsyncIOState> io_state,SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> cb)264 void WindowsEndpoint::HandleReadClosure::Prime(
265 std::shared_ptr<AsyncIOState> io_state, SliceBuffer* buffer,
266 absl::AnyInvocable<void(absl::Status)> cb) {
267 io_state_ = std::move(io_state);
268 cb_ = std::move(cb);
269 buffer_ = buffer;
270 }
271
Prime(std::shared_ptr<AsyncIOState> io_state,SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> cb)272 void WindowsEndpoint::HandleWriteClosure::Prime(
273 std::shared_ptr<AsyncIOState> io_state, SliceBuffer* buffer,
274 absl::AnyInvocable<void(absl::Status)> cb) {
275 io_state_ = std::move(io_state);
276 cb_ = std::move(cb);
277 buffer_ = buffer;
278 }
279
Run()280 void WindowsEndpoint::HandleReadClosure::Run() {
281 // Deletes the shared_ptr when this closure returns
282 // Note that the endpoint may have already been destroyed.
283 auto io_state = std::move(io_state_);
284 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
285 << "WindowsEndpoint::" << io_state->endpoint << " Handling Read Event";
286 const auto result = io_state->socket->read_info()->result();
287 if (!result.error_status.ok()) {
288 buffer_->Clear();
289 return ResetAndReturnCallback()(result.error_status);
290 }
291 absl::Status status;
292 if (result.wsa_error != 0) {
293 status = GRPC_WSA_ERROR(result.wsa_error, "Async Read Error");
294 buffer_->Clear();
295 return ResetAndReturnCallback()(status);
296 }
297 if (result.bytes_transferred == 0) {
298 DCHECK_GT(io_state.use_count(), 0);
299 // Either the endpoint is shut down or we've seen the end of the stream
300 if (GRPC_TRACE_FLAG_ENABLED(event_engine_endpoint_data)) {
301 LOG(INFO) << "WindowsEndpoint::" << this << " read 0 bytes.";
302 DumpSliceBuffer(
303 &last_read_buffer_,
304 absl::StrFormat("WindowsEndpoint::%p READ last_read_buffer_: ",
305 io_state->endpoint));
306 }
307 buffer_->Swap(last_read_buffer_);
308 if (buffer_->Length() == 0) {
309 // Only send an error if there is no more data to consume. If the endpoint
310 // or socket is shut down, the next read will discover that.
311 status = absl::InternalError("End of TCP stream");
312 grpc_core::StatusSetInt(&status, grpc_core::StatusIntProperty::kRpcStatus,
313 GRPC_STATUS_UNAVAILABLE);
314 }
315 return ResetAndReturnCallback()(status);
316 }
317 DCHECK_GT(result.bytes_transferred, 0);
318 DCHECK(result.bytes_transferred <= buffer_->Length());
319 buffer_->MoveFirstNBytesIntoSliceBuffer(result.bytes_transferred,
320 last_read_buffer_);
321 if (buffer_->Length() == 0) {
322 buffer_->Swap(last_read_buffer_);
323 return ResetAndReturnCallback()(status);
324 }
325 // Doing another read. Let's keep the AsyncIOState alive a bit longer.
326 io_state_ = std::move(io_state);
327 io_state_->DoTcpRead(buffer_);
328 }
329
MaybeFinishIfDataHasAlreadyBeenRead()330 bool WindowsEndpoint::HandleReadClosure::MaybeFinishIfDataHasAlreadyBeenRead() {
331 if (last_read_buffer_.Length() > 0) {
332 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
333 << "WindowsEndpoint::" << io_state_->endpoint
334 << " finishing a synchronous read";
335 buffer_->Swap(last_read_buffer_);
336 if (GRPC_TRACE_FLAG_ENABLED(event_engine_endpoint_data)) {
337 DumpSliceBuffer(buffer_, "finishing synchronous read");
338 }
339 io_state_->thread_pool->Run(
340 [cb = ResetAndReturnCallback()]() mutable { cb(absl::OkStatus()); });
341 return true;
342 }
343 return false;
344 }
345
DonateSpareSlices(SliceBuffer * buffer)346 void WindowsEndpoint::HandleReadClosure::DonateSpareSlices(
347 SliceBuffer* buffer) {
348 // Donee buffer must be empty.
349 CHECK_EQ(buffer->Length(), 0);
350 // HandleReadClosure must be in the reset state.
351 CHECK_EQ(buffer_, nullptr);
352 buffer->Swap(last_read_buffer_);
353 }
354
Run()355 void WindowsEndpoint::HandleWriteClosure::Run() {
356 // Deletes the shared_ptr when this closure returns
357 auto io_state = std::move(io_state_);
358 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
359 << "WindowsEndpoint::" << io_state->endpoint << " Handling Write Event";
360 const auto result = io_state->socket->write_info()->result();
361 if (!result.error_status.ok()) {
362 buffer_->Clear();
363 return ResetAndReturnCallback()(result.error_status);
364 }
365 absl::Status status;
366 if (result.wsa_error != 0) {
367 status = GRPC_WSA_ERROR(result.wsa_error, "WSASend");
368 } else {
369 CHECK(result.bytes_transferred == buffer_->Length());
370 }
371 return ResetAndReturnCallback()(status);
372 }
373
374 // ---- AsyncIOState ----
375
AsyncIOState(WindowsEndpoint * endpoint,std::unique_ptr<WinSocket> socket,std::shared_ptr<EventEngine> engine,ThreadPool * thread_pool)376 WindowsEndpoint::AsyncIOState::AsyncIOState(WindowsEndpoint* endpoint,
377 std::unique_ptr<WinSocket> socket,
378 std::shared_ptr<EventEngine> engine,
379 ThreadPool* thread_pool)
380 : endpoint(endpoint),
381 socket(std::move(socket)),
382 engine(std::move(engine)),
383 thread_pool(thread_pool) {}
384
~AsyncIOState()385 WindowsEndpoint::AsyncIOState::~AsyncIOState() {
386 socket->Shutdown(DEBUG_LOCATION, "~AsyncIOState");
387 }
388
389 } // namespace experimental
390 } // namespace grpc_event_engine
391
392 #endif // GPR_WINDOWS
393