• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_epoll.h"
6 
7 #include <sys/eventfd.h>
8 
9 #include <cstddef>
10 #include <cstdint>
11 #include <optional>
12 #include <utility>
13 
14 #include "base/auto_reset.h"
15 #include "base/check_op.h"
16 #include "base/feature_list.h"
17 #include "base/memory/raw_ptr.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/metrics/histogram_macros.h"
20 #include "base/numerics/safe_conversions.h"
21 #include "base/posix/eintr_wrapper.h"
22 #include "base/ranges/algorithm.h"
23 #include "base/threading/thread_checker.h"
24 #include "base/time/time.h"
25 #include "base/trace_event/base_tracing.h"
26 
27 #if DCHECK_IS_ON()
28 #include <iomanip>
29 #endif
30 
31 namespace base {
32 
33 namespace {
34 
35 // Under this feature native work is batched.
36 BASE_FEATURE(kBatchNativeEventsInMessagePumpEpoll,
37              "BatchNativeEventsInMessagePumpEpoll",
38              base::FEATURE_DISABLED_BY_DEFAULT);
39 
40 // Caches the state of the "BatchNativeEventsInMessagePumpEpoll".
41 std::atomic_bool g_use_batched_version = false;
42 std::atomic_bool g_use_poll = false;
43 
44 constexpr std::pair<uint32_t, short int> kEpollToPollEvents[] = {
45     {EPOLLIN, POLLIN},   {EPOLLOUT, POLLOUT}, {EPOLLRDHUP, POLLRDHUP},
46     {EPOLLPRI, POLLPRI}, {EPOLLERR, POLLERR}, {EPOLLHUP, POLLHUP}};
47 
SetEventsForPoll(const uint32_t epoll_events,struct pollfd * poll_entry)48 void SetEventsForPoll(const uint32_t epoll_events, struct pollfd* poll_entry) {
49   poll_entry->events = 0;
50   for (const auto& epoll_poll : kEpollToPollEvents) {
51     if (epoll_events & epoll_poll.first) {
52       poll_entry->events |= epoll_poll.second;
53     }
54   }
55 }
56 }  // namespace
57 
58 // Parameters used to construct and describe an interest.
59 struct MessagePumpEpoll::InterestParams {
60   // The file descriptor of interest.
61   int fd;
62 
63   // Indicates an interest in being able to read() from `fd`.
64   bool read;
65 
66   // Indicates an interest in being able to write() to `fd`.
67   bool write;
68 
69   // Indicates whether this interest is a one-shot interest, meaning that it
70   // must be automatically deactivated every time it triggers an epoll event.
71   bool one_shot;
72 
IsEqualbase::MessagePumpEpoll::InterestParams73   bool IsEqual(const InterestParams& rhs) const {
74     return std::tie(fd, read, write, one_shot) ==
75            std::tie(rhs.fd, rhs.read, rhs.write, rhs.one_shot);
76   }
77 };
78 
79 // Represents a single controller's interest in a file descriptor via epoll,
80 // and tracks whether that interest is currently active. Though an interest
81 // persists as long as its controller is alive and hasn't changed interests,
82 // it only participates in epoll waits while active.
83 class MessagePumpEpoll::Interest : public RefCounted<Interest> {
84  public:
Interest(FdWatchController * controller,const InterestParams & params)85   Interest(FdWatchController* controller, const InterestParams& params)
86       : controller_(controller), params_(params) {}
87 
88   Interest(const Interest&) = delete;
89   Interest& operator=(const Interest&) = delete;
90 
controller()91   FdWatchController* controller() { return controller_; }
params() const92   const InterestParams& params() const { return params_; }
93 
active() const94   bool active() const { return active_; }
set_active(bool active)95   void set_active(bool active) { active_ = active; }
96 
97   // Only meaningful between WatchForControllerDestruction() and
98   // StopWatchingForControllerDestruction().
was_controller_destroyed() const99   bool was_controller_destroyed() const { return was_controller_destroyed_; }
100 
WatchForControllerDestruction()101   void WatchForControllerDestruction() {
102     DCHECK_GE(nested_controller_destruction_watchers_, 0);
103     if (nested_controller_destruction_watchers_ == 0) {
104       DCHECK(!controller_->was_destroyed_);
105       controller_->was_destroyed_ = &was_controller_destroyed_;
106     } else {
107       // If this is a nested event we should already be watching `controller_`
108       // for destruction from an outer event handler.
109       DCHECK_EQ(controller_->was_destroyed_, &was_controller_destroyed_);
110     }
111     ++nested_controller_destruction_watchers_;
112   }
113 
StopWatchingForControllerDestruction()114   void StopWatchingForControllerDestruction() {
115     --nested_controller_destruction_watchers_;
116     DCHECK_GE(nested_controller_destruction_watchers_, 0);
117     if (nested_controller_destruction_watchers_ == 0 &&
118         !was_controller_destroyed_) {
119       DCHECK_EQ(controller_->was_destroyed_, &was_controller_destroyed_);
120       controller_->was_destroyed_ = nullptr;
121     }
122   }
123 
124  private:
125   friend class RefCounted<Interest>;
126   ~Interest() = default;
127 
128   const raw_ptr<FdWatchController, DanglingUntriaged> controller_;
129   const InterestParams params_;
130   bool active_ = true;
131   bool was_controller_destroyed_ = false;
132 
133   // Avoid resetting `controller_->was_destroyed` when nested destruction
134   // watchers are active.
135   int nested_controller_destruction_watchers_ = 0;
136 };
137 
MessagePumpEpoll()138 MessagePumpEpoll::MessagePumpEpoll() {
139   epoll_.reset(epoll_create1(/*flags=*/0));
140   PCHECK(epoll_.is_valid());
141 
142   wake_event_.reset(eventfd(0, EFD_NONBLOCK));
143   PCHECK(wake_event_.is_valid());
144 
145   epoll_event wake{.events = EPOLLIN, .data = {.ptr = &wake_event_}};
146   int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, wake_event_.get(), &wake);
147   PCHECK(rv == 0);
148 
149   struct pollfd poll_entry;
150   poll_entry.fd = wake_event_.get();
151   poll_entry.events = POLLIN;
152   poll_entry.revents = 0;
153   pollfds_.push_back(poll_entry);
154 
155   next_metrics_time_ = base::TimeTicks::Now() + base::Minutes(1);
156 }
157 
158 MessagePumpEpoll::~MessagePumpEpoll() = default;
159 
InitializeFeatures()160 void MessagePumpEpoll::InitializeFeatures() {
161   // Relaxed memory order since no memory access depends on value.
162   g_use_batched_version.store(
163       base::FeatureList::IsEnabled(kBatchNativeEventsInMessagePumpEpoll),
164       std::memory_order_relaxed);
165   g_use_poll.store(base::FeatureList::IsEnabled(kUsePollForMessagePumpEpoll),
166                    std::memory_order_relaxed);
167 }
168 
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * watcher)169 bool MessagePumpEpoll::WatchFileDescriptor(int fd,
170                                            bool persistent,
171                                            int mode,
172                                            FdWatchController* controller,
173                                            FdWatcher* watcher) {
174   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
175   TRACE_EVENT("base", "MessagePumpEpoll::WatchFileDescriptor", "fd", fd,
176               "persistent", persistent, "watch_read", mode & WATCH_READ,
177               "watch_write", mode & WATCH_WRITE);
178 
179   const InterestParams params{
180       .fd = fd,
181       .read = (mode == WATCH_READ || mode == WATCH_READ_WRITE),
182       .write = (mode == WATCH_WRITE || mode == WATCH_READ_WRITE),
183       .one_shot = !persistent,
184   };
185 
186   auto [it, is_new_fd_entry] = entries_.emplace(fd, fd);
187   EpollEventEntry& entry = it->second;
188   scoped_refptr<Interest> existing_interest = controller->interest();
189   if (existing_interest && existing_interest->params().IsEqual(params)) {
190     // WatchFileDescriptor() has already been called for this controller at
191     // least once before, and as in the most common cases, it is now being
192     // called again with the same parameters.
193     //
194     // We don't need to allocate and register a new Interest in this case, but
195     // we can instead reactivate the existing (presumably deactivated,
196     // non-persistent) Interest.
197     existing_interest->set_active(true);
198   } else {
199     entry.interests.push_back(controller->AssignInterest(params));
200     if (existing_interest) {
201       UnregisterInterest(existing_interest);
202     }
203   }
204 
205   if (is_new_fd_entry) {
206     AddEpollEvent(entry);
207   } else {
208     UpdateEpollEvent(entry);
209   }
210 
211   controller->set_pump(weak_ptr_factory_.GetWeakPtr());
212   controller->set_watcher(watcher);
213   return true;
214 }
215 
Run(Delegate * delegate)216 void MessagePumpEpoll::Run(Delegate* delegate) {
217   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
218   RunState run_state(delegate);
219   AutoReset<raw_ptr<RunState>> auto_reset_run_state(&run_state_, &run_state);
220   for (;;) {
221     // Do some work and see if the next task is ready right away.
222     Delegate::NextWorkInfo next_work_info = delegate->DoWork();
223     const bool immediate_work_available = next_work_info.is_immediate();
224     if (run_state.should_quit) {
225       break;
226     }
227 
228     if (next_work_info.recent_now > next_metrics_time_) {
229       RecordPeriodicMetrics();
230     }
231 
232     // Reset the native work flag before processing IO events.
233     native_work_started_ = false;
234 
235     // Process any immediately ready IO event, but don't sleep yet.
236     // Process epoll events until none is available without blocking or
237     // the maximum number of iterations is reached. The maximum number of
238     // iterations when `g_use_batched_version` is true was chosen so that
239     // all available events are dispatched 95% of the time in local tests.
240     // The maximum is not infinite because we want to yield to application
241     // tasks at some point.
242     bool did_native_work = false;
243     const int max_iterations =
244         g_use_batched_version.load(std::memory_order_relaxed) ? 16 : 1;
245     for (int i = 0; i < max_iterations; ++i) {
246       if (!WaitForEpollEvents(TimeDelta())) {
247         break;
248       }
249       did_native_work = true;
250     }
251 
252     bool attempt_more_work = immediate_work_available || did_native_work;
253 
254     if (run_state.should_quit) {
255       break;
256     }
257     if (attempt_more_work) {
258       continue;
259     }
260 
261     delegate->DoIdleWork();
262     if (run_state.should_quit) {
263       break;
264     }
265 
266     TimeDelta next_metrics_delay =
267         next_metrics_time_ - next_work_info.recent_now;
268     TimeDelta timeout = TimeDelta::Max();
269     DCHECK(!next_work_info.delayed_run_time.is_null());
270     if (!next_work_info.delayed_run_time.is_max()) {
271       timeout = next_work_info.remaining_delay();
272     }
273     if (timeout > next_metrics_delay) {
274       timeout = next_metrics_delay;
275       // Ensure we never get a negative timeout from the next_metrics_delay as
276       // this will cause epoll to block indefinitely if no fds are signaled,
277       // preventing existing non-fd tasks from running.
278       if (timeout < base::Milliseconds(0)) {
279         timeout = base::Milliseconds(0);
280       }
281     }
282     delegate->BeforeWait();
283     WaitForEpollEvents(timeout);
284     if (run_state.should_quit) {
285       break;
286     }
287   }
288 }
289 
RecordPeriodicMetrics()290 void MessagePumpEpoll::RecordPeriodicMetrics() {
291   UMA_HISTOGRAM_COUNTS_1000("MessagePumpEpoll.WatchedFileDescriptors",
292                             (int)entries_.size());
293   next_metrics_time_ += base::Minutes(1);
294 }
295 
Quit()296 void MessagePumpEpoll::Quit() {
297   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
298   DCHECK(run_state_) << "Quit() called outside of Run()";
299   run_state_->should_quit = true;
300 }
301 
ScheduleWork()302 void MessagePumpEpoll::ScheduleWork() {
303   const uint64_t value = 1;
304   ssize_t n = HANDLE_EINTR(write(wake_event_.get(), &value, sizeof(value)));
305 
306   // EAGAIN here implies that the write() would overflow of the event counter,
307   // which is a condition we can safely ignore. It implies that the event
308   // counter is non-zero and therefore readable, which is enough to ensure that
309   // any pending wait eventually wakes up.
310   DPCHECK(n == sizeof(value) || errno == EAGAIN);
311 }
312 
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)313 void MessagePumpEpoll::ScheduleDelayedWork(
314     const Delegate::NextWorkInfo& next_work_info) {
315   // Nothing to do. This can only be called from the same thread as Run(), so
316   // the pump must be in between waits. The scheduled work therefore will be
317   // seen in time for the next wait.
318 }
319 
AddEpollEvent(EpollEventEntry & entry)320 void MessagePumpEpoll::AddEpollEvent(EpollEventEntry& entry) {
321   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
322   DCHECK(!entry.stopped);
323   const uint32_t events = entry.ComputeActiveEvents();
324   epoll_event event{.events = events, .data = {.ptr = &entry}};
325   int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, entry.fd, &event);
326 #if DCHECK_IS_ON()
327   // TODO(361611793): Remove these debug logs after resolving the issue.
328   if (rv != 0) {
329     for (auto& history : entry.epoll_history_) {
330       if (history.event) {
331         auto& e = history.event.value();
332         LOG(ERROR) << "events=0x" << std::hex << std::setfill('0')
333                    << std::setw(8) << e.events;
334         LOG(ERROR) << "data=0x" << std::hex << std::setfill('0')
335                    << std::setw(16) << e.data.u64;
336       }
337       LOG(ERROR) << history.stack_trace;
338     }
339   } else {
340     entry.PushEpollHistory(std::make_optional(event));
341   }
342 #endif
343   DPCHECK(rv == 0);
344   entry.registered_events = events;
345 
346   DCHECK(FindPollEntry(entry.fd) == pollfds_.end());
347   struct pollfd poll_entry;
348   poll_entry.fd = entry.fd;
349   poll_entry.revents = 0;
350   SetEventsForPoll(events, &poll_entry);
351 
352   pollfds_.push_back(poll_entry);
353 }
354 
UpdateEpollEvent(EpollEventEntry & entry)355 void MessagePumpEpoll::UpdateEpollEvent(EpollEventEntry& entry) {
356   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
357   const uint32_t events = entry.ComputeActiveEvents();
358   if (!entry.stopped) {
359     if (events == 0) {
360       // There is no active interest now.
361       // We don't have to call epoll_ctl() if the last event was registered as
362       // one-shot since the fd has already been disabled.
363       if (!(entry.registered_events & EPOLLONESHOT)) {
364         // The fd is still enabled. We need to disable it but don't remove the
365         // entry from `entries_` to keep the reference alive because handling
366         // the entry isn't finished yet.
367         StopEpollEvent(entry);
368       } else {
369         // No work needs to be done for epoll, but for poll we have to implement
370         // the equivalent of oneshot ourselves by unregistering for all events.
371         auto poll_entry = FindPollEntry(entry.fd);
372         CHECK(poll_entry != pollfds_.end());
373         poll_entry->events = 0;
374       }
375       return;
376     }
377     if (events == entry.registered_events && !(events & EPOLLONESHOT)) {
378       // Persistent events don't need to be modified if no bits are changing.
379       return;
380     }
381     epoll_event event{.events = events, .data = {.ptr = &entry}};
382     int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_MOD, entry.fd, &event);
383     DPCHECK(rv == 0);
384 #if DCHECK_IS_ON()
385     entry.PushEpollHistory(std::make_optional(event));
386 #endif
387     entry.registered_events = events;
388 
389     auto poll_entry = FindPollEntry(entry.fd);
390     CHECK(poll_entry != pollfds_.end());
391     SetEventsForPoll(events, &(*poll_entry));
392   } else if (events != 0) {
393     // An interest for the fd has been reactivated. Re-enable the fd.
394     entry.stopped = false;
395     AddEpollEvent(entry);
396   }
397 }
398 
StopEpollEvent(EpollEventEntry & entry)399 void MessagePumpEpoll::StopEpollEvent(EpollEventEntry& entry) {
400   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
401   if (!entry.stopped) {
402     int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_DEL, entry.fd, nullptr);
403     DPCHECK(rv == 0);
404 #if DCHECK_IS_ON()
405     entry.PushEpollHistory(std::nullopt);
406 #endif
407     entry.stopped = true;
408     entry.registered_events = 0;
409     RemovePollEntry(entry.fd);
410   }
411 }
412 
UnregisterInterest(const scoped_refptr<Interest> & interest)413 void MessagePumpEpoll::UnregisterInterest(
414     const scoped_refptr<Interest>& interest) {
415   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
416 
417   const int fd = interest->params().fd;
418   auto entry_it = entries_.find(fd);
419   CHECK(entry_it != entries_.end(), base::NotFatalUntil::M125);
420 
421   EpollEventEntry& entry = entry_it->second;
422   auto& interests = entry.interests;
423   auto* it = ranges::find(interests, interest);
424   CHECK(it != interests.end(), base::NotFatalUntil::M125);
425   interests.erase(it);
426 
427   if (interests.empty()) {
428     StopEpollEvent(entry);
429     entries_.erase(entry_it);
430   } else {
431     UpdateEpollEvent(entry);
432   }
433 }
434 
WaitForEpollEvents(TimeDelta timeout)435 bool MessagePumpEpoll::WaitForEpollEvents(TimeDelta timeout) {
436   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
437 
438   // `timeout` has microsecond resolution, but timeouts accepted by epoll_wait()
439   // are integral milliseconds. Round up to the next millisecond.
440   // TODO(crbug.com/40245876): Consider higher-resolution timeouts.
441   const int epoll_timeout =
442       timeout.is_max() ? -1
443                        : saturated_cast<int>(timeout.InMillisecondsRoundedUp());
444 
445   // Used in the "epoll" code path.
446   epoll_event epoll_events[16];
447   // Used in the "poll" code path.
448   std::vector<epoll_event> poll_events;
449   // Will refer to `events` or `events_vector` depending on which
450   // code path is taken.
451   span<epoll_event> ready_events;
452 
453   // When there are many FDs, epoll() can be significantly faster as poll needs
454   // to iterate through the list of watched fds. This value is pretty arbitrary,
455   // the internet suggests that under 1000 fds that epoll isn't noticeably
456   // faster than poll but this isn't easy to empirically measure.
457   bool use_poll =
458       g_use_poll.load(std::memory_order_relaxed) && entries_.size() < 500;
459 
460   if (use_poll) {
461     if (!GetEventsPoll(epoll_timeout, &poll_events)) {
462       return false;
463     }
464     ready_events = span(poll_events).first(poll_events.size());
465   } else {
466     const int epoll_result = epoll_wait(epoll_.get(), epoll_events,
467                                         std::size(epoll_events), epoll_timeout);
468     if (epoll_result < 0) {
469       DPCHECK(errno == EINTR);
470       return false;
471     }
472     if (epoll_result == 0) {
473       return false;
474     }
475 
476     ready_events =
477         span(epoll_events).first(base::checked_cast<size_t>(epoll_result));
478   }
479 
480   for (epoll_event& e : ready_events) {
481     if (e.data.ptr == &wake_event_) {
482       // Wake-up events are always safe to handle immediately. Unlike other
483       // events used by MessagePumpEpoll they also don't point to an
484       // EpollEventEntry, so we handle them separately here.
485       HandleWakeUp();
486       e.data.ptr = nullptr;
487       continue;
488     }
489 
490     // To guard against one of the ready events unregistering and thus
491     // invalidating one of the others here, first link each entry to the
492     // corresponding epoll_event returned by epoll_wait(). We do this before
493     // dispatching any events, and the second pass below will only dispatch an
494     // event if its epoll_event data is still valid.
495     auto& entry = EpollEventEntry::FromEpollEvent(e);
496     DCHECK(!entry.active_event);
497     EpollEventEntry::FromEpollEvent(e).active_event = &e;
498   }
499 
500   for (auto& e : ready_events) {
501     if (e.data.ptr) {
502       auto& entry = EpollEventEntry::FromEpollEvent(e);
503       entry.active_event = nullptr;
504       OnEpollEvent(entry, e.events);
505     }
506   }
507 
508   return true;
509 }
510 
FindPollEntry(int fd)511 std::vector<struct pollfd>::iterator MessagePumpEpoll::FindPollEntry(int fd) {
512   return std::find_if(
513       pollfds_.begin(), pollfds_.end(),
514       [fd](const struct pollfd poll_entry) { return poll_entry.fd == fd; });
515 }
516 
RemovePollEntry(int fd)517 void MessagePumpEpoll::RemovePollEntry(int fd) {
518   pollfds_.erase(FindPollEntry(fd));
519 }
520 
GetEventsPoll(int epoll_timeout,std::vector<epoll_event> * epoll_events)521 bool MessagePumpEpoll::GetEventsPoll(int epoll_timeout,
522                                      std::vector<epoll_event>* epoll_events) {
523   int retval = poll(&pollfds_[0], base::checked_cast<nfds_t>(pollfds_.size()),
524                     epoll_timeout);
525   if (retval < 0) {
526     DPCHECK(errno == EINTR);
527     return false;
528   }
529   // Nothing to do, timeout.
530   if (retval == 0) {
531     return false;
532   }
533 
534   for (struct pollfd& pollfd_entry : pollfds_) {
535     if (pollfd_entry.revents == 0) {
536       continue;
537     }
538 
539     epoll_event event;
540     memset(&event, 0, sizeof(event));
541 
542     if (pollfd_entry.fd == wake_event_.get()) {
543       event.data.ptr = &wake_event_;
544     } else {
545       auto entry = entries_.find(pollfd_entry.fd);
546       CHECK(entry != entries_.end());
547       event.data.ptr = &(entry->second);
548     }
549 
550     for (const auto& epoll_poll : kEpollToPollEvents) {
551       if (pollfd_entry.revents & epoll_poll.second) {
552         event.events |= epoll_poll.first;
553       }
554     }
555     epoll_events->push_back(event);
556     pollfd_entry.revents = 0;
557   }
558   return true;
559 }
560 
OnEpollEvent(EpollEventEntry & entry,uint32_t events)561 void MessagePumpEpoll::OnEpollEvent(EpollEventEntry& entry, uint32_t events) {
562   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
563   DCHECK(!entry.stopped);
564 
565   const bool readable = (events & EPOLLIN) != 0;
566   const bool writable = (events & EPOLLOUT) != 0;
567 
568   // Under different circumstances, peer closure may raise both/either EPOLLHUP
569   // and/or EPOLLERR. Treat them as equivalent. Notify the watchers to
570   // gracefully stop watching if disconnected.
571   const bool disconnected = (events & (EPOLLHUP | EPOLLERR)) != 0;
572   DCHECK(readable || writable || disconnected);
573 
574   // Copy the set of Interests, since interests may be added to or removed from
575   // `entry` during the loop below. This copy is inexpensive in practice
576   // because the size of this vector is expected to be very small (<= 2).
577   auto interests = entry.interests;
578 
579   // Any of these interests' event handlers may destroy any of the others'
580   // controllers. Start all of them watching for destruction before we actually
581   // dispatch any events.
582   for (const auto& interest : interests) {
583     interest->WatchForControllerDestruction();
584   }
585 
586   bool event_handled = false;
587   for (const auto& interest : interests) {
588     if (!interest->active()) {
589       continue;
590     }
591 
592     const bool one_shot = interest->params().one_shot;
593     const bool can_read = (readable || disconnected) && interest->params().read;
594     const bool can_write = (writable || disconnected) &&
595                            interest->params().write && (!one_shot || !can_read);
596     if (!can_read && !can_write) {
597       // If this Interest is active but not watching for whichever event was
598       // raised here, there's nothing to do. This can occur if a descriptor has
599       // multiple active interests, since only one interest needs to be
600       // satisfied in order for us to process an epoll event.
601       continue;
602     }
603 
604     if (interest->params().one_shot) {
605       // This is a one-shot event watch which is about to be triggered. We
606       // deactivate the interest and update epoll immediately. The event handler
607       // may reactivate it.
608       interest->set_active(false);
609       UpdateEpollEvent(entry);
610     }
611 
612     if (!interest->was_controller_destroyed()) {
613       HandleEvent(entry.fd, can_read, can_write, interest->controller());
614       event_handled = true;
615     }
616   }
617 
618   // Stop `EpollEventEntry` for disconnected file descriptor without active
619   // interests.
620   if (disconnected && !event_handled) {
621     StopEpollEvent(entry);
622   }
623 
624   for (const auto& interest : interests) {
625     interest->StopWatchingForControllerDestruction();
626   }
627 }
628 
HandleEvent(int fd,bool can_read,bool can_write,FdWatchController * controller)629 void MessagePumpEpoll::HandleEvent(int fd,
630                                    bool can_read,
631                                    bool can_write,
632                                    FdWatchController* controller) {
633   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
634   BeginNativeWorkBatch();
635   // Make the MessagePumpDelegate aware of this other form of "DoWork". Skip if
636   // HandleNotification() is called outside of Run() (e.g. in unit tests).
637   Delegate::ScopedDoWorkItem scoped_do_work_item;
638   if (run_state_) {
639     scoped_do_work_item = run_state_->delegate->BeginWorkItem();
640   }
641 
642   // Trace events must begin after the above BeginWorkItem() so that the
643   // ensuing "ThreadController active" outscopes all the events under it.
644   TRACE_EVENT("toplevel", "EpollEvent", "controller_created_from",
645               controller->created_from_location(), "fd", fd, "can_read",
646               can_read, "can_write", can_write, "context",
647               static_cast<void*>(controller));
648   TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION heap_profiler_scope(
649       controller->created_from_location().file_name());
650   if (can_read && can_write) {
651     bool controller_was_destroyed = false;
652     bool* previous_was_destroyed_flag =
653         std::exchange(controller->was_destroyed_, &controller_was_destroyed);
654 
655     controller->OnFdWritable();
656     if (!controller_was_destroyed) {
657       controller->OnFdReadable();
658     }
659     if (!controller_was_destroyed) {
660       controller->was_destroyed_ = previous_was_destroyed_flag;
661     } else if (previous_was_destroyed_flag) {
662       *previous_was_destroyed_flag = true;
663     }
664   } else if (can_write) {
665     controller->OnFdWritable();
666   } else if (can_read) {
667     controller->OnFdReadable();
668   }
669 }
670 
HandleWakeUp()671 void MessagePumpEpoll::HandleWakeUp() {
672   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
673   BeginNativeWorkBatch();
674   uint64_t value;
675   ssize_t n = HANDLE_EINTR(read(wake_event_.get(), &value, sizeof(value)));
676   DPCHECK(n == sizeof(value));
677 }
678 
BeginNativeWorkBatch()679 void MessagePumpEpoll::BeginNativeWorkBatch() {
680   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
681   // Call `BeginNativeWorkBeforeDoWork()` if native work hasn't started.
682   if (!native_work_started_) {
683     if (run_state_) {
684       run_state_->delegate->BeginNativeWorkBeforeDoWork();
685     }
686     native_work_started_ = true;
687   }
688 }
689 
EpollEventEntry(int fd)690 MessagePumpEpoll::EpollEventEntry::EpollEventEntry(int fd) : fd(fd) {}
691 
~EpollEventEntry()692 MessagePumpEpoll::EpollEventEntry::~EpollEventEntry() {
693   if (active_event) {
694     DCHECK_EQ(this, active_event->data.ptr);
695     active_event->data.ptr = nullptr;
696   }
697 }
698 
ComputeActiveEvents() const699 uint32_t MessagePumpEpoll::EpollEventEntry::ComputeActiveEvents() const {
700   uint32_t events = 0;
701   bool one_shot = true;
702   for (const auto& interest : interests) {
703     if (!interest->active()) {
704       continue;
705     }
706     const InterestParams& params = interest->params();
707     events |= (params.read ? EPOLLIN : 0) | (params.write ? EPOLLOUT : 0);
708     one_shot &= params.one_shot;
709   }
710   if (events != 0 && one_shot) {
711     return events | EPOLLONESHOT;
712   }
713   return events;
714 }
715 
FdWatchController(const Location & from_here)716 MessagePumpEpoll::FdWatchController::FdWatchController(
717     const Location& from_here)
718     : FdWatchControllerInterface(from_here) {}
719 
~FdWatchController()720 MessagePumpEpoll::FdWatchController::~FdWatchController() {
721   CHECK(StopWatchingFileDescriptor());
722   if (was_destroyed_) {
723     DCHECK(!*was_destroyed_);
724     *was_destroyed_ = true;
725   }
726 }
727 
StopWatchingFileDescriptor()728 bool MessagePumpEpoll::FdWatchController::StopWatchingFileDescriptor() {
729   watcher_ = nullptr;
730   if (pump_ && interest_) {
731     pump_->UnregisterInterest(interest_);
732     interest_.reset();
733     pump_.reset();
734   }
735   return true;
736 }
737 
738 const scoped_refptr<MessagePumpEpoll::Interest>&
AssignInterest(const InterestParams & params)739 MessagePumpEpoll::FdWatchController::AssignInterest(
740     const InterestParams& params) {
741   interest_ = MakeRefCounted<Interest>(this, params);
742   return interest_;
743 }
744 
ClearInterest()745 void MessagePumpEpoll::FdWatchController::ClearInterest() {
746   interest_.reset();
747 }
748 
OnFdReadable()749 void MessagePumpEpoll::FdWatchController::OnFdReadable() {
750   if (!watcher_) {
751     // When a watcher is watching both read and write and both are possible, the
752     // pump will call OnFdWritable() first, followed by OnFdReadable(). But
753     // OnFdWritable() may stop or destroy the watch. If the watch is destroyed,
754     // the pump will not call OnFdReadable() at all, but if it's merely stopped,
755     // OnFdReadable() will be called while `watcher_` is  null. In this case we
756     // don't actually want to call the client.
757     return;
758   }
759   watcher_->OnFileCanReadWithoutBlocking(interest_->params().fd);
760 }
761 
OnFdWritable()762 void MessagePumpEpoll::FdWatchController::OnFdWritable() {
763   DCHECK(watcher_);
764   watcher_->OnFileCanWriteWithoutBlocking(interest_->params().fd);
765 }
766 
767 }  // namespace base
768