• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #ifdef GPR_WINDOWS
17 #include <gmock/gmock.h>
18 #include <grpc/grpc.h>
19 #include <grpc/support/log_windows.h>
20 #include <gtest/gtest.h>
21 
22 #include <thread>
23 
24 #include "absl/log/log.h"
25 #include "absl/status/status.h"
26 #include "absl/time/time.h"
27 #include "absl/types/variant.h"
28 #include "src/core/lib/event_engine/common_closures.h"
29 #include "src/core/lib/event_engine/poller.h"
30 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
31 #include "src/core/lib/event_engine/windows/iocp.h"
32 #include "src/core/lib/event_engine/windows/win_socket.h"
33 #include "src/core/lib/iomgr/error.h"
34 #include "src/core/util/notification.h"
35 #include "test/core/event_engine/windows/create_sockpair.h"
36 
37 namespace {
38 using ::grpc_event_engine::experimental::AnyInvocableClosure;
39 using ::grpc_event_engine::experimental::CreateSockpair;
40 using ::grpc_event_engine::experimental::EventEngine;
41 using ::grpc_event_engine::experimental::IOCP;
42 using ::grpc_event_engine::experimental::Poller;
43 using ::grpc_event_engine::experimental::SelfDeletingClosure;
44 using ::grpc_event_engine::experimental::ThreadPool;
45 using ::grpc_event_engine::experimental::WinSocket;
46 
47 // TODO(hork): replace with logging mechanism that plays nicely with:
48 //   `ASSERT_OK(...) << GetErrorMessage(error, context);`
LogErrorMessage(int messageid,absl::string_view context)49 void LogErrorMessage(int messageid, absl::string_view context) {
50   char* utf8_message = gpr_format_message(messageid);
51   LOG(ERROR) << "Error in " << context << ": " << utf8_message;
52   gpr_free(utf8_message);
53 }
54 }  // namespace
55 
56 class IOCPTest : public testing::Test {};
57 
TEST_F(IOCPTest,ClientReceivesNotificationOfServerSend)58 TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
59   auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
60   IOCP iocp(thread_pool.get());
61   SOCKET sockpair[2];
62   CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
63   auto wrapped_client_socket = iocp.Watch(sockpair[0]);
64   auto wrapped_server_socket = iocp.Watch(sockpair[1]);
65   grpc_core::Notification read_called;
66   grpc_core::Notification write_called;
67   DWORD flags = 0;
68   AnyInvocableClosure* on_read;
69   AnyInvocableClosure* on_write;
70   WSABUF read_wsabuf;
71   DWORD bytes_rcvd;
72   read_wsabuf.len = 2048;
73   char read_char_buffer[2048];
74   read_wsabuf.buf = read_char_buffer;
75   {
76     // When the client gets some data, ensure it matches what we expect.
77     on_read = new AnyInvocableClosure([win_socket = wrapped_client_socket.get(),
78                                        &read_called, &read_wsabuf]() {
79       VLOG(2) << "Notified on read";
80       EXPECT_TRUE(win_socket->read_info()->result().error_status.ok())
81           << "Error on read: "
82           << win_socket->read_info()->result().error_status;
83       EXPECT_GE(win_socket->read_info()->result().bytes_transferred, 10u);
84       EXPECT_STREQ(read_wsabuf.buf, "hello!");
85       read_called.Notify();
86     });
87     wrapped_client_socket->NotifyOnRead(on_read);
88     int status = WSARecv(
89         wrapped_client_socket->raw_socket(), &read_wsabuf, 1, &bytes_rcvd,
90         &flags, wrapped_client_socket->read_info()->overlapped(), NULL);
91     // Expecting error 997, WSA_IO_PENDING
92     EXPECT_EQ(status, -1);
93     int last_error = WSAGetLastError();
94     EXPECT_EQ(last_error, WSA_IO_PENDING);
95     if (last_error != WSA_IO_PENDING) {
96       LogErrorMessage(last_error, "WSARecv");
97     }
98   }
99   {
100     on_write = new AnyInvocableClosure([&write_called] {
101       VLOG(2) << "Notified on write";
102       write_called.Notify();
103     });
104     wrapped_server_socket->NotifyOnWrite(on_write);
105     // Have the server send a message to the client
106     WSABUF write_wsabuf;
107     char write_char_buffer[2048] = "hello!";
108     write_wsabuf.len = 2048;
109     write_wsabuf.buf = write_char_buffer;
110     DWORD bytes_sent;
111     int status = WSASend(
112         wrapped_server_socket->raw_socket(), &write_wsabuf, 1, &bytes_sent, 0,
113         wrapped_server_socket->write_info()->overlapped(), NULL);
114     EXPECT_EQ(status, 0);
115     if (status != 0) {
116       LogErrorMessage(WSAGetLastError(), "WSASend");
117     }
118   }
119   // Doing work for WSASend
120   bool cb_invoked = false;
121   auto work_result = iocp.Work(std::chrono::seconds(10),
122                                [&cb_invoked]() { cb_invoked = true; });
123   ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
124   ASSERT_TRUE(cb_invoked);
125   // Doing work for WSARecv
126   cb_invoked = false;
127   work_result = iocp.Work(std::chrono::seconds(10),
128                           [&cb_invoked]() { cb_invoked = true; });
129   ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
130   ASSERT_TRUE(cb_invoked);
131   // wait for the callbacks to run
132   read_called.WaitForNotification();
133   write_called.WaitForNotification();
134 
135   delete on_read;
136   delete on_write;
137   wrapped_client_socket->Shutdown();
138   wrapped_server_socket->Shutdown();
139   iocp.Shutdown();
140   thread_pool->Quiesce();
141 }
142 
TEST_F(IOCPTest,IocpWorkTimeoutDueToNoNotificationRegistered)143 TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
144   auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
145   IOCP iocp(thread_pool.get());
146   SOCKET sockpair[2];
147   CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
148   auto wrapped_client_socket = iocp.Watch(sockpair[0]);
149   grpc_core::Notification read_called;
150   DWORD flags = 0;
151   WSABUF read_wsabuf;
152   DWORD bytes_rcvd;
153   read_wsabuf.len = 2048;
154   char read_char_buffer[2048];
155   read_wsabuf.buf = read_char_buffer;
156   {
157     // Set the client to receive asynchronously
158     // Prepare a notification callback, but don't register it yet.
159     wrapped_client_socket->NotifyOnRead(
160         SelfDeletingClosure::Create([win_socket = wrapped_client_socket.get(),
161                                      &read_called, &read_wsabuf]() {
162           VLOG(2) << "Notified on read";
163           EXPECT_TRUE(win_socket->read_info()->result().error_status.ok())
164               << "Error on read: "
165               << win_socket->read_info()->result().error_status;
166           EXPECT_GE(win_socket->read_info()->result().bytes_transferred, 10u);
167           EXPECT_STREQ(read_wsabuf.buf, "hello!");
168           read_called.Notify();
169         }));
170     int status = WSARecv(
171         wrapped_client_socket->raw_socket(), &read_wsabuf, 1, &bytes_rcvd,
172         &flags, wrapped_client_socket->read_info()->overlapped(), NULL);
173     // Expecting error 997, WSA_IO_PENDING
174     EXPECT_EQ(status, -1);
175     int last_error = WSAGetLastError();
176     EXPECT_EQ(last_error, WSA_IO_PENDING);
177     if (last_error != WSA_IO_PENDING) {
178       LogErrorMessage(last_error, "WSARecv");
179     }
180   }
181   {
182     // Have the server send a message to the client. No need to track via IOCP
183     WSABUF write_wsabuf;
184     char write_char_buffer[2048] = "hello!";
185     write_wsabuf.len = 2048;
186     write_wsabuf.buf = write_char_buffer;
187     DWORD bytes_sent;
188     OVERLAPPED write_overlapped;
189     memset(&write_overlapped, 0, sizeof(OVERLAPPED));
190     int status = WSASend(sockpair[1], &write_wsabuf, 1, &bytes_sent, 0,
191                          &write_overlapped, NULL);
192     EXPECT_EQ(status, 0);
193     if (status != 0) {
194       LogErrorMessage(WSAGetLastError(), "WSASend");
195     }
196   }
197   // IOCP::Work without any notification callbacks should still return Ok.
198   bool cb_invoked = false;
199   auto work_result = iocp.Work(std::chrono::seconds(2),
200                                [&cb_invoked]() { cb_invoked = true; });
201   ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
202   ASSERT_TRUE(cb_invoked);
203   // wait for the callbacks to run
204   read_called.WaitForNotification();
205   wrapped_client_socket->Shutdown();
206   iocp.Shutdown();
207   thread_pool->Quiesce();
208 }
209 
TEST_F(IOCPTest,KickWorks)210 TEST_F(IOCPTest, KickWorks) {
211   auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
212   IOCP iocp(thread_pool.get());
213   grpc_core::Notification kicked;
214   thread_pool->Run([&iocp, &kicked] {
215     bool cb_invoked = false;
216     Poller::WorkResult result = iocp.Work(
217         std::chrono::seconds(30), [&cb_invoked]() { cb_invoked = true; });
218     ASSERT_TRUE(result == Poller::WorkResult::kKicked);
219     ASSERT_FALSE(cb_invoked);
220     kicked.Notify();
221   });
222   thread_pool->Run([&iocp] {
223     // give the worker thread a chance to start
224     absl::SleepFor(absl::Milliseconds(42));
225     iocp.Kick();
226   });
227   // wait for the callbacks to run
228   kicked.WaitForNotification();
229   thread_pool->Quiesce();
230 }
231 
TEST_F(IOCPTest,KickThenShutdownCasusesNextWorkerToBeKicked)232 TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) {
233   // TODO(hork): evaluate if a kick count is going to be useful.
234   // This documents the existing poller's behavior of maintaining a kick count,
235   // but it's unclear if it's going to be needed.
236   auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
237   IOCP iocp(thread_pool.get());
238   // kick twice
239   iocp.Kick();
240   iocp.Kick();
241   bool cb_invoked = false;
242   // Assert the next two WorkResults are kicks
243   auto result = iocp.Work(std::chrono::milliseconds(1),
244                           [&cb_invoked]() { cb_invoked = true; });
245   ASSERT_TRUE(result == Poller::WorkResult::kKicked);
246   ASSERT_FALSE(cb_invoked);
247   result = iocp.Work(std::chrono::milliseconds(1),
248                      [&cb_invoked]() { cb_invoked = true; });
249   ASSERT_TRUE(result == Poller::WorkResult::kKicked);
250   ASSERT_FALSE(cb_invoked);
251   // followed by a DeadlineExceeded
252   result = iocp.Work(std::chrono::milliseconds(1),
253                      [&cb_invoked]() { cb_invoked = true; });
254   ASSERT_TRUE(result == Poller::WorkResult::kDeadlineExceeded);
255   ASSERT_FALSE(cb_invoked);
256   thread_pool->Quiesce();
257 }
258 
TEST_F(IOCPTest,StressTestThousandsOfSockets)259 TEST_F(IOCPTest, StressTestThousandsOfSockets) {
260   // Start 10 threads, each with their own IOCP
261   // On each thread, create 50 socket pairs (100 sockets) and have them exchange
262   // a message before shutting down.
263   int thread_count = 10;
264   int sockets_per_thread = 50;
265   std::atomic<int> read_count{0};
266   std::atomic<int> write_count{0};
267   std::vector<std::thread> threads;
268   threads.reserve(thread_count);
269   for (int thread_n = 0; thread_n < thread_count; thread_n++) {
270     threads.emplace_back([sockets_per_thread, &read_count, &write_count] {
271       auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
272       IOCP iocp(thread_pool.get());
273       // Start a looping worker thread with a moderate timeout
274       std::thread iocp_worker([&iocp] {
275         Poller::WorkResult result;
276         do {
277           result = iocp.Work(std::chrono::seconds(1), []() {});
278         } while (result != Poller::WorkResult::kDeadlineExceeded);
279       });
280       for (int i = 0; i < sockets_per_thread; i++) {
281         SOCKET sockpair[2];
282         CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
283         auto wrapped_client_socket = iocp.Watch(sockpair[0]);
284         auto wrapped_server_socket = iocp.Watch(sockpair[1]);
285         auto* pclient = wrapped_client_socket.get();
286         pclient->NotifyOnRead(SelfDeletingClosure::Create(
287             [&read_count,
288              win_socket = std::move(wrapped_client_socket)]() mutable {
289               read_count.fetch_add(1);
290               win_socket->Shutdown();
291             }));
292         auto* pserver = wrapped_server_socket.get();
293         pserver->NotifyOnWrite(SelfDeletingClosure::Create(
294             [&write_count,
295              win_socket = std::move(wrapped_server_socket)]() mutable {
296               write_count.fetch_add(1);
297               win_socket->Shutdown();
298             }));
299         {
300           // Set the client to receive
301           WSABUF read_wsabuf;
302           read_wsabuf.len = 20;
303           char read_char_buffer[20];
304           read_wsabuf.buf = read_char_buffer;
305           DWORD bytes_rcvd;
306           DWORD flags = 0;
307           int status =
308               WSARecv(pclient->raw_socket(), &read_wsabuf, 1, &bytes_rcvd,
309                       &flags, pclient->read_info()->overlapped(), NULL);
310           // Expecting error 997, WSA_IO_PENDING
311           EXPECT_EQ(status, -1);
312           int last_error = WSAGetLastError();
313           EXPECT_EQ(last_error, WSA_IO_PENDING);
314           if (last_error != WSA_IO_PENDING) {
315             LogErrorMessage(last_error, "WSARecv");
316           }
317         }
318         {
319           // Have the server send a message to the client.
320           WSABUF write_wsabuf;
321           char write_char_buffer[20] = "hello!";
322           write_wsabuf.len = 20;
323           write_wsabuf.buf = write_char_buffer;
324           DWORD bytes_sent;
325           int status =
326               WSASend(pserver->raw_socket(), &write_wsabuf, 1, &bytes_sent, 0,
327                       pserver->write_info()->overlapped(), NULL);
328           if (status != 0) {
329             int wsa_error = WSAGetLastError();
330             if (wsa_error != WSA_IO_PENDING) {
331               LogErrorMessage(wsa_error, "WSASend");
332               FAIL() << "Error in WSASend. See logs";
333             }
334           }
335         }
336       }
337       iocp_worker.join();
338       thread_pool->Quiesce();
339     });
340   }
341   for (auto& t : threads) {
342     t.join();
343   }
344   absl::Time deadline = absl::Now() + absl::Seconds(30);
345   while (read_count.load() != thread_count * sockets_per_thread ||
346          write_count.load() != thread_count * sockets_per_thread) {
347     absl::SleepFor(absl::Milliseconds(50));
348     if (deadline < absl::Now()) {
349       FAIL() << "Deadline exceeded with " << read_count.load() << " reads and "
350              << write_count.load() << " writes";
351     }
352   }
353   ASSERT_EQ(read_count.load(), thread_count * sockets_per_thread);
354   ASSERT_EQ(write_count.load(), thread_count * sockets_per_thread);
355 }
356 
main(int argc,char ** argv)357 int main(int argc, char** argv) {
358   ::testing::InitGoogleTest(&argc, argv);
359   grpc_init();
360   int status = RUN_ALL_TESTS();
361   grpc_shutdown();
362   return status;
363 }
364 #else  // not GPR_WINDOWS
main(int,char **)365 int main(int /* argc */, char** /* argv */) { return 0; }
366 #endif
367