• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/event_engine/event_engine.h>
16 #include <grpc/event_engine/memory_allocator.h>
17 #include <grpc/impl/channel_arg_names.h>
18 
19 #include <algorithm>
20 #include <chrono>
21 #include <memory>
22 #include <string>
23 #include <thread>
24 #include <tuple>
25 #include <type_traits>
26 #include <utility>
27 #include <vector>
28 
29 #include "absl/log/check.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/time/clock.h"
34 #include "absl/time/time.h"
35 #include "gtest/gtest.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
38 #include "src/core/lib/event_engine/tcp_socket_utils.h"
39 #include "src/core/lib/iomgr/exec_ctx.h"
40 #include "src/core/lib/resource_quota/memory_quota.h"
41 #include "src/core/lib/resource_quota/resource_quota.h"
42 #include "src/core/util/notification.h"
43 #include "test/core/event_engine/event_engine_test_utils.h"
44 #include "test/core/event_engine/test_suite/event_engine_test_framework.h"
45 #include "test/core/test_util/port.h"
46 
47 namespace grpc_event_engine {
48 namespace experimental {
49 
InitServerTests()50 void InitServerTests() {}
51 
52 }  // namespace experimental
53 }  // namespace grpc_event_engine
54 
55 class EventEngineServerTest : public EventEngineTest {};
56 
57 using namespace std::chrono_literals;
58 
59 namespace {
60 
61 using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
62 using ::grpc_event_engine::experimental::EventEngine;
63 using ::grpc_event_engine::experimental::URIToResolvedAddress;
64 using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
65 using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
66 using ::grpc_event_engine::experimental::GetNextSendMessage;
67 
68 constexpr int kNumExchangedMessages = 100;
69 
70 }  // namespace
71 
TEST_F(EventEngineServerTest,CannotBindAfterStarted)72 TEST_F(EventEngineServerTest, CannotBindAfterStarted) {
73   std::shared_ptr<EventEngine> engine(this->NewEventEngine());
74   ChannelArgsEndpointConfig config;
75   auto listener = engine->CreateListener(
76       [](std::unique_ptr<Endpoint>, grpc_core::MemoryAllocator) {},
77       [](absl::Status) {}, config,
78       std::make_unique<grpc_core::MemoryQuota>("foo"));
79   // Bind an initial port to ensure normal listener startup
80   auto resolved_addr = URIToResolvedAddress(absl::StrCat(
81       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())));
82   ASSERT_TRUE(resolved_addr.ok()) << resolved_addr.status();
83   auto bind_result = (*listener)->Bind(*resolved_addr);
84   ASSERT_TRUE(bind_result.ok()) << bind_result.status();
85   auto listen_result = (*listener)->Start();
86   ASSERT_TRUE(listen_result.ok()) << listen_result;
87   // A subsequent bind, which should fail
88   auto resolved_addr2 = URIToResolvedAddress(absl::StrCat(
89       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())));
90   ASSERT_TRUE(resolved_addr2.ok());
91   ASSERT_FALSE((*listener)->Bind(*resolved_addr2).ok());
92 }
93 
94 // Create a connection using the oracle EventEngine to a listener created
95 // by the Test EventEngine and exchange bi-di data over the connection.
96 // For each data transfer, verify that data written at one end of the stream
97 // equals data read at the other end of the stream.
TEST_F(EventEngineServerTest,ServerConnectExchangeBidiDataTransferTest)98 TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) {
99   grpc_core::ExecCtx ctx;
100   std::shared_ptr<EventEngine> oracle_ee(this->NewOracleEventEngine());
101   std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
102   auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
103   std::string target_addr = absl::StrCat(
104       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
105   auto resolved_addr = URIToResolvedAddress(target_addr);
106   CHECK_OK(resolved_addr);
107   std::unique_ptr<EventEngine::Endpoint> client_endpoint;
108   std::unique_ptr<EventEngine::Endpoint> server_endpoint;
109   grpc_core::Notification client_signal;
110   grpc_core::Notification server_signal;
111 
112   Listener::AcceptCallback accept_cb =
113       [&server_endpoint, &server_signal](
114           std::unique_ptr<Endpoint> ep,
115           grpc_core::MemoryAllocator /*memory_allocator*/) {
116         server_endpoint = std::move(ep);
117         server_signal.Notify();
118       };
119 
120   grpc_core::ChannelArgs args;
121   auto quota = grpc_core::ResourceQuota::Default();
122   args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
123   ChannelArgsEndpointConfig config(args);
124   auto listener = *test_ee->CreateListener(
125       std::move(accept_cb),
126       [](absl::Status status) {
127         ASSERT_TRUE(status.ok()) << status.ToString();
128       },
129       config, std::make_unique<grpc_core::MemoryQuota>("foo"));
130 
131   ASSERT_TRUE(listener->Bind(*resolved_addr).ok());
132   ASSERT_TRUE(listener->Start().ok());
133 
134   oracle_ee->Connect(
135       [&client_endpoint,
136        &client_signal](absl::StatusOr<std::unique_ptr<Endpoint>> endpoint) {
137         ASSERT_TRUE(endpoint.ok()) << endpoint.status();
138         client_endpoint = std::move(*endpoint);
139         client_signal.Notify();
140       },
141       *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"),
142       24h);
143 
144   client_signal.WaitForNotification();
145   server_signal.WaitForNotification();
146   ASSERT_NE(client_endpoint.get(), nullptr);
147   ASSERT_NE(server_endpoint.get(), nullptr);
148 
149   // Alternate message exchanges between client -- server and server --
150   // client.
151   for (int i = 0; i < kNumExchangedMessages; i++) {
152     // Send from client to server and verify data read at the server.
153     ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(),
154                                     server_endpoint.get())
155                     .ok());
156 
157     // Send from server to client and verify data read at the client.
158     ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(),
159                                     client_endpoint.get())
160                     .ok());
161   }
162   client_endpoint.reset();
163   server_endpoint.reset();
164   listener.reset();
165 }
166 
167 // Create 1 listener bound to N IPv6 addresses and M connections where M > N and
168 // exchange and verify random number of messages over each connection.
TEST_F(EventEngineServerTest,ServerMultipleIPv6ConnectionsToOneOracleListenerTest)169 TEST_F(EventEngineServerTest,
170        ServerMultipleIPv6ConnectionsToOneOracleListenerTest) {
171   grpc_core::ExecCtx ctx;
172   static constexpr int kNumListenerAddresses = 10;  // N
173   static constexpr int kNumConnections = 10;        // M
174   std::shared_ptr<EventEngine> oracle_ee(this->NewOracleEventEngine());
175   std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
176   auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
177   std::unique_ptr<EventEngine::Endpoint> server_endpoint;
178   // Notifications can only be fired once, so they are newed every loop
179   grpc_core::Notification* server_signal = new grpc_core::Notification();
180   std::vector<std::string> target_addrs;
181   std::vector<std::tuple<std::unique_ptr<Endpoint>, std::unique_ptr<Endpoint>>>
182       connections;
183 
184   Listener::AcceptCallback accept_cb =
185       [&server_endpoint, &server_signal](
186           std::unique_ptr<Endpoint> ep,
187           grpc_core::MemoryAllocator /*memory_allocator*/) {
188         server_endpoint = std::move(ep);
189         server_signal->Notify();
190       };
191   grpc_core::ChannelArgs args;
192   auto quota = grpc_core::ResourceQuota::Default();
193   args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
194   ChannelArgsEndpointConfig config(args);
195   auto listener = *test_ee->CreateListener(
196       std::move(accept_cb),
197       [](absl::Status status) {
198         ASSERT_TRUE(status.ok()) << status.ToString();
199       },
200       config, std::make_unique<grpc_core::MemoryQuota>("foo"));
201 
202   target_addrs.reserve(kNumListenerAddresses);
203   for (int i = 0; i < kNumListenerAddresses; i++) {
204     std::string target_addr = absl::StrCat(
205         "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
206     ASSERT_TRUE(listener->Bind(*URIToResolvedAddress(target_addr)).ok());
207     target_addrs.push_back(target_addr);
208   }
209   ASSERT_TRUE(listener->Start().ok());
210   absl::SleepFor(absl::Milliseconds(500));
211   for (int i = 0; i < kNumConnections; i++) {
212     std::unique_ptr<EventEngine::Endpoint> client_endpoint;
213     grpc_core::Notification client_signal;
214     // Create an oracle EventEngine client and connect to a one of the
215     // addresses bound to the test EventEngine listener. Verify that the
216     // connection succeeds.
217     grpc_core::ChannelArgs client_args;
218     auto client_quota = grpc_core::ResourceQuota::Default();
219     client_args = client_args.Set(GRPC_ARG_RESOURCE_QUOTA, client_quota);
220     ChannelArgsEndpointConfig client_config(client_args);
221     oracle_ee->Connect(
222         [&client_endpoint,
223          &client_signal](absl::StatusOr<std::unique_ptr<Endpoint>> endpoint) {
224           ASSERT_TRUE(endpoint.ok());
225           client_endpoint = std::move(*endpoint);
226           client_signal.Notify();
227         },
228         *URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]),
229         client_config,
230         memory_quota->CreateMemoryAllocator(
231             absl::StrCat("conn-", std::to_string(i))),
232         24h);
233 
234     client_signal.WaitForNotification();
235     server_signal->WaitForNotification();
236     ASSERT_NE(client_endpoint.get(), nullptr);
237     ASSERT_NE(server_endpoint.get(), nullptr);
238     connections.push_back(std::make_tuple(std::move(client_endpoint),
239                                           std::move(server_endpoint)));
240     delete server_signal;
241     server_signal = new grpc_core::Notification();
242   }
243   delete server_signal;
244 
245   std::vector<std::thread> threads;
246   // Create one thread for each connection. For each connection, create
247   // 2 more worker threads: to exchange and verify bi-directional data
248   // transfer.
249   threads.reserve(kNumConnections);
250   for (int i = 0; i < kNumConnections; i++) {
251     // For each connection, simulate a parallel bi-directional data transfer.
252     // All bi-directional transfers are run in parallel across all
253     // connections. Each bi-directional data transfer uses a random number of
254     // messages.
255     threads.emplace_back([client_endpoint =
256                               std::move(std::get<0>(connections[i])),
257                           server_endpoint =
258                               std::move(std::get<1>(connections[i]))]() {
259       std::vector<std::thread> workers;
260       workers.reserve(2);
261       auto worker = [client_endpoint = client_endpoint.get(),
262                      server_endpoint =
263                          server_endpoint.get()](bool client_to_server) {
264         grpc_core::ExecCtx ctx;
265         for (int i = 0; i < kNumExchangedMessages; i++) {
266           // If client_to_server is true, send from client to server and
267           // verify data read at the server. Otherwise send data from server
268           // to client and verify data read at client.
269           if (client_to_server) {
270             ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
271                                             client_endpoint, server_endpoint)
272                             .ok());
273           } else {
274             ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
275                                             server_endpoint, client_endpoint)
276                             .ok());
277           }
278         }
279       };
280       // worker[0] simulates a flow from client to server endpoint
281       workers.emplace_back([&worker]() { worker(true); });
282       // worker[1] simulates a flow from server to client endpoint
283       workers.emplace_back([&worker]() { worker(false); });
284       workers[0].join();
285       workers[1].join();
286     });
287   }
288   for (auto& t : threads) {
289     t.join();
290   }
291   server_endpoint.reset();
292   listener.reset();
293 }
294 
295 // TODO(vigneshbabu): Add more tests which create listeners bound to a mix
296 // Ipv6 and other type of addresses (UDS) in the same test.
297