1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #ifdef GPR_WINDOWS
17 #include <gmock/gmock.h>
18 #include <grpc/grpc.h>
19 #include <grpc/support/log_windows.h>
20 #include <gtest/gtest.h>
21
22 #include <thread>
23
24 #include "absl/log/log.h"
25 #include "absl/status/status.h"
26 #include "absl/time/time.h"
27 #include "absl/types/variant.h"
28 #include "src/core/lib/event_engine/common_closures.h"
29 #include "src/core/lib/event_engine/poller.h"
30 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
31 #include "src/core/lib/event_engine/windows/iocp.h"
32 #include "src/core/lib/event_engine/windows/win_socket.h"
33 #include "src/core/lib/iomgr/error.h"
34 #include "src/core/util/notification.h"
35 #include "test/core/event_engine/windows/create_sockpair.h"
36
37 namespace {
38 using ::grpc_event_engine::experimental::AnyInvocableClosure;
39 using ::grpc_event_engine::experimental::CreateSockpair;
40 using ::grpc_event_engine::experimental::EventEngine;
41 using ::grpc_event_engine::experimental::IOCP;
42 using ::grpc_event_engine::experimental::Poller;
43 using ::grpc_event_engine::experimental::SelfDeletingClosure;
44 using ::grpc_event_engine::experimental::ThreadPool;
45 using ::grpc_event_engine::experimental::WinSocket;
46
47 // TODO(hork): replace with logging mechanism that plays nicely with:
48 // `ASSERT_OK(...) << GetErrorMessage(error, context);`
LogErrorMessage(int messageid,absl::string_view context)49 void LogErrorMessage(int messageid, absl::string_view context) {
50 char* utf8_message = gpr_format_message(messageid);
51 LOG(ERROR) << "Error in " << context << ": " << utf8_message;
52 gpr_free(utf8_message);
53 }
54 } // namespace
55
56 class IOCPTest : public testing::Test {};
57
TEST_F(IOCPTest,ClientReceivesNotificationOfServerSend)58 TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
59 auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
60 IOCP iocp(thread_pool.get());
61 SOCKET sockpair[2];
62 CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
63 auto wrapped_client_socket = iocp.Watch(sockpair[0]);
64 auto wrapped_server_socket = iocp.Watch(sockpair[1]);
65 grpc_core::Notification read_called;
66 grpc_core::Notification write_called;
67 DWORD flags = 0;
68 AnyInvocableClosure* on_read;
69 AnyInvocableClosure* on_write;
70 WSABUF read_wsabuf;
71 DWORD bytes_rcvd;
72 read_wsabuf.len = 2048;
73 char read_char_buffer[2048];
74 read_wsabuf.buf = read_char_buffer;
75 {
76 // When the client gets some data, ensure it matches what we expect.
77 on_read = new AnyInvocableClosure([win_socket = wrapped_client_socket.get(),
78 &read_called, &read_wsabuf]() {
79 VLOG(2) << "Notified on read";
80 EXPECT_TRUE(win_socket->read_info()->result().error_status.ok())
81 << "Error on read: "
82 << win_socket->read_info()->result().error_status;
83 EXPECT_GE(win_socket->read_info()->result().bytes_transferred, 10u);
84 EXPECT_STREQ(read_wsabuf.buf, "hello!");
85 read_called.Notify();
86 });
87 wrapped_client_socket->NotifyOnRead(on_read);
88 int status = WSARecv(
89 wrapped_client_socket->raw_socket(), &read_wsabuf, 1, &bytes_rcvd,
90 &flags, wrapped_client_socket->read_info()->overlapped(), NULL);
91 // Expecting error 997, WSA_IO_PENDING
92 EXPECT_EQ(status, -1);
93 int last_error = WSAGetLastError();
94 EXPECT_EQ(last_error, WSA_IO_PENDING);
95 if (last_error != WSA_IO_PENDING) {
96 LogErrorMessage(last_error, "WSARecv");
97 }
98 }
99 {
100 on_write = new AnyInvocableClosure([&write_called] {
101 VLOG(2) << "Notified on write";
102 write_called.Notify();
103 });
104 wrapped_server_socket->NotifyOnWrite(on_write);
105 // Have the server send a message to the client
106 WSABUF write_wsabuf;
107 char write_char_buffer[2048] = "hello!";
108 write_wsabuf.len = 2048;
109 write_wsabuf.buf = write_char_buffer;
110 DWORD bytes_sent;
111 int status = WSASend(
112 wrapped_server_socket->raw_socket(), &write_wsabuf, 1, &bytes_sent, 0,
113 wrapped_server_socket->write_info()->overlapped(), NULL);
114 EXPECT_EQ(status, 0);
115 if (status != 0) {
116 LogErrorMessage(WSAGetLastError(), "WSASend");
117 }
118 }
119 // Doing work for WSASend
120 bool cb_invoked = false;
121 auto work_result = iocp.Work(std::chrono::seconds(10),
122 [&cb_invoked]() { cb_invoked = true; });
123 ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
124 ASSERT_TRUE(cb_invoked);
125 // Doing work for WSARecv
126 cb_invoked = false;
127 work_result = iocp.Work(std::chrono::seconds(10),
128 [&cb_invoked]() { cb_invoked = true; });
129 ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
130 ASSERT_TRUE(cb_invoked);
131 // wait for the callbacks to run
132 read_called.WaitForNotification();
133 write_called.WaitForNotification();
134
135 delete on_read;
136 delete on_write;
137 wrapped_client_socket->Shutdown();
138 wrapped_server_socket->Shutdown();
139 iocp.Shutdown();
140 thread_pool->Quiesce();
141 }
142
TEST_F(IOCPTest,IocpWorkTimeoutDueToNoNotificationRegistered)143 TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
144 auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
145 IOCP iocp(thread_pool.get());
146 SOCKET sockpair[2];
147 CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
148 auto wrapped_client_socket = iocp.Watch(sockpair[0]);
149 grpc_core::Notification read_called;
150 DWORD flags = 0;
151 WSABUF read_wsabuf;
152 DWORD bytes_rcvd;
153 read_wsabuf.len = 2048;
154 char read_char_buffer[2048];
155 read_wsabuf.buf = read_char_buffer;
156 {
157 // Set the client to receive asynchronously
158 // Prepare a notification callback, but don't register it yet.
159 wrapped_client_socket->NotifyOnRead(
160 SelfDeletingClosure::Create([win_socket = wrapped_client_socket.get(),
161 &read_called, &read_wsabuf]() {
162 VLOG(2) << "Notified on read";
163 EXPECT_TRUE(win_socket->read_info()->result().error_status.ok())
164 << "Error on read: "
165 << win_socket->read_info()->result().error_status;
166 EXPECT_GE(win_socket->read_info()->result().bytes_transferred, 10u);
167 EXPECT_STREQ(read_wsabuf.buf, "hello!");
168 read_called.Notify();
169 }));
170 int status = WSARecv(
171 wrapped_client_socket->raw_socket(), &read_wsabuf, 1, &bytes_rcvd,
172 &flags, wrapped_client_socket->read_info()->overlapped(), NULL);
173 // Expecting error 997, WSA_IO_PENDING
174 EXPECT_EQ(status, -1);
175 int last_error = WSAGetLastError();
176 EXPECT_EQ(last_error, WSA_IO_PENDING);
177 if (last_error != WSA_IO_PENDING) {
178 LogErrorMessage(last_error, "WSARecv");
179 }
180 }
181 {
182 // Have the server send a message to the client. No need to track via IOCP
183 WSABUF write_wsabuf;
184 char write_char_buffer[2048] = "hello!";
185 write_wsabuf.len = 2048;
186 write_wsabuf.buf = write_char_buffer;
187 DWORD bytes_sent;
188 OVERLAPPED write_overlapped;
189 memset(&write_overlapped, 0, sizeof(OVERLAPPED));
190 int status = WSASend(sockpair[1], &write_wsabuf, 1, &bytes_sent, 0,
191 &write_overlapped, NULL);
192 EXPECT_EQ(status, 0);
193 if (status != 0) {
194 LogErrorMessage(WSAGetLastError(), "WSASend");
195 }
196 }
197 // IOCP::Work without any notification callbacks should still return Ok.
198 bool cb_invoked = false;
199 auto work_result = iocp.Work(std::chrono::seconds(2),
200 [&cb_invoked]() { cb_invoked = true; });
201 ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
202 ASSERT_TRUE(cb_invoked);
203 // wait for the callbacks to run
204 read_called.WaitForNotification();
205 wrapped_client_socket->Shutdown();
206 iocp.Shutdown();
207 thread_pool->Quiesce();
208 }
209
TEST_F(IOCPTest,KickWorks)210 TEST_F(IOCPTest, KickWorks) {
211 auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
212 IOCP iocp(thread_pool.get());
213 grpc_core::Notification kicked;
214 thread_pool->Run([&iocp, &kicked] {
215 bool cb_invoked = false;
216 Poller::WorkResult result = iocp.Work(
217 std::chrono::seconds(30), [&cb_invoked]() { cb_invoked = true; });
218 ASSERT_TRUE(result == Poller::WorkResult::kKicked);
219 ASSERT_FALSE(cb_invoked);
220 kicked.Notify();
221 });
222 thread_pool->Run([&iocp] {
223 // give the worker thread a chance to start
224 absl::SleepFor(absl::Milliseconds(42));
225 iocp.Kick();
226 });
227 // wait for the callbacks to run
228 kicked.WaitForNotification();
229 thread_pool->Quiesce();
230 }
231
TEST_F(IOCPTest,KickThenShutdownCasusesNextWorkerToBeKicked)232 TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) {
233 // TODO(hork): evaluate if a kick count is going to be useful.
234 // This documents the existing poller's behavior of maintaining a kick count,
235 // but it's unclear if it's going to be needed.
236 auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
237 IOCP iocp(thread_pool.get());
238 // kick twice
239 iocp.Kick();
240 iocp.Kick();
241 bool cb_invoked = false;
242 // Assert the next two WorkResults are kicks
243 auto result = iocp.Work(std::chrono::milliseconds(1),
244 [&cb_invoked]() { cb_invoked = true; });
245 ASSERT_TRUE(result == Poller::WorkResult::kKicked);
246 ASSERT_FALSE(cb_invoked);
247 result = iocp.Work(std::chrono::milliseconds(1),
248 [&cb_invoked]() { cb_invoked = true; });
249 ASSERT_TRUE(result == Poller::WorkResult::kKicked);
250 ASSERT_FALSE(cb_invoked);
251 // followed by a DeadlineExceeded
252 result = iocp.Work(std::chrono::milliseconds(1),
253 [&cb_invoked]() { cb_invoked = true; });
254 ASSERT_TRUE(result == Poller::WorkResult::kDeadlineExceeded);
255 ASSERT_FALSE(cb_invoked);
256 thread_pool->Quiesce();
257 }
258
TEST_F(IOCPTest,StressTestThousandsOfSockets)259 TEST_F(IOCPTest, StressTestThousandsOfSockets) {
260 // Start 10 threads, each with their own IOCP
261 // On each thread, create 50 socket pairs (100 sockets) and have them exchange
262 // a message before shutting down.
263 int thread_count = 10;
264 int sockets_per_thread = 50;
265 std::atomic<int> read_count{0};
266 std::atomic<int> write_count{0};
267 std::vector<std::thread> threads;
268 threads.reserve(thread_count);
269 for (int thread_n = 0; thread_n < thread_count; thread_n++) {
270 threads.emplace_back([sockets_per_thread, &read_count, &write_count] {
271 auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
272 IOCP iocp(thread_pool.get());
273 // Start a looping worker thread with a moderate timeout
274 std::thread iocp_worker([&iocp] {
275 Poller::WorkResult result;
276 do {
277 result = iocp.Work(std::chrono::seconds(1), []() {});
278 } while (result != Poller::WorkResult::kDeadlineExceeded);
279 });
280 for (int i = 0; i < sockets_per_thread; i++) {
281 SOCKET sockpair[2];
282 CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
283 auto wrapped_client_socket = iocp.Watch(sockpair[0]);
284 auto wrapped_server_socket = iocp.Watch(sockpair[1]);
285 auto* pclient = wrapped_client_socket.get();
286 pclient->NotifyOnRead(SelfDeletingClosure::Create(
287 [&read_count,
288 win_socket = std::move(wrapped_client_socket)]() mutable {
289 read_count.fetch_add(1);
290 win_socket->Shutdown();
291 }));
292 auto* pserver = wrapped_server_socket.get();
293 pserver->NotifyOnWrite(SelfDeletingClosure::Create(
294 [&write_count,
295 win_socket = std::move(wrapped_server_socket)]() mutable {
296 write_count.fetch_add(1);
297 win_socket->Shutdown();
298 }));
299 {
300 // Set the client to receive
301 WSABUF read_wsabuf;
302 read_wsabuf.len = 20;
303 char read_char_buffer[20];
304 read_wsabuf.buf = read_char_buffer;
305 DWORD bytes_rcvd;
306 DWORD flags = 0;
307 int status =
308 WSARecv(pclient->raw_socket(), &read_wsabuf, 1, &bytes_rcvd,
309 &flags, pclient->read_info()->overlapped(), NULL);
310 // Expecting error 997, WSA_IO_PENDING
311 EXPECT_EQ(status, -1);
312 int last_error = WSAGetLastError();
313 EXPECT_EQ(last_error, WSA_IO_PENDING);
314 if (last_error != WSA_IO_PENDING) {
315 LogErrorMessage(last_error, "WSARecv");
316 }
317 }
318 {
319 // Have the server send a message to the client.
320 WSABUF write_wsabuf;
321 char write_char_buffer[20] = "hello!";
322 write_wsabuf.len = 20;
323 write_wsabuf.buf = write_char_buffer;
324 DWORD bytes_sent;
325 int status =
326 WSASend(pserver->raw_socket(), &write_wsabuf, 1, &bytes_sent, 0,
327 pserver->write_info()->overlapped(), NULL);
328 if (status != 0) {
329 int wsa_error = WSAGetLastError();
330 if (wsa_error != WSA_IO_PENDING) {
331 LogErrorMessage(wsa_error, "WSASend");
332 FAIL() << "Error in WSASend. See logs";
333 }
334 }
335 }
336 }
337 iocp_worker.join();
338 thread_pool->Quiesce();
339 });
340 }
341 for (auto& t : threads) {
342 t.join();
343 }
344 absl::Time deadline = absl::Now() + absl::Seconds(30);
345 while (read_count.load() != thread_count * sockets_per_thread ||
346 write_count.load() != thread_count * sockets_per_thread) {
347 absl::SleepFor(absl::Milliseconds(50));
348 if (deadline < absl::Now()) {
349 FAIL() << "Deadline exceeded with " << read_count.load() << " reads and "
350 << write_count.load() << " writes";
351 }
352 }
353 ASSERT_EQ(read_count.load(), thread_count * sockets_per_thread);
354 ASSERT_EQ(write_count.load(), thread_count * sockets_per_thread);
355 }
356
main(int argc,char ** argv)357 int main(int argc, char** argv) {
358 ::testing::InitGoogleTest(&argc, argv);
359 grpc_init();
360 int status = RUN_ALL_TESTS();
361 grpc_shutdown();
362 return status;
363 }
364 #else // not GPR_WINDOWS
main(int,char **)365 int main(int /* argc */, char** /* argv */) { return 0; }
366 #endif
367