1 // Copyright 2022 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_loop/message_pump_epoll.h"
6
7 #include <sys/eventfd.h>
8
9 #include <cstddef>
10 #include <cstdint>
11 #include <optional>
12 #include <utility>
13
14 #include "base/auto_reset.h"
15 #include "base/check_op.h"
16 #include "base/feature_list.h"
17 #include "base/memory/raw_ptr.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/metrics/histogram_macros.h"
20 #include "base/numerics/safe_conversions.h"
21 #include "base/posix/eintr_wrapper.h"
22 #include "base/ranges/algorithm.h"
23 #include "base/threading/thread_checker.h"
24 #include "base/time/time.h"
25 #include "base/trace_event/base_tracing.h"
26
27 #if DCHECK_IS_ON()
28 #include <iomanip>
29 #endif
30
31 namespace base {
32
33 namespace {
34
35 // Under this feature native work is batched.
36 BASE_FEATURE(kBatchNativeEventsInMessagePumpEpoll,
37 "BatchNativeEventsInMessagePumpEpoll",
38 base::FEATURE_DISABLED_BY_DEFAULT);
39
40 // Caches the state of the "BatchNativeEventsInMessagePumpEpoll".
41 std::atomic_bool g_use_batched_version = false;
42 std::atomic_bool g_use_poll = false;
43
44 constexpr std::pair<uint32_t, short int> kEpollToPollEvents[] = {
45 {EPOLLIN, POLLIN}, {EPOLLOUT, POLLOUT}, {EPOLLRDHUP, POLLRDHUP},
46 {EPOLLPRI, POLLPRI}, {EPOLLERR, POLLERR}, {EPOLLHUP, POLLHUP}};
47
SetEventsForPoll(const uint32_t epoll_events,struct pollfd * poll_entry)48 void SetEventsForPoll(const uint32_t epoll_events, struct pollfd* poll_entry) {
49 poll_entry->events = 0;
50 for (const auto& epoll_poll : kEpollToPollEvents) {
51 if (epoll_events & epoll_poll.first) {
52 poll_entry->events |= epoll_poll.second;
53 }
54 }
55 }
56 } // namespace
57
58 // Parameters used to construct and describe an interest.
59 struct MessagePumpEpoll::InterestParams {
60 // The file descriptor of interest.
61 int fd;
62
63 // Indicates an interest in being able to read() from `fd`.
64 bool read;
65
66 // Indicates an interest in being able to write() to `fd`.
67 bool write;
68
69 // Indicates whether this interest is a one-shot interest, meaning that it
70 // must be automatically deactivated every time it triggers an epoll event.
71 bool one_shot;
72
IsEqualbase::MessagePumpEpoll::InterestParams73 bool IsEqual(const InterestParams& rhs) const {
74 return std::tie(fd, read, write, one_shot) ==
75 std::tie(rhs.fd, rhs.read, rhs.write, rhs.one_shot);
76 }
77 };
78
79 // Represents a single controller's interest in a file descriptor via epoll,
80 // and tracks whether that interest is currently active. Though an interest
81 // persists as long as its controller is alive and hasn't changed interests,
82 // it only participates in epoll waits while active.
83 class MessagePumpEpoll::Interest : public RefCounted<Interest> {
84 public:
Interest(FdWatchController * controller,const InterestParams & params)85 Interest(FdWatchController* controller, const InterestParams& params)
86 : controller_(controller), params_(params) {}
87
88 Interest(const Interest&) = delete;
89 Interest& operator=(const Interest&) = delete;
90
controller()91 FdWatchController* controller() { return controller_; }
params() const92 const InterestParams& params() const { return params_; }
93
active() const94 bool active() const { return active_; }
set_active(bool active)95 void set_active(bool active) { active_ = active; }
96
97 // Only meaningful between WatchForControllerDestruction() and
98 // StopWatchingForControllerDestruction().
was_controller_destroyed() const99 bool was_controller_destroyed() const { return was_controller_destroyed_; }
100
WatchForControllerDestruction()101 void WatchForControllerDestruction() {
102 DCHECK_GE(nested_controller_destruction_watchers_, 0);
103 if (nested_controller_destruction_watchers_ == 0) {
104 DCHECK(!controller_->was_destroyed_);
105 controller_->was_destroyed_ = &was_controller_destroyed_;
106 } else {
107 // If this is a nested event we should already be watching `controller_`
108 // for destruction from an outer event handler.
109 DCHECK_EQ(controller_->was_destroyed_, &was_controller_destroyed_);
110 }
111 ++nested_controller_destruction_watchers_;
112 }
113
StopWatchingForControllerDestruction()114 void StopWatchingForControllerDestruction() {
115 --nested_controller_destruction_watchers_;
116 DCHECK_GE(nested_controller_destruction_watchers_, 0);
117 if (nested_controller_destruction_watchers_ == 0 &&
118 !was_controller_destroyed_) {
119 DCHECK_EQ(controller_->was_destroyed_, &was_controller_destroyed_);
120 controller_->was_destroyed_ = nullptr;
121 }
122 }
123
124 private:
125 friend class RefCounted<Interest>;
126 ~Interest() = default;
127
128 const raw_ptr<FdWatchController, DanglingUntriaged> controller_;
129 const InterestParams params_;
130 bool active_ = true;
131 bool was_controller_destroyed_ = false;
132
133 // Avoid resetting `controller_->was_destroyed` when nested destruction
134 // watchers are active.
135 int nested_controller_destruction_watchers_ = 0;
136 };
137
MessagePumpEpoll()138 MessagePumpEpoll::MessagePumpEpoll() {
139 epoll_.reset(epoll_create1(/*flags=*/0));
140 PCHECK(epoll_.is_valid());
141
142 wake_event_.reset(eventfd(0, EFD_NONBLOCK));
143 PCHECK(wake_event_.is_valid());
144
145 epoll_event wake{.events = EPOLLIN, .data = {.ptr = &wake_event_}};
146 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, wake_event_.get(), &wake);
147 PCHECK(rv == 0);
148
149 struct pollfd poll_entry;
150 poll_entry.fd = wake_event_.get();
151 poll_entry.events = POLLIN;
152 poll_entry.revents = 0;
153 pollfds_.push_back(poll_entry);
154
155 next_metrics_time_ = base::TimeTicks::Now() + base::Minutes(1);
156 }
157
158 MessagePumpEpoll::~MessagePumpEpoll() = default;
159
InitializeFeatures()160 void MessagePumpEpoll::InitializeFeatures() {
161 // Relaxed memory order since no memory access depends on value.
162 g_use_batched_version.store(
163 base::FeatureList::IsEnabled(kBatchNativeEventsInMessagePumpEpoll),
164 std::memory_order_relaxed);
165 g_use_poll.store(base::FeatureList::IsEnabled(kUsePollForMessagePumpEpoll),
166 std::memory_order_relaxed);
167 }
168
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * watcher)169 bool MessagePumpEpoll::WatchFileDescriptor(int fd,
170 bool persistent,
171 int mode,
172 FdWatchController* controller,
173 FdWatcher* watcher) {
174 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
175 TRACE_EVENT("base", "MessagePumpEpoll::WatchFileDescriptor", "fd", fd,
176 "persistent", persistent, "watch_read", mode & WATCH_READ,
177 "watch_write", mode & WATCH_WRITE);
178
179 const InterestParams params{
180 .fd = fd,
181 .read = (mode == WATCH_READ || mode == WATCH_READ_WRITE),
182 .write = (mode == WATCH_WRITE || mode == WATCH_READ_WRITE),
183 .one_shot = !persistent,
184 };
185
186 auto [it, is_new_fd_entry] = entries_.emplace(fd, fd);
187 EpollEventEntry& entry = it->second;
188 scoped_refptr<Interest> existing_interest = controller->interest();
189 if (existing_interest && existing_interest->params().IsEqual(params)) {
190 // WatchFileDescriptor() has already been called for this controller at
191 // least once before, and as in the most common cases, it is now being
192 // called again with the same parameters.
193 //
194 // We don't need to allocate and register a new Interest in this case, but
195 // we can instead reactivate the existing (presumably deactivated,
196 // non-persistent) Interest.
197 existing_interest->set_active(true);
198 } else {
199 entry.interests.push_back(controller->AssignInterest(params));
200 if (existing_interest) {
201 UnregisterInterest(existing_interest);
202 }
203 }
204
205 if (is_new_fd_entry) {
206 AddEpollEvent(entry);
207 } else {
208 UpdateEpollEvent(entry);
209 }
210
211 controller->set_pump(weak_ptr_factory_.GetWeakPtr());
212 controller->set_watcher(watcher);
213 return true;
214 }
215
Run(Delegate * delegate)216 void MessagePumpEpoll::Run(Delegate* delegate) {
217 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
218 RunState run_state(delegate);
219 AutoReset<raw_ptr<RunState>> auto_reset_run_state(&run_state_, &run_state);
220 for (;;) {
221 // Do some work and see if the next task is ready right away.
222 Delegate::NextWorkInfo next_work_info = delegate->DoWork();
223 const bool immediate_work_available = next_work_info.is_immediate();
224 if (run_state.should_quit) {
225 break;
226 }
227
228 if (next_work_info.recent_now > next_metrics_time_) {
229 RecordPeriodicMetrics();
230 }
231
232 // Reset the native work flag before processing IO events.
233 native_work_started_ = false;
234
235 // Process any immediately ready IO event, but don't sleep yet.
236 // Process epoll events until none is available without blocking or
237 // the maximum number of iterations is reached. The maximum number of
238 // iterations when `g_use_batched_version` is true was chosen so that
239 // all available events are dispatched 95% of the time in local tests.
240 // The maximum is not infinite because we want to yield to application
241 // tasks at some point.
242 bool did_native_work = false;
243 const int max_iterations =
244 g_use_batched_version.load(std::memory_order_relaxed) ? 16 : 1;
245 for (int i = 0; i < max_iterations; ++i) {
246 if (!WaitForEpollEvents(TimeDelta())) {
247 break;
248 }
249 did_native_work = true;
250 }
251
252 bool attempt_more_work = immediate_work_available || did_native_work;
253
254 if (run_state.should_quit) {
255 break;
256 }
257 if (attempt_more_work) {
258 continue;
259 }
260
261 delegate->DoIdleWork();
262 if (run_state.should_quit) {
263 break;
264 }
265
266 TimeDelta next_metrics_delay =
267 next_metrics_time_ - next_work_info.recent_now;
268 TimeDelta timeout = TimeDelta::Max();
269 DCHECK(!next_work_info.delayed_run_time.is_null());
270 if (!next_work_info.delayed_run_time.is_max()) {
271 timeout = next_work_info.remaining_delay();
272 }
273 if (timeout > next_metrics_delay) {
274 timeout = next_metrics_delay;
275 // Ensure we never get a negative timeout from the next_metrics_delay as
276 // this will cause epoll to block indefinitely if no fds are signaled,
277 // preventing existing non-fd tasks from running.
278 if (timeout < base::Milliseconds(0)) {
279 timeout = base::Milliseconds(0);
280 }
281 }
282 delegate->BeforeWait();
283 WaitForEpollEvents(timeout);
284 if (run_state.should_quit) {
285 break;
286 }
287 }
288 }
289
RecordPeriodicMetrics()290 void MessagePumpEpoll::RecordPeriodicMetrics() {
291 UMA_HISTOGRAM_COUNTS_1000("MessagePumpEpoll.WatchedFileDescriptors",
292 (int)entries_.size());
293 next_metrics_time_ += base::Minutes(1);
294 }
295
Quit()296 void MessagePumpEpoll::Quit() {
297 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
298 DCHECK(run_state_) << "Quit() called outside of Run()";
299 run_state_->should_quit = true;
300 }
301
ScheduleWork()302 void MessagePumpEpoll::ScheduleWork() {
303 const uint64_t value = 1;
304 ssize_t n = HANDLE_EINTR(write(wake_event_.get(), &value, sizeof(value)));
305
306 // EAGAIN here implies that the write() would overflow of the event counter,
307 // which is a condition we can safely ignore. It implies that the event
308 // counter is non-zero and therefore readable, which is enough to ensure that
309 // any pending wait eventually wakes up.
310 DPCHECK(n == sizeof(value) || errno == EAGAIN);
311 }
312
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)313 void MessagePumpEpoll::ScheduleDelayedWork(
314 const Delegate::NextWorkInfo& next_work_info) {
315 // Nothing to do. This can only be called from the same thread as Run(), so
316 // the pump must be in between waits. The scheduled work therefore will be
317 // seen in time for the next wait.
318 }
319
AddEpollEvent(EpollEventEntry & entry)320 void MessagePumpEpoll::AddEpollEvent(EpollEventEntry& entry) {
321 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
322 DCHECK(!entry.stopped);
323 const uint32_t events = entry.ComputeActiveEvents();
324 epoll_event event{.events = events, .data = {.ptr = &entry}};
325 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, entry.fd, &event);
326 #if DCHECK_IS_ON()
327 // TODO(361611793): Remove these debug logs after resolving the issue.
328 if (rv != 0) {
329 for (auto& history : entry.epoll_history_) {
330 if (history.event) {
331 auto& e = history.event.value();
332 LOG(ERROR) << "events=0x" << std::hex << std::setfill('0')
333 << std::setw(8) << e.events;
334 LOG(ERROR) << "data=0x" << std::hex << std::setfill('0')
335 << std::setw(16) << e.data.u64;
336 }
337 LOG(ERROR) << history.stack_trace;
338 }
339 } else {
340 entry.PushEpollHistory(std::make_optional(event));
341 }
342 #endif
343 DPCHECK(rv == 0);
344 entry.registered_events = events;
345
346 DCHECK(FindPollEntry(entry.fd) == pollfds_.end());
347 struct pollfd poll_entry;
348 poll_entry.fd = entry.fd;
349 poll_entry.revents = 0;
350 SetEventsForPoll(events, &poll_entry);
351
352 pollfds_.push_back(poll_entry);
353 }
354
UpdateEpollEvent(EpollEventEntry & entry)355 void MessagePumpEpoll::UpdateEpollEvent(EpollEventEntry& entry) {
356 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
357 const uint32_t events = entry.ComputeActiveEvents();
358 if (!entry.stopped) {
359 if (events == 0) {
360 // There is no active interest now.
361 // We don't have to call epoll_ctl() if the last event was registered as
362 // one-shot since the fd has already been disabled.
363 if (!(entry.registered_events & EPOLLONESHOT)) {
364 // The fd is still enabled. We need to disable it but don't remove the
365 // entry from `entries_` to keep the reference alive because handling
366 // the entry isn't finished yet.
367 StopEpollEvent(entry);
368 } else {
369 // No work needs to be done for epoll, but for poll we have to implement
370 // the equivalent of oneshot ourselves by unregistering for all events.
371 auto poll_entry = FindPollEntry(entry.fd);
372 CHECK(poll_entry != pollfds_.end());
373 poll_entry->events = 0;
374 }
375 return;
376 }
377 if (events == entry.registered_events && !(events & EPOLLONESHOT)) {
378 // Persistent events don't need to be modified if no bits are changing.
379 return;
380 }
381 epoll_event event{.events = events, .data = {.ptr = &entry}};
382 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_MOD, entry.fd, &event);
383 DPCHECK(rv == 0);
384 #if DCHECK_IS_ON()
385 entry.PushEpollHistory(std::make_optional(event));
386 #endif
387 entry.registered_events = events;
388
389 auto poll_entry = FindPollEntry(entry.fd);
390 CHECK(poll_entry != pollfds_.end());
391 SetEventsForPoll(events, &(*poll_entry));
392 } else if (events != 0) {
393 // An interest for the fd has been reactivated. Re-enable the fd.
394 entry.stopped = false;
395 AddEpollEvent(entry);
396 }
397 }
398
StopEpollEvent(EpollEventEntry & entry)399 void MessagePumpEpoll::StopEpollEvent(EpollEventEntry& entry) {
400 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
401 if (!entry.stopped) {
402 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_DEL, entry.fd, nullptr);
403 DPCHECK(rv == 0);
404 #if DCHECK_IS_ON()
405 entry.PushEpollHistory(std::nullopt);
406 #endif
407 entry.stopped = true;
408 entry.registered_events = 0;
409 RemovePollEntry(entry.fd);
410 }
411 }
412
UnregisterInterest(const scoped_refptr<Interest> & interest)413 void MessagePumpEpoll::UnregisterInterest(
414 const scoped_refptr<Interest>& interest) {
415 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
416
417 const int fd = interest->params().fd;
418 auto entry_it = entries_.find(fd);
419 CHECK(entry_it != entries_.end(), base::NotFatalUntil::M125);
420
421 EpollEventEntry& entry = entry_it->second;
422 auto& interests = entry.interests;
423 auto* it = ranges::find(interests, interest);
424 CHECK(it != interests.end(), base::NotFatalUntil::M125);
425 interests.erase(it);
426
427 if (interests.empty()) {
428 StopEpollEvent(entry);
429 entries_.erase(entry_it);
430 } else {
431 UpdateEpollEvent(entry);
432 }
433 }
434
WaitForEpollEvents(TimeDelta timeout)435 bool MessagePumpEpoll::WaitForEpollEvents(TimeDelta timeout) {
436 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
437
438 // `timeout` has microsecond resolution, but timeouts accepted by epoll_wait()
439 // are integral milliseconds. Round up to the next millisecond.
440 // TODO(crbug.com/40245876): Consider higher-resolution timeouts.
441 const int epoll_timeout =
442 timeout.is_max() ? -1
443 : saturated_cast<int>(timeout.InMillisecondsRoundedUp());
444
445 // Used in the "epoll" code path.
446 epoll_event epoll_events[16];
447 // Used in the "poll" code path.
448 std::vector<epoll_event> poll_events;
449 // Will refer to `events` or `events_vector` depending on which
450 // code path is taken.
451 span<epoll_event> ready_events;
452
453 // When there are many FDs, epoll() can be significantly faster as poll needs
454 // to iterate through the list of watched fds. This value is pretty arbitrary,
455 // the internet suggests that under 1000 fds that epoll isn't noticeably
456 // faster than poll but this isn't easy to empirically measure.
457 bool use_poll =
458 g_use_poll.load(std::memory_order_relaxed) && entries_.size() < 500;
459
460 if (use_poll) {
461 if (!GetEventsPoll(epoll_timeout, &poll_events)) {
462 return false;
463 }
464 ready_events = span(poll_events).first(poll_events.size());
465 } else {
466 const int epoll_result = epoll_wait(epoll_.get(), epoll_events,
467 std::size(epoll_events), epoll_timeout);
468 if (epoll_result < 0) {
469 DPCHECK(errno == EINTR);
470 return false;
471 }
472 if (epoll_result == 0) {
473 return false;
474 }
475
476 ready_events =
477 span(epoll_events).first(base::checked_cast<size_t>(epoll_result));
478 }
479
480 for (epoll_event& e : ready_events) {
481 if (e.data.ptr == &wake_event_) {
482 // Wake-up events are always safe to handle immediately. Unlike other
483 // events used by MessagePumpEpoll they also don't point to an
484 // EpollEventEntry, so we handle them separately here.
485 HandleWakeUp();
486 e.data.ptr = nullptr;
487 continue;
488 }
489
490 // To guard against one of the ready events unregistering and thus
491 // invalidating one of the others here, first link each entry to the
492 // corresponding epoll_event returned by epoll_wait(). We do this before
493 // dispatching any events, and the second pass below will only dispatch an
494 // event if its epoll_event data is still valid.
495 auto& entry = EpollEventEntry::FromEpollEvent(e);
496 DCHECK(!entry.active_event);
497 EpollEventEntry::FromEpollEvent(e).active_event = &e;
498 }
499
500 for (auto& e : ready_events) {
501 if (e.data.ptr) {
502 auto& entry = EpollEventEntry::FromEpollEvent(e);
503 entry.active_event = nullptr;
504 OnEpollEvent(entry, e.events);
505 }
506 }
507
508 return true;
509 }
510
FindPollEntry(int fd)511 std::vector<struct pollfd>::iterator MessagePumpEpoll::FindPollEntry(int fd) {
512 return std::find_if(
513 pollfds_.begin(), pollfds_.end(),
514 [fd](const struct pollfd poll_entry) { return poll_entry.fd == fd; });
515 }
516
RemovePollEntry(int fd)517 void MessagePumpEpoll::RemovePollEntry(int fd) {
518 pollfds_.erase(FindPollEntry(fd));
519 }
520
GetEventsPoll(int epoll_timeout,std::vector<epoll_event> * epoll_events)521 bool MessagePumpEpoll::GetEventsPoll(int epoll_timeout,
522 std::vector<epoll_event>* epoll_events) {
523 int retval = poll(&pollfds_[0], base::checked_cast<nfds_t>(pollfds_.size()),
524 epoll_timeout);
525 if (retval < 0) {
526 DPCHECK(errno == EINTR);
527 return false;
528 }
529 // Nothing to do, timeout.
530 if (retval == 0) {
531 return false;
532 }
533
534 for (struct pollfd& pollfd_entry : pollfds_) {
535 if (pollfd_entry.revents == 0) {
536 continue;
537 }
538
539 epoll_event event;
540 memset(&event, 0, sizeof(event));
541
542 if (pollfd_entry.fd == wake_event_.get()) {
543 event.data.ptr = &wake_event_;
544 } else {
545 auto entry = entries_.find(pollfd_entry.fd);
546 CHECK(entry != entries_.end());
547 event.data.ptr = &(entry->second);
548 }
549
550 for (const auto& epoll_poll : kEpollToPollEvents) {
551 if (pollfd_entry.revents & epoll_poll.second) {
552 event.events |= epoll_poll.first;
553 }
554 }
555 epoll_events->push_back(event);
556 pollfd_entry.revents = 0;
557 }
558 return true;
559 }
560
OnEpollEvent(EpollEventEntry & entry,uint32_t events)561 void MessagePumpEpoll::OnEpollEvent(EpollEventEntry& entry, uint32_t events) {
562 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
563 DCHECK(!entry.stopped);
564
565 const bool readable = (events & EPOLLIN) != 0;
566 const bool writable = (events & EPOLLOUT) != 0;
567
568 // Under different circumstances, peer closure may raise both/either EPOLLHUP
569 // and/or EPOLLERR. Treat them as equivalent. Notify the watchers to
570 // gracefully stop watching if disconnected.
571 const bool disconnected = (events & (EPOLLHUP | EPOLLERR)) != 0;
572 DCHECK(readable || writable || disconnected);
573
574 // Copy the set of Interests, since interests may be added to or removed from
575 // `entry` during the loop below. This copy is inexpensive in practice
576 // because the size of this vector is expected to be very small (<= 2).
577 auto interests = entry.interests;
578
579 // Any of these interests' event handlers may destroy any of the others'
580 // controllers. Start all of them watching for destruction before we actually
581 // dispatch any events.
582 for (const auto& interest : interests) {
583 interest->WatchForControllerDestruction();
584 }
585
586 bool event_handled = false;
587 for (const auto& interest : interests) {
588 if (!interest->active()) {
589 continue;
590 }
591
592 const bool one_shot = interest->params().one_shot;
593 const bool can_read = (readable || disconnected) && interest->params().read;
594 const bool can_write = (writable || disconnected) &&
595 interest->params().write && (!one_shot || !can_read);
596 if (!can_read && !can_write) {
597 // If this Interest is active but not watching for whichever event was
598 // raised here, there's nothing to do. This can occur if a descriptor has
599 // multiple active interests, since only one interest needs to be
600 // satisfied in order for us to process an epoll event.
601 continue;
602 }
603
604 if (interest->params().one_shot) {
605 // This is a one-shot event watch which is about to be triggered. We
606 // deactivate the interest and update epoll immediately. The event handler
607 // may reactivate it.
608 interest->set_active(false);
609 UpdateEpollEvent(entry);
610 }
611
612 if (!interest->was_controller_destroyed()) {
613 HandleEvent(entry.fd, can_read, can_write, interest->controller());
614 event_handled = true;
615 }
616 }
617
618 // Stop `EpollEventEntry` for disconnected file descriptor without active
619 // interests.
620 if (disconnected && !event_handled) {
621 StopEpollEvent(entry);
622 }
623
624 for (const auto& interest : interests) {
625 interest->StopWatchingForControllerDestruction();
626 }
627 }
628
HandleEvent(int fd,bool can_read,bool can_write,FdWatchController * controller)629 void MessagePumpEpoll::HandleEvent(int fd,
630 bool can_read,
631 bool can_write,
632 FdWatchController* controller) {
633 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
634 BeginNativeWorkBatch();
635 // Make the MessagePumpDelegate aware of this other form of "DoWork". Skip if
636 // HandleNotification() is called outside of Run() (e.g. in unit tests).
637 Delegate::ScopedDoWorkItem scoped_do_work_item;
638 if (run_state_) {
639 scoped_do_work_item = run_state_->delegate->BeginWorkItem();
640 }
641
642 // Trace events must begin after the above BeginWorkItem() so that the
643 // ensuing "ThreadController active" outscopes all the events under it.
644 TRACE_EVENT("toplevel", "EpollEvent", "controller_created_from",
645 controller->created_from_location(), "fd", fd, "can_read",
646 can_read, "can_write", can_write, "context",
647 static_cast<void*>(controller));
648 TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION heap_profiler_scope(
649 controller->created_from_location().file_name());
650 if (can_read && can_write) {
651 bool controller_was_destroyed = false;
652 bool* previous_was_destroyed_flag =
653 std::exchange(controller->was_destroyed_, &controller_was_destroyed);
654
655 controller->OnFdWritable();
656 if (!controller_was_destroyed) {
657 controller->OnFdReadable();
658 }
659 if (!controller_was_destroyed) {
660 controller->was_destroyed_ = previous_was_destroyed_flag;
661 } else if (previous_was_destroyed_flag) {
662 *previous_was_destroyed_flag = true;
663 }
664 } else if (can_write) {
665 controller->OnFdWritable();
666 } else if (can_read) {
667 controller->OnFdReadable();
668 }
669 }
670
HandleWakeUp()671 void MessagePumpEpoll::HandleWakeUp() {
672 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
673 BeginNativeWorkBatch();
674 uint64_t value;
675 ssize_t n = HANDLE_EINTR(read(wake_event_.get(), &value, sizeof(value)));
676 DPCHECK(n == sizeof(value));
677 }
678
BeginNativeWorkBatch()679 void MessagePumpEpoll::BeginNativeWorkBatch() {
680 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
681 // Call `BeginNativeWorkBeforeDoWork()` if native work hasn't started.
682 if (!native_work_started_) {
683 if (run_state_) {
684 run_state_->delegate->BeginNativeWorkBeforeDoWork();
685 }
686 native_work_started_ = true;
687 }
688 }
689
EpollEventEntry(int fd)690 MessagePumpEpoll::EpollEventEntry::EpollEventEntry(int fd) : fd(fd) {}
691
~EpollEventEntry()692 MessagePumpEpoll::EpollEventEntry::~EpollEventEntry() {
693 if (active_event) {
694 DCHECK_EQ(this, active_event->data.ptr);
695 active_event->data.ptr = nullptr;
696 }
697 }
698
ComputeActiveEvents() const699 uint32_t MessagePumpEpoll::EpollEventEntry::ComputeActiveEvents() const {
700 uint32_t events = 0;
701 bool one_shot = true;
702 for (const auto& interest : interests) {
703 if (!interest->active()) {
704 continue;
705 }
706 const InterestParams& params = interest->params();
707 events |= (params.read ? EPOLLIN : 0) | (params.write ? EPOLLOUT : 0);
708 one_shot &= params.one_shot;
709 }
710 if (events != 0 && one_shot) {
711 return events | EPOLLONESHOT;
712 }
713 return events;
714 }
715
FdWatchController(const Location & from_here)716 MessagePumpEpoll::FdWatchController::FdWatchController(
717 const Location& from_here)
718 : FdWatchControllerInterface(from_here) {}
719
~FdWatchController()720 MessagePumpEpoll::FdWatchController::~FdWatchController() {
721 CHECK(StopWatchingFileDescriptor());
722 if (was_destroyed_) {
723 DCHECK(!*was_destroyed_);
724 *was_destroyed_ = true;
725 }
726 }
727
StopWatchingFileDescriptor()728 bool MessagePumpEpoll::FdWatchController::StopWatchingFileDescriptor() {
729 watcher_ = nullptr;
730 if (pump_ && interest_) {
731 pump_->UnregisterInterest(interest_);
732 interest_.reset();
733 pump_.reset();
734 }
735 return true;
736 }
737
738 const scoped_refptr<MessagePumpEpoll::Interest>&
AssignInterest(const InterestParams & params)739 MessagePumpEpoll::FdWatchController::AssignInterest(
740 const InterestParams& params) {
741 interest_ = MakeRefCounted<Interest>(this, params);
742 return interest_;
743 }
744
ClearInterest()745 void MessagePumpEpoll::FdWatchController::ClearInterest() {
746 interest_.reset();
747 }
748
OnFdReadable()749 void MessagePumpEpoll::FdWatchController::OnFdReadable() {
750 if (!watcher_) {
751 // When a watcher is watching both read and write and both are possible, the
752 // pump will call OnFdWritable() first, followed by OnFdReadable(). But
753 // OnFdWritable() may stop or destroy the watch. If the watch is destroyed,
754 // the pump will not call OnFdReadable() at all, but if it's merely stopped,
755 // OnFdReadable() will be called while `watcher_` is null. In this case we
756 // don't actually want to call the client.
757 return;
758 }
759 watcher_->OnFileCanReadWithoutBlocking(interest_->params().fd);
760 }
761
OnFdWritable()762 void MessagePumpEpoll::FdWatchController::OnFdWritable() {
763 DCHECK(watcher_);
764 watcher_->OnFileCanWriteWithoutBlocking(interest_->params().fd);
765 }
766
767 } // namespace base
768