1 // Copyright 2022 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_loop/message_pump_epoll.h"
6
7 #include <sys/epoll.h>
8 #include <sys/eventfd.h>
9
10 #include <cstddef>
11 #include <cstdint>
12 #include <utility>
13
14 #include "base/auto_reset.h"
15 #include "base/check_op.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/posix/eintr_wrapper.h"
18 #include "base/ranges/algorithm.h"
19 #include "base/threading/thread_checker.h"
20 #include "base/trace_event/base_tracing.h"
21 #include "third_party/abseil-cpp/absl/types/optional.h"
22
23 namespace base {
24
MessagePumpEpoll()25 MessagePumpEpoll::MessagePumpEpoll() {
26 epoll_.reset(epoll_create(/*ignored_but_must_be_positive=*/1));
27 PCHECK(epoll_.is_valid());
28
29 wake_event_.reset(eventfd(0, EFD_NONBLOCK));
30 PCHECK(wake_event_.is_valid());
31
32 epoll_event wake{.events = EPOLLIN, .data = {.ptr = &wake_event_}};
33 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, wake_event_.get(), &wake);
34 PCHECK(rv == 0);
35 }
36
37 MessagePumpEpoll::~MessagePumpEpoll() = default;
38
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * watcher)39 bool MessagePumpEpoll::WatchFileDescriptor(int fd,
40 bool persistent,
41 int mode,
42 FdWatchController* controller,
43 FdWatcher* watcher) {
44 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
45 TRACE_EVENT("base", "MessagePumpEpoll::WatchFileDescriptor", "fd", fd,
46 "persistent", persistent, "watch_read", mode & WATCH_READ,
47 "watch_write", mode & WATCH_WRITE);
48
49 const InterestParams params{
50 .fd = fd,
51 .read = (mode == WATCH_READ || mode == WATCH_READ_WRITE),
52 .write = (mode == WATCH_WRITE || mode == WATCH_READ_WRITE),
53 .one_shot = !persistent,
54 };
55
56 auto [it, is_new_fd_entry] = entries_.emplace(fd, fd);
57 EpollEventEntry& entry = it->second;
58 scoped_refptr<Interest> existing_interest = controller->epoll_interest();
59 if (existing_interest && existing_interest->params().IsEqual(params)) {
60 // WatchFileDescriptor() has already been called for this controller at
61 // least once before, and as in the most common cases, it is now being
62 // called again with the same parameters.
63 //
64 // We don't need to allocate and register a new Interest in this case, but
65 // we can instead reactivate the existing (presumably deactivated,
66 // non-persistent) Interest.
67 existing_interest->set_active(true);
68 } else {
69 entry.interests->push_back(controller->AssignEpollInterest(params));
70 if (existing_interest) {
71 UnregisterInterest(existing_interest);
72 }
73 }
74
75 if (is_new_fd_entry) {
76 AddEpollEvent(entry);
77 } else {
78 UpdateEpollEvent(entry);
79 }
80
81 controller->set_epoll_pump(weak_ptr_factory_.GetWeakPtr());
82 controller->set_watcher(watcher);
83 return true;
84 }
85
Run(Delegate * delegate)86 void MessagePumpEpoll::Run(Delegate* delegate) {
87 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
88 RunState run_state(delegate);
89 AutoReset<RunState*> auto_reset_run_state(&run_state_, &run_state);
90 for (;;) {
91 // Do some work and see if the next task is ready right away.
92 Delegate::NextWorkInfo next_work_info = delegate->DoWork();
93 const bool immediate_work_available = next_work_info.is_immediate();
94 if (run_state.should_quit) {
95 break;
96 }
97
98 // Process any immediately ready IO event, but don't wait for more yet.
99 const bool processed_events = WaitForEpollEvents(TimeDelta());
100 if (run_state.should_quit) {
101 break;
102 }
103
104 if (immediate_work_available || processed_events) {
105 continue;
106 }
107
108 const bool did_idle_work = delegate->DoIdleWork();
109 if (run_state.should_quit) {
110 break;
111 }
112 if (did_idle_work) {
113 continue;
114 }
115
116 TimeDelta timeout = TimeDelta::Max();
117 DCHECK(!next_work_info.delayed_run_time.is_null());
118 if (!next_work_info.delayed_run_time.is_max()) {
119 timeout = next_work_info.remaining_delay();
120 }
121 delegate->BeforeWait();
122 WaitForEpollEvents(timeout);
123 if (run_state.should_quit) {
124 break;
125 }
126 }
127 }
128
Quit()129 void MessagePumpEpoll::Quit() {
130 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
131 DCHECK(run_state_) << "Quit() called outside of Run()";
132 run_state_->should_quit = true;
133 }
134
ScheduleWork()135 void MessagePumpEpoll::ScheduleWork() {
136 const uint64_t value = 1;
137 ssize_t n = HANDLE_EINTR(write(wake_event_.get(), &value, sizeof(value)));
138
139 // EAGAIN here implies that the write() would overflow of the event counter,
140 // which is a condition we can safely ignore. It implies that the event
141 // counter is non-zero and therefore readable, which is enough to ensure that
142 // any pending wait eventually wakes up.
143 DPCHECK(n == sizeof(value) || errno == EAGAIN);
144 }
145
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)146 void MessagePumpEpoll::ScheduleDelayedWork(
147 const Delegate::NextWorkInfo& next_work_info) {
148 // Nothing to do. This can only be called from the same thread as Run(), so
149 // the pump must be in between waits. The scheduled work therefore will be
150 // seen in time for the next wait.
151 }
152
AddEpollEvent(EpollEventEntry & entry)153 void MessagePumpEpoll::AddEpollEvent(EpollEventEntry& entry) {
154 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
155 const uint32_t events = entry.ComputeActiveEvents();
156 epoll_event event{.events = events, .data = {.ptr = &entry}};
157 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, entry.fd, &event);
158 DPCHECK(rv == 0);
159 entry.registered_events = events;
160 }
161
UpdateEpollEvent(EpollEventEntry & entry)162 void MessagePumpEpoll::UpdateEpollEvent(EpollEventEntry& entry) {
163 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
164 const uint32_t events = entry.ComputeActiveEvents();
165 if (events == entry.registered_events && !(events & EPOLLONESHOT)) {
166 // Persistent events don't need to be modified if no bits are changing.
167 return;
168 }
169 epoll_event event{.events = events, .data = {.ptr = &entry}};
170 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_MOD, entry.fd, &event);
171 DPCHECK(rv == 0);
172 entry.registered_events = events;
173 }
174
UnregisterInterest(const scoped_refptr<Interest> & interest)175 void MessagePumpEpoll::UnregisterInterest(
176 const scoped_refptr<Interest>& interest) {
177 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
178
179 const int fd = interest->params().fd;
180 auto entry_it = entries_.find(fd);
181 DCHECK(entry_it != entries_.end());
182
183 EpollEventEntry& entry = entry_it->second;
184 auto& interests = entry.interests.container();
185 auto it = ranges::find(interests, interest);
186 DCHECK(it != interests.end());
187 interests.erase(it);
188
189 if (interests.empty()) {
190 entries_.erase(entry_it);
191 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_DEL, fd, nullptr);
192 DPCHECK(rv == 0);
193 } else {
194 UpdateEpollEvent(entry);
195 }
196 }
197
WaitForEpollEvents(TimeDelta timeout)198 bool MessagePumpEpoll::WaitForEpollEvents(TimeDelta timeout) {
199 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
200
201 // `timeout` has microsecond resolution, but timeouts accepted by epoll_wait()
202 // are integral milliseconds. Round up to the next millisecond.
203 // TODO(https://crbug.com/1382894): Consider higher-resolution timeouts.
204 const int epoll_timeout =
205 timeout.is_max() ? -1
206 : saturated_cast<int>(timeout.InMillisecondsRoundedUp());
207 epoll_event events[16];
208 const int epoll_result =
209 epoll_wait(epoll_.get(), events, std::size(events), epoll_timeout);
210 if (epoll_result < 0) {
211 DPCHECK(errno == EINTR);
212 return false;
213 }
214
215 if (epoll_result == 0) {
216 return false;
217 }
218
219 const base::span<epoll_event> ready_events(events,
220 static_cast<size_t>(epoll_result));
221 for (auto& e : ready_events) {
222 if (e.data.ptr == &wake_event_) {
223 // Wake-up events are always safe to handle immediately. Unlike other
224 // events used by MessagePumpEpoll they also don't point to an
225 // EpollEventEntry, so we handle them separately here.
226 HandleWakeUp();
227 e.data.ptr = nullptr;
228 continue;
229 }
230
231 // To guard against one of the ready events unregistering and thus
232 // invalidating one of the others here, first link each entry to the
233 // corresponding epoll_event returned by epoll_wait(). We do this before
234 // dispatching any events, and the second pass below will only dispatch an
235 // event if its epoll_event data is still valid.
236 auto& entry = EpollEventEntry::FromEpollEvent(e);
237 DCHECK(!entry.active_event);
238 EpollEventEntry::FromEpollEvent(e).active_event = &e;
239 }
240
241 for (auto& e : ready_events) {
242 if (e.data.ptr) {
243 auto& entry = EpollEventEntry::FromEpollEvent(e);
244 entry.active_event = nullptr;
245 OnEpollEvent(entry, e.events);
246 }
247 }
248
249 return true;
250 }
251
OnEpollEvent(EpollEventEntry & entry,uint32_t events)252 void MessagePumpEpoll::OnEpollEvent(EpollEventEntry& entry, uint32_t events) {
253 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
254
255 const bool readable = (events & EPOLLIN) != 0;
256 const bool writable = (events & EPOLLOUT) != 0;
257
258 // Under different circumstances, peer closure may raise both/either EPOLLHUP
259 // and/or EPOLLERR. Treat them as equivalent.
260 const bool disconnected = (events & (EPOLLHUP | EPOLLERR)) != 0;
261
262 // Copy the set of Interests, since interests may be added to or removed from
263 // `entry` during the loop below. This copy is inexpensive in practice
264 // because the size of this vector is expected to be very small (<= 2).
265 auto interests = entry.interests;
266
267 // Any of these interests' event handlers may destroy any of the others'
268 // controllers. Start all of them watching for destruction before we actually
269 // dispatch any events.
270 for (const auto& interest : interests.container()) {
271 interest->WatchForControllerDestruction();
272 }
273
274 for (const auto& interest : interests.container()) {
275 if (!interest->active()) {
276 continue;
277 }
278
279 const bool can_read = (readable || disconnected) && interest->params().read;
280 const bool can_write = writable && interest->params().write;
281 if (!can_read && !can_write) {
282 // If this Interest is active but not watching for whichever event was
283 // raised here, there's nothing to do. This can occur if a descriptor has
284 // multiple active interests, since only one interest needs to be
285 // satisfied in order for us to process an epoll event.
286 continue;
287 }
288
289 if (interest->params().one_shot) {
290 // This is a one-shot event watch which is about to be triggered. We
291 // deactivate the interest and update epoll immediately. The event handler
292 // may reactivate it.
293 interest->set_active(false);
294 UpdateEpollEvent(entry);
295 }
296
297 if (!interest->was_controller_destroyed()) {
298 HandleEvent(entry.fd, can_read, can_write, interest->controller());
299 }
300 }
301
302 for (const auto& interest : interests.container()) {
303 interest->StopWatchingForControllerDestruction();
304 }
305 }
306
HandleEvent(int fd,bool can_read,bool can_write,FdWatchController * controller)307 void MessagePumpEpoll::HandleEvent(int fd,
308 bool can_read,
309 bool can_write,
310 FdWatchController* controller) {
311 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
312 // Make the MessagePumpDelegate aware of this other form of "DoWork". Skip if
313 // HandleNotification() is called outside of Run() (e.g. in unit tests).
314 Delegate::ScopedDoWorkItem scoped_do_work_item;
315 if (run_state_) {
316 scoped_do_work_item = run_state_->delegate->BeginWorkItem();
317 }
318
319 // Trace events must begin after the above BeginWorkItem() so that the
320 // ensuing "ThreadController active" outscopes all the events under it.
321 TRACE_EVENT("toplevel", "EpollEvent", "controller_created_from",
322 controller->created_from_location(), "fd", fd, "can_read",
323 can_read, "can_write", can_write, "context",
324 static_cast<void*>(controller));
325 TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION heap_profiler_scope(
326 controller->created_from_location().file_name());
327 if (can_read && can_write) {
328 bool controller_was_destroyed = false;
329 bool* previous_was_destroyed_flag =
330 std::exchange(controller->was_destroyed_, &controller_was_destroyed);
331
332 controller->OnFdWritable();
333 if (!controller_was_destroyed) {
334 controller->OnFdReadable();
335 }
336 if (!controller_was_destroyed) {
337 controller->was_destroyed_ = previous_was_destroyed_flag;
338 } else if (previous_was_destroyed_flag) {
339 *previous_was_destroyed_flag = true;
340 }
341 } else if (can_write) {
342 controller->OnFdWritable();
343 } else if (can_read) {
344 controller->OnFdReadable();
345 }
346 }
347
HandleWakeUp()348 void MessagePumpEpoll::HandleWakeUp() {
349 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
350 uint64_t value;
351 ssize_t n = HANDLE_EINTR(read(wake_event_.get(), &value, sizeof(value)));
352 DPCHECK(n == sizeof(value));
353 }
354
EpollEventEntry(int fd)355 MessagePumpEpoll::EpollEventEntry::EpollEventEntry(int fd) : fd(fd) {}
356
~EpollEventEntry()357 MessagePumpEpoll::EpollEventEntry::~EpollEventEntry() {
358 if (active_event) {
359 DCHECK_EQ(this, active_event->data.ptr);
360 active_event->data.ptr = nullptr;
361 }
362 }
363
ComputeActiveEvents()364 uint32_t MessagePumpEpoll::EpollEventEntry::ComputeActiveEvents() {
365 uint32_t events = 0;
366 bool one_shot = true;
367 for (const auto& interest : interests.container()) {
368 if (!interest->active()) {
369 continue;
370 }
371 const InterestParams& params = interest->params();
372 events |= (params.read ? EPOLLIN : 0) | (params.write ? EPOLLOUT : 0);
373 one_shot &= params.one_shot;
374 }
375 if (events != 0 && one_shot) {
376 return events | EPOLLONESHOT;
377 }
378 return events;
379 }
380
381 } // namespace base
382