• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/event_engine/event_engine.h>
16 #include <grpc/event_engine/memory_allocator.h>
17 #include <grpc/impl/channel_arg_names.h>
18 
19 #include <algorithm>
20 #include <chrono>
21 #include <memory>
22 #include <string>
23 #include <thread>
24 #include <tuple>
25 #include <type_traits>
26 #include <utility>
27 #include <vector>
28 
29 #include "absl/log/check.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/time/clock.h"
34 #include "absl/time/time.h"
35 #include "gtest/gtest.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
38 #include "src/core/lib/event_engine/tcp_socket_utils.h"
39 #include "src/core/lib/iomgr/exec_ctx.h"
40 #include "src/core/lib/resource_quota/memory_quota.h"
41 #include "src/core/lib/resource_quota/resource_quota.h"
42 #include "src/core/util/notification.h"
43 #include "test/core/event_engine/event_engine_test_utils.h"
44 #include "test/core/event_engine/test_suite/event_engine_test_framework.h"
45 #include "test/core/test_util/port.h"
46 
47 namespace grpc_event_engine {
48 namespace experimental {
49 
InitClientTests()50 void InitClientTests() {}
51 
52 }  // namespace experimental
53 }  // namespace grpc_event_engine
54 
55 class EventEngineClientTest : public EventEngineTest {};
56 
57 using namespace std::chrono_literals;
58 
59 namespace {
60 
61 using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
62 using ::grpc_event_engine::experimental::EventEngine;
63 using ::grpc_event_engine::experimental::URIToResolvedAddress;
64 using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
65 using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
66 using ::grpc_event_engine::experimental::GetNextSendMessage;
67 using ::grpc_event_engine::experimental::NotifyOnDelete;
68 
69 constexpr int kNumExchangedMessages = 100;
70 
71 }  // namespace
72 
73 // Create a connection using the test EventEngine to a non-existent listener
74 // and verify that the connection fails.
TEST_F(EventEngineClientTest,ConnectToNonExistentListenerTest)75 TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) {
76   grpc_core::ExecCtx ctx;
77   std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
78   grpc_core::Notification signal;
79   auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
80   std::string target_addr = absl::StrCat(
81       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
82   // Create a test EventEngine client endpoint and connect to a non existent
83   // listener.
84   ChannelArgsEndpointConfig config;
85   test_ee->Connect(
86       [_ = NotifyOnDelete(&signal)](
87           absl::StatusOr<std::unique_ptr<Endpoint>> status) {
88         // Connect should fail.
89         EXPECT_FALSE(status.ok());
90       },
91       *URIToResolvedAddress(target_addr), config,
92       memory_quota->CreateMemoryAllocator("conn-1"), 24h);
93   signal.WaitForNotification();
94 }
95 
96 // Create a connection using the test EventEngine to a listener created
97 // by the oracle EventEngine and exchange bi-di data over the connection.
98 // For each data transfer, verify that data written at one end of the stream
99 // equals data read at the other end of the stream.
100 
TEST_F(EventEngineClientTest,ConnectExchangeBidiDataTransferTest)101 TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
102   grpc_core::ExecCtx ctx;
103   std::shared_ptr<EventEngine> oracle_ee(this->NewOracleEventEngine());
104   std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
105   auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
106   std::string target_addr = absl::StrCat(
107       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
108   auto resolved_addr = URIToResolvedAddress(target_addr);
109   CHECK_OK(resolved_addr);
110   std::unique_ptr<EventEngine::Endpoint> client_endpoint;
111   std::unique_ptr<EventEngine::Endpoint> server_endpoint;
112   grpc_core::Notification client_signal;
113   grpc_core::Notification server_signal;
114 
115   Listener::AcceptCallback accept_cb =
116       [&server_endpoint, &server_signal](
117           std::unique_ptr<Endpoint> ep,
118           grpc_core::MemoryAllocator /*memory_allocator*/) {
119         server_endpoint = std::move(ep);
120         server_signal.Notify();
121       };
122 
123   grpc_core::ChannelArgs args;
124   auto quota = grpc_core::ResourceQuota::Default();
125   args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
126   ChannelArgsEndpointConfig config(args);
127   auto listener = *oracle_ee->CreateListener(
128       std::move(accept_cb),
129       [](absl::Status status) {
130         ASSERT_TRUE(status.ok()) << status.ToString();
131       },
132       config, std::make_unique<grpc_core::MemoryQuota>("foo"));
133 
134   ASSERT_TRUE(listener->Bind(*resolved_addr).ok());
135   ASSERT_TRUE(listener->Start().ok());
136 
137   test_ee->Connect(
138       [&client_endpoint,
139        &client_signal](absl::StatusOr<std::unique_ptr<Endpoint>> endpoint) {
140         ASSERT_TRUE(endpoint.ok());
141         client_endpoint = std::move(*endpoint);
142         client_signal.Notify();
143       },
144       *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"),
145       24h);
146 
147   client_signal.WaitForNotification();
148   server_signal.WaitForNotification();
149   ASSERT_NE(client_endpoint.get(), nullptr);
150   ASSERT_NE(server_endpoint.get(), nullptr);
151 
152   // Alternate message exchanges between client -- server and server --
153   // client.
154   for (int i = 0; i < kNumExchangedMessages; i++) {
155     // Send from client to server and verify data read at the server.
156     ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(),
157                                     server_endpoint.get())
158                     .ok());
159 
160     // Send from server to client and verify data read at the client.
161     ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(),
162                                     client_endpoint.get())
163                     .ok());
164   }
165   client_endpoint.reset();
166   server_endpoint.reset();
167   listener.reset();
168 }
169 
170 // Create 1 listener bound to N IPv6 addresses and M connections where M > N and
171 // exchange and verify random number of messages over each connection.
TEST_F(EventEngineClientTest,MultipleIPv6ConnectionsToOneOracleListenerTest)172 TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
173   grpc_core::ExecCtx ctx;
174   static constexpr int kNumListenerAddresses = 10;  // N
175   static constexpr int kNumConnections = 10;        // M
176   std::shared_ptr<EventEngine> oracle_ee(this->NewOracleEventEngine());
177   std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
178   auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
179   std::unique_ptr<EventEngine::Endpoint> server_endpoint;
180   // Notifications can only be fired once, so they are newed every loop
181   grpc_core::Notification* server_signal = new grpc_core::Notification();
182   std::vector<std::string> target_addrs;
183   std::vector<std::tuple<std::unique_ptr<Endpoint>, std::unique_ptr<Endpoint>>>
184       connections;
185 
186   Listener::AcceptCallback accept_cb =
187       [&server_endpoint, &server_signal](
188           std::unique_ptr<Endpoint> ep,
189           grpc_core::MemoryAllocator /*memory_allocator*/) {
190         server_endpoint = std::move(ep);
191         server_signal->Notify();
192       };
193   grpc_core::ChannelArgs args;
194   auto quota = grpc_core::ResourceQuota::Default();
195   args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
196   ChannelArgsEndpointConfig config(args);
197   auto listener = *oracle_ee->CreateListener(
198       std::move(accept_cb),
199       [](absl::Status status) {
200         ASSERT_TRUE(status.ok()) << status.ToString();
201       },
202       config, std::make_unique<grpc_core::MemoryQuota>("foo"));
203 
204   target_addrs.reserve(kNumListenerAddresses);
205   for (int i = 0; i < kNumListenerAddresses; i++) {
206     std::string target_addr = absl::StrCat(
207         "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
208     ASSERT_TRUE(listener->Bind(*URIToResolvedAddress(target_addr)).ok());
209     target_addrs.push_back(target_addr);
210   }
211   ASSERT_TRUE(listener->Start().ok());
212   absl::SleepFor(absl::Milliseconds(500));
213   for (int i = 0; i < kNumConnections; i++) {
214     std::unique_ptr<EventEngine::Endpoint> client_endpoint;
215     grpc_core::Notification client_signal;
216     // Create a test EventEngine client endpoint and connect to a one of the
217     // addresses bound to the oracle listener. Verify that the connection
218     // succeeds.
219     grpc_core::ChannelArgs client_args;
220     auto client_quota = grpc_core::ResourceQuota::Default();
221     client_args = client_args.Set(GRPC_ARG_RESOURCE_QUOTA, client_quota);
222     ChannelArgsEndpointConfig client_config(client_args);
223     test_ee->Connect(
224         [&client_endpoint,
225          &client_signal](absl::StatusOr<std::unique_ptr<Endpoint>> endpoint) {
226           ASSERT_TRUE(endpoint.ok());
227           client_endpoint = std::move(*endpoint);
228           client_signal.Notify();
229         },
230         *URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]),
231         client_config,
232         memory_quota->CreateMemoryAllocator(
233             absl::StrCat("conn-", std::to_string(i))),
234         24h);
235 
236     client_signal.WaitForNotification();
237     server_signal->WaitForNotification();
238     ASSERT_NE(client_endpoint.get(), nullptr);
239     ASSERT_NE(server_endpoint.get(), nullptr);
240     connections.push_back(std::make_tuple(std::move(client_endpoint),
241                                           std::move(server_endpoint)));
242     delete server_signal;
243     server_signal = new grpc_core::Notification();
244   }
245   delete server_signal;
246 
247   std::vector<std::thread> threads;
248   // Create one thread for each connection. For each connection, create
249   // 2 more worker threads: to exchange and verify bi-directional data
250   // transfer.
251   threads.reserve(kNumConnections);
252   for (int i = 0; i < kNumConnections; i++) {
253     // For each connection, simulate a parallel bi-directional data transfer.
254     // All bi-directional transfers are run in parallel across all
255     // connections. Each bi-directional data transfer uses a random number of
256     // messages.
257     threads.emplace_back([client_endpoint =
258                               std::move(std::get<0>(connections[i])),
259                           server_endpoint =
260                               std::move(std::get<1>(connections[i]))]() {
261       std::vector<std::thread> workers;
262       workers.reserve(2);
263       auto worker = [client_endpoint = client_endpoint.get(),
264                      server_endpoint =
265                          server_endpoint.get()](bool client_to_server) {
266         grpc_core::ExecCtx ctx;
267         for (int i = 0; i < kNumExchangedMessages; i++) {
268           // If client_to_server is true, send from client to server and
269           // verify data read at the server. Otherwise send data from server
270           // to client and verify data read at client.
271           if (client_to_server) {
272             ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
273                                             client_endpoint, server_endpoint)
274                             .ok());
275           } else {
276             ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
277                                             server_endpoint, client_endpoint)
278                             .ok());
279           }
280         }
281       };
282       // worker[0] simulates a flow from client to server endpoint
283       workers.emplace_back([&worker]() { worker(true); });
284       // worker[1] simulates a flow from server to client endpoint
285       workers.emplace_back([&worker]() { worker(false); });
286       workers[0].join();
287       workers[1].join();
288     });
289   }
290   for (auto& t : threads) {
291     t.join();
292   }
293   server_endpoint.reset();
294 }
295 
296 // TODO(vigneshbabu): Add more tests which create listeners bound to a mix
297 // Ipv6 and other type of addresses (UDS) in the same test.
298