1 // Copyright 2022 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_loop/message_pump_epoll.h"
6
7 #include <sys/epoll.h>
8 #include <sys/eventfd.h>
9
10 #include <cstddef>
11 #include <cstdint>
12 #include <utility>
13
14 #include "base/auto_reset.h"
15 #include "base/check_op.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/posix/eintr_wrapper.h"
18 #include "base/ranges/algorithm.h"
19 #include "base/threading/thread_checker.h"
20 #include "base/trace_event/base_tracing.h"
21 #include "third_party/abseil-cpp/absl/types/optional.h"
22
23 namespace base {
24
MessagePumpEpoll()25 MessagePumpEpoll::MessagePumpEpoll() {
26 epoll_.reset(epoll_create1(/*flags=*/0));
27 PCHECK(epoll_.is_valid());
28
29 wake_event_.reset(eventfd(0, EFD_NONBLOCK));
30 PCHECK(wake_event_.is_valid());
31
32 epoll_event wake{.events = EPOLLIN, .data = {.ptr = &wake_event_}};
33 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, wake_event_.get(), &wake);
34 PCHECK(rv == 0);
35 }
36
37 MessagePumpEpoll::~MessagePumpEpoll() = default;
38
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * watcher)39 bool MessagePumpEpoll::WatchFileDescriptor(int fd,
40 bool persistent,
41 int mode,
42 FdWatchController* controller,
43 FdWatcher* watcher) {
44 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
45 TRACE_EVENT("base", "MessagePumpEpoll::WatchFileDescriptor", "fd", fd,
46 "persistent", persistent, "watch_read", mode & WATCH_READ,
47 "watch_write", mode & WATCH_WRITE);
48
49 const InterestParams params{
50 .fd = fd,
51 .read = (mode == WATCH_READ || mode == WATCH_READ_WRITE),
52 .write = (mode == WATCH_WRITE || mode == WATCH_READ_WRITE),
53 .one_shot = !persistent,
54 };
55
56 auto [it, is_new_fd_entry] = entries_.emplace(fd, fd);
57 EpollEventEntry& entry = it->second;
58 scoped_refptr<Interest> existing_interest = controller->epoll_interest();
59 if (existing_interest && existing_interest->params().IsEqual(params)) {
60 // WatchFileDescriptor() has already been called for this controller at
61 // least once before, and as in the most common cases, it is now being
62 // called again with the same parameters.
63 //
64 // We don't need to allocate and register a new Interest in this case, but
65 // we can instead reactivate the existing (presumably deactivated,
66 // non-persistent) Interest.
67 existing_interest->set_active(true);
68 } else {
69 entry.interests.push_back(controller->AssignEpollInterest(params));
70 if (existing_interest) {
71 UnregisterInterest(existing_interest);
72 }
73 }
74
75 if (is_new_fd_entry) {
76 AddEpollEvent(entry);
77 } else {
78 UpdateEpollEvent(entry);
79 }
80
81 controller->set_epoll_pump(weak_ptr_factory_.GetWeakPtr());
82 controller->set_watcher(watcher);
83 return true;
84 }
85
Run(Delegate * delegate)86 void MessagePumpEpoll::Run(Delegate* delegate) {
87 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
88 RunState run_state(delegate);
89 AutoReset<RunState*> auto_reset_run_state(&run_state_, &run_state);
90 for (;;) {
91 // Do some work and see if the next task is ready right away.
92 Delegate::NextWorkInfo next_work_info = delegate->DoWork();
93 const bool immediate_work_available = next_work_info.is_immediate();
94 if (run_state.should_quit) {
95 break;
96 }
97
98 // Process any immediately ready IO event, but don't wait for more yet.
99 const bool processed_events = WaitForEpollEvents(TimeDelta(), delegate);
100 if (run_state.should_quit) {
101 break;
102 }
103
104 if (immediate_work_available || processed_events) {
105 continue;
106 }
107
108 const bool did_idle_work = delegate->DoIdleWork();
109 if (run_state.should_quit) {
110 break;
111 }
112 if (did_idle_work) {
113 continue;
114 }
115
116 TimeDelta timeout = TimeDelta::Max();
117 DCHECK(!next_work_info.delayed_run_time.is_null());
118 if (!next_work_info.delayed_run_time.is_max()) {
119 timeout = next_work_info.remaining_delay();
120 }
121 delegate->BeforeWait();
122 WaitForEpollEvents(timeout, delegate);
123 if (run_state.should_quit) {
124 break;
125 }
126 }
127 }
128
Quit()129 void MessagePumpEpoll::Quit() {
130 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
131 DCHECK(run_state_) << "Quit() called outside of Run()";
132 run_state_->should_quit = true;
133 }
134
ScheduleWork()135 void MessagePumpEpoll::ScheduleWork() {
136 const uint64_t value = 1;
137 ssize_t n = HANDLE_EINTR(write(wake_event_.get(), &value, sizeof(value)));
138
139 // EAGAIN here implies that the write() would overflow of the event counter,
140 // which is a condition we can safely ignore. It implies that the event
141 // counter is non-zero and therefore readable, which is enough to ensure that
142 // any pending wait eventually wakes up.
143 DPCHECK(n == sizeof(value) || errno == EAGAIN);
144 }
145
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)146 void MessagePumpEpoll::ScheduleDelayedWork(
147 const Delegate::NextWorkInfo& next_work_info) {
148 // Nothing to do. This can only be called from the same thread as Run(), so
149 // the pump must be in between waits. The scheduled work therefore will be
150 // seen in time for the next wait.
151 }
152
AddEpollEvent(EpollEventEntry & entry)153 void MessagePumpEpoll::AddEpollEvent(EpollEventEntry& entry) {
154 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
155 const uint32_t events = entry.ComputeActiveEvents();
156 epoll_event event{.events = events, .data = {.ptr = &entry}};
157 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, entry.fd, &event);
158 DPCHECK(rv == 0);
159 entry.registered_events = events;
160 }
161
UpdateEpollEvent(EpollEventEntry & entry)162 void MessagePumpEpoll::UpdateEpollEvent(EpollEventEntry& entry) {
163 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
164 const uint32_t events = entry.ComputeActiveEvents();
165 if (events == entry.registered_events && !(events & EPOLLONESHOT)) {
166 // Persistent events don't need to be modified if no bits are changing.
167 return;
168 }
169 epoll_event event{.events = events, .data = {.ptr = &entry}};
170 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_MOD, entry.fd, &event);
171 DPCHECK(rv == 0);
172 entry.registered_events = events;
173 }
174
UnregisterInterest(const scoped_refptr<Interest> & interest)175 void MessagePumpEpoll::UnregisterInterest(
176 const scoped_refptr<Interest>& interest) {
177 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
178
179 const int fd = interest->params().fd;
180 auto entry_it = entries_.find(fd);
181 DCHECK(entry_it != entries_.end());
182
183 EpollEventEntry& entry = entry_it->second;
184 auto& interests = entry.interests;
185 auto* it = ranges::find(interests, interest);
186 DCHECK(it != interests.end());
187 interests.erase(it);
188
189 if (interests.empty()) {
190 entries_.erase(entry_it);
191 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_DEL, fd, nullptr);
192 DPCHECK(rv == 0);
193 } else {
194 UpdateEpollEvent(entry);
195 }
196 }
197
WaitForEpollEvents(TimeDelta timeout,Delegate * delegate)198 bool MessagePumpEpoll::WaitForEpollEvents(TimeDelta timeout,
199 Delegate* delegate) {
200 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
201
202 // `timeout` has microsecond resolution, but timeouts accepted by epoll_wait()
203 // are integral milliseconds. Round up to the next millisecond.
204 // TODO(https://crbug.com/1382894): Consider higher-resolution timeouts.
205 const int epoll_timeout =
206 timeout.is_max() ? -1
207 : saturated_cast<int>(timeout.InMillisecondsRoundedUp());
208 epoll_event events[16];
209 const int epoll_result =
210 epoll_wait(epoll_.get(), events, std::size(events), epoll_timeout);
211 if (epoll_result < 0) {
212 DPCHECK(errno == EINTR);
213 return false;
214 }
215
216 if (epoll_result == 0) {
217 return false;
218 }
219
220 delegate->BeginNativeWorkBeforeDoWork();
221 const base::span<epoll_event> ready_events(events,
222 static_cast<size_t>(epoll_result));
223 for (auto& e : ready_events) {
224 if (e.data.ptr == &wake_event_) {
225 // Wake-up events are always safe to handle immediately. Unlike other
226 // events used by MessagePumpEpoll they also don't point to an
227 // EpollEventEntry, so we handle them separately here.
228 HandleWakeUp();
229 e.data.ptr = nullptr;
230 continue;
231 }
232
233 // To guard against one of the ready events unregistering and thus
234 // invalidating one of the others here, first link each entry to the
235 // corresponding epoll_event returned by epoll_wait(). We do this before
236 // dispatching any events, and the second pass below will only dispatch an
237 // event if its epoll_event data is still valid.
238 auto& entry = EpollEventEntry::FromEpollEvent(e);
239 DCHECK(!entry.active_event);
240 EpollEventEntry::FromEpollEvent(e).active_event = &e;
241 }
242
243 for (auto& e : ready_events) {
244 if (e.data.ptr) {
245 auto& entry = EpollEventEntry::FromEpollEvent(e);
246 entry.active_event = nullptr;
247 OnEpollEvent(entry, e.events);
248 }
249 }
250
251 return true;
252 }
253
OnEpollEvent(EpollEventEntry & entry,uint32_t events)254 void MessagePumpEpoll::OnEpollEvent(EpollEventEntry& entry, uint32_t events) {
255 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
256
257 const bool readable = (events & EPOLLIN) != 0;
258 const bool writable = (events & EPOLLOUT) != 0;
259
260 // Under different circumstances, peer closure may raise both/either EPOLLHUP
261 // and/or EPOLLERR. Treat them as equivalent.
262 const bool disconnected = (events & (EPOLLHUP | EPOLLERR)) != 0;
263
264 // Copy the set of Interests, since interests may be added to or removed from
265 // `entry` during the loop below. This copy is inexpensive in practice
266 // because the size of this vector is expected to be very small (<= 2).
267 auto interests = entry.interests;
268
269 // Any of these interests' event handlers may destroy any of the others'
270 // controllers. Start all of them watching for destruction before we actually
271 // dispatch any events.
272 for (const auto& interest : interests) {
273 interest->WatchForControllerDestruction();
274 }
275
276 for (const auto& interest : interests) {
277 if (!interest->active()) {
278 continue;
279 }
280
281 const bool can_read = (readable || disconnected) && interest->params().read;
282 const bool can_write = writable && interest->params().write;
283 if (!can_read && !can_write) {
284 // If this Interest is active but not watching for whichever event was
285 // raised here, there's nothing to do. This can occur if a descriptor has
286 // multiple active interests, since only one interest needs to be
287 // satisfied in order for us to process an epoll event.
288 continue;
289 }
290
291 if (interest->params().one_shot) {
292 // This is a one-shot event watch which is about to be triggered. We
293 // deactivate the interest and update epoll immediately. The event handler
294 // may reactivate it.
295 interest->set_active(false);
296 UpdateEpollEvent(entry);
297 }
298
299 if (!interest->was_controller_destroyed()) {
300 HandleEvent(entry.fd, can_read, can_write, interest->controller());
301 }
302 }
303
304 for (const auto& interest : interests) {
305 interest->StopWatchingForControllerDestruction();
306 }
307 }
308
HandleEvent(int fd,bool can_read,bool can_write,FdWatchController * controller)309 void MessagePumpEpoll::HandleEvent(int fd,
310 bool can_read,
311 bool can_write,
312 FdWatchController* controller) {
313 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
314 // Make the MessagePumpDelegate aware of this other form of "DoWork". Skip if
315 // HandleNotification() is called outside of Run() (e.g. in unit tests).
316 Delegate::ScopedDoWorkItem scoped_do_work_item;
317 if (run_state_) {
318 scoped_do_work_item = run_state_->delegate->BeginWorkItem();
319 }
320
321 // Trace events must begin after the above BeginWorkItem() so that the
322 // ensuing "ThreadController active" outscopes all the events under it.
323 TRACE_EVENT("toplevel", "EpollEvent", "controller_created_from",
324 controller->created_from_location(), "fd", fd, "can_read",
325 can_read, "can_write", can_write, "context",
326 static_cast<void*>(controller));
327 TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION heap_profiler_scope(
328 controller->created_from_location().file_name());
329 if (can_read && can_write) {
330 bool controller_was_destroyed = false;
331 bool* previous_was_destroyed_flag =
332 std::exchange(controller->was_destroyed_, &controller_was_destroyed);
333
334 controller->OnFdWritable();
335 if (!controller_was_destroyed) {
336 controller->OnFdReadable();
337 }
338 if (!controller_was_destroyed) {
339 controller->was_destroyed_ = previous_was_destroyed_flag;
340 } else if (previous_was_destroyed_flag) {
341 *previous_was_destroyed_flag = true;
342 }
343 } else if (can_write) {
344 controller->OnFdWritable();
345 } else if (can_read) {
346 controller->OnFdReadable();
347 }
348 }
349
HandleWakeUp()350 void MessagePumpEpoll::HandleWakeUp() {
351 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
352 uint64_t value;
353 ssize_t n = HANDLE_EINTR(read(wake_event_.get(), &value, sizeof(value)));
354 DPCHECK(n == sizeof(value));
355 }
356
EpollEventEntry(int fd)357 MessagePumpEpoll::EpollEventEntry::EpollEventEntry(int fd) : fd(fd) {}
358
~EpollEventEntry()359 MessagePumpEpoll::EpollEventEntry::~EpollEventEntry() {
360 if (active_event) {
361 DCHECK_EQ(this, active_event->data.ptr);
362 active_event->data.ptr = nullptr;
363 }
364 }
365
ComputeActiveEvents()366 uint32_t MessagePumpEpoll::EpollEventEntry::ComputeActiveEvents() {
367 uint32_t events = 0;
368 bool one_shot = true;
369 for (const auto& interest : interests) {
370 if (!interest->active()) {
371 continue;
372 }
373 const InterestParams& params = interest->params();
374 events |= (params.read ? EPOLLIN : 0) | (params.write ? EPOLLOUT : 0);
375 one_shot &= params.one_shot;
376 }
377 if (events != 0 && one_shot) {
378 return events | EPOLLONESHOT;
379 }
380 return events;
381 }
382
383 } // namespace base
384