1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/grpc.h>
16 #include <stdint.h>
17 #include <sys/select.h>
18
19 #include <algorithm>
20 #include <atomic>
21 #include <chrono>
22 #include <cstring>
23 #include <memory>
24 #include <vector>
25
26 #include "absl/status/statusor.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/str_format.h"
29 #include "absl/strings/str_split.h"
30 #include "absl/strings/string_view.h"
31 #include "gtest/gtest.h"
32 #include "src/core/config/config_vars.h"
33 #include "src/core/lib/event_engine/poller.h"
34 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h"
35 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
36 #include "src/core/lib/iomgr/port.h"
37 #include "src/core/util/ref_counted_ptr.h"
38
39 // IWYU pragma: no_include <arpa/inet.h>
40 // IWYU pragma: no_include <ratio>
41
42 // This test won't work except with posix sockets enabled
43 #ifdef GRPC_POSIX_SOCKET_EV
44
45 #include <errno.h>
46 #include <fcntl.h>
47 #include <grpc/support/alloc.h>
48 #include <grpc/support/sync.h>
49 #include <netinet/in.h>
50 #include <poll.h>
51 #include <stdlib.h>
52 #include <sys/socket.h>
53 #include <unistd.h>
54
55 #include "absl/log/log.h"
56 #include "absl/status/status.h"
57 #include "src/core/lib/event_engine/common_closures.h"
58 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
59 #include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
60 #include "src/core/lib/event_engine/posix_engine/posix_engine.h"
61 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
62 #include "src/core/util/crash.h"
63 #include "src/core/util/dual_ref_counted.h"
64 #include "src/core/util/notification.h"
65 #include "src/core/util/strerror.h"
66 #include "test/core/event_engine/posix/posix_engine_test_utils.h"
67 #include "test/core/test_util/port.h"
68
69 static gpr_mu g_mu;
70 static std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller>
71 g_event_poller;
72
73 // buffer size used to send and receive data.
74 // 1024 is the minimal value to set TCP send and receive buffer.
75 #define BUF_SIZE 1024
76 // Max number of connections pending to be accepted by listen().
77 #define MAX_NUM_FD 1024
78 // Client write buffer size
79 #define CLIENT_WRITE_BUF_SIZE 10
80 // Total number of times that the client fills up the write buffer
81 #define CLIENT_TOTAL_WRITE_CNT 3
82
83 namespace grpc_event_engine {
84 namespace experimental {
85
86 using namespace std::chrono_literals;
87
88 namespace {
89
SetSocketSendBuf(int fd,int buffer_size_bytes)90 absl::Status SetSocketSendBuf(int fd, int buffer_size_bytes) {
91 return 0 == setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffer_size_bytes,
92 sizeof(buffer_size_bytes))
93 ? absl::OkStatus()
94 : absl::Status(absl::StatusCode::kInternal,
95 grpc_core::StrError(errno).c_str());
96 }
97
98 // Create a test socket with the right properties for testing.
99 // port is the TCP port to listen or connect to.
100 // Return a socket FD and sockaddr_in.
CreateTestSocket(int port,int * socket_fd,struct sockaddr_in6 * sin)101 void CreateTestSocket(int port, int* socket_fd, struct sockaddr_in6* sin) {
102 int fd;
103 int one = 1;
104 int buffer_size_bytes = BUF_SIZE;
105 int flags;
106
107 fd = socket(AF_INET6, SOCK_STREAM, 0);
108 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
109 // Reset the size of socket send buffer to the minimal value to facilitate
110 // buffer filling up and triggering notify_on_write
111 EXPECT_TRUE(SetSocketSendBuf(fd, buffer_size_bytes).ok());
112 EXPECT_TRUE(SetSocketSendBuf(fd, buffer_size_bytes).ok());
113 // Make fd non-blocking.
114 flags = fcntl(fd, F_GETFL, 0);
115 EXPECT_EQ(fcntl(fd, F_SETFL, flags | O_NONBLOCK), 0);
116 *socket_fd = fd;
117
118 // Use local address for test.
119 memset(sin, 0, sizeof(struct sockaddr_in6));
120 sin->sin6_family = AF_INET6;
121 (reinterpret_cast<char*>(&sin->sin6_addr))[15] = 1;
122 EXPECT_TRUE(port >= 0 && port < 65536);
123 sin->sin6_port = htons(static_cast<uint16_t>(port));
124 }
125
126 // =======An upload server to test notify_on_read===========
127 // The server simply reads and counts a stream of bytes.
128
129 // An upload server.
130 typedef struct {
131 EventHandle* em_fd; // listening fd
132 ssize_t read_bytes_total; // total number of received bytes
133 int done; // set to 1 when a server finishes serving
134 PosixEngineClosure* listen_closure;
135 } server;
136
ServerInit(server * sv)137 void ServerInit(server* sv) {
138 sv->read_bytes_total = 0;
139 sv->done = 0;
140 }
141
142 // An upload session.
143 // Created when a new upload request arrives in the server.
144 typedef struct {
145 server* sv; // not owned by a single session
146 EventHandle* em_fd; // fd to read upload bytes
147 char read_buf[BUF_SIZE]; // buffer to store upload bytes
148 PosixEngineClosure* session_read_closure;
149 } session;
150
151 // Called when an upload session can be safely shutdown.
152 // Close session FD and start to shutdown listen FD.
SessionShutdownCb(session * se,bool)153 void SessionShutdownCb(session* se, bool /*success*/) {
154 server* sv = se->sv;
155 se->em_fd->OrphanHandle(nullptr, nullptr, "a");
156 gpr_free(se);
157 // Start to shutdown listen fd.
158 sv->em_fd->ShutdownHandle(
159 absl::Status(absl::StatusCode::kUnknown, "SessionShutdownCb"));
160 }
161
162 // Called when data become readable in a session.
SessionReadCb(session * se,absl::Status status)163 void SessionReadCb(session* se, absl::Status status) {
164 int fd = se->em_fd->WrappedFd();
165
166 ssize_t read_once = 0;
167 ssize_t read_total = 0;
168
169 if (!status.ok()) {
170 SessionShutdownCb(se, true);
171 return;
172 }
173
174 do {
175 read_once = read(fd, se->read_buf, BUF_SIZE);
176 if (read_once > 0) read_total += read_once;
177 } while (read_once > 0);
178 se->sv->read_bytes_total += read_total;
179
180 // read() returns 0 to indicate the TCP connection was closed by the
181 // client read(fd, read_buf, 0) also returns 0 which should never be called as
182 // such. It is possible to read nothing due to spurious edge event or data has
183 // been drained, In such a case, read() returns -1 and set errno to
184 // EAGAIN.
185 if (read_once == 0) {
186 SessionShutdownCb(se, true);
187 } else if (read_once == -1) {
188 EXPECT_EQ(errno, EAGAIN);
189 // An edge triggered event is cached in the kernel until next poll.
190 // In the current single thread implementation, SessionReadCb is called
191 // in the polling thread, such that polling only happens after this
192 // callback, and will catch read edge event if data is available again
193 // before notify_on_read.
194 se->session_read_closure = PosixEngineClosure::TestOnlyToClosure(
195 [se](absl::Status status) { SessionReadCb(se, status); });
196 se->em_fd->NotifyOnRead(se->session_read_closure);
197 }
198 }
199
200 // Called when the listen FD can be safely shutdown. Close listen FD and
201 // signal that server can be shutdown.
ListenShutdownCb(server * sv)202 void ListenShutdownCb(server* sv) {
203 sv->em_fd->OrphanHandle(nullptr, nullptr, "b");
204 gpr_mu_lock(&g_mu);
205 sv->done = 1;
206 g_event_poller->Kick();
207 gpr_mu_unlock(&g_mu);
208 }
209
210 // Called when a new TCP connection request arrives in the listening port.
ListenCb(server * sv,absl::Status status)211 void ListenCb(server* sv, absl::Status status) {
212 int fd;
213 int flags;
214 session* se;
215 struct sockaddr_storage ss;
216 socklen_t slen = sizeof(ss);
217 EventHandle* listen_em_fd = sv->em_fd;
218
219 if (!status.ok()) {
220 ListenShutdownCb(sv);
221 return;
222 }
223
224 do {
225 fd = accept(listen_em_fd->WrappedFd(),
226 reinterpret_cast<struct sockaddr*>(&ss), &slen);
227 } while (fd < 0 && errno == EINTR);
228 if (fd < 0 && errno == EAGAIN) {
229 sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
230 [sv](absl::Status status) { ListenCb(sv, status); });
231 listen_em_fd->NotifyOnRead(sv->listen_closure);
232 return;
233 } else if (fd < 0) {
234 LOG(ERROR) << "Failed to accept a connection, returned error: "
235 << grpc_core::StrError(errno);
236 }
237 EXPECT_GE(fd, 0);
238 EXPECT_LT(fd, FD_SETSIZE);
239 flags = fcntl(fd, F_GETFL, 0);
240 fcntl(fd, F_SETFL, flags | O_NONBLOCK);
241 se = static_cast<session*>(gpr_malloc(sizeof(*se)));
242 se->sv = sv;
243 se->em_fd = g_event_poller->CreateHandle(fd, "listener", false);
244 se->session_read_closure = PosixEngineClosure::TestOnlyToClosure(
245 [se](absl::Status status) { SessionReadCb(se, status); });
246 se->em_fd->NotifyOnRead(se->session_read_closure);
247 sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
248 [sv](absl::Status status) { ListenCb(sv, status); });
249 listen_em_fd->NotifyOnRead(sv->listen_closure);
250 }
251
252 // Start a test server, return the TCP listening port bound to listen_fd.
253 // ListenCb() is registered to be interested in reading from listen_fd.
254 // When connection request arrives, ListenCb() is called to accept the
255 // connection request.
ServerStart(server * sv)256 int ServerStart(server* sv) {
257 int port = grpc_pick_unused_port_or_die();
258 int fd;
259 struct sockaddr_in6 sin;
260 socklen_t addr_len;
261
262 CreateTestSocket(port, &fd, &sin);
263 addr_len = sizeof(sin);
264 EXPECT_EQ(bind(fd, (struct sockaddr*)&sin, addr_len), 0);
265 EXPECT_EQ(getsockname(fd, (struct sockaddr*)&sin, &addr_len), 0);
266 port = ntohs(sin.sin6_port);
267 EXPECT_EQ(listen(fd, MAX_NUM_FD), 0);
268
269 sv->em_fd = g_event_poller->CreateHandle(fd, "server", false);
270 sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
271 [sv](absl::Status status) { ListenCb(sv, status); });
272 sv->em_fd->NotifyOnRead(sv->listen_closure);
273 return port;
274 }
275
276 // ===An upload client to test notify_on_write===
277
278 // An upload client.
279 typedef struct {
280 EventHandle* em_fd;
281 char write_buf[CLIENT_WRITE_BUF_SIZE];
282 ssize_t write_bytes_total;
283 // Number of times that the client fills up the write buffer and calls
284 // notify_on_write to schedule another write.
285 int client_write_cnt;
286 int done;
287 PosixEngineClosure* write_closure;
288 } client;
289
ClientInit(client * cl)290 void ClientInit(client* cl) {
291 memset(cl->write_buf, 0, sizeof(cl->write_buf));
292 cl->write_bytes_total = 0;
293 cl->client_write_cnt = 0;
294 cl->done = 0;
295 }
296
297 // Called when a client upload session is ready to shutdown.
ClientSessionShutdownCb(client * cl)298 void ClientSessionShutdownCb(client* cl) {
299 cl->em_fd->OrphanHandle(nullptr, nullptr, "c");
300 gpr_mu_lock(&g_mu);
301 cl->done = 1;
302 g_event_poller->Kick();
303 gpr_mu_unlock(&g_mu);
304 }
305
306 // Write as much as possible, then register notify_on_write.
ClientSessionWrite(client * cl,absl::Status status)307 void ClientSessionWrite(client* cl, absl::Status status) {
308 int fd = cl->em_fd->WrappedFd();
309 ssize_t write_once = 0;
310
311 if (!status.ok()) {
312 ClientSessionShutdownCb(cl);
313 return;
314 }
315
316 do {
317 write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
318 if (write_once > 0) cl->write_bytes_total += write_once;
319 } while (write_once > 0);
320
321 EXPECT_EQ(errno, EAGAIN);
322 gpr_mu_lock(&g_mu);
323 if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
324 cl->write_closure = PosixEngineClosure::TestOnlyToClosure(
325 [cl](absl::Status status) { ClientSessionWrite(cl, status); });
326 cl->client_write_cnt++;
327 gpr_mu_unlock(&g_mu);
328 cl->em_fd->NotifyOnWrite(cl->write_closure);
329 } else {
330 gpr_mu_unlock(&g_mu);
331 ClientSessionShutdownCb(cl);
332 }
333 }
334
335 // Start a client to send a stream of bytes.
ClientStart(client * cl,int port)336 void ClientStart(client* cl, int port) {
337 int fd;
338 struct sockaddr_in6 sin;
339 CreateTestSocket(port, &fd, &sin);
340 if (connect(fd, reinterpret_cast<struct sockaddr*>(&sin), sizeof(sin)) ==
341 -1) {
342 if (errno == EINPROGRESS) {
343 struct pollfd pfd;
344 pfd.fd = fd;
345 pfd.events = POLLOUT;
346 pfd.revents = 0;
347 if (poll(&pfd, 1, -1) == -1) {
348 LOG(ERROR) << "poll() failed during connect; errno=" << errno;
349 abort();
350 }
351 } else {
352 grpc_core::Crash(
353 absl::StrFormat("Failed to connect to the server (errno=%d)", errno));
354 }
355 }
356
357 cl->em_fd = g_event_poller->CreateHandle(fd, "client", false);
358 ClientSessionWrite(cl, absl::OkStatus());
359 }
360
361 // Wait for the signal to shutdown client and server.
WaitAndShutdown(server * sv,client * cl)362 void WaitAndShutdown(server* sv, client* cl) {
363 Poller::WorkResult result;
364 gpr_mu_lock(&g_mu);
365 while (!sv->done || !cl->done) {
366 gpr_mu_unlock(&g_mu);
367 result = g_event_poller->Work(24h, []() {});
368 ASSERT_FALSE(result == Poller::WorkResult::kDeadlineExceeded);
369 gpr_mu_lock(&g_mu);
370 }
371 gpr_mu_unlock(&g_mu);
372 }
373
374 class EventPollerTest : public ::testing::Test {
SetUp()375 void SetUp() override {
376 engine_ =
377 std::make_unique<grpc_event_engine::experimental::PosixEventEngine>();
378 EXPECT_NE(engine_, nullptr);
379 scheduler_ =
380 std::make_unique<grpc_event_engine::experimental::TestScheduler>(
381 engine_.get());
382 EXPECT_NE(scheduler_, nullptr);
383 g_event_poller = MakeDefaultPoller(scheduler_.get());
384 engine_ = PosixEventEngine::MakeTestOnlyPosixEventEngine(g_event_poller);
385 EXPECT_NE(engine_, nullptr);
386 scheduler_->ChangeCurrentEventEngine(engine_.get());
387 if (g_event_poller != nullptr) {
388 LOG(INFO) << "Using poller: " << g_event_poller->Name();
389 }
390 }
391
TearDown()392 void TearDown() override {
393 if (g_event_poller != nullptr) {
394 g_event_poller->Shutdown();
395 }
396 }
397
398 public:
Scheduler()399 TestScheduler* Scheduler() { return scheduler_.get(); }
400
401 private:
402 std::shared_ptr<grpc_event_engine::experimental::PosixEventEngine> engine_;
403 std::unique_ptr<grpc_event_engine::experimental::TestScheduler> scheduler_;
404 };
405
406 // Test grpc_fd. Start an upload server and client, upload a stream of bytes
407 // from the client to the server, and verify that the total number of sent
408 // bytes is equal to the total number of received bytes.
TEST_F(EventPollerTest,TestEventPollerHandle)409 TEST_F(EventPollerTest, TestEventPollerHandle) {
410 server sv;
411 client cl;
412 int port;
413 if (g_event_poller == nullptr) {
414 return;
415 }
416 ServerInit(&sv);
417 port = ServerStart(&sv);
418 ClientInit(&cl);
419 ClientStart(&cl, port);
420
421 WaitAndShutdown(&sv, &cl);
422 EXPECT_EQ(sv.read_bytes_total, cl.write_bytes_total);
423 }
424
425 typedef struct FdChangeData {
426 void (*cb_that_ran)(struct FdChangeData*, absl::Status);
427 } FdChangeData;
428
InitChangeData(FdChangeData * fdc)429 void InitChangeData(FdChangeData* fdc) { fdc->cb_that_ran = nullptr; }
430
DestroyChangeData(FdChangeData *)431 void DestroyChangeData(FdChangeData* /*fdc*/) {}
432
FirstReadCallback(FdChangeData * fdc,absl::Status)433 void FirstReadCallback(FdChangeData* fdc, absl::Status /*status*/) {
434 gpr_mu_lock(&g_mu);
435 fdc->cb_that_ran = FirstReadCallback;
436 g_event_poller->Kick();
437 gpr_mu_unlock(&g_mu);
438 }
439
SecondReadCallback(FdChangeData * fdc,absl::Status)440 void SecondReadCallback(FdChangeData* fdc, absl::Status /*status*/) {
441 gpr_mu_lock(&g_mu);
442 fdc->cb_that_ran = SecondReadCallback;
443 g_event_poller->Kick();
444 gpr_mu_unlock(&g_mu);
445 }
446
447 // Test that changing the callback we use for notify_on_read actually works.
448 // Note that we have two different but almost identical callbacks above -- the
449 // point is to have two different function pointers and two different data
450 // pointers and make sure that changing both really works.
TEST_F(EventPollerTest,TestEventPollerHandleChange)451 TEST_F(EventPollerTest, TestEventPollerHandleChange) {
452 EventHandle* em_fd;
453 FdChangeData a, b;
454 int flags;
455 int sv[2];
456 char data;
457 ssize_t result;
458 if (g_event_poller == nullptr) {
459 return;
460 }
461 PosixEngineClosure* first_closure = PosixEngineClosure::TestOnlyToClosure(
462 [a = &a](absl::Status status) { FirstReadCallback(a, status); });
463 PosixEngineClosure* second_closure = PosixEngineClosure::TestOnlyToClosure(
464 [b = &b](absl::Status status) { SecondReadCallback(b, status); });
465 InitChangeData(&a);
466 InitChangeData(&b);
467
468 EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
469 flags = fcntl(sv[0], F_GETFL, 0);
470 EXPECT_EQ(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK), 0);
471 flags = fcntl(sv[1], F_GETFL, 0);
472 EXPECT_EQ(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK), 0);
473
474 em_fd =
475 g_event_poller->CreateHandle(sv[0], "TestEventPollerHandleChange", false);
476 EXPECT_NE(em_fd, nullptr);
477 // Register the first callback, then make its FD readable
478 em_fd->NotifyOnRead(first_closure);
479 data = 0;
480 result = write(sv[1], &data, 1);
481 EXPECT_EQ(result, 1);
482
483 // And now wait for it to run.
484 auto poller_work = [](FdChangeData* fdc) {
485 Poller::WorkResult result;
486 gpr_mu_lock(&g_mu);
487 while (fdc->cb_that_ran == nullptr) {
488 gpr_mu_unlock(&g_mu);
489 result = g_event_poller->Work(24h, []() {});
490 ASSERT_FALSE(result == Poller::WorkResult::kDeadlineExceeded);
491 gpr_mu_lock(&g_mu);
492 }
493 };
494 poller_work(&a);
495 EXPECT_EQ(a.cb_that_ran, FirstReadCallback);
496 gpr_mu_unlock(&g_mu);
497
498 // And drain the socket so we can generate a new read edge
499 result = read(sv[0], &data, 1);
500 EXPECT_EQ(result, 1);
501
502 // Now register a second callback with distinct change data, and do the same
503 // thing again.
504 em_fd->NotifyOnRead(second_closure);
505 data = 0;
506 result = write(sv[1], &data, 1);
507 EXPECT_EQ(result, 1);
508
509 // And now wait for it to run.
510 poller_work(&b);
511 // Except now we verify that SecondReadCallback ran instead.
512 EXPECT_EQ(b.cb_that_ran, SecondReadCallback);
513 gpr_mu_unlock(&g_mu);
514
515 em_fd->OrphanHandle(nullptr, nullptr, "d");
516 DestroyChangeData(&a);
517 DestroyChangeData(&b);
518 close(sv[1]);
519 }
520
521 std::atomic<int> kTotalActiveWakeupFdHandles{0};
522
523 // A helper class representing one file descriptor. Its implemented using
524 // a WakeupFd. It registers itself with the poller and waits to be notified
525 // of read events. Upon receiving a read event, (1) it processes it,
526 // (2) registers to be notified of the next read event and (3) schedules
527 // generation of the next read event. The Fd orphans itself after processing
528 // a specified number of read events.
529 class WakeupFdHandle : public grpc_core::DualRefCounted<WakeupFdHandle> {
530 public:
WakeupFdHandle(int num_wakeups,Scheduler * scheduler,PosixEventPoller * poller)531 WakeupFdHandle(int num_wakeups, Scheduler* scheduler,
532 PosixEventPoller* poller)
533 : num_wakeups_(num_wakeups),
534 scheduler_(scheduler),
535 poller_(poller),
536 on_read_(
537 PosixEngineClosure::ToPermanentClosure([this](absl::Status status) {
538 EXPECT_TRUE(status.ok());
539 status = ReadPipe();
540 if (!status.ok()) {
541 // Rarely epoll1 poller may generate an EPOLLHUP - which is a
542 // spurious wakeup. Poll based poller may also likely generate a
543 // lot of spurious wakeups because of the level triggered nature
544 // of poll In such cases do not bother changing the number of
545 // wakeups received.
546 EXPECT_EQ(status, absl::InternalError("Spurious Wakeup"));
547 handle_->NotifyOnRead(on_read_);
548 return;
549 }
550 if (--num_wakeups_ == 0) {
551 // This should invoke the registered NotifyOnRead callbacks with
552 // the shutdown error. When those callbacks call Unref(), the
553 // WakeupFdHandle should call OrphanHandle in the Unref() method
554 // implementation.
555 handle_->ShutdownHandle(absl::InternalError("Shutting down"));
556 Unref();
557 } else {
558 handle_->NotifyOnRead(on_read_);
559 Ref().release();
560 // Schedule next wakeup to trigger the registered NotifyOnRead
561 // callback.
562 scheduler_->Run(SelfDeletingClosure::Create([this]() {
563 // Send next wakeup.
564 EXPECT_TRUE(wakeup_fd_->Wakeup().ok());
565 Unref();
566 }));
567 }
568 })) {
569 WeakRef().release();
570 ++kTotalActiveWakeupFdHandles;
571 EXPECT_GT(num_wakeups_, 0);
572 EXPECT_NE(scheduler_, nullptr);
573 EXPECT_NE(poller_, nullptr);
574 wakeup_fd_ = *PipeWakeupFd::CreatePipeWakeupFd();
575 handle_ = poller_->CreateHandle(wakeup_fd_->ReadFd(), "test", false);
576 EXPECT_NE(handle_, nullptr);
577 handle_->NotifyOnRead(on_read_);
578 // Send a wakeup initially.
579 EXPECT_TRUE(wakeup_fd_->Wakeup().ok());
580 }
581
~WakeupFdHandle()582 ~WakeupFdHandle() override { delete on_read_; }
583
Orphaned()584 void Orphaned() override {
585 // Once the handle has orphaned itself, decrement
586 // kTotalActiveWakeupFdHandles. Once all handles have orphaned themselves,
587 // send a Kick to the poller.
588 handle_->OrphanHandle(
589 PosixEngineClosure::TestOnlyToClosure(
590 [poller = poller_, wakeupfd_handle = this](absl::Status status) {
591 EXPECT_TRUE(status.ok());
592 if (--kTotalActiveWakeupFdHandles == 0) {
593 poller->Kick();
594 }
595 wakeupfd_handle->WeakUnref();
596 }),
597 nullptr, "");
598 }
599
600 private:
ReadPipe()601 absl::Status ReadPipe() {
602 char buf[128];
603 ssize_t r;
604 int total_bytes_read = 0;
605 for (;;) {
606 r = read(wakeup_fd_->ReadFd(), buf, sizeof(buf));
607 if (r > 0) {
608 total_bytes_read += r;
609 continue;
610 }
611 if (r == 0) return absl::OkStatus();
612 switch (errno) {
613 case EAGAIN:
614 return total_bytes_read > 0 ? absl::OkStatus()
615 : absl::InternalError("Spurious Wakeup");
616 case EINTR:
617 continue;
618 default:
619 return absl::Status(
620 absl::StatusCode::kInternal,
621 absl::StrCat("read: ", grpc_core::StrError(errno)));
622 }
623 }
624 }
625 int num_wakeups_;
626 Scheduler* scheduler_;
627 PosixEventPoller* poller_;
628 PosixEngineClosure* on_read_;
629 std::unique_ptr<WakeupFd> wakeup_fd_;
630 EventHandle* handle_;
631 };
632
633 // A helper class to create Fds and drive the polling for these Fds. It
634 // repeatedly calls the Work(..) method on the poller to get pet pending events,
635 // then schedules another parallel Work(..) instantiation and processes these
636 // pending events. This continues until all Fds have orphaned themselves.
637 class Worker : public grpc_core::DualRefCounted<Worker> {
638 public:
Worker(Scheduler * scheduler,PosixEventPoller * poller,int num_handles,int num_wakeups_per_handle)639 Worker(Scheduler* scheduler, PosixEventPoller* poller, int num_handles,
640 int num_wakeups_per_handle)
641 : scheduler_(scheduler), poller_(poller) {
642 handles_.reserve(num_handles);
643 for (int i = 0; i < num_handles; i++) {
644 handles_.push_back(
645 new WakeupFdHandle(num_wakeups_per_handle, scheduler_, poller_));
646 }
647 WeakRef().release();
648 }
Orphaned()649 void Orphaned() override { signal.Notify(); }
Start()650 void Start() {
651 // Start executing Work(..).
652 scheduler_->Run([this]() { Work(); });
653 }
654
Wait()655 void Wait() {
656 signal.WaitForNotification();
657 WeakUnref();
658 }
659
660 private:
Work()661 void Work() {
662 auto result = g_event_poller->Work(24h, [this]() {
663 // Schedule next work instantiation immediately and take a Ref for
664 // the next instantiation.
665 Ref().release();
666 scheduler_->Run([this]() { Work(); });
667 });
668 ASSERT_TRUE(result == Poller::WorkResult::kOk ||
669 result == Poller::WorkResult::kKicked);
670 // Corresponds to the Ref taken for the current instantiation. If the
671 // result was Poller::WorkResult::kKicked, then the next work instantiation
672 // would not have been scheduled and the poll_again callback should have
673 // been deleted.
674 Unref();
675 }
676 Scheduler* scheduler_;
677 PosixEventPoller* poller_;
678 grpc_core::Notification signal;
679 std::vector<WakeupFdHandle*> handles_;
680 };
681
682 // This test creates kNumHandles file descriptors and kNumWakeupsPerHandle
683 // separate read events to the created Fds. The Fds use the NotifyOnRead API to
684 // wait for a read event, upon receiving a read event they process it
685 // immediately and schedule the wait for the next read event. A new read event
686 // is also generated for each fd in parallel after the previous one is
687 // processed.
TEST_F(EventPollerTest,TestMultipleHandles)688 TEST_F(EventPollerTest, TestMultipleHandles) {
689 static constexpr int kNumHandles = 100;
690 static constexpr int kNumWakeupsPerHandle = 100;
691 if (g_event_poller == nullptr) {
692 return;
693 }
694 Worker* worker = new Worker(Scheduler(), g_event_poller.get(), kNumHandles,
695 kNumWakeupsPerHandle);
696 worker->Start();
697 worker->Wait();
698 }
699
700 } // namespace
701 } // namespace experimental
702 } // namespace grpc_event_engine
703
main(int argc,char ** argv)704 int main(int argc, char** argv) {
705 ::testing::InitGoogleTest(&argc, argv);
706 gpr_mu_init(&g_mu);
707 auto poll_strategy = grpc_core::ConfigVars::Get().PollStrategy();
708 auto strings = absl::StrSplit(poll_strategy, ',');
709 if (std::find(strings.begin(), strings.end(), "none") != strings.end()) {
710 // Skip the test entirely if poll strategy is none.
711 return 0;
712 }
713 // TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
714 // until we clear out the iomgr shutdown code.
715 grpc_init();
716 int r = RUN_ALL_TESTS();
717 grpc_shutdown();
718 return r;
719 }
720
721 #else // GRPC_POSIX_SOCKET_EV
722
main(int argc,char ** argv)723 int main(int argc, char** argv) { return 1; }
724
725 #endif // GRPC_POSIX_SOCKET_EV
726