1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #ifdef GPR_WINDOWS
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/grpc.h>
21 #include <gtest/gtest.h>
22
23 #include "absl/status/status.h"
24 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
25 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
26 #include "src/core/lib/event_engine/windows/iocp.h"
27 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
28 #include "src/core/lib/event_engine/windows/windows_engine.h"
29 #include "src/core/lib/resource_quota/memory_quota.h"
30 #include "src/core/util/notification.h"
31 #include "test/core/event_engine/windows/create_sockpair.h"
32
33 namespace grpc_event_engine {
34 namespace experimental {
35
36 using namespace std::chrono_literals;
37
38 class WindowsEndpointTest : public testing::Test {};
39
TEST_F(WindowsEndpointTest,BasicCommunication)40 TEST_F(WindowsEndpointTest, BasicCommunication) {
41 // TODO(hork): deduplicate against winsocket and iocp tests
42 // Setup
43 auto thread_pool = MakeThreadPool(8);
44 IOCP iocp(thread_pool.get());
45 grpc_core::MemoryQuota quota("endpoint_test");
46 SOCKET sockpair[2];
47 CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags());
48 auto wrapped_client_socket = iocp.Watch(sockpair[0]);
49 auto wrapped_server_socket = iocp.Watch(sockpair[1]);
50 sockaddr_in loopback_addr = GetSomeIpv4LoopbackAddress();
51 auto engine = std::make_shared<WindowsEventEngine>();
52 EventEngine::ResolvedAddress addr((sockaddr*)&loopback_addr,
53 sizeof(loopback_addr));
54 WindowsEndpoint client(addr, std::move(wrapped_client_socket),
55 quota.CreateMemoryAllocator("client"),
56 ChannelArgsEndpointConfig(), thread_pool.get(),
57 engine);
58 WindowsEndpoint server(addr, std::move(wrapped_server_socket),
59 quota.CreateMemoryAllocator("server"),
60 ChannelArgsEndpointConfig(), thread_pool.get(),
61 engine);
62 // Test
63 std::string message = "0xDEADBEEF";
64 grpc_core::Notification read_done;
65 SliceBuffer read_buffer;
66 EXPECT_FALSE(server.Read(
67 [&read_done, &message, &read_buffer](absl::Status) {
68 ASSERT_EQ(read_buffer.Count(), 1u);
69 auto slice = read_buffer.TakeFirst();
70 EXPECT_EQ(slice.as_string_view(), message);
71 read_done.Notify();
72 },
73 &read_buffer, nullptr));
74 grpc_core::Notification write_done;
75 SliceBuffer write_buffer;
76 write_buffer.Append(Slice::FromCopiedString(message));
77 EXPECT_FALSE(
78 client.Write([&write_done](absl::Status) { write_done.Notify(); },
79 &write_buffer, nullptr));
80 iocp.Work(5s, []() {});
81 // Cleanup
82 write_done.WaitForNotification();
83 read_done.WaitForNotification();
84 thread_pool->Quiesce();
85 }
86
TEST_F(WindowsEndpointTest,Conversation)87 TEST_F(WindowsEndpointTest, Conversation) {
88 // Setup
89 auto thread_pool = MakeThreadPool(8);
90 IOCP iocp(thread_pool.get());
91 grpc_core::MemoryQuota quota("endpoint_test");
92 SOCKET sockpair[2];
93 CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags());
94 sockaddr_in loopback_addr = GetSomeIpv4LoopbackAddress();
95 EventEngine::ResolvedAddress addr((sockaddr*)&loopback_addr,
96 sizeof(loopback_addr));
97 // Test
98 struct AppState {
99 AppState(const EventEngine::ResolvedAddress& addr,
100 std::unique_ptr<WinSocket> client,
101 std::unique_ptr<WinSocket> server, grpc_core::MemoryQuota& quota,
102 ThreadPool* thread_pool, std::shared_ptr<EventEngine> engine)
103 : client(addr, std::move(client), quota.CreateMemoryAllocator("client"),
104 ChannelArgsEndpointConfig(), thread_pool, engine),
105 server(addr, std::move(server), quota.CreateMemoryAllocator("server"),
106 ChannelArgsEndpointConfig(), thread_pool, engine) {}
107 grpc_core::Notification done;
108 WindowsEndpoint client;
109 WindowsEndpoint server;
110 SliceBuffer read_buffer;
111 SliceBuffer write_buffer;
112 const std::vector<std::string> messages{
113 "Java is to Javascript what car is to carpet. -Heilmann",
114 "Make it work, make it right, make it fast. -Beck",
115 "First, solve the problem. Then write the code. -Johnson",
116 "It works on my machine."};
117 // incremented after a corresponding read of a previous write
118 // if exchange%2 == 0, client -> server
119 // if exchange%2 == 1, server -> client
120 // if exchange == messages.length, done
121 std::atomic<size_t> exchange{0};
122
123 // Initiates a Write and corresponding Read on two endpoints.
124 void WriteAndQueueReader(WindowsEndpoint* writer, WindowsEndpoint* reader) {
125 write_buffer.Clear();
126 write_buffer.Append(Slice::FromCopiedString(messages[exchange]));
127 EXPECT_FALSE(
128 writer->Write([](absl::Status) {}, &write_buffer, /*args=*/nullptr));
129 auto cb = [this](absl::Status status) { ReadCB(status); };
130 read_buffer.Clear();
131 EXPECT_FALSE(reader->Read(cb, &read_buffer, /*args=*/nullptr));
132 }
133
134 // Asserts that the received string matches, then queues the next Write/Read
135 // pair
136 void ReadCB(absl::Status) {
137 ASSERT_EQ(read_buffer.Count(), 1u);
138 ASSERT_EQ(read_buffer.TakeFirst().as_string_view(), messages[exchange]);
139 if (++exchange == messages.size()) {
140 done.Notify();
141 return;
142 }
143 if (exchange % 2 == 0) {
144 WriteAndQueueReader(/*writer=*/&client, /*reader=*/&server);
145 } else {
146 WriteAndQueueReader(/*writer=*/&server, /*reader=*/&client);
147 }
148 }
149 };
150 auto engine = std::make_shared<WindowsEventEngine>();
151 AppState state(addr, /*client=*/iocp.Watch(sockpair[0]),
152 /*server=*/iocp.Watch(sockpair[1]), quota, thread_pool.get(),
153 engine);
154 state.WriteAndQueueReader(/*writer=*/&state.client, /*reader=*/&state.server);
155 while (iocp.Work(100ms, []() {}) == Poller::WorkResult::kOk ||
156 !state.done.HasBeenNotified()) {
157 }
158 // Cleanup
159 state.done.WaitForNotification();
160 thread_pool->Quiesce();
161 }
162
163 } // namespace experimental
164 } // namespace grpc_event_engine
165
main(int argc,char ** argv)166 int main(int argc, char** argv) {
167 ::testing::InitGoogleTest(&argc, argv);
168 grpc_init();
169 int status = RUN_ALL_TESTS();
170 grpc_shutdown();
171 return status;
172 }
173
174 #else // not GPR_WINDOWS
main(int,char **)175 int main(int /* argc */, char** /* argv */) { return 0; }
176 #endif
177