1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <algorithm>
16 #include <chrono>
17 #include <memory>
18 #include <string>
19 #include <thread>
20 #include <tuple>
21 #include <type_traits>
22 #include <utility>
23 #include <vector>
24 
25 #include "absl/status/status.h"
26 #include "absl/status/statusor.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/time/clock.h"
29 #include "absl/time/time.h"
30 #include "gtest/gtest.h"
31 
32 #include <grpc/event_engine/event_engine.h>
33 #include <grpc/event_engine/memory_allocator.h>
34 #include <grpc/impl/channel_arg_names.h>
35 #include <grpc/support/log.h>
36 
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
39 #include "src/core/lib/event_engine/tcp_socket_utils.h"
40 #include "src/core/lib/gprpp/notification.h"
41 #include "src/core/lib/iomgr/exec_ctx.h"
42 #include "src/core/lib/resource_quota/memory_quota.h"
43 #include "src/core/lib/resource_quota/resource_quota.h"
44 #include "test/core/event_engine/event_engine_test_utils.h"
45 #include "test/core/event_engine/test_suite/event_engine_test_framework.h"
46 #include "test/core/util/port.h"
47 
48 namespace grpc_event_engine {
49 namespace experimental {
50 
InitClientTests()51 void InitClientTests() {}
52 
53 }  // namespace experimental
54 }  // namespace grpc_event_engine
55 
56 class EventEngineClientTest : public EventEngineTest {};
57 
58 using namespace std::chrono_literals;
59 
60 namespace {
61 
62 using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
63 using ::grpc_event_engine::experimental::EventEngine;
64 using ::grpc_event_engine::experimental::URIToResolvedAddress;
65 using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
66 using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
67 using ::grpc_event_engine::experimental::GetNextSendMessage;
68 using ::grpc_event_engine::experimental::NotifyOnDelete;
69 
70 constexpr int kNumExchangedMessages = 100;
71 
72 }  // namespace
73 
74 // Create a connection using the test EventEngine to a non-existent listener
75 // and verify that the connection fails.
TEST_F(EventEngineClientTest,ConnectToNonExistentListenerTest)76 TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) {
77   grpc_core::ExecCtx ctx;
78   std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
79   grpc_core::Notification signal;
80   auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
81   std::string target_addr = absl::StrCat(
82       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
83   // Create a test EventEngine client endpoint and connect to a non existent
84   // listener.
85   ChannelArgsEndpointConfig config;
86   test_ee->Connect(
87       [_ = NotifyOnDelete(&signal)](
88           absl::StatusOr<std::unique_ptr<Endpoint>> status) {
89         // Connect should fail.
90         EXPECT_FALSE(status.ok());
91       },
92       *URIToResolvedAddress(target_addr), config,
93       memory_quota->CreateMemoryAllocator("conn-1"), 24h);
94   signal.WaitForNotification();
95 }
96 
97 // Create a connection using the test EventEngine to a listener created
98 // by the oracle EventEngine and exchange bi-di data over the connection.
99 // For each data transfer, verify that data written at one end of the stream
100 // equals data read at the other end of the stream.
101 
TEST_F(EventEngineClientTest,ConnectExchangeBidiDataTransferTest)102 TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
103   grpc_core::ExecCtx ctx;
104   std::shared_ptr<EventEngine> oracle_ee(this->NewOracleEventEngine());
105   std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
106   auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
107   std::string target_addr = absl::StrCat(
108       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
109   auto resolved_addr = URIToResolvedAddress(target_addr);
110   GPR_ASSERT(resolved_addr.ok());
111   std::unique_ptr<EventEngine::Endpoint> client_endpoint;
112   std::unique_ptr<EventEngine::Endpoint> server_endpoint;
113   grpc_core::Notification client_signal;
114   grpc_core::Notification server_signal;
115 
116   Listener::AcceptCallback accept_cb =
117       [&server_endpoint, &server_signal](
118           std::unique_ptr<Endpoint> ep,
119           grpc_core::MemoryAllocator /*memory_allocator*/) {
120         server_endpoint = std::move(ep);
121         server_signal.Notify();
122       };
123 
124   grpc_core::ChannelArgs args;
125   auto quota = grpc_core::ResourceQuota::Default();
126   args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
127   ChannelArgsEndpointConfig config(args);
128   auto listener = *oracle_ee->CreateListener(
129       std::move(accept_cb),
130       [](absl::Status status) {
131         ASSERT_TRUE(status.ok()) << status.ToString();
132       },
133       config, std::make_unique<grpc_core::MemoryQuota>("foo"));
134 
135   ASSERT_TRUE(listener->Bind(*resolved_addr).ok());
136   ASSERT_TRUE(listener->Start().ok());
137 
138   test_ee->Connect(
139       [&client_endpoint,
140        &client_signal](absl::StatusOr<std::unique_ptr<Endpoint>> endpoint) {
141         ASSERT_TRUE(endpoint.ok());
142         client_endpoint = std::move(*endpoint);
143         client_signal.Notify();
144       },
145       *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"),
146       24h);
147 
148   client_signal.WaitForNotification();
149   server_signal.WaitForNotification();
150   ASSERT_NE(client_endpoint.get(), nullptr);
151   ASSERT_NE(server_endpoint.get(), nullptr);
152 
153   // Alternate message exchanges between client -- server and server --
154   // client.
155   for (int i = 0; i < kNumExchangedMessages; i++) {
156     // Send from client to server and verify data read at the server.
157     ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(),
158                                     server_endpoint.get())
159                     .ok());
160 
161     // Send from server to client and verify data read at the client.
162     ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(),
163                                     client_endpoint.get())
164                     .ok());
165   }
166   client_endpoint.reset();
167   server_endpoint.reset();
168   listener.reset();
169 }
170 
171 // Create 1 listener bound to N IPv6 addresses and M connections where M > N and
172 // exchange and verify random number of messages over each connection.
TEST_F(EventEngineClientTest,MultipleIPv6ConnectionsToOneOracleListenerTest)173 TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
174   grpc_core::ExecCtx ctx;
175   static constexpr int kNumListenerAddresses = 10;  // N
176   static constexpr int kNumConnections = 10;        // M
177   std::shared_ptr<EventEngine> oracle_ee(this->NewOracleEventEngine());
178   std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
179   auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
180   std::unique_ptr<EventEngine::Endpoint> server_endpoint;
181   // Notifications can only be fired once, so they are newed every loop
182   grpc_core::Notification* server_signal = new grpc_core::Notification();
183   std::vector<std::string> target_addrs;
184   std::vector<std::tuple<std::unique_ptr<Endpoint>, std::unique_ptr<Endpoint>>>
185       connections;
186 
187   Listener::AcceptCallback accept_cb =
188       [&server_endpoint, &server_signal](
189           std::unique_ptr<Endpoint> ep,
190           grpc_core::MemoryAllocator /*memory_allocator*/) {
191         server_endpoint = std::move(ep);
192         server_signal->Notify();
193       };
194   grpc_core::ChannelArgs args;
195   auto quota = grpc_core::ResourceQuota::Default();
196   args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
197   ChannelArgsEndpointConfig config(args);
198   auto listener = *oracle_ee->CreateListener(
199       std::move(accept_cb),
200       [](absl::Status status) {
201         ASSERT_TRUE(status.ok()) << status.ToString();
202       },
203       config, std::make_unique<grpc_core::MemoryQuota>("foo"));
204 
205   target_addrs.reserve(kNumListenerAddresses);
206   for (int i = 0; i < kNumListenerAddresses; i++) {
207     std::string target_addr = absl::StrCat(
208         "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
209     ASSERT_TRUE(listener->Bind(*URIToResolvedAddress(target_addr)).ok());
210     target_addrs.push_back(target_addr);
211   }
212   ASSERT_TRUE(listener->Start().ok());
213   absl::SleepFor(absl::Milliseconds(500));
214   for (int i = 0; i < kNumConnections; i++) {
215     std::unique_ptr<EventEngine::Endpoint> client_endpoint;
216     grpc_core::Notification client_signal;
217     // Create a test EventEngine client endpoint and connect to a one of the
218     // addresses bound to the oracle listener. Verify that the connection
219     // succeeds.
220     grpc_core::ChannelArgs client_args;
221     auto client_quota = grpc_core::ResourceQuota::Default();
222     client_args = client_args.Set(GRPC_ARG_RESOURCE_QUOTA, client_quota);
223     ChannelArgsEndpointConfig client_config(client_args);
224     test_ee->Connect(
225         [&client_endpoint,
226          &client_signal](absl::StatusOr<std::unique_ptr<Endpoint>> endpoint) {
227           ASSERT_TRUE(endpoint.ok());
228           client_endpoint = std::move(*endpoint);
229           client_signal.Notify();
230         },
231         *URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]),
232         client_config,
233         memory_quota->CreateMemoryAllocator(
234             absl::StrCat("conn-", std::to_string(i))),
235         24h);
236 
237     client_signal.WaitForNotification();
238     server_signal->WaitForNotification();
239     ASSERT_NE(client_endpoint.get(), nullptr);
240     ASSERT_NE(server_endpoint.get(), nullptr);
241     connections.push_back(std::make_tuple(std::move(client_endpoint),
242                                           std::move(server_endpoint)));
243     delete server_signal;
244     server_signal = new grpc_core::Notification();
245   }
246   delete server_signal;
247 
248   std::vector<std::thread> threads;
249   // Create one thread for each connection. For each connection, create
250   // 2 more worker threads: to exchange and verify bi-directional data
251   // transfer.
252   threads.reserve(kNumConnections);
253   for (int i = 0; i < kNumConnections; i++) {
254     // For each connection, simulate a parallel bi-directional data transfer.
255     // All bi-directional transfers are run in parallel across all
256     // connections. Each bi-directional data transfer uses a random number of
257     // messages.
258     threads.emplace_back([client_endpoint =
259                               std::move(std::get<0>(connections[i])),
260                           server_endpoint =
261                               std::move(std::get<1>(connections[i]))]() {
262       std::vector<std::thread> workers;
263       workers.reserve(2);
264       auto worker = [client_endpoint = client_endpoint.get(),
265                      server_endpoint =
266                          server_endpoint.get()](bool client_to_server) {
267         grpc_core::ExecCtx ctx;
268         for (int i = 0; i < kNumExchangedMessages; i++) {
269           // If client_to_server is true, send from client to server and
270           // verify data read at the server. Otherwise send data from server
271           // to client and verify data read at client.
272           if (client_to_server) {
273             ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
274                                             client_endpoint, server_endpoint)
275                             .ok());
276           } else {
277             ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
278                                             server_endpoint, client_endpoint)
279                             .ok());
280           }
281         }
282       };
283       // worker[0] simulates a flow from client to server endpoint
284       workers.emplace_back([&worker]() { worker(true); });
285       // worker[1] simulates a flow from server to client endpoint
286       workers.emplace_back([&worker]() { worker(false); });
287       workers[0].join();
288       workers[1].join();
289     });
290   }
291   for (auto& t : threads) {
292     t.join();
293   }
294   server_endpoint.reset();
295 }
296 
297 // TODO(vigneshbabu): Add more tests which create listeners bound to a mix
298 // Ipv6 and other type of addresses (UDS) in the same test.
299