• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #ifdef GPR_WINDOWS
17 
18 #include <grpc/event_engine/memory_allocator.h>
19 #include <grpc/support/log_windows.h>
20 
21 #include "absl/cleanup/cleanup.h"
22 #include "absl/functional/any_invocable.h"
23 #include "absl/log/check.h"
24 #include "absl/log/log.h"
25 #include "absl/status/status.h"
26 #include "absl/strings/str_format.h"
27 #include "src/core/lib/event_engine/tcp_socket_utils.h"
28 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
29 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
30 #include "src/core/lib/iomgr/error.h"
31 #include "src/core/util/debug_location.h"
32 #include "src/core/util/status_helper.h"
33 
34 namespace grpc_event_engine {
35 namespace experimental {
36 
37 namespace {
38 constexpr size_t kDefaultTargetReadSize = 8192;
39 constexpr int kMaxWSABUFCount = 16;
40 
DumpSliceBuffer(SliceBuffer * buffer,absl::string_view context_string)41 void DumpSliceBuffer(SliceBuffer* buffer, absl::string_view context_string) {
42   for (size_t i = 0; i < buffer->Count(); i++) {
43     auto str = buffer->MutableSliceAt(i).as_string_view();
44     GRPC_TRACE_LOG(event_engine_endpoint, INFO)
45         << context_string << " [" << i + 1 << "/" << buffer->Count()
46         << "]: " << str;
47   }
48 }
49 
50 }  // namespace
51 
WindowsEndpoint(const EventEngine::ResolvedAddress & peer_address,std::unique_ptr<WinSocket> socket,MemoryAllocator && allocator,const EndpointConfig &,ThreadPool * thread_pool,std::shared_ptr<EventEngine> engine)52 WindowsEndpoint::WindowsEndpoint(
53     const EventEngine::ResolvedAddress& peer_address,
54     std::unique_ptr<WinSocket> socket, MemoryAllocator&& allocator,
55     const EndpointConfig& /* config */, ThreadPool* thread_pool,
56     std::shared_ptr<EventEngine> engine)
57     : peer_address_(peer_address),
58       allocator_(std::move(allocator)),
59       io_state_(std::make_shared<AsyncIOState>(
60           this, std::move(socket), std::move(engine), thread_pool)) {
61   char addr[EventEngine::ResolvedAddress::MAX_SIZE_BYTES];
62   int addr_len = sizeof(addr);
63   if (getsockname(io_state_->socket->raw_socket(),
64                   reinterpret_cast<sockaddr*>(addr), &addr_len) < 0) {
65     grpc_core::Crash(absl::StrFormat(
66         "Unrecoverable error: Failed to get local socket name. %s",
67         GRPC_WSA_ERROR(WSAGetLastError(), "getsockname").ToString().c_str()));
68   }
69   local_address_ =
70       EventEngine::ResolvedAddress(reinterpret_cast<sockaddr*>(addr), addr_len);
71   local_address_string_ = *ResolvedAddressToURI(local_address_);
72   peer_address_string_ = *ResolvedAddressToURI(peer_address_);
73 }
74 
~WindowsEndpoint()75 WindowsEndpoint::~WindowsEndpoint() {
76   io_state_->socket->Shutdown(DEBUG_LOCATION, "~WindowsEndpoint");
77   GRPC_TRACE_LOG(event_engine_endpoint, INFO) << "~WindowsEndpoint::" << this;
78 }
79 
DoTcpRead(SliceBuffer * buffer)80 void WindowsEndpoint::AsyncIOState::DoTcpRead(SliceBuffer* buffer) {
81   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
82       << "WindowsEndpoint::" << endpoint << " attempting a read";
83   if (socket->IsShutdown()) {
84     socket->read_info()->SetErrorStatus(
85         absl::InternalError("Socket is shutting down."));
86     thread_pool->Run(&handle_read_event);
87     return;
88   }
89   // Prepare the WSABUF struct
90   CHECK(buffer->Count() <= kMaxWSABUFCount);
91   WSABUF wsa_buffers[kMaxWSABUFCount];
92   for (size_t i = 0; i < buffer->Count(); i++) {
93     auto& slice = buffer->MutableSliceAt(i);
94     wsa_buffers[i].buf = (char*)slice.begin();
95     wsa_buffers[i].len = slice.size();
96   }
97   DWORD bytes_read = 0;
98   DWORD flags = 0;
99   // First try a synchronous, non-blocking read.
100   int status =
101       WSARecv(socket->raw_socket(), wsa_buffers, (DWORD)buffer->Count(),
102               &bytes_read, &flags, nullptr, nullptr);
103   int wsa_error = status == 0 ? 0 : WSAGetLastError();
104   if (wsa_error != WSAEWOULDBLOCK) {
105     // Data or some error was returned immediately.
106     socket->read_info()->SetResult(wsa_error, bytes_read, "WSARecv");
107     thread_pool->Run(&handle_read_event);
108     return;
109   }
110   // If the endpoint has already received some data, and the next call would
111   // block, return the data in case that is all the data the reader expects.
112   if (handle_read_event.MaybeFinishIfDataHasAlreadyBeenRead()) {
113     return;
114   }
115   // Otherwise, let's retry, by queuing a read.
116   socket->NotifyOnRead(&handle_read_event);
117   status = WSARecv(socket->raw_socket(), wsa_buffers, (DWORD)buffer->Count(),
118                    nullptr, &flags, socket->read_info()->overlapped(), nullptr);
119   wsa_error = status == 0 ? 0 : WSAGetLastError();
120   if (wsa_error != 0 && wsa_error != WSA_IO_PENDING) {
121     // The async read attempt returned an error immediately.
122     socket->UnregisterReadCallback();
123     socket->read_info()->SetResult(
124         wsa_error, 0, absl::StrFormat("WindowsEndpoint::%p Read failed", this));
125     thread_pool->Run(&handle_read_event);
126   }
127 }
128 
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs *)129 bool WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
130                            SliceBuffer* buffer, const ReadArgs* /* args */) {
131   if (io_state_->socket->IsShutdown()) {
132     io_state_->thread_pool->Run([on_read = std::move(on_read)]() mutable {
133       on_read(absl::InternalError("Socket is shutting down."));
134     });
135     return false;
136   }
137   buffer->Clear();
138   io_state_->handle_read_event.DonateSpareSlices(buffer);
139   // TODO(hork): sometimes args->read_hint_bytes is 1, which is not useful.
140   // Choose an appropriate size.
141   size_t min_read_size = kDefaultTargetReadSize;
142   if (buffer->Length() < min_read_size && buffer->Count() < kMaxWSABUFCount) {
143     buffer->AppendIndexed(Slice(allocator_.MakeSlice(min_read_size)));
144   }
145   io_state_->handle_read_event.Prime(io_state_, buffer, std::move(on_read));
146   io_state_->DoTcpRead(buffer);
147   return false;
148 }
149 
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs *)150 bool WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
151                             SliceBuffer* data, const WriteArgs* /* args */) {
152   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
153       << "WindowsEndpoint::" << this << " writing";
154   if (io_state_->socket->IsShutdown()) {
155     io_state_->thread_pool->Run(
156         [on_writable = std::move(on_writable)]() mutable {
157           on_writable(absl::InternalError("Socket is shutting down."));
158         });
159     return false;
160   }
161   if (GRPC_TRACE_FLAG_ENABLED(event_engine_endpoint_data)) {
162     for (size_t i = 0; i < data->Count(); i++) {
163       auto str = data->RefSlice(i).as_string_view();
164       GRPC_TRACE_LOG(event_engine_endpoint, INFO)
165           << "WindowsEndpoint::" << this
166           << " WRITE (peer=" << peer_address_string_ << "): " << str;
167     }
168   }
169   CHECK(data->Count() <= UINT_MAX);
170   absl::InlinedVector<WSABUF, kMaxWSABUFCount> buffers(data->Count());
171   for (size_t i = 0; i < data->Count(); i++) {
172     auto& slice = data->MutableSliceAt(i);
173     CHECK(slice.size() <= ULONG_MAX);
174     buffers[i].len = slice.size();
175     buffers[i].buf = (char*)slice.begin();
176   }
177   // First, let's try a synchronous, non-blocking write.
178   DWORD bytes_sent;
179   int status = WSASend(io_state_->socket->raw_socket(), buffers.data(),
180                        (DWORD)buffers.size(), &bytes_sent, 0, nullptr, nullptr);
181   size_t async_buffers_offset = 0;
182   if (status == 0) {
183     if (bytes_sent == data->Length()) {
184       // Write completed, exiting early
185       io_state_->thread_pool->Run(
186           [cb = std::move(on_writable)]() mutable { cb(absl::OkStatus()); });
187       return false;
188     }
189     // The data was not completely delivered, we should send the rest of it by
190     // doing an async write operation.
191     for (size_t i = 0; i < data->Count(); i++) {
192       if (buffers[i].len > bytes_sent) {
193         buffers[i].buf += bytes_sent;
194         buffers[i].len -= bytes_sent;
195         break;
196       }
197       bytes_sent -= buffers[i].len;
198       async_buffers_offset++;
199     }
200   } else {
201     // We would kind of expect to get a WSAEWOULDBLOCK here, especially on a
202     // busy connection that has its send queue filled up. But if we don't,
203     // then we can avoid doing an async write operation at all.
204     int wsa_error = WSAGetLastError();
205     if (wsa_error != WSAEWOULDBLOCK) {
206       io_state_->thread_pool->Run(
207           [cb = std::move(on_writable), wsa_error]() mutable {
208             cb(GRPC_WSA_ERROR(wsa_error, "WSASend"));
209           });
210       return false;
211     }
212   }
213   auto write_info = io_state_->socket->write_info();
214   io_state_->handle_write_event.Prime(io_state_, data, std::move(on_writable));
215   io_state_->socket->NotifyOnWrite(&io_state_->handle_write_event);
216   status =
217       WSASend(io_state_->socket->raw_socket(), &buffers[async_buffers_offset],
218               (DWORD)(data->Count() - async_buffers_offset), nullptr, 0,
219               write_info->overlapped(), nullptr);
220   if (status != 0) {
221     int wsa_error = WSAGetLastError();
222     if (wsa_error != WSA_IO_PENDING) {
223       io_state_->socket->UnregisterWriteCallback();
224       io_state_->socket->write_info()->SetResult(wsa_error, 0, "WSASend");
225       io_state_->thread_pool->Run(&io_state_->handle_write_event);
226     }
227   }
228   return false;
229 }
GetPeerAddress() const230 const EventEngine::ResolvedAddress& WindowsEndpoint::GetPeerAddress() const {
231   return peer_address_;
232 }
GetLocalAddress() const233 const EventEngine::ResolvedAddress& WindowsEndpoint::GetLocalAddress() const {
234   return local_address_;
235 }
236 
237 // ---- Handle{Read|Write}Closure ----
238 namespace {
AbortOnEvent(absl::Status)239 void AbortOnEvent(absl::Status) {
240   grpc_core::Crash(
241       "INTERNAL ERROR: Asked to handle read/write event with an invalid "
242       "callback");
243 }
244 }  // namespace
245 
246 absl::AnyInvocable<void(absl::Status)>
ResetAndReturnCallback()247 WindowsEndpoint::HandleReadClosure::ResetAndReturnCallback() {
248   auto cb = std::move(cb_);
249   cb_ = &AbortOnEvent;
250   buffer_ = nullptr;
251   io_state_.reset();
252   return cb;
253 }
254 
255 absl::AnyInvocable<void(absl::Status)>
ResetAndReturnCallback()256 WindowsEndpoint::HandleWriteClosure::ResetAndReturnCallback() {
257   auto cb = std::move(cb_);
258   cb_ = &AbortOnEvent;
259   buffer_ = nullptr;
260   io_state_.reset();
261   return cb;
262 }
263 
Prime(std::shared_ptr<AsyncIOState> io_state,SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> cb)264 void WindowsEndpoint::HandleReadClosure::Prime(
265     std::shared_ptr<AsyncIOState> io_state, SliceBuffer* buffer,
266     absl::AnyInvocable<void(absl::Status)> cb) {
267   io_state_ = std::move(io_state);
268   cb_ = std::move(cb);
269   buffer_ = buffer;
270 }
271 
Prime(std::shared_ptr<AsyncIOState> io_state,SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> cb)272 void WindowsEndpoint::HandleWriteClosure::Prime(
273     std::shared_ptr<AsyncIOState> io_state, SliceBuffer* buffer,
274     absl::AnyInvocable<void(absl::Status)> cb) {
275   io_state_ = std::move(io_state);
276   cb_ = std::move(cb);
277   buffer_ = buffer;
278 }
279 
Run()280 void WindowsEndpoint::HandleReadClosure::Run() {
281   // Deletes the shared_ptr when this closure returns
282   // Note that the endpoint may have already been destroyed.
283   auto io_state = std::move(io_state_);
284   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
285       << "WindowsEndpoint::" << io_state->endpoint << " Handling Read Event";
286   const auto result = io_state->socket->read_info()->result();
287   if (!result.error_status.ok()) {
288     buffer_->Clear();
289     return ResetAndReturnCallback()(result.error_status);
290   }
291   absl::Status status;
292   if (result.wsa_error != 0) {
293     status = GRPC_WSA_ERROR(result.wsa_error, "Async Read Error");
294     buffer_->Clear();
295     return ResetAndReturnCallback()(status);
296   }
297   if (result.bytes_transferred == 0) {
298     DCHECK_GT(io_state.use_count(), 0);
299     // Either the endpoint is shut down or we've seen the end of the stream
300     if (GRPC_TRACE_FLAG_ENABLED(event_engine_endpoint_data)) {
301       LOG(INFO) << "WindowsEndpoint::" << this << " read 0 bytes.";
302       DumpSliceBuffer(
303           &last_read_buffer_,
304           absl::StrFormat("WindowsEndpoint::%p READ last_read_buffer_: ",
305                           io_state->endpoint));
306     }
307     buffer_->Swap(last_read_buffer_);
308     if (buffer_->Length() == 0) {
309       // Only send an error if there is no more data to consume. If the endpoint
310       // or socket is shut down, the next read will discover that.
311       status = absl::InternalError("End of TCP stream");
312       grpc_core::StatusSetInt(&status, grpc_core::StatusIntProperty::kRpcStatus,
313                               GRPC_STATUS_UNAVAILABLE);
314     }
315     return ResetAndReturnCallback()(status);
316   }
317   DCHECK_GT(result.bytes_transferred, 0);
318   DCHECK(result.bytes_transferred <= buffer_->Length());
319   buffer_->MoveFirstNBytesIntoSliceBuffer(result.bytes_transferred,
320                                           last_read_buffer_);
321   if (buffer_->Length() == 0) {
322     buffer_->Swap(last_read_buffer_);
323     return ResetAndReturnCallback()(status);
324   }
325   // Doing another read. Let's keep the AsyncIOState alive a bit longer.
326   io_state_ = std::move(io_state);
327   io_state_->DoTcpRead(buffer_);
328 }
329 
MaybeFinishIfDataHasAlreadyBeenRead()330 bool WindowsEndpoint::HandleReadClosure::MaybeFinishIfDataHasAlreadyBeenRead() {
331   if (last_read_buffer_.Length() > 0) {
332     GRPC_TRACE_LOG(event_engine_endpoint, INFO)
333         << "WindowsEndpoint::" << io_state_->endpoint
334         << " finishing a synchronous read";
335     buffer_->Swap(last_read_buffer_);
336     if (GRPC_TRACE_FLAG_ENABLED(event_engine_endpoint_data)) {
337       DumpSliceBuffer(buffer_, "finishing synchronous read");
338     }
339     io_state_->thread_pool->Run(
340         [cb = ResetAndReturnCallback()]() mutable { cb(absl::OkStatus()); });
341     return true;
342   }
343   return false;
344 }
345 
DonateSpareSlices(SliceBuffer * buffer)346 void WindowsEndpoint::HandleReadClosure::DonateSpareSlices(
347     SliceBuffer* buffer) {
348   // Donee buffer must be empty.
349   CHECK_EQ(buffer->Length(), 0);
350   // HandleReadClosure must be in the reset state.
351   CHECK_EQ(buffer_, nullptr);
352   buffer->Swap(last_read_buffer_);
353 }
354 
Run()355 void WindowsEndpoint::HandleWriteClosure::Run() {
356   // Deletes the shared_ptr when this closure returns
357   auto io_state = std::move(io_state_);
358   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
359       << "WindowsEndpoint::" << io_state->endpoint << " Handling Write Event";
360   const auto result = io_state->socket->write_info()->result();
361   if (!result.error_status.ok()) {
362     buffer_->Clear();
363     return ResetAndReturnCallback()(result.error_status);
364   }
365   absl::Status status;
366   if (result.wsa_error != 0) {
367     status = GRPC_WSA_ERROR(result.wsa_error, "WSASend");
368   } else {
369     CHECK(result.bytes_transferred == buffer_->Length());
370   }
371   return ResetAndReturnCallback()(status);
372 }
373 
374 // ---- AsyncIOState ----
375 
AsyncIOState(WindowsEndpoint * endpoint,std::unique_ptr<WinSocket> socket,std::shared_ptr<EventEngine> engine,ThreadPool * thread_pool)376 WindowsEndpoint::AsyncIOState::AsyncIOState(WindowsEndpoint* endpoint,
377                                             std::unique_ptr<WinSocket> socket,
378                                             std::shared_ptr<EventEngine> engine,
379                                             ThreadPool* thread_pool)
380     : endpoint(endpoint),
381       socket(std::move(socket)),
382       engine(std::move(engine)),
383       thread_pool(thread_pool) {}
384 
~AsyncIOState()385 WindowsEndpoint::AsyncIOState::~AsyncIOState() {
386   socket->Shutdown(DEBUG_LOCATION, "~AsyncIOState");
387 }
388 
389 }  // namespace experimental
390 }  // namespace grpc_event_engine
391 
392 #endif  // GPR_WINDOWS
393