• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
16 
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/grpc.h>
19 #include <grpc/impl/channel_arg_names.h>
20 
21 #include <algorithm>
22 #include <chrono>
23 #include <list>
24 #include <memory>
25 #include <string>
26 #include <thread>
27 #include <type_traits>
28 #include <vector>
29 
30 #include "absl/log/check.h"
31 #include "absl/log/log.h"
32 #include "absl/status/statusor.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_split.h"
35 #include "absl/strings/string_view.h"
36 #include "gtest/gtest.h"
37 #include "src/core/config/config_vars.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
40 #include "src/core/lib/event_engine/poller.h"
41 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
42 #include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
43 #include "src/core/lib/event_engine/posix_engine/posix_engine.h"
44 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
45 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
46 #include "src/core/lib/event_engine/tcp_socket_utils.h"
47 #include "src/core/lib/resource_quota/resource_quota.h"
48 #include "src/core/util/dual_ref_counted.h"
49 #include "src/core/util/notification.h"
50 #include "src/core/util/ref_counted_ptr.h"
51 #include "test/core/event_engine/event_engine_test_utils.h"
52 #include "test/core/event_engine/posix/posix_engine_test_utils.h"
53 #include "test/core/event_engine/test_suite/posix/oracle_event_engine_posix.h"
54 #include "test/core/test_util/port.h"
55 
56 namespace grpc_event_engine {
57 namespace experimental {
58 
59 namespace {
60 
61 using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
62 using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
63 using namespace std::chrono_literals;
64 
65 constexpr int kMinMessageSize = 1024;
66 constexpr int kNumConnections = 10;
67 constexpr int kNumExchangedMessages = 100;
68 std::atomic<int> g_num_active_connections{0};
69 
70 struct Connection {
71   std::unique_ptr<EventEngine::Endpoint> client_endpoint;
72   std::unique_ptr<EventEngine::Endpoint> server_endpoint;
73 };
74 
CreateConnectedEndpoints(PosixEventPoller & poller,bool is_zero_copy_enabled,int num_connections,std::shared_ptr<EventEngine> posix_ee,std::shared_ptr<EventEngine> oracle_ee)75 std::list<Connection> CreateConnectedEndpoints(
76     PosixEventPoller& poller, bool is_zero_copy_enabled, int num_connections,
77     std::shared_ptr<EventEngine> posix_ee,
78     std::shared_ptr<EventEngine> oracle_ee) {
79   std::list<Connection> connections;
80   auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
81   std::string target_addr = absl::StrCat(
82       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
83   auto resolved_addr = URIToResolvedAddress(target_addr);
84   CHECK_OK(resolved_addr);
85   std::unique_ptr<EventEngine::Endpoint> server_endpoint;
86   grpc_core::Notification* server_signal = new grpc_core::Notification();
87 
88   Listener::AcceptCallback accept_cb =
89       [&server_endpoint, &server_signal](
90           std::unique_ptr<Endpoint> ep,
91           grpc_core::MemoryAllocator /*memory_allocator*/) {
92         server_endpoint = std::move(ep);
93         server_signal->Notify();
94       };
95   grpc_core::ChannelArgs args;
96   auto quota = grpc_core::ResourceQuota::Default();
97   args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
98   if (is_zero_copy_enabled) {
99     args = args.Set(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED, 1);
100     args = args.Set(GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD,
101                     kMinMessageSize);
102   }
103   ChannelArgsEndpointConfig config(args);
104   auto listener = oracle_ee->CreateListener(
105       std::move(accept_cb),
106       [](absl::Status status) { ASSERT_TRUE(status.ok()); }, config,
107       std::make_unique<grpc_core::MemoryQuota>("foo"));
108   CHECK_OK(listener);
109 
110   EXPECT_TRUE((*listener)->Bind(*resolved_addr).ok());
111   EXPECT_TRUE((*listener)->Start().ok());
112 
113   // Create client socket and connect to the target address.
114   for (int i = 0; i < num_connections; ++i) {
115     int client_fd = ConnectToServerOrDie(*resolved_addr);
116     EventHandle* handle =
117         poller.CreateHandle(client_fd, "test", poller.CanTrackErrors());
118     EXPECT_NE(handle, nullptr);
119     server_signal->WaitForNotification();
120     EXPECT_NE(server_endpoint, nullptr);
121     ++g_num_active_connections;
122     PosixTcpOptions options = TcpOptionsFromEndpointConfig(config);
123     connections.push_back(Connection{
124         CreatePosixEndpoint(
125             handle,
126             PosixEngineClosure::TestOnlyToClosure(
127                 [&poller](absl::Status /*status*/) {
128                   if (--g_num_active_connections == 0) {
129                     poller.Kick();
130                   }
131                 }),
132             posix_ee,
133             options.resource_quota->memory_quota()->CreateMemoryAllocator(
134                 "test"),
135             options),
136         std::move(server_endpoint)});
137     delete server_signal;
138     server_signal = new grpc_core::Notification();
139   }
140   delete server_signal;
141   return connections;
142 }
143 
144 }  // namespace
145 
TestScenarioName(const::testing::TestParamInfo<bool> & info)146 std::string TestScenarioName(const ::testing::TestParamInfo<bool>& info) {
147   return absl::StrCat("is_zero_copy_enabled_", info.param);
148 }
149 
150 // A helper class to drive the polling of Fds. It repeatedly calls the Work(..)
151 // method on the poller to get pet pending events, then schedules another
152 // parallel Work(..) instantiation and processes these pending events. This
153 // continues until all Fds have orphaned themselves.
154 class Worker : public grpc_core::DualRefCounted<Worker> {
155  public:
Worker(std::shared_ptr<EventEngine> engine,PosixEventPoller * poller)156   Worker(std::shared_ptr<EventEngine> engine, PosixEventPoller* poller)
157       : engine_(std::move(engine)), poller_(poller) {
158     WeakRef().release();
159   }
Orphaned()160   void Orphaned() override { signal.Notify(); }
Start()161   void Start() {
162     // Start executing Work(..).
163     engine_->Run([this]() { Work(); });
164   }
165 
Wait()166   void Wait() {
167     signal.WaitForNotification();
168     WeakUnref();
169   }
170 
171  private:
Work()172   void Work() {
173     auto result = poller_->Work(24h, [this]() {
174       // Schedule next work instantiation immediately and take a Ref for
175       // the next instantiation.
176       Ref().release();
177       engine_->Run([this]() { Work(); });
178     });
179     ASSERT_TRUE(result == Poller::WorkResult::kOk ||
180                 result == Poller::WorkResult::kKicked);
181     // Corresponds to the Ref taken for the current instantiation. If the
182     // result was Poller::WorkResult::kKicked, then the next work instantiation
183     // would not have been scheduled and the poll_again callback would have
184     // been deleted.
185     Unref();
186   }
187   std::shared_ptr<EventEngine> engine_;
188   // The poller is not owned by the Worker. Rather it is owned by the test
189   // which creates the worker instance.
190   PosixEventPoller* poller_;
191   grpc_core::Notification signal;
192 };
193 
194 class PosixEndpointTest : public ::testing::TestWithParam<bool> {
SetUp()195   void SetUp() override {
196     oracle_ee_ = std::make_shared<PosixOracleEventEngine>();
197     scheduler_ =
198         std::make_unique<grpc_event_engine::experimental::TestScheduler>(
199             posix_ee_.get());
200     EXPECT_NE(scheduler_, nullptr);
201     poller_ = MakeDefaultPoller(scheduler_.get());
202     posix_ee_ = PosixEventEngine::MakeTestOnlyPosixEventEngine(poller_);
203     EXPECT_NE(posix_ee_, nullptr);
204     scheduler_->ChangeCurrentEventEngine(posix_ee_.get());
205     if (poller_ != nullptr) {
206       LOG(INFO) << "Using poller: " << poller_->Name();
207     }
208   }
209 
TearDown()210   void TearDown() override {
211     if (poller_ != nullptr) {
212       poller_->Shutdown();
213     }
214     WaitForSingleOwner(std::move(posix_ee_));
215     WaitForSingleOwner(std::move(oracle_ee_));
216   }
217 
218  public:
Scheduler()219   TestScheduler* Scheduler() { return scheduler_.get(); }
220 
GetPosixEE()221   std::shared_ptr<EventEngine> GetPosixEE() { return posix_ee_; }
222 
GetOracleEE()223   std::shared_ptr<EventEngine> GetOracleEE() { return oracle_ee_; }
224 
PosixPoller()225   PosixEventPoller* PosixPoller() { return poller_.get(); }
226 
227  private:
228   std::shared_ptr<PosixEventPoller> poller_;
229   std::unique_ptr<TestScheduler> scheduler_;
230   std::shared_ptr<EventEngine> posix_ee_;
231   std::shared_ptr<EventEngine> oracle_ee_;
232 };
233 
TEST_P(PosixEndpointTest,ConnectExchangeBidiDataTransferTest)234 TEST_P(PosixEndpointTest, ConnectExchangeBidiDataTransferTest) {
235   if (PosixPoller() == nullptr) {
236     return;
237   }
238   Worker* worker = new Worker(GetPosixEE(), PosixPoller());
239   worker->Start();
240   {
241     auto connections = CreateConnectedEndpoints(*PosixPoller(), GetParam(), 1,
242                                                 GetPosixEE(), GetOracleEE());
243     auto it = connections.begin();
244     auto client_endpoint = std::move((*it).client_endpoint);
245     auto server_endpoint = std::move((*it).server_endpoint);
246     EXPECT_NE(client_endpoint, nullptr);
247     EXPECT_NE(server_endpoint, nullptr);
248     connections.erase(it);
249 
250     // Alternate message exchanges between client -- server and server --
251     // client.
252     for (int i = 0; i < kNumExchangedMessages; i++) {
253       // Send from client to server and verify data read at the server.
254       ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
255                                       client_endpoint.get(),
256                                       server_endpoint.get())
257                       .ok());
258       // Send from server to client and verify data read at the client.
259       ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
260                                       server_endpoint.get(),
261                                       client_endpoint.get())
262                       .ok());
263     }
264   }
265   worker->Wait();
266 }
267 
268 // Create  N connections and exchange and verify random number of messages over
269 // each connection in parallel.
TEST_P(PosixEndpointTest,MultipleIPv6ConnectionsToOneOracleListenerTest)270 TEST_P(PosixEndpointTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
271   if (PosixPoller() == nullptr) {
272     return;
273   }
274   Worker* worker = new Worker(GetPosixEE(), PosixPoller());
275   worker->Start();
276   auto connections = CreateConnectedEndpoints(
277       *PosixPoller(), GetParam(), kNumConnections, GetPosixEE(), GetOracleEE());
278   std::vector<std::thread> threads;
279   // Create one thread for each connection. For each connection, create
280   // 2 more worker threads: to exchange and verify bi-directional data transfer.
281   threads.reserve(kNumConnections);
282   for (int i = 0; i < kNumConnections; i++) {
283     // For each connection, simulate a parallel bi-directional data transfer.
284     // All bi-directional transfers are run in parallel across all connections.
285     auto it = connections.begin();
286     auto client_endpoint = std::move((*it).client_endpoint);
287     auto server_endpoint = std::move((*it).server_endpoint);
288     EXPECT_NE(client_endpoint, nullptr);
289     EXPECT_NE(server_endpoint, nullptr);
290     connections.erase(it);
291     threads.emplace_back([client_endpoint = std::move(client_endpoint),
292                           server_endpoint = std::move(server_endpoint)]() {
293       std::vector<std::thread> workers;
294       workers.reserve(2);
295       auto worker = [client_endpoint = client_endpoint.get(),
296                      server_endpoint =
297                          server_endpoint.get()](bool client_to_server) {
298         for (int i = 0; i < kNumExchangedMessages; i++) {
299           // If client_to_server is true, send from client to server and
300           // verify data read at the server. Otherwise send data from server
301           // to client and verify data read at client.
302           if (client_to_server) {
303             EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
304                                             client_endpoint, server_endpoint)
305                             .ok());
306           } else {
307             EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
308                                             server_endpoint, client_endpoint)
309                             .ok());
310           }
311         }
312       };
313       // worker[0] simulates a flow from client to server endpoint
314       workers.emplace_back([&worker]() { worker(true); });
315       // worker[1] simulates a flow from server to client endpoint
316       workers.emplace_back([&worker]() { worker(false); });
317       workers[0].join();
318       workers[1].join();
319     });
320   }
321   for (auto& t : threads) {
322     t.join();
323   }
324   worker->Wait();
325 }
326 
327 // Test with zero copy enabled and disabled.
328 INSTANTIATE_TEST_SUITE_P(PosixEndpoint, PosixEndpointTest,
329                          ::testing::ValuesIn({false, true}), &TestScenarioName);
330 
331 }  // namespace experimental
332 }  // namespace grpc_event_engine
333 
main(int argc,char ** argv)334 int main(int argc, char** argv) {
335   ::testing::InitGoogleTest(&argc, argv);
336   auto poll_strategy = grpc_core::ConfigVars::Get().PollStrategy();
337   auto strings = absl::StrSplit(poll_strategy, ',');
338   if (std::find(strings.begin(), strings.end(), "none") != strings.end()) {
339     // Skip the test entirely if poll strategy is none.
340     return 0;
341   }
342   // TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
343   // until we clear out the iomgr shutdown code.
344   grpc_init();
345   int r = RUN_ALL_TESTS();
346   grpc_shutdown();
347   return r;
348 }
349