• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_epoll.h"
6 
7 #include <sys/epoll.h>
8 #include <sys/eventfd.h>
9 
10 #include <cstddef>
11 #include <cstdint>
12 #include <utility>
13 
14 #include "base/auto_reset.h"
15 #include "base/check_op.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/posix/eintr_wrapper.h"
18 #include "base/ranges/algorithm.h"
19 #include "base/threading/thread_checker.h"
20 #include "base/trace_event/base_tracing.h"
21 #include "third_party/abseil-cpp/absl/types/optional.h"
22 
23 namespace base {
24 
MessagePumpEpoll()25 MessagePumpEpoll::MessagePumpEpoll() {
26   epoll_.reset(epoll_create1(/*flags=*/0));
27   PCHECK(epoll_.is_valid());
28 
29   wake_event_.reset(eventfd(0, EFD_NONBLOCK));
30   PCHECK(wake_event_.is_valid());
31 
32   epoll_event wake{.events = EPOLLIN, .data = {.ptr = &wake_event_}};
33   int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, wake_event_.get(), &wake);
34   PCHECK(rv == 0);
35 }
36 
37 MessagePumpEpoll::~MessagePumpEpoll() = default;
38 
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * watcher)39 bool MessagePumpEpoll::WatchFileDescriptor(int fd,
40                                            bool persistent,
41                                            int mode,
42                                            FdWatchController* controller,
43                                            FdWatcher* watcher) {
44   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
45   TRACE_EVENT("base", "MessagePumpEpoll::WatchFileDescriptor", "fd", fd,
46               "persistent", persistent, "watch_read", mode & WATCH_READ,
47               "watch_write", mode & WATCH_WRITE);
48 
49   const InterestParams params{
50       .fd = fd,
51       .read = (mode == WATCH_READ || mode == WATCH_READ_WRITE),
52       .write = (mode == WATCH_WRITE || mode == WATCH_READ_WRITE),
53       .one_shot = !persistent,
54   };
55 
56   auto [it, is_new_fd_entry] = entries_.emplace(fd, fd);
57   EpollEventEntry& entry = it->second;
58   scoped_refptr<Interest> existing_interest = controller->epoll_interest();
59   if (existing_interest && existing_interest->params().IsEqual(params)) {
60     // WatchFileDescriptor() has already been called for this controller at
61     // least once before, and as in the most common cases, it is now being
62     // called again with the same parameters.
63     //
64     // We don't need to allocate and register a new Interest in this case, but
65     // we can instead reactivate the existing (presumably deactivated,
66     // non-persistent) Interest.
67     existing_interest->set_active(true);
68   } else {
69     entry.interests.push_back(controller->AssignEpollInterest(params));
70     if (existing_interest) {
71       UnregisterInterest(existing_interest);
72     }
73   }
74 
75   if (is_new_fd_entry) {
76     AddEpollEvent(entry);
77   } else {
78     UpdateEpollEvent(entry);
79   }
80 
81   controller->set_epoll_pump(weak_ptr_factory_.GetWeakPtr());
82   controller->set_watcher(watcher);
83   return true;
84 }
85 
Run(Delegate * delegate)86 void MessagePumpEpoll::Run(Delegate* delegate) {
87   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
88   RunState run_state(delegate);
89   AutoReset<RunState*> auto_reset_run_state(&run_state_, &run_state);
90   for (;;) {
91     // Do some work and see if the next task is ready right away.
92     Delegate::NextWorkInfo next_work_info = delegate->DoWork();
93     const bool immediate_work_available = next_work_info.is_immediate();
94     if (run_state.should_quit) {
95       break;
96     }
97 
98     // Process any immediately ready IO event, but don't wait for more yet.
99     const bool processed_events = WaitForEpollEvents(TimeDelta(), delegate);
100     if (run_state.should_quit) {
101       break;
102     }
103 
104     if (immediate_work_available || processed_events) {
105       continue;
106     }
107 
108     const bool did_idle_work = delegate->DoIdleWork();
109     if (run_state.should_quit) {
110       break;
111     }
112     if (did_idle_work) {
113       continue;
114     }
115 
116     TimeDelta timeout = TimeDelta::Max();
117     DCHECK(!next_work_info.delayed_run_time.is_null());
118     if (!next_work_info.delayed_run_time.is_max()) {
119       timeout = next_work_info.remaining_delay();
120     }
121     delegate->BeforeWait();
122     WaitForEpollEvents(timeout, delegate);
123     if (run_state.should_quit) {
124       break;
125     }
126   }
127 }
128 
Quit()129 void MessagePumpEpoll::Quit() {
130   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
131   DCHECK(run_state_) << "Quit() called outside of Run()";
132   run_state_->should_quit = true;
133 }
134 
ScheduleWork()135 void MessagePumpEpoll::ScheduleWork() {
136   const uint64_t value = 1;
137   ssize_t n = HANDLE_EINTR(write(wake_event_.get(), &value, sizeof(value)));
138 
139   // EAGAIN here implies that the write() would overflow of the event counter,
140   // which is a condition we can safely ignore. It implies that the event
141   // counter is non-zero and therefore readable, which is enough to ensure that
142   // any pending wait eventually wakes up.
143   DPCHECK(n == sizeof(value) || errno == EAGAIN);
144 }
145 
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)146 void MessagePumpEpoll::ScheduleDelayedWork(
147     const Delegate::NextWorkInfo& next_work_info) {
148   // Nothing to do. This can only be called from the same thread as Run(), so
149   // the pump must be in between waits. The scheduled work therefore will be
150   // seen in time for the next wait.
151 }
152 
AddEpollEvent(EpollEventEntry & entry)153 void MessagePumpEpoll::AddEpollEvent(EpollEventEntry& entry) {
154   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
155   const uint32_t events = entry.ComputeActiveEvents();
156   epoll_event event{.events = events, .data = {.ptr = &entry}};
157   int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, entry.fd, &event);
158   DPCHECK(rv == 0);
159   entry.registered_events = events;
160 }
161 
UpdateEpollEvent(EpollEventEntry & entry)162 void MessagePumpEpoll::UpdateEpollEvent(EpollEventEntry& entry) {
163   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
164   const uint32_t events = entry.ComputeActiveEvents();
165   if (events == entry.registered_events && !(events & EPOLLONESHOT)) {
166     // Persistent events don't need to be modified if no bits are changing.
167     return;
168   }
169   epoll_event event{.events = events, .data = {.ptr = &entry}};
170   int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_MOD, entry.fd, &event);
171   DPCHECK(rv == 0);
172   entry.registered_events = events;
173 }
174 
UnregisterInterest(const scoped_refptr<Interest> & interest)175 void MessagePumpEpoll::UnregisterInterest(
176     const scoped_refptr<Interest>& interest) {
177   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
178 
179   const int fd = interest->params().fd;
180   auto entry_it = entries_.find(fd);
181   DCHECK(entry_it != entries_.end());
182 
183   EpollEventEntry& entry = entry_it->second;
184   auto& interests = entry.interests;
185   auto* it = ranges::find(interests, interest);
186   DCHECK(it != interests.end());
187   interests.erase(it);
188 
189   if (interests.empty()) {
190     entries_.erase(entry_it);
191     int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_DEL, fd, nullptr);
192     DPCHECK(rv == 0);
193   } else {
194     UpdateEpollEvent(entry);
195   }
196 }
197 
WaitForEpollEvents(TimeDelta timeout,Delegate * delegate)198 bool MessagePumpEpoll::WaitForEpollEvents(TimeDelta timeout,
199                                           Delegate* delegate) {
200   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
201 
202   // `timeout` has microsecond resolution, but timeouts accepted by epoll_wait()
203   // are integral milliseconds. Round up to the next millisecond.
204   // TODO(https://crbug.com/1382894): Consider higher-resolution timeouts.
205   const int epoll_timeout =
206       timeout.is_max() ? -1
207                        : saturated_cast<int>(timeout.InMillisecondsRoundedUp());
208   epoll_event events[16];
209   const int epoll_result =
210       epoll_wait(epoll_.get(), events, std::size(events), epoll_timeout);
211   if (epoll_result < 0) {
212     DPCHECK(errno == EINTR);
213     return false;
214   }
215 
216   if (epoll_result == 0) {
217     return false;
218   }
219 
220   delegate->BeginNativeWorkBeforeDoWork();
221   const base::span<epoll_event> ready_events(events,
222                                              static_cast<size_t>(epoll_result));
223   for (auto& e : ready_events) {
224     if (e.data.ptr == &wake_event_) {
225       // Wake-up events are always safe to handle immediately. Unlike other
226       // events used by MessagePumpEpoll they also don't point to an
227       // EpollEventEntry, so we handle them separately here.
228       HandleWakeUp();
229       e.data.ptr = nullptr;
230       continue;
231     }
232 
233     // To guard against one of the ready events unregistering and thus
234     // invalidating one of the others here, first link each entry to the
235     // corresponding epoll_event returned by epoll_wait(). We do this before
236     // dispatching any events, and the second pass below will only dispatch an
237     // event if its epoll_event data is still valid.
238     auto& entry = EpollEventEntry::FromEpollEvent(e);
239     DCHECK(!entry.active_event);
240     EpollEventEntry::FromEpollEvent(e).active_event = &e;
241   }
242 
243   for (auto& e : ready_events) {
244     if (e.data.ptr) {
245       auto& entry = EpollEventEntry::FromEpollEvent(e);
246       entry.active_event = nullptr;
247       OnEpollEvent(entry, e.events);
248     }
249   }
250 
251   return true;
252 }
253 
OnEpollEvent(EpollEventEntry & entry,uint32_t events)254 void MessagePumpEpoll::OnEpollEvent(EpollEventEntry& entry, uint32_t events) {
255   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
256 
257   const bool readable = (events & EPOLLIN) != 0;
258   const bool writable = (events & EPOLLOUT) != 0;
259 
260   // Under different circumstances, peer closure may raise both/either EPOLLHUP
261   // and/or EPOLLERR. Treat them as equivalent.
262   const bool disconnected = (events & (EPOLLHUP | EPOLLERR)) != 0;
263 
264   // Copy the set of Interests, since interests may be added to or removed from
265   // `entry` during the loop below. This copy is inexpensive in practice
266   // because the size of this vector is expected to be very small (<= 2).
267   auto interests = entry.interests;
268 
269   // Any of these interests' event handlers may destroy any of the others'
270   // controllers. Start all of them watching for destruction before we actually
271   // dispatch any events.
272   for (const auto& interest : interests) {
273     interest->WatchForControllerDestruction();
274   }
275 
276   for (const auto& interest : interests) {
277     if (!interest->active()) {
278       continue;
279     }
280 
281     const bool can_read = (readable || disconnected) && interest->params().read;
282     const bool can_write = writable && interest->params().write;
283     if (!can_read && !can_write) {
284       // If this Interest is active but not watching for whichever event was
285       // raised here, there's nothing to do. This can occur if a descriptor has
286       // multiple active interests, since only one interest needs to be
287       // satisfied in order for us to process an epoll event.
288       continue;
289     }
290 
291     if (interest->params().one_shot) {
292       // This is a one-shot event watch which is about to be triggered. We
293       // deactivate the interest and update epoll immediately. The event handler
294       // may reactivate it.
295       interest->set_active(false);
296       UpdateEpollEvent(entry);
297     }
298 
299     if (!interest->was_controller_destroyed()) {
300       HandleEvent(entry.fd, can_read, can_write, interest->controller());
301     }
302   }
303 
304   for (const auto& interest : interests) {
305     interest->StopWatchingForControllerDestruction();
306   }
307 }
308 
HandleEvent(int fd,bool can_read,bool can_write,FdWatchController * controller)309 void MessagePumpEpoll::HandleEvent(int fd,
310                                    bool can_read,
311                                    bool can_write,
312                                    FdWatchController* controller) {
313   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
314   // Make the MessagePumpDelegate aware of this other form of "DoWork". Skip if
315   // HandleNotification() is called outside of Run() (e.g. in unit tests).
316   Delegate::ScopedDoWorkItem scoped_do_work_item;
317   if (run_state_) {
318     scoped_do_work_item = run_state_->delegate->BeginWorkItem();
319   }
320 
321   // Trace events must begin after the above BeginWorkItem() so that the
322   // ensuing "ThreadController active" outscopes all the events under it.
323   TRACE_EVENT("toplevel", "EpollEvent", "controller_created_from",
324               controller->created_from_location(), "fd", fd, "can_read",
325               can_read, "can_write", can_write, "context",
326               static_cast<void*>(controller));
327   TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION heap_profiler_scope(
328       controller->created_from_location().file_name());
329   if (can_read && can_write) {
330     bool controller_was_destroyed = false;
331     bool* previous_was_destroyed_flag =
332         std::exchange(controller->was_destroyed_, &controller_was_destroyed);
333 
334     controller->OnFdWritable();
335     if (!controller_was_destroyed) {
336       controller->OnFdReadable();
337     }
338     if (!controller_was_destroyed) {
339       controller->was_destroyed_ = previous_was_destroyed_flag;
340     } else if (previous_was_destroyed_flag) {
341       *previous_was_destroyed_flag = true;
342     }
343   } else if (can_write) {
344     controller->OnFdWritable();
345   } else if (can_read) {
346     controller->OnFdReadable();
347   }
348 }
349 
HandleWakeUp()350 void MessagePumpEpoll::HandleWakeUp() {
351   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
352   uint64_t value;
353   ssize_t n = HANDLE_EINTR(read(wake_event_.get(), &value, sizeof(value)));
354   DPCHECK(n == sizeof(value));
355 }
356 
EpollEventEntry(int fd)357 MessagePumpEpoll::EpollEventEntry::EpollEventEntry(int fd) : fd(fd) {}
358 
~EpollEventEntry()359 MessagePumpEpoll::EpollEventEntry::~EpollEventEntry() {
360   if (active_event) {
361     DCHECK_EQ(this, active_event->data.ptr);
362     active_event->data.ptr = nullptr;
363   }
364 }
365 
ComputeActiveEvents()366 uint32_t MessagePumpEpoll::EpollEventEntry::ComputeActiveEvents() {
367   uint32_t events = 0;
368   bool one_shot = true;
369   for (const auto& interest : interests) {
370     if (!interest->active()) {
371       continue;
372     }
373     const InterestParams& params = interest->params();
374     events |= (params.read ? EPOLLIN : 0) | (params.write ? EPOLLOUT : 0);
375     one_shot &= params.one_shot;
376   }
377   if (events != 0 && one_shot) {
378     return events | EPOLLONESHOT;
379   }
380   return events;
381 }
382 
383 }  // namespace base
384