• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #ifdef GPR_WINDOWS
18 
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/grpc.h>
21 #include <gtest/gtest.h>
22 
23 #include "absl/status/status.h"
24 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
25 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
26 #include "src/core/lib/event_engine/windows/iocp.h"
27 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
28 #include "src/core/lib/event_engine/windows/windows_engine.h"
29 #include "src/core/lib/resource_quota/memory_quota.h"
30 #include "src/core/util/notification.h"
31 #include "test/core/event_engine/windows/create_sockpair.h"
32 
33 namespace grpc_event_engine {
34 namespace experimental {
35 
36 using namespace std::chrono_literals;
37 
38 class WindowsEndpointTest : public testing::Test {};
39 
TEST_F(WindowsEndpointTest,BasicCommunication)40 TEST_F(WindowsEndpointTest, BasicCommunication) {
41   // TODO(hork): deduplicate against winsocket and iocp tests
42   // Setup
43   auto thread_pool = MakeThreadPool(8);
44   IOCP iocp(thread_pool.get());
45   grpc_core::MemoryQuota quota("endpoint_test");
46   SOCKET sockpair[2];
47   CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags());
48   auto wrapped_client_socket = iocp.Watch(sockpair[0]);
49   auto wrapped_server_socket = iocp.Watch(sockpair[1]);
50   sockaddr_in loopback_addr = GetSomeIpv4LoopbackAddress();
51   auto engine = std::make_shared<WindowsEventEngine>();
52   EventEngine::ResolvedAddress addr((sockaddr*)&loopback_addr,
53                                     sizeof(loopback_addr));
54   WindowsEndpoint client(addr, std::move(wrapped_client_socket),
55                          quota.CreateMemoryAllocator("client"),
56                          ChannelArgsEndpointConfig(), thread_pool.get(),
57                          engine);
58   WindowsEndpoint server(addr, std::move(wrapped_server_socket),
59                          quota.CreateMemoryAllocator("server"),
60                          ChannelArgsEndpointConfig(), thread_pool.get(),
61                          engine);
62   // Test
63   std::string message = "0xDEADBEEF";
64   grpc_core::Notification read_done;
65   SliceBuffer read_buffer;
66   EXPECT_FALSE(server.Read(
67       [&read_done, &message, &read_buffer](absl::Status) {
68         ASSERT_EQ(read_buffer.Count(), 1u);
69         auto slice = read_buffer.TakeFirst();
70         EXPECT_EQ(slice.as_string_view(), message);
71         read_done.Notify();
72       },
73       &read_buffer, nullptr));
74   grpc_core::Notification write_done;
75   SliceBuffer write_buffer;
76   write_buffer.Append(Slice::FromCopiedString(message));
77   EXPECT_FALSE(
78       client.Write([&write_done](absl::Status) { write_done.Notify(); },
79                    &write_buffer, nullptr));
80   iocp.Work(5s, []() {});
81   // Cleanup
82   write_done.WaitForNotification();
83   read_done.WaitForNotification();
84   thread_pool->Quiesce();
85 }
86 
TEST_F(WindowsEndpointTest,Conversation)87 TEST_F(WindowsEndpointTest, Conversation) {
88   // Setup
89   auto thread_pool = MakeThreadPool(8);
90   IOCP iocp(thread_pool.get());
91   grpc_core::MemoryQuota quota("endpoint_test");
92   SOCKET sockpair[2];
93   CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags());
94   sockaddr_in loopback_addr = GetSomeIpv4LoopbackAddress();
95   EventEngine::ResolvedAddress addr((sockaddr*)&loopback_addr,
96                                     sizeof(loopback_addr));
97   // Test
98   struct AppState {
99     AppState(const EventEngine::ResolvedAddress& addr,
100              std::unique_ptr<WinSocket> client,
101              std::unique_ptr<WinSocket> server, grpc_core::MemoryQuota& quota,
102              ThreadPool* thread_pool, std::shared_ptr<EventEngine> engine)
103         : client(addr, std::move(client), quota.CreateMemoryAllocator("client"),
104                  ChannelArgsEndpointConfig(), thread_pool, engine),
105           server(addr, std::move(server), quota.CreateMemoryAllocator("server"),
106                  ChannelArgsEndpointConfig(), thread_pool, engine) {}
107     grpc_core::Notification done;
108     WindowsEndpoint client;
109     WindowsEndpoint server;
110     SliceBuffer read_buffer;
111     SliceBuffer write_buffer;
112     const std::vector<std::string> messages{
113         "Java is to Javascript what car is to carpet. -Heilmann",
114         "Make it work, make it right, make it fast. -Beck",
115         "First, solve the problem. Then write the code. -Johnson",
116         "It works on my machine."};
117     // incremented after a corresponding read of a previous write
118     // if exchange%2 == 0, client -> server
119     // if exchange%2 == 1, server -> client
120     // if exchange == messages.length, done
121     std::atomic<size_t> exchange{0};
122 
123     // Initiates a Write and corresponding Read on two endpoints.
124     void WriteAndQueueReader(WindowsEndpoint* writer, WindowsEndpoint* reader) {
125       write_buffer.Clear();
126       write_buffer.Append(Slice::FromCopiedString(messages[exchange]));
127       EXPECT_FALSE(
128           writer->Write([](absl::Status) {}, &write_buffer, /*args=*/nullptr));
129       auto cb = [this](absl::Status status) { ReadCB(status); };
130       read_buffer.Clear();
131       EXPECT_FALSE(reader->Read(cb, &read_buffer, /*args=*/nullptr));
132     }
133 
134     // Asserts that the received string matches, then queues the next Write/Read
135     // pair
136     void ReadCB(absl::Status) {
137       ASSERT_EQ(read_buffer.Count(), 1u);
138       ASSERT_EQ(read_buffer.TakeFirst().as_string_view(), messages[exchange]);
139       if (++exchange == messages.size()) {
140         done.Notify();
141         return;
142       }
143       if (exchange % 2 == 0) {
144         WriteAndQueueReader(/*writer=*/&client, /*reader=*/&server);
145       } else {
146         WriteAndQueueReader(/*writer=*/&server, /*reader=*/&client);
147       }
148     }
149   };
150   auto engine = std::make_shared<WindowsEventEngine>();
151   AppState state(addr, /*client=*/iocp.Watch(sockpair[0]),
152                  /*server=*/iocp.Watch(sockpair[1]), quota, thread_pool.get(),
153                  engine);
154   state.WriteAndQueueReader(/*writer=*/&state.client, /*reader=*/&state.server);
155   while (iocp.Work(100ms, []() {}) == Poller::WorkResult::kOk ||
156          !state.done.HasBeenNotified()) {
157   }
158   // Cleanup
159   state.done.WaitForNotification();
160   thread_pool->Quiesce();
161 }
162 
163 }  // namespace experimental
164 }  // namespace grpc_event_engine
165 
main(int argc,char ** argv)166 int main(int argc, char** argv) {
167   ::testing::InitGoogleTest(&argc, argv);
168   grpc_init();
169   int status = RUN_ALL_TESTS();
170   grpc_shutdown();
171   return status;
172 }
173 
174 #else  // not GPR_WINDOWS
main(int,char **)175 int main(int /* argc */, char** /* argv */) { return 0; }
176 #endif
177