1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #ifdef GPR_WINDOWS
17
18 #include <grpc/support/alloc.h>
19 #include <grpc/support/log_windows.h>
20
21 #include <chrono>
22
23 #include "absl/log/check.h"
24 #include "absl/strings/str_format.h"
25 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
26 #include "src/core/lib/event_engine/time_util.h"
27 #include "src/core/lib/event_engine/windows/iocp.h"
28 #include "src/core/lib/event_engine/windows/win_socket.h"
29 #include "src/core/lib/iomgr/error.h"
30 #include "src/core/util/crash.h"
31
32 namespace grpc_event_engine {
33 namespace experimental {
34
IOCP(ThreadPool * thread_pool)35 IOCP::IOCP(ThreadPool* thread_pool) noexcept
36 : thread_pool_(thread_pool),
37 iocp_handle_(CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr,
38 (ULONG_PTR) nullptr, 0)) {
39 CHECK(iocp_handle_);
40 WSASocketFlagsInit();
41 }
42
43 // Shutdown must be called prior to deletion
~IOCP()44 IOCP::~IOCP() {}
45
Watch(SOCKET socket)46 std::unique_ptr<WinSocket> IOCP::Watch(SOCKET socket) {
47 auto wrapped_socket = std::make_unique<WinSocket>(socket, thread_pool_);
48 HANDLE ret = CreateIoCompletionPort(
49 reinterpret_cast<HANDLE>(socket), iocp_handle_,
50 reinterpret_cast<uintptr_t>(wrapped_socket.get()), 0);
51 if (!ret) {
52 grpc_core::Crash(
53 GRPC_WSA_ERROR(WSAGetLastError(), "Unable to add socket to iocp")
54 .ToString());
55 }
56 CHECK(ret == iocp_handle_);
57 return wrapped_socket;
58 }
59
Shutdown()60 void IOCP::Shutdown() {
61 GRPC_TRACE_LOG(event_engine_poller, INFO)
62 << "IOCP::" << this
63 << " shutting down. Outstanding kicks: " << outstanding_kicks_.load();
64 while (outstanding_kicks_.load() > 0) {
65 Work(std::chrono::hours(42), []() {});
66 }
67 CHECK(CloseHandle(iocp_handle_));
68 }
69
Work(EventEngine::Duration timeout,absl::FunctionRef<void ()> schedule_poll_again)70 Poller::WorkResult IOCP::Work(EventEngine::Duration timeout,
71 absl::FunctionRef<void()> schedule_poll_again) {
72 DWORD bytes = 0;
73 ULONG_PTR completion_key;
74 LPOVERLAPPED overlapped;
75 GRPC_TRACE_LOG(event_engine_poller, INFO)
76 << "IOCP::" << this << " doing work";
77 BOOL success = GetQueuedCompletionStatus(
78 iocp_handle_, &bytes, &completion_key, &overlapped,
79 static_cast<DWORD>(Milliseconds(timeout)));
80 if (success == 0 && overlapped == nullptr) {
81 GRPC_TRACE_LOG(event_engine_poller, INFO)
82 << "IOCP::" << this << " deadline exceeded";
83 return Poller::WorkResult::kDeadlineExceeded;
84 }
85 CHECK(completion_key);
86 CHECK(overlapped);
87 if (overlapped == &kick_overlap_) {
88 GRPC_TRACE_LOG(event_engine_poller, INFO) << "IOCP::" << this << " kicked";
89 outstanding_kicks_.fetch_sub(1);
90 if (completion_key == (ULONG_PTR)&kick_token_) {
91 return Poller::WorkResult::kKicked;
92 }
93 grpc_core::Crash(
94 absl::StrFormat("Unknown custom completion key: %lu", completion_key));
95 }
96 GRPC_TRACE_LOG(event_engine_poller, INFO)
97 << "IOCP::" << this << " got event on OVERLAPPED::" << overlapped;
98 // Safety note: socket is guaranteed to exist when managed by a
99 // WindowsEndpoint. If an overlapped event came in, then either a read event
100 // handler is registered, which keeps the socket alive, or the WindowsEndpoint
101 // (which keeps the socket alive) has done an asynchronous WSARecv and is
102 // about to register for notification of an overlapped event.
103 auto* socket = reinterpret_cast<WinSocket*>(completion_key);
104 WinSocket::OpState* info = socket->GetOpInfoForOverlapped(overlapped);
105 CHECK_NE(info, nullptr);
106 info->GetOverlappedResult();
107 info->SetReady();
108 schedule_poll_again();
109 return Poller::WorkResult::kOk;
110 }
111
Kick()112 void IOCP::Kick() {
113 outstanding_kicks_.fetch_add(1);
114 CHECK(PostQueuedCompletionStatus(iocp_handle_, 0,
115 reinterpret_cast<ULONG_PTR>(&kick_token_),
116 &kick_overlap_));
117 }
118
GetDefaultSocketFlags()119 DWORD IOCP::GetDefaultSocketFlags() {
120 static DWORD wsa_socket_flags = WSASocketFlagsInit();
121 return wsa_socket_flags;
122 }
123
WSASocketFlagsInit()124 DWORD IOCP::WSASocketFlagsInit() {
125 DWORD wsa_socket_flags = WSA_FLAG_OVERLAPPED;
126 // WSA_FLAG_NO_HANDLE_INHERIT may be not supported on the older Windows
127 // versions, see
128 // https://msdn.microsoft.com/en-us/library/windows/desktop/ms742212(v=vs.85).aspx
129 // for details.
130 SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0,
131 wsa_socket_flags | WSA_FLAG_NO_HANDLE_INHERIT);
132 if (sock != INVALID_SOCKET) {
133 // Windows 7, Windows 2008 R2 with SP1 or later
134 wsa_socket_flags |= WSA_FLAG_NO_HANDLE_INHERIT;
135 closesocket(sock);
136 }
137 return wsa_socket_flags;
138 }
139
140 } // namespace experimental
141 } // namespace grpc_event_engine
142
143 #endif // GPR_WINDOWS
144