1 // Copyright 2022 gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
16
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/grpc.h>
19 #include <grpc/impl/channel_arg_names.h>
20
21 #include <algorithm>
22 #include <chrono>
23 #include <list>
24 #include <memory>
25 #include <string>
26 #include <thread>
27 #include <type_traits>
28 #include <vector>
29
30 #include "absl/log/check.h"
31 #include "absl/log/log.h"
32 #include "absl/status/statusor.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_split.h"
35 #include "absl/strings/string_view.h"
36 #include "gtest/gtest.h"
37 #include "src/core/config/config_vars.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
40 #include "src/core/lib/event_engine/poller.h"
41 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
42 #include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
43 #include "src/core/lib/event_engine/posix_engine/posix_engine.h"
44 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
45 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
46 #include "src/core/lib/event_engine/tcp_socket_utils.h"
47 #include "src/core/lib/resource_quota/resource_quota.h"
48 #include "src/core/util/dual_ref_counted.h"
49 #include "src/core/util/notification.h"
50 #include "src/core/util/ref_counted_ptr.h"
51 #include "test/core/event_engine/event_engine_test_utils.h"
52 #include "test/core/event_engine/posix/posix_engine_test_utils.h"
53 #include "test/core/event_engine/test_suite/posix/oracle_event_engine_posix.h"
54 #include "test/core/test_util/port.h"
55
56 namespace grpc_event_engine {
57 namespace experimental {
58
59 namespace {
60
61 using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
62 using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
63 using namespace std::chrono_literals;
64
65 constexpr int kMinMessageSize = 1024;
66 constexpr int kNumConnections = 10;
67 constexpr int kNumExchangedMessages = 100;
68 std::atomic<int> g_num_active_connections{0};
69
70 struct Connection {
71 std::unique_ptr<EventEngine::Endpoint> client_endpoint;
72 std::unique_ptr<EventEngine::Endpoint> server_endpoint;
73 };
74
CreateConnectedEndpoints(PosixEventPoller & poller,bool is_zero_copy_enabled,int num_connections,std::shared_ptr<EventEngine> posix_ee,std::shared_ptr<EventEngine> oracle_ee)75 std::list<Connection> CreateConnectedEndpoints(
76 PosixEventPoller& poller, bool is_zero_copy_enabled, int num_connections,
77 std::shared_ptr<EventEngine> posix_ee,
78 std::shared_ptr<EventEngine> oracle_ee) {
79 std::list<Connection> connections;
80 auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
81 std::string target_addr = absl::StrCat(
82 "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
83 auto resolved_addr = URIToResolvedAddress(target_addr);
84 CHECK_OK(resolved_addr);
85 std::unique_ptr<EventEngine::Endpoint> server_endpoint;
86 grpc_core::Notification* server_signal = new grpc_core::Notification();
87
88 Listener::AcceptCallback accept_cb =
89 [&server_endpoint, &server_signal](
90 std::unique_ptr<Endpoint> ep,
91 grpc_core::MemoryAllocator /*memory_allocator*/) {
92 server_endpoint = std::move(ep);
93 server_signal->Notify();
94 };
95 grpc_core::ChannelArgs args;
96 auto quota = grpc_core::ResourceQuota::Default();
97 args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
98 if (is_zero_copy_enabled) {
99 args = args.Set(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED, 1);
100 args = args.Set(GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD,
101 kMinMessageSize);
102 }
103 ChannelArgsEndpointConfig config(args);
104 auto listener = oracle_ee->CreateListener(
105 std::move(accept_cb),
106 [](absl::Status status) { ASSERT_TRUE(status.ok()); }, config,
107 std::make_unique<grpc_core::MemoryQuota>("foo"));
108 CHECK_OK(listener);
109
110 EXPECT_TRUE((*listener)->Bind(*resolved_addr).ok());
111 EXPECT_TRUE((*listener)->Start().ok());
112
113 // Create client socket and connect to the target address.
114 for (int i = 0; i < num_connections; ++i) {
115 int client_fd = ConnectToServerOrDie(*resolved_addr);
116 EventHandle* handle =
117 poller.CreateHandle(client_fd, "test", poller.CanTrackErrors());
118 EXPECT_NE(handle, nullptr);
119 server_signal->WaitForNotification();
120 EXPECT_NE(server_endpoint, nullptr);
121 ++g_num_active_connections;
122 PosixTcpOptions options = TcpOptionsFromEndpointConfig(config);
123 connections.push_back(Connection{
124 CreatePosixEndpoint(
125 handle,
126 PosixEngineClosure::TestOnlyToClosure(
127 [&poller](absl::Status /*status*/) {
128 if (--g_num_active_connections == 0) {
129 poller.Kick();
130 }
131 }),
132 posix_ee,
133 options.resource_quota->memory_quota()->CreateMemoryAllocator(
134 "test"),
135 options),
136 std::move(server_endpoint)});
137 delete server_signal;
138 server_signal = new grpc_core::Notification();
139 }
140 delete server_signal;
141 return connections;
142 }
143
144 } // namespace
145
TestScenarioName(const::testing::TestParamInfo<bool> & info)146 std::string TestScenarioName(const ::testing::TestParamInfo<bool>& info) {
147 return absl::StrCat("is_zero_copy_enabled_", info.param);
148 }
149
150 // A helper class to drive the polling of Fds. It repeatedly calls the Work(..)
151 // method on the poller to get pet pending events, then schedules another
152 // parallel Work(..) instantiation and processes these pending events. This
153 // continues until all Fds have orphaned themselves.
154 class Worker : public grpc_core::DualRefCounted<Worker> {
155 public:
Worker(std::shared_ptr<EventEngine> engine,PosixEventPoller * poller)156 Worker(std::shared_ptr<EventEngine> engine, PosixEventPoller* poller)
157 : engine_(std::move(engine)), poller_(poller) {
158 WeakRef().release();
159 }
Orphaned()160 void Orphaned() override { signal.Notify(); }
Start()161 void Start() {
162 // Start executing Work(..).
163 engine_->Run([this]() { Work(); });
164 }
165
Wait()166 void Wait() {
167 signal.WaitForNotification();
168 WeakUnref();
169 }
170
171 private:
Work()172 void Work() {
173 auto result = poller_->Work(24h, [this]() {
174 // Schedule next work instantiation immediately and take a Ref for
175 // the next instantiation.
176 Ref().release();
177 engine_->Run([this]() { Work(); });
178 });
179 ASSERT_TRUE(result == Poller::WorkResult::kOk ||
180 result == Poller::WorkResult::kKicked);
181 // Corresponds to the Ref taken for the current instantiation. If the
182 // result was Poller::WorkResult::kKicked, then the next work instantiation
183 // would not have been scheduled and the poll_again callback would have
184 // been deleted.
185 Unref();
186 }
187 std::shared_ptr<EventEngine> engine_;
188 // The poller is not owned by the Worker. Rather it is owned by the test
189 // which creates the worker instance.
190 PosixEventPoller* poller_;
191 grpc_core::Notification signal;
192 };
193
194 class PosixEndpointTest : public ::testing::TestWithParam<bool> {
SetUp()195 void SetUp() override {
196 oracle_ee_ = std::make_shared<PosixOracleEventEngine>();
197 scheduler_ =
198 std::make_unique<grpc_event_engine::experimental::TestScheduler>(
199 posix_ee_.get());
200 EXPECT_NE(scheduler_, nullptr);
201 poller_ = MakeDefaultPoller(scheduler_.get());
202 posix_ee_ = PosixEventEngine::MakeTestOnlyPosixEventEngine(poller_);
203 EXPECT_NE(posix_ee_, nullptr);
204 scheduler_->ChangeCurrentEventEngine(posix_ee_.get());
205 if (poller_ != nullptr) {
206 LOG(INFO) << "Using poller: " << poller_->Name();
207 }
208 }
209
TearDown()210 void TearDown() override {
211 if (poller_ != nullptr) {
212 poller_->Shutdown();
213 }
214 WaitForSingleOwner(std::move(posix_ee_));
215 WaitForSingleOwner(std::move(oracle_ee_));
216 }
217
218 public:
Scheduler()219 TestScheduler* Scheduler() { return scheduler_.get(); }
220
GetPosixEE()221 std::shared_ptr<EventEngine> GetPosixEE() { return posix_ee_; }
222
GetOracleEE()223 std::shared_ptr<EventEngine> GetOracleEE() { return oracle_ee_; }
224
PosixPoller()225 PosixEventPoller* PosixPoller() { return poller_.get(); }
226
227 private:
228 std::shared_ptr<PosixEventPoller> poller_;
229 std::unique_ptr<TestScheduler> scheduler_;
230 std::shared_ptr<EventEngine> posix_ee_;
231 std::shared_ptr<EventEngine> oracle_ee_;
232 };
233
TEST_P(PosixEndpointTest,ConnectExchangeBidiDataTransferTest)234 TEST_P(PosixEndpointTest, ConnectExchangeBidiDataTransferTest) {
235 if (PosixPoller() == nullptr) {
236 return;
237 }
238 Worker* worker = new Worker(GetPosixEE(), PosixPoller());
239 worker->Start();
240 {
241 auto connections = CreateConnectedEndpoints(*PosixPoller(), GetParam(), 1,
242 GetPosixEE(), GetOracleEE());
243 auto it = connections.begin();
244 auto client_endpoint = std::move((*it).client_endpoint);
245 auto server_endpoint = std::move((*it).server_endpoint);
246 EXPECT_NE(client_endpoint, nullptr);
247 EXPECT_NE(server_endpoint, nullptr);
248 connections.erase(it);
249
250 // Alternate message exchanges between client -- server and server --
251 // client.
252 for (int i = 0; i < kNumExchangedMessages; i++) {
253 // Send from client to server and verify data read at the server.
254 ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
255 client_endpoint.get(),
256 server_endpoint.get())
257 .ok());
258 // Send from server to client and verify data read at the client.
259 ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
260 server_endpoint.get(),
261 client_endpoint.get())
262 .ok());
263 }
264 }
265 worker->Wait();
266 }
267
268 // Create N connections and exchange and verify random number of messages over
269 // each connection in parallel.
TEST_P(PosixEndpointTest,MultipleIPv6ConnectionsToOneOracleListenerTest)270 TEST_P(PosixEndpointTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
271 if (PosixPoller() == nullptr) {
272 return;
273 }
274 Worker* worker = new Worker(GetPosixEE(), PosixPoller());
275 worker->Start();
276 auto connections = CreateConnectedEndpoints(
277 *PosixPoller(), GetParam(), kNumConnections, GetPosixEE(), GetOracleEE());
278 std::vector<std::thread> threads;
279 // Create one thread for each connection. For each connection, create
280 // 2 more worker threads: to exchange and verify bi-directional data transfer.
281 threads.reserve(kNumConnections);
282 for (int i = 0; i < kNumConnections; i++) {
283 // For each connection, simulate a parallel bi-directional data transfer.
284 // All bi-directional transfers are run in parallel across all connections.
285 auto it = connections.begin();
286 auto client_endpoint = std::move((*it).client_endpoint);
287 auto server_endpoint = std::move((*it).server_endpoint);
288 EXPECT_NE(client_endpoint, nullptr);
289 EXPECT_NE(server_endpoint, nullptr);
290 connections.erase(it);
291 threads.emplace_back([client_endpoint = std::move(client_endpoint),
292 server_endpoint = std::move(server_endpoint)]() {
293 std::vector<std::thread> workers;
294 workers.reserve(2);
295 auto worker = [client_endpoint = client_endpoint.get(),
296 server_endpoint =
297 server_endpoint.get()](bool client_to_server) {
298 for (int i = 0; i < kNumExchangedMessages; i++) {
299 // If client_to_server is true, send from client to server and
300 // verify data read at the server. Otherwise send data from server
301 // to client and verify data read at client.
302 if (client_to_server) {
303 EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
304 client_endpoint, server_endpoint)
305 .ok());
306 } else {
307 EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
308 server_endpoint, client_endpoint)
309 .ok());
310 }
311 }
312 };
313 // worker[0] simulates a flow from client to server endpoint
314 workers.emplace_back([&worker]() { worker(true); });
315 // worker[1] simulates a flow from server to client endpoint
316 workers.emplace_back([&worker]() { worker(false); });
317 workers[0].join();
318 workers[1].join();
319 });
320 }
321 for (auto& t : threads) {
322 t.join();
323 }
324 worker->Wait();
325 }
326
327 // Test with zero copy enabled and disabled.
328 INSTANTIATE_TEST_SUITE_P(PosixEndpoint, PosixEndpointTest,
329 ::testing::ValuesIn({false, true}), &TestScenarioName);
330
331 } // namespace experimental
332 } // namespace grpc_event_engine
333
main(int argc,char ** argv)334 int main(int argc, char** argv) {
335 ::testing::InitGoogleTest(&argc, argv);
336 auto poll_strategy = grpc_core::ConfigVars::Get().PollStrategy();
337 auto strings = absl::StrSplit(poll_strategy, ',');
338 if (std::find(strings.begin(), strings.end(), "none") != strings.end()) {
339 // Skip the test entirely if poll strategy is none.
340 return 0;
341 }
342 // TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
343 // until we clear out the iomgr shutdown code.
344 grpc_init();
345 int r = RUN_ALL_TESTS();
346 grpc_shutdown();
347 return r;
348 }
349