• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #ifdef GPR_WINDOWS
17 
18 #include <grpc/support/alloc.h>
19 #include <grpc/support/log_windows.h>
20 
21 #include <chrono>
22 
23 #include "absl/log/check.h"
24 #include "absl/strings/str_format.h"
25 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
26 #include "src/core/lib/event_engine/time_util.h"
27 #include "src/core/lib/event_engine/windows/iocp.h"
28 #include "src/core/lib/event_engine/windows/win_socket.h"
29 #include "src/core/lib/iomgr/error.h"
30 #include "src/core/util/crash.h"
31 
32 namespace grpc_event_engine {
33 namespace experimental {
34 
IOCP(ThreadPool * thread_pool)35 IOCP::IOCP(ThreadPool* thread_pool) noexcept
36     : thread_pool_(thread_pool),
37       iocp_handle_(CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr,
38                                           (ULONG_PTR) nullptr, 0)) {
39   CHECK(iocp_handle_);
40   WSASocketFlagsInit();
41 }
42 
43 // Shutdown must be called prior to deletion
~IOCP()44 IOCP::~IOCP() {}
45 
Watch(SOCKET socket)46 std::unique_ptr<WinSocket> IOCP::Watch(SOCKET socket) {
47   auto wrapped_socket = std::make_unique<WinSocket>(socket, thread_pool_);
48   HANDLE ret = CreateIoCompletionPort(
49       reinterpret_cast<HANDLE>(socket), iocp_handle_,
50       reinterpret_cast<uintptr_t>(wrapped_socket.get()), 0);
51   if (!ret) {
52     grpc_core::Crash(
53         GRPC_WSA_ERROR(WSAGetLastError(), "Unable to add socket to iocp")
54             .ToString());
55   }
56   CHECK(ret == iocp_handle_);
57   return wrapped_socket;
58 }
59 
Shutdown()60 void IOCP::Shutdown() {
61   GRPC_TRACE_LOG(event_engine_poller, INFO)
62       << "IOCP::" << this
63       << " shutting down. Outstanding kicks: " << outstanding_kicks_.load();
64   while (outstanding_kicks_.load() > 0) {
65     Work(std::chrono::hours(42), []() {});
66   }
67   CHECK(CloseHandle(iocp_handle_));
68 }
69 
Work(EventEngine::Duration timeout,absl::FunctionRef<void ()> schedule_poll_again)70 Poller::WorkResult IOCP::Work(EventEngine::Duration timeout,
71                               absl::FunctionRef<void()> schedule_poll_again) {
72   DWORD bytes = 0;
73   ULONG_PTR completion_key;
74   LPOVERLAPPED overlapped;
75   GRPC_TRACE_LOG(event_engine_poller, INFO)
76       << "IOCP::" << this << " doing work";
77   BOOL success = GetQueuedCompletionStatus(
78       iocp_handle_, &bytes, &completion_key, &overlapped,
79       static_cast<DWORD>(Milliseconds(timeout)));
80   if (success == 0 && overlapped == nullptr) {
81     GRPC_TRACE_LOG(event_engine_poller, INFO)
82         << "IOCP::" << this << " deadline exceeded";
83     return Poller::WorkResult::kDeadlineExceeded;
84   }
85   CHECK(completion_key);
86   CHECK(overlapped);
87   if (overlapped == &kick_overlap_) {
88     GRPC_TRACE_LOG(event_engine_poller, INFO) << "IOCP::" << this << " kicked";
89     outstanding_kicks_.fetch_sub(1);
90     if (completion_key == (ULONG_PTR)&kick_token_) {
91       return Poller::WorkResult::kKicked;
92     }
93     grpc_core::Crash(
94         absl::StrFormat("Unknown custom completion key: %lu", completion_key));
95   }
96   GRPC_TRACE_LOG(event_engine_poller, INFO)
97       << "IOCP::" << this << " got event on OVERLAPPED::" << overlapped;
98   // Safety note: socket is guaranteed to exist when managed by a
99   // WindowsEndpoint. If an overlapped event came in, then either a read event
100   // handler is registered, which keeps the socket alive, or the WindowsEndpoint
101   // (which keeps the socket alive) has done an asynchronous WSARecv and is
102   // about to register for notification of an overlapped event.
103   auto* socket = reinterpret_cast<WinSocket*>(completion_key);
104   WinSocket::OpState* info = socket->GetOpInfoForOverlapped(overlapped);
105   CHECK_NE(info, nullptr);
106   info->GetOverlappedResult();
107   info->SetReady();
108   schedule_poll_again();
109   return Poller::WorkResult::kOk;
110 }
111 
Kick()112 void IOCP::Kick() {
113   outstanding_kicks_.fetch_add(1);
114   CHECK(PostQueuedCompletionStatus(iocp_handle_, 0,
115                                    reinterpret_cast<ULONG_PTR>(&kick_token_),
116                                    &kick_overlap_));
117 }
118 
GetDefaultSocketFlags()119 DWORD IOCP::GetDefaultSocketFlags() {
120   static DWORD wsa_socket_flags = WSASocketFlagsInit();
121   return wsa_socket_flags;
122 }
123 
WSASocketFlagsInit()124 DWORD IOCP::WSASocketFlagsInit() {
125   DWORD wsa_socket_flags = WSA_FLAG_OVERLAPPED;
126   // WSA_FLAG_NO_HANDLE_INHERIT may be not supported on the older Windows
127   // versions, see
128   // https://msdn.microsoft.com/en-us/library/windows/desktop/ms742212(v=vs.85).aspx
129   // for details.
130   SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0,
131                           wsa_socket_flags | WSA_FLAG_NO_HANDLE_INHERIT);
132   if (sock != INVALID_SOCKET) {
133     // Windows 7, Windows 2008 R2 with SP1 or later
134     wsa_socket_flags |= WSA_FLAG_NO_HANDLE_INHERIT;
135     closesocket(sock);
136   }
137   return wsa_socket_flags;
138 }
139 
140 }  // namespace experimental
141 }  // namespace grpc_event_engine
142 
143 #endif  // GPR_WINDOWS
144