• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_epoll.h"
6 
7 #include <sys/epoll.h>
8 #include <sys/eventfd.h>
9 
10 #include <cstddef>
11 #include <cstdint>
12 #include <utility>
13 
14 #include "base/auto_reset.h"
15 #include "base/check_op.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/posix/eintr_wrapper.h"
18 #include "base/ranges/algorithm.h"
19 #include "base/threading/thread_checker.h"
20 #include "base/trace_event/base_tracing.h"
21 #include "third_party/abseil-cpp/absl/types/optional.h"
22 
23 namespace base {
24 
MessagePumpEpoll()25 MessagePumpEpoll::MessagePumpEpoll() {
26   epoll_.reset(epoll_create(/*ignored_but_must_be_positive=*/1));
27   PCHECK(epoll_.is_valid());
28 
29   wake_event_.reset(eventfd(0, EFD_NONBLOCK));
30   PCHECK(wake_event_.is_valid());
31 
32   epoll_event wake{.events = EPOLLIN, .data = {.ptr = &wake_event_}};
33   int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, wake_event_.get(), &wake);
34   PCHECK(rv == 0);
35 }
36 
37 MessagePumpEpoll::~MessagePumpEpoll() = default;
38 
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * watcher)39 bool MessagePumpEpoll::WatchFileDescriptor(int fd,
40                                            bool persistent,
41                                            int mode,
42                                            FdWatchController* controller,
43                                            FdWatcher* watcher) {
44   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
45   TRACE_EVENT("base", "MessagePumpEpoll::WatchFileDescriptor", "fd", fd,
46               "persistent", persistent, "watch_read", mode & WATCH_READ,
47               "watch_write", mode & WATCH_WRITE);
48 
49   const InterestParams params{
50       .fd = fd,
51       .read = (mode == WATCH_READ || mode == WATCH_READ_WRITE),
52       .write = (mode == WATCH_WRITE || mode == WATCH_READ_WRITE),
53       .one_shot = !persistent,
54   };
55 
56   auto [it, is_new_fd_entry] = entries_.emplace(fd, fd);
57   EpollEventEntry& entry = it->second;
58   scoped_refptr<Interest> existing_interest = controller->epoll_interest();
59   if (existing_interest && existing_interest->params().IsEqual(params)) {
60     // WatchFileDescriptor() has already been called for this controller at
61     // least once before, and as in the most common cases, it is now being
62     // called again with the same parameters.
63     //
64     // We don't need to allocate and register a new Interest in this case, but
65     // we can instead reactivate the existing (presumably deactivated,
66     // non-persistent) Interest.
67     existing_interest->set_active(true);
68   } else {
69     entry.interests->push_back(controller->AssignEpollInterest(params));
70     if (existing_interest) {
71       UnregisterInterest(existing_interest);
72     }
73   }
74 
75   if (is_new_fd_entry) {
76     AddEpollEvent(entry);
77   } else {
78     UpdateEpollEvent(entry);
79   }
80 
81   controller->set_epoll_pump(weak_ptr_factory_.GetWeakPtr());
82   controller->set_watcher(watcher);
83   return true;
84 }
85 
Run(Delegate * delegate)86 void MessagePumpEpoll::Run(Delegate* delegate) {
87   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
88   RunState run_state(delegate);
89   AutoReset<RunState*> auto_reset_run_state(&run_state_, &run_state);
90   for (;;) {
91     // Do some work and see if the next task is ready right away.
92     Delegate::NextWorkInfo next_work_info = delegate->DoWork();
93     const bool immediate_work_available = next_work_info.is_immediate();
94     if (run_state.should_quit) {
95       break;
96     }
97 
98     // Process any immediately ready IO event, but don't wait for more yet.
99     const bool processed_events = WaitForEpollEvents(TimeDelta());
100     if (run_state.should_quit) {
101       break;
102     }
103 
104     if (immediate_work_available || processed_events) {
105       continue;
106     }
107 
108     const bool did_idle_work = delegate->DoIdleWork();
109     if (run_state.should_quit) {
110       break;
111     }
112     if (did_idle_work) {
113       continue;
114     }
115 
116     TimeDelta timeout = TimeDelta::Max();
117     DCHECK(!next_work_info.delayed_run_time.is_null());
118     if (!next_work_info.delayed_run_time.is_max()) {
119       timeout = next_work_info.remaining_delay();
120     }
121     delegate->BeforeWait();
122     WaitForEpollEvents(timeout);
123     if (run_state.should_quit) {
124       break;
125     }
126   }
127 }
128 
Quit()129 void MessagePumpEpoll::Quit() {
130   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
131   DCHECK(run_state_) << "Quit() called outside of Run()";
132   run_state_->should_quit = true;
133 }
134 
ScheduleWork()135 void MessagePumpEpoll::ScheduleWork() {
136   const uint64_t value = 1;
137   ssize_t n = HANDLE_EINTR(write(wake_event_.get(), &value, sizeof(value)));
138 
139   // EAGAIN here implies that the write() would overflow of the event counter,
140   // which is a condition we can safely ignore. It implies that the event
141   // counter is non-zero and therefore readable, which is enough to ensure that
142   // any pending wait eventually wakes up.
143   DPCHECK(n == sizeof(value) || errno == EAGAIN);
144 }
145 
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)146 void MessagePumpEpoll::ScheduleDelayedWork(
147     const Delegate::NextWorkInfo& next_work_info) {
148   // Nothing to do. This can only be called from the same thread as Run(), so
149   // the pump must be in between waits. The scheduled work therefore will be
150   // seen in time for the next wait.
151 }
152 
AddEpollEvent(EpollEventEntry & entry)153 void MessagePumpEpoll::AddEpollEvent(EpollEventEntry& entry) {
154   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
155   const uint32_t events = entry.ComputeActiveEvents();
156   epoll_event event{.events = events, .data = {.ptr = &entry}};
157   int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, entry.fd, &event);
158   DPCHECK(rv == 0);
159   entry.registered_events = events;
160 }
161 
UpdateEpollEvent(EpollEventEntry & entry)162 void MessagePumpEpoll::UpdateEpollEvent(EpollEventEntry& entry) {
163   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
164   const uint32_t events = entry.ComputeActiveEvents();
165   if (events == entry.registered_events && !(events & EPOLLONESHOT)) {
166     // Persistent events don't need to be modified if no bits are changing.
167     return;
168   }
169   epoll_event event{.events = events, .data = {.ptr = &entry}};
170   int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_MOD, entry.fd, &event);
171   DPCHECK(rv == 0);
172   entry.registered_events = events;
173 }
174 
UnregisterInterest(const scoped_refptr<Interest> & interest)175 void MessagePumpEpoll::UnregisterInterest(
176     const scoped_refptr<Interest>& interest) {
177   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
178 
179   const int fd = interest->params().fd;
180   auto entry_it = entries_.find(fd);
181   DCHECK(entry_it != entries_.end());
182 
183   EpollEventEntry& entry = entry_it->second;
184   auto& interests = entry.interests.container();
185   auto it = ranges::find(interests, interest);
186   DCHECK(it != interests.end());
187   interests.erase(it);
188 
189   if (interests.empty()) {
190     entries_.erase(entry_it);
191     int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_DEL, fd, nullptr);
192     DPCHECK(rv == 0);
193   } else {
194     UpdateEpollEvent(entry);
195   }
196 }
197 
WaitForEpollEvents(TimeDelta timeout)198 bool MessagePumpEpoll::WaitForEpollEvents(TimeDelta timeout) {
199   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
200 
201   // `timeout` has microsecond resolution, but timeouts accepted by epoll_wait()
202   // are integral milliseconds. Round up to the next millisecond.
203   // TODO(https://crbug.com/1382894): Consider higher-resolution timeouts.
204   const int epoll_timeout =
205       timeout.is_max() ? -1
206                        : saturated_cast<int>(timeout.InMillisecondsRoundedUp());
207   epoll_event events[16];
208   const int epoll_result =
209       epoll_wait(epoll_.get(), events, std::size(events), epoll_timeout);
210   if (epoll_result < 0) {
211     DPCHECK(errno == EINTR);
212     return false;
213   }
214 
215   if (epoll_result == 0) {
216     return false;
217   }
218 
219   const base::span<epoll_event> ready_events(events,
220                                              static_cast<size_t>(epoll_result));
221   for (auto& e : ready_events) {
222     if (e.data.ptr == &wake_event_) {
223       // Wake-up events are always safe to handle immediately. Unlike other
224       // events used by MessagePumpEpoll they also don't point to an
225       // EpollEventEntry, so we handle them separately here.
226       HandleWakeUp();
227       e.data.ptr = nullptr;
228       continue;
229     }
230 
231     // To guard against one of the ready events unregistering and thus
232     // invalidating one of the others here, first link each entry to the
233     // corresponding epoll_event returned by epoll_wait(). We do this before
234     // dispatching any events, and the second pass below will only dispatch an
235     // event if its epoll_event data is still valid.
236     auto& entry = EpollEventEntry::FromEpollEvent(e);
237     DCHECK(!entry.active_event);
238     EpollEventEntry::FromEpollEvent(e).active_event = &e;
239   }
240 
241   for (auto& e : ready_events) {
242     if (e.data.ptr) {
243       auto& entry = EpollEventEntry::FromEpollEvent(e);
244       entry.active_event = nullptr;
245       OnEpollEvent(entry, e.events);
246     }
247   }
248 
249   return true;
250 }
251 
OnEpollEvent(EpollEventEntry & entry,uint32_t events)252 void MessagePumpEpoll::OnEpollEvent(EpollEventEntry& entry, uint32_t events) {
253   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
254 
255   const bool readable = (events & EPOLLIN) != 0;
256   const bool writable = (events & EPOLLOUT) != 0;
257 
258   // Under different circumstances, peer closure may raise both/either EPOLLHUP
259   // and/or EPOLLERR. Treat them as equivalent.
260   const bool disconnected = (events & (EPOLLHUP | EPOLLERR)) != 0;
261 
262   // Copy the set of Interests, since interests may be added to or removed from
263   // `entry` during the loop below. This copy is inexpensive in practice
264   // because the size of this vector is expected to be very small (<= 2).
265   auto interests = entry.interests;
266 
267   // Any of these interests' event handlers may destroy any of the others'
268   // controllers. Start all of them watching for destruction before we actually
269   // dispatch any events.
270   for (const auto& interest : interests.container()) {
271     interest->WatchForControllerDestruction();
272   }
273 
274   for (const auto& interest : interests.container()) {
275     if (!interest->active()) {
276       continue;
277     }
278 
279     const bool can_read = (readable || disconnected) && interest->params().read;
280     const bool can_write = writable && interest->params().write;
281     if (!can_read && !can_write) {
282       // If this Interest is active but not watching for whichever event was
283       // raised here, there's nothing to do. This can occur if a descriptor has
284       // multiple active interests, since only one interest needs to be
285       // satisfied in order for us to process an epoll event.
286       continue;
287     }
288 
289     if (interest->params().one_shot) {
290       // This is a one-shot event watch which is about to be triggered. We
291       // deactivate the interest and update epoll immediately. The event handler
292       // may reactivate it.
293       interest->set_active(false);
294       UpdateEpollEvent(entry);
295     }
296 
297     if (!interest->was_controller_destroyed()) {
298       HandleEvent(entry.fd, can_read, can_write, interest->controller());
299     }
300   }
301 
302   for (const auto& interest : interests.container()) {
303     interest->StopWatchingForControllerDestruction();
304   }
305 }
306 
HandleEvent(int fd,bool can_read,bool can_write,FdWatchController * controller)307 void MessagePumpEpoll::HandleEvent(int fd,
308                                    bool can_read,
309                                    bool can_write,
310                                    FdWatchController* controller) {
311   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
312   // Make the MessagePumpDelegate aware of this other form of "DoWork". Skip if
313   // HandleNotification() is called outside of Run() (e.g. in unit tests).
314   Delegate::ScopedDoWorkItem scoped_do_work_item;
315   if (run_state_) {
316     scoped_do_work_item = run_state_->delegate->BeginWorkItem();
317   }
318 
319   // Trace events must begin after the above BeginWorkItem() so that the
320   // ensuing "ThreadController active" outscopes all the events under it.
321   TRACE_EVENT("toplevel", "EpollEvent", "controller_created_from",
322               controller->created_from_location(), "fd", fd, "can_read",
323               can_read, "can_write", can_write, "context",
324               static_cast<void*>(controller));
325   TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION heap_profiler_scope(
326       controller->created_from_location().file_name());
327   if (can_read && can_write) {
328     bool controller_was_destroyed = false;
329     bool* previous_was_destroyed_flag =
330         std::exchange(controller->was_destroyed_, &controller_was_destroyed);
331 
332     controller->OnFdWritable();
333     if (!controller_was_destroyed) {
334       controller->OnFdReadable();
335     }
336     if (!controller_was_destroyed) {
337       controller->was_destroyed_ = previous_was_destroyed_flag;
338     } else if (previous_was_destroyed_flag) {
339       *previous_was_destroyed_flag = true;
340     }
341   } else if (can_write) {
342     controller->OnFdWritable();
343   } else if (can_read) {
344     controller->OnFdReadable();
345   }
346 }
347 
HandleWakeUp()348 void MessagePumpEpoll::HandleWakeUp() {
349   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
350   uint64_t value;
351   ssize_t n = HANDLE_EINTR(read(wake_event_.get(), &value, sizeof(value)));
352   DPCHECK(n == sizeof(value));
353 }
354 
EpollEventEntry(int fd)355 MessagePumpEpoll::EpollEventEntry::EpollEventEntry(int fd) : fd(fd) {}
356 
~EpollEventEntry()357 MessagePumpEpoll::EpollEventEntry::~EpollEventEntry() {
358   if (active_event) {
359     DCHECK_EQ(this, active_event->data.ptr);
360     active_event->data.ptr = nullptr;
361   }
362 }
363 
ComputeActiveEvents()364 uint32_t MessagePumpEpoll::EpollEventEntry::ComputeActiveEvents() {
365   uint32_t events = 0;
366   bool one_shot = true;
367   for (const auto& interest : interests.container()) {
368     if (!interest->active()) {
369       continue;
370     }
371     const InterestParams& params = interest->params();
372     events |= (params.read ? EPOLLIN : 0) | (params.write ? EPOLLOUT : 0);
373     one_shot &= params.one_shot;
374   }
375   if (events != 0 && one_shot) {
376     return events | EPOLLONESHOT;
377   }
378   return events;
379 }
380 
381 }  // namespace base
382