• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/grpc.h>
16 #include <stdint.h>
17 #include <sys/select.h>
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <chrono>
22 #include <cstring>
23 #include <memory>
24 #include <vector>
25 
26 #include "absl/status/statusor.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/str_format.h"
29 #include "absl/strings/str_split.h"
30 #include "absl/strings/string_view.h"
31 #include "gtest/gtest.h"
32 #include "src/core/config/config_vars.h"
33 #include "src/core/lib/event_engine/poller.h"
34 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h"
35 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
36 #include "src/core/lib/iomgr/port.h"
37 #include "src/core/util/ref_counted_ptr.h"
38 
39 // IWYU pragma: no_include <arpa/inet.h>
40 // IWYU pragma: no_include <ratio>
41 
42 // This test won't work except with posix sockets enabled
43 #ifdef GRPC_POSIX_SOCKET_EV
44 
45 #include <errno.h>
46 #include <fcntl.h>
47 #include <grpc/support/alloc.h>
48 #include <grpc/support/sync.h>
49 #include <netinet/in.h>
50 #include <poll.h>
51 #include <stdlib.h>
52 #include <sys/socket.h>
53 #include <unistd.h>
54 
55 #include "absl/log/log.h"
56 #include "absl/status/status.h"
57 #include "src/core/lib/event_engine/common_closures.h"
58 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
59 #include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
60 #include "src/core/lib/event_engine/posix_engine/posix_engine.h"
61 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
62 #include "src/core/util/crash.h"
63 #include "src/core/util/dual_ref_counted.h"
64 #include "src/core/util/notification.h"
65 #include "src/core/util/strerror.h"
66 #include "test/core/event_engine/posix/posix_engine_test_utils.h"
67 #include "test/core/test_util/port.h"
68 
69 static gpr_mu g_mu;
70 static std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller>
71     g_event_poller;
72 
73 // buffer size used to send and receive data.
74 // 1024 is the minimal value to set TCP send and receive buffer.
75 #define BUF_SIZE 1024
76 // Max number of connections pending to be accepted by listen().
77 #define MAX_NUM_FD 1024
78 // Client write buffer size
79 #define CLIENT_WRITE_BUF_SIZE 10
80 // Total number of times that the client fills up the write buffer
81 #define CLIENT_TOTAL_WRITE_CNT 3
82 
83 namespace grpc_event_engine {
84 namespace experimental {
85 
86 using namespace std::chrono_literals;
87 
88 namespace {
89 
SetSocketSendBuf(int fd,int buffer_size_bytes)90 absl::Status SetSocketSendBuf(int fd, int buffer_size_bytes) {
91   return 0 == setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffer_size_bytes,
92                          sizeof(buffer_size_bytes))
93              ? absl::OkStatus()
94              : absl::Status(absl::StatusCode::kInternal,
95                             grpc_core::StrError(errno).c_str());
96 }
97 
98 // Create a test socket with the right properties for testing.
99 // port is the TCP port to listen or connect to.
100 // Return a socket FD and sockaddr_in.
CreateTestSocket(int port,int * socket_fd,struct sockaddr_in6 * sin)101 void CreateTestSocket(int port, int* socket_fd, struct sockaddr_in6* sin) {
102   int fd;
103   int one = 1;
104   int buffer_size_bytes = BUF_SIZE;
105   int flags;
106 
107   fd = socket(AF_INET6, SOCK_STREAM, 0);
108   setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
109   // Reset the size of socket send buffer to the minimal value to facilitate
110   // buffer filling up and triggering notify_on_write
111   EXPECT_TRUE(SetSocketSendBuf(fd, buffer_size_bytes).ok());
112   EXPECT_TRUE(SetSocketSendBuf(fd, buffer_size_bytes).ok());
113   // Make fd non-blocking.
114   flags = fcntl(fd, F_GETFL, 0);
115   EXPECT_EQ(fcntl(fd, F_SETFL, flags | O_NONBLOCK), 0);
116   *socket_fd = fd;
117 
118   // Use local address for test.
119   memset(sin, 0, sizeof(struct sockaddr_in6));
120   sin->sin6_family = AF_INET6;
121   (reinterpret_cast<char*>(&sin->sin6_addr))[15] = 1;
122   EXPECT_TRUE(port >= 0 && port < 65536);
123   sin->sin6_port = htons(static_cast<uint16_t>(port));
124 }
125 
126 //  =======An upload server to test notify_on_read===========
127 //    The server simply reads and counts a stream of bytes.
128 
129 // An upload server.
130 typedef struct {
131   EventHandle* em_fd;        // listening fd
132   ssize_t read_bytes_total;  // total number of received bytes
133   int done;                  // set to 1 when a server finishes serving
134   PosixEngineClosure* listen_closure;
135 } server;
136 
ServerInit(server * sv)137 void ServerInit(server* sv) {
138   sv->read_bytes_total = 0;
139   sv->done = 0;
140 }
141 
142 // An upload session.
143 // Created when a new upload request arrives in the server.
144 typedef struct {
145   server* sv;               // not owned by a single session
146   EventHandle* em_fd;       // fd to read upload bytes
147   char read_buf[BUF_SIZE];  // buffer to store upload bytes
148   PosixEngineClosure* session_read_closure;
149 } session;
150 
151 // Called when an upload session can be safely shutdown.
152 // Close session FD and start to shutdown listen FD.
SessionShutdownCb(session * se,bool)153 void SessionShutdownCb(session* se, bool /*success*/) {
154   server* sv = se->sv;
155   se->em_fd->OrphanHandle(nullptr, nullptr, "a");
156   gpr_free(se);
157   // Start to shutdown listen fd.
158   sv->em_fd->ShutdownHandle(
159       absl::Status(absl::StatusCode::kUnknown, "SessionShutdownCb"));
160 }
161 
162 // Called when data become readable in a session.
SessionReadCb(session * se,absl::Status status)163 void SessionReadCb(session* se, absl::Status status) {
164   int fd = se->em_fd->WrappedFd();
165 
166   ssize_t read_once = 0;
167   ssize_t read_total = 0;
168 
169   if (!status.ok()) {
170     SessionShutdownCb(se, true);
171     return;
172   }
173 
174   do {
175     read_once = read(fd, se->read_buf, BUF_SIZE);
176     if (read_once > 0) read_total += read_once;
177   } while (read_once > 0);
178   se->sv->read_bytes_total += read_total;
179 
180   // read() returns 0 to indicate the TCP connection was closed by the
181   // client read(fd, read_buf, 0) also returns 0 which should never be called as
182   // such. It is possible to read nothing due to spurious edge event or data has
183   // been drained, In such a case, read() returns -1 and set errno to
184   // EAGAIN.
185   if (read_once == 0) {
186     SessionShutdownCb(se, true);
187   } else if (read_once == -1) {
188     EXPECT_EQ(errno, EAGAIN);
189     // An edge triggered event is cached in the kernel until next poll.
190     // In the current single thread implementation, SessionReadCb is called
191     // in the polling thread, such that polling only happens after this
192     // callback, and will catch read edge event if data is available again
193     // before notify_on_read.
194     se->session_read_closure = PosixEngineClosure::TestOnlyToClosure(
195         [se](absl::Status status) { SessionReadCb(se, status); });
196     se->em_fd->NotifyOnRead(se->session_read_closure);
197   }
198 }
199 
200 // Called when the listen FD can be safely shutdown. Close listen FD and
201 // signal that server can be shutdown.
ListenShutdownCb(server * sv)202 void ListenShutdownCb(server* sv) {
203   sv->em_fd->OrphanHandle(nullptr, nullptr, "b");
204   gpr_mu_lock(&g_mu);
205   sv->done = 1;
206   g_event_poller->Kick();
207   gpr_mu_unlock(&g_mu);
208 }
209 
210 // Called when a new TCP connection request arrives in the listening port.
ListenCb(server * sv,absl::Status status)211 void ListenCb(server* sv, absl::Status status) {
212   int fd;
213   int flags;
214   session* se;
215   struct sockaddr_storage ss;
216   socklen_t slen = sizeof(ss);
217   EventHandle* listen_em_fd = sv->em_fd;
218 
219   if (!status.ok()) {
220     ListenShutdownCb(sv);
221     return;
222   }
223 
224   do {
225     fd = accept(listen_em_fd->WrappedFd(),
226                 reinterpret_cast<struct sockaddr*>(&ss), &slen);
227   } while (fd < 0 && errno == EINTR);
228   if (fd < 0 && errno == EAGAIN) {
229     sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
230         [sv](absl::Status status) { ListenCb(sv, status); });
231     listen_em_fd->NotifyOnRead(sv->listen_closure);
232     return;
233   } else if (fd < 0) {
234     LOG(ERROR) << "Failed to accept a connection, returned error: "
235                << grpc_core::StrError(errno);
236   }
237   EXPECT_GE(fd, 0);
238   EXPECT_LT(fd, FD_SETSIZE);
239   flags = fcntl(fd, F_GETFL, 0);
240   fcntl(fd, F_SETFL, flags | O_NONBLOCK);
241   se = static_cast<session*>(gpr_malloc(sizeof(*se)));
242   se->sv = sv;
243   se->em_fd = g_event_poller->CreateHandle(fd, "listener", false);
244   se->session_read_closure = PosixEngineClosure::TestOnlyToClosure(
245       [se](absl::Status status) { SessionReadCb(se, status); });
246   se->em_fd->NotifyOnRead(se->session_read_closure);
247   sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
248       [sv](absl::Status status) { ListenCb(sv, status); });
249   listen_em_fd->NotifyOnRead(sv->listen_closure);
250 }
251 
252 // Start a test server, return the TCP listening port bound to listen_fd.
253 // ListenCb() is registered to be interested in reading from listen_fd.
254 // When connection request arrives, ListenCb() is called to accept the
255 // connection request.
ServerStart(server * sv)256 int ServerStart(server* sv) {
257   int port = grpc_pick_unused_port_or_die();
258   int fd;
259   struct sockaddr_in6 sin;
260   socklen_t addr_len;
261 
262   CreateTestSocket(port, &fd, &sin);
263   addr_len = sizeof(sin);
264   EXPECT_EQ(bind(fd, (struct sockaddr*)&sin, addr_len), 0);
265   EXPECT_EQ(getsockname(fd, (struct sockaddr*)&sin, &addr_len), 0);
266   port = ntohs(sin.sin6_port);
267   EXPECT_EQ(listen(fd, MAX_NUM_FD), 0);
268 
269   sv->em_fd = g_event_poller->CreateHandle(fd, "server", false);
270   sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
271       [sv](absl::Status status) { ListenCb(sv, status); });
272   sv->em_fd->NotifyOnRead(sv->listen_closure);
273   return port;
274 }
275 
276 // ===An upload client to test notify_on_write===
277 
278 // An upload client.
279 typedef struct {
280   EventHandle* em_fd;
281   char write_buf[CLIENT_WRITE_BUF_SIZE];
282   ssize_t write_bytes_total;
283   // Number of times that the client fills up the write buffer and calls
284   // notify_on_write to schedule another write.
285   int client_write_cnt;
286   int done;
287   PosixEngineClosure* write_closure;
288 } client;
289 
ClientInit(client * cl)290 void ClientInit(client* cl) {
291   memset(cl->write_buf, 0, sizeof(cl->write_buf));
292   cl->write_bytes_total = 0;
293   cl->client_write_cnt = 0;
294   cl->done = 0;
295 }
296 
297 // Called when a client upload session is ready to shutdown.
ClientSessionShutdownCb(client * cl)298 void ClientSessionShutdownCb(client* cl) {
299   cl->em_fd->OrphanHandle(nullptr, nullptr, "c");
300   gpr_mu_lock(&g_mu);
301   cl->done = 1;
302   g_event_poller->Kick();
303   gpr_mu_unlock(&g_mu);
304 }
305 
306 // Write as much as possible, then register notify_on_write.
ClientSessionWrite(client * cl,absl::Status status)307 void ClientSessionWrite(client* cl, absl::Status status) {
308   int fd = cl->em_fd->WrappedFd();
309   ssize_t write_once = 0;
310 
311   if (!status.ok()) {
312     ClientSessionShutdownCb(cl);
313     return;
314   }
315 
316   do {
317     write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
318     if (write_once > 0) cl->write_bytes_total += write_once;
319   } while (write_once > 0);
320 
321   EXPECT_EQ(errno, EAGAIN);
322   gpr_mu_lock(&g_mu);
323   if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
324     cl->write_closure = PosixEngineClosure::TestOnlyToClosure(
325         [cl](absl::Status status) { ClientSessionWrite(cl, status); });
326     cl->client_write_cnt++;
327     gpr_mu_unlock(&g_mu);
328     cl->em_fd->NotifyOnWrite(cl->write_closure);
329   } else {
330     gpr_mu_unlock(&g_mu);
331     ClientSessionShutdownCb(cl);
332   }
333 }
334 
335 // Start a client to send a stream of bytes.
ClientStart(client * cl,int port)336 void ClientStart(client* cl, int port) {
337   int fd;
338   struct sockaddr_in6 sin;
339   CreateTestSocket(port, &fd, &sin);
340   if (connect(fd, reinterpret_cast<struct sockaddr*>(&sin), sizeof(sin)) ==
341       -1) {
342     if (errno == EINPROGRESS) {
343       struct pollfd pfd;
344       pfd.fd = fd;
345       pfd.events = POLLOUT;
346       pfd.revents = 0;
347       if (poll(&pfd, 1, -1) == -1) {
348         LOG(ERROR) << "poll() failed during connect; errno=" << errno;
349         abort();
350       }
351     } else {
352       grpc_core::Crash(
353           absl::StrFormat("Failed to connect to the server (errno=%d)", errno));
354     }
355   }
356 
357   cl->em_fd = g_event_poller->CreateHandle(fd, "client", false);
358   ClientSessionWrite(cl, absl::OkStatus());
359 }
360 
361 // Wait for the signal to shutdown client and server.
WaitAndShutdown(server * sv,client * cl)362 void WaitAndShutdown(server* sv, client* cl) {
363   Poller::WorkResult result;
364   gpr_mu_lock(&g_mu);
365   while (!sv->done || !cl->done) {
366     gpr_mu_unlock(&g_mu);
367     result = g_event_poller->Work(24h, []() {});
368     ASSERT_FALSE(result == Poller::WorkResult::kDeadlineExceeded);
369     gpr_mu_lock(&g_mu);
370   }
371   gpr_mu_unlock(&g_mu);
372 }
373 
374 class EventPollerTest : public ::testing::Test {
SetUp()375   void SetUp() override {
376     engine_ =
377         std::make_unique<grpc_event_engine::experimental::PosixEventEngine>();
378     EXPECT_NE(engine_, nullptr);
379     scheduler_ =
380         std::make_unique<grpc_event_engine::experimental::TestScheduler>(
381             engine_.get());
382     EXPECT_NE(scheduler_, nullptr);
383     g_event_poller = MakeDefaultPoller(scheduler_.get());
384     engine_ = PosixEventEngine::MakeTestOnlyPosixEventEngine(g_event_poller);
385     EXPECT_NE(engine_, nullptr);
386     scheduler_->ChangeCurrentEventEngine(engine_.get());
387     if (g_event_poller != nullptr) {
388       LOG(INFO) << "Using poller: " << g_event_poller->Name();
389     }
390   }
391 
TearDown()392   void TearDown() override {
393     if (g_event_poller != nullptr) {
394       g_event_poller->Shutdown();
395     }
396   }
397 
398  public:
Scheduler()399   TestScheduler* Scheduler() { return scheduler_.get(); }
400 
401  private:
402   std::shared_ptr<grpc_event_engine::experimental::PosixEventEngine> engine_;
403   std::unique_ptr<grpc_event_engine::experimental::TestScheduler> scheduler_;
404 };
405 
406 // Test grpc_fd. Start an upload server and client, upload a stream of bytes
407 // from the client to the server, and verify that the total number of sent
408 // bytes is equal to the total number of received bytes.
TEST_F(EventPollerTest,TestEventPollerHandle)409 TEST_F(EventPollerTest, TestEventPollerHandle) {
410   server sv;
411   client cl;
412   int port;
413   if (g_event_poller == nullptr) {
414     return;
415   }
416   ServerInit(&sv);
417   port = ServerStart(&sv);
418   ClientInit(&cl);
419   ClientStart(&cl, port);
420 
421   WaitAndShutdown(&sv, &cl);
422   EXPECT_EQ(sv.read_bytes_total, cl.write_bytes_total);
423 }
424 
425 typedef struct FdChangeData {
426   void (*cb_that_ran)(struct FdChangeData*, absl::Status);
427 } FdChangeData;
428 
InitChangeData(FdChangeData * fdc)429 void InitChangeData(FdChangeData* fdc) { fdc->cb_that_ran = nullptr; }
430 
DestroyChangeData(FdChangeData *)431 void DestroyChangeData(FdChangeData* /*fdc*/) {}
432 
FirstReadCallback(FdChangeData * fdc,absl::Status)433 void FirstReadCallback(FdChangeData* fdc, absl::Status /*status*/) {
434   gpr_mu_lock(&g_mu);
435   fdc->cb_that_ran = FirstReadCallback;
436   g_event_poller->Kick();
437   gpr_mu_unlock(&g_mu);
438 }
439 
SecondReadCallback(FdChangeData * fdc,absl::Status)440 void SecondReadCallback(FdChangeData* fdc, absl::Status /*status*/) {
441   gpr_mu_lock(&g_mu);
442   fdc->cb_that_ran = SecondReadCallback;
443   g_event_poller->Kick();
444   gpr_mu_unlock(&g_mu);
445 }
446 
447 // Test that changing the callback we use for notify_on_read actually works.
448 // Note that we have two different but almost identical callbacks above -- the
449 // point is to have two different function pointers and two different data
450 // pointers and make sure that changing both really works.
TEST_F(EventPollerTest,TestEventPollerHandleChange)451 TEST_F(EventPollerTest, TestEventPollerHandleChange) {
452   EventHandle* em_fd;
453   FdChangeData a, b;
454   int flags;
455   int sv[2];
456   char data;
457   ssize_t result;
458   if (g_event_poller == nullptr) {
459     return;
460   }
461   PosixEngineClosure* first_closure = PosixEngineClosure::TestOnlyToClosure(
462       [a = &a](absl::Status status) { FirstReadCallback(a, status); });
463   PosixEngineClosure* second_closure = PosixEngineClosure::TestOnlyToClosure(
464       [b = &b](absl::Status status) { SecondReadCallback(b, status); });
465   InitChangeData(&a);
466   InitChangeData(&b);
467 
468   EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
469   flags = fcntl(sv[0], F_GETFL, 0);
470   EXPECT_EQ(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK), 0);
471   flags = fcntl(sv[1], F_GETFL, 0);
472   EXPECT_EQ(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK), 0);
473 
474   em_fd =
475       g_event_poller->CreateHandle(sv[0], "TestEventPollerHandleChange", false);
476   EXPECT_NE(em_fd, nullptr);
477   // Register the first callback, then make its FD readable
478   em_fd->NotifyOnRead(first_closure);
479   data = 0;
480   result = write(sv[1], &data, 1);
481   EXPECT_EQ(result, 1);
482 
483   // And now wait for it to run.
484   auto poller_work = [](FdChangeData* fdc) {
485     Poller::WorkResult result;
486     gpr_mu_lock(&g_mu);
487     while (fdc->cb_that_ran == nullptr) {
488       gpr_mu_unlock(&g_mu);
489       result = g_event_poller->Work(24h, []() {});
490       ASSERT_FALSE(result == Poller::WorkResult::kDeadlineExceeded);
491       gpr_mu_lock(&g_mu);
492     }
493   };
494   poller_work(&a);
495   EXPECT_EQ(a.cb_that_ran, FirstReadCallback);
496   gpr_mu_unlock(&g_mu);
497 
498   // And drain the socket so we can generate a new read edge
499   result = read(sv[0], &data, 1);
500   EXPECT_EQ(result, 1);
501 
502   // Now register a second callback with distinct change data, and do the same
503   // thing again.
504   em_fd->NotifyOnRead(second_closure);
505   data = 0;
506   result = write(sv[1], &data, 1);
507   EXPECT_EQ(result, 1);
508 
509   // And now wait for it to run.
510   poller_work(&b);
511   // Except now we verify that SecondReadCallback ran instead.
512   EXPECT_EQ(b.cb_that_ran, SecondReadCallback);
513   gpr_mu_unlock(&g_mu);
514 
515   em_fd->OrphanHandle(nullptr, nullptr, "d");
516   DestroyChangeData(&a);
517   DestroyChangeData(&b);
518   close(sv[1]);
519 }
520 
521 std::atomic<int> kTotalActiveWakeupFdHandles{0};
522 
523 // A helper class representing one file descriptor. Its implemented using
524 // a WakeupFd. It registers itself with the poller and waits to be notified
525 // of read events. Upon receiving a read event, (1) it processes it,
526 // (2) registers to be notified of the next read event and (3) schedules
527 // generation of the next read event. The Fd orphans itself after processing
528 // a specified number of read events.
529 class WakeupFdHandle : public grpc_core::DualRefCounted<WakeupFdHandle> {
530  public:
WakeupFdHandle(int num_wakeups,Scheduler * scheduler,PosixEventPoller * poller)531   WakeupFdHandle(int num_wakeups, Scheduler* scheduler,
532                  PosixEventPoller* poller)
533       : num_wakeups_(num_wakeups),
534         scheduler_(scheduler),
535         poller_(poller),
536         on_read_(
537             PosixEngineClosure::ToPermanentClosure([this](absl::Status status) {
538               EXPECT_TRUE(status.ok());
539               status = ReadPipe();
540               if (!status.ok()) {
541                 // Rarely epoll1 poller may generate an EPOLLHUP - which is a
542                 // spurious wakeup. Poll based poller may also likely generate a
543                 // lot of spurious wakeups because of the level triggered nature
544                 // of poll In such cases do not bother changing the number of
545                 // wakeups received.
546                 EXPECT_EQ(status, absl::InternalError("Spurious Wakeup"));
547                 handle_->NotifyOnRead(on_read_);
548                 return;
549               }
550               if (--num_wakeups_ == 0) {
551                 // This should invoke the registered NotifyOnRead callbacks with
552                 // the shutdown error. When those callbacks call Unref(), the
553                 // WakeupFdHandle should call OrphanHandle in the Unref() method
554                 // implementation.
555                 handle_->ShutdownHandle(absl::InternalError("Shutting down"));
556                 Unref();
557               } else {
558                 handle_->NotifyOnRead(on_read_);
559                 Ref().release();
560                 // Schedule next wakeup to trigger the registered NotifyOnRead
561                 // callback.
562                 scheduler_->Run(SelfDeletingClosure::Create([this]() {
563                   // Send next wakeup.
564                   EXPECT_TRUE(wakeup_fd_->Wakeup().ok());
565                   Unref();
566                 }));
567               }
568             })) {
569     WeakRef().release();
570     ++kTotalActiveWakeupFdHandles;
571     EXPECT_GT(num_wakeups_, 0);
572     EXPECT_NE(scheduler_, nullptr);
573     EXPECT_NE(poller_, nullptr);
574     wakeup_fd_ = *PipeWakeupFd::CreatePipeWakeupFd();
575     handle_ = poller_->CreateHandle(wakeup_fd_->ReadFd(), "test", false);
576     EXPECT_NE(handle_, nullptr);
577     handle_->NotifyOnRead(on_read_);
578     //  Send a wakeup initially.
579     EXPECT_TRUE(wakeup_fd_->Wakeup().ok());
580   }
581 
~WakeupFdHandle()582   ~WakeupFdHandle() override { delete on_read_; }
583 
Orphaned()584   void Orphaned() override {
585     // Once the handle has orphaned itself, decrement
586     // kTotalActiveWakeupFdHandles. Once all handles have orphaned themselves,
587     // send a Kick to the poller.
588     handle_->OrphanHandle(
589         PosixEngineClosure::TestOnlyToClosure(
590             [poller = poller_, wakeupfd_handle = this](absl::Status status) {
591               EXPECT_TRUE(status.ok());
592               if (--kTotalActiveWakeupFdHandles == 0) {
593                 poller->Kick();
594               }
595               wakeupfd_handle->WeakUnref();
596             }),
597         nullptr, "");
598   }
599 
600  private:
ReadPipe()601   absl::Status ReadPipe() {
602     char buf[128];
603     ssize_t r;
604     int total_bytes_read = 0;
605     for (;;) {
606       r = read(wakeup_fd_->ReadFd(), buf, sizeof(buf));
607       if (r > 0) {
608         total_bytes_read += r;
609         continue;
610       }
611       if (r == 0) return absl::OkStatus();
612       switch (errno) {
613         case EAGAIN:
614           return total_bytes_read > 0 ? absl::OkStatus()
615                                       : absl::InternalError("Spurious Wakeup");
616         case EINTR:
617           continue;
618         default:
619           return absl::Status(
620               absl::StatusCode::kInternal,
621               absl::StrCat("read: ", grpc_core::StrError(errno)));
622       }
623     }
624   }
625   int num_wakeups_;
626   Scheduler* scheduler_;
627   PosixEventPoller* poller_;
628   PosixEngineClosure* on_read_;
629   std::unique_ptr<WakeupFd> wakeup_fd_;
630   EventHandle* handle_;
631 };
632 
633 // A helper class to create Fds and drive the polling for these Fds. It
634 // repeatedly calls the Work(..) method on the poller to get pet pending events,
635 // then schedules another parallel Work(..) instantiation and processes these
636 // pending events. This continues until all Fds have orphaned themselves.
637 class Worker : public grpc_core::DualRefCounted<Worker> {
638  public:
Worker(Scheduler * scheduler,PosixEventPoller * poller,int num_handles,int num_wakeups_per_handle)639   Worker(Scheduler* scheduler, PosixEventPoller* poller, int num_handles,
640          int num_wakeups_per_handle)
641       : scheduler_(scheduler), poller_(poller) {
642     handles_.reserve(num_handles);
643     for (int i = 0; i < num_handles; i++) {
644       handles_.push_back(
645           new WakeupFdHandle(num_wakeups_per_handle, scheduler_, poller_));
646     }
647     WeakRef().release();
648   }
Orphaned()649   void Orphaned() override { signal.Notify(); }
Start()650   void Start() {
651     // Start executing Work(..).
652     scheduler_->Run([this]() { Work(); });
653   }
654 
Wait()655   void Wait() {
656     signal.WaitForNotification();
657     WeakUnref();
658   }
659 
660  private:
Work()661   void Work() {
662     auto result = g_event_poller->Work(24h, [this]() {
663       // Schedule next work instantiation immediately and take a Ref for
664       // the next instantiation.
665       Ref().release();
666       scheduler_->Run([this]() { Work(); });
667     });
668     ASSERT_TRUE(result == Poller::WorkResult::kOk ||
669                 result == Poller::WorkResult::kKicked);
670     // Corresponds to the Ref taken for the current instantiation. If the
671     // result was Poller::WorkResult::kKicked, then the next work instantiation
672     // would not have been scheduled and the poll_again callback should have
673     // been deleted.
674     Unref();
675   }
676   Scheduler* scheduler_;
677   PosixEventPoller* poller_;
678   grpc_core::Notification signal;
679   std::vector<WakeupFdHandle*> handles_;
680 };
681 
682 // This test creates kNumHandles file descriptors and kNumWakeupsPerHandle
683 // separate read events to the created Fds. The Fds use the NotifyOnRead API to
684 // wait for a read event, upon receiving a read event they process it
685 // immediately and schedule the wait for the next read event. A new read event
686 // is also generated for each fd in parallel after the previous one is
687 // processed.
TEST_F(EventPollerTest,TestMultipleHandles)688 TEST_F(EventPollerTest, TestMultipleHandles) {
689   static constexpr int kNumHandles = 100;
690   static constexpr int kNumWakeupsPerHandle = 100;
691   if (g_event_poller == nullptr) {
692     return;
693   }
694   Worker* worker = new Worker(Scheduler(), g_event_poller.get(), kNumHandles,
695                               kNumWakeupsPerHandle);
696   worker->Start();
697   worker->Wait();
698 }
699 
700 }  // namespace
701 }  // namespace experimental
702 }  // namespace grpc_event_engine
703 
main(int argc,char ** argv)704 int main(int argc, char** argv) {
705   ::testing::InitGoogleTest(&argc, argv);
706   gpr_mu_init(&g_mu);
707   auto poll_strategy = grpc_core::ConfigVars::Get().PollStrategy();
708   auto strings = absl::StrSplit(poll_strategy, ',');
709   if (std::find(strings.begin(), strings.end(), "none") != strings.end()) {
710     // Skip the test entirely if poll strategy is none.
711     return 0;
712   }
713   // TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
714   // until we clear out the iomgr shutdown code.
715   grpc_init();
716   int r = RUN_ALL_TESTS();
717   grpc_shutdown();
718   return r;
719 }
720 
721 #else  // GRPC_POSIX_SOCKET_EV
722 
main(int argc,char ** argv)723 int main(int argc, char** argv) { return 1; }
724 
725 #endif  // GRPC_POSIX_SOCKET_EV
726