1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_loop/message_pump_kqueue.h"
6
7 #include <sys/errno.h>
8
9 #include <atomic>
10
11 #include "base/auto_reset.h"
12 #include "base/feature_list.h"
13 #include "base/logging.h"
14 #include "base/mac/mac_util.h"
15 #include "base/mac/mach_logging.h"
16 #include "base/mac/scoped_nsautorelease_pool.h"
17 #include "base/notreached.h"
18 #include "base/posix/eintr_wrapper.h"
19 #include "base/time/time_override.h"
20
21 namespace base {
22
23 namespace {
24
25 // Under this feature a simplified version of the Run() function is used. It
26 // improves legibility and avoids some calls to kevent64(). Remove once
27 // crbug.com/1200141 is resolved.
28 BASE_FEATURE(kUseSimplifiedMessagePumpKqueueLoop,
29 "UseSimplifiedMessagePumpKqueueLoop",
30 base::FEATURE_DISABLED_BY_DEFAULT);
31
32 // Caches the state of the "UseSimplifiedMessagePumpKqueueLoop".
33 std::atomic_bool g_use_simplified_version = false;
34
35 #if DCHECK_IS_ON()
36 // Prior to macOS 10.14, kqueue timers may spuriously wake up, because earlier
37 // wake ups race with timer resets in the kernel. As of macOS 10.14, updating a
38 // timer from the thread that reads the kqueue does not cause spurious wakeups.
39 // Note that updating a kqueue timer from one thread while another thread is
40 // waiting in a kevent64 invocation is still (inherently) racy.
KqueueTimersSpuriouslyWakeUp()41 bool KqueueTimersSpuriouslyWakeUp() {
42 #if BUILDFLAG(IS_MAC)
43 static const bool kqueue_timers_spuriously_wakeup = mac::IsAtMostOS10_13();
44 return kqueue_timers_spuriously_wakeup;
45 #else
46 // This still happens on iOS15.
47 return true;
48 #endif
49 }
50 #endif
51
ChangeOneEvent(const ScopedFD & kqueue,kevent64_s * event)52 int ChangeOneEvent(const ScopedFD& kqueue, kevent64_s* event) {
53 return HANDLE_EINTR(kevent64(kqueue.get(), event, 1, nullptr, 0, 0, nullptr));
54 }
55
56 } // namespace
57
FdWatchController(const Location & from_here)58 MessagePumpKqueue::FdWatchController::FdWatchController(
59 const Location& from_here)
60 : FdWatchControllerInterface(from_here) {}
61
~FdWatchController()62 MessagePumpKqueue::FdWatchController::~FdWatchController() {
63 StopWatchingFileDescriptor();
64 }
65
StopWatchingFileDescriptor()66 bool MessagePumpKqueue::FdWatchController::StopWatchingFileDescriptor() {
67 if (!pump_)
68 return true;
69 return pump_->StopWatchingFileDescriptor(this);
70 }
71
Init(WeakPtr<MessagePumpKqueue> pump,int fd,int mode,FdWatcher * watcher)72 void MessagePumpKqueue::FdWatchController::Init(WeakPtr<MessagePumpKqueue> pump,
73 int fd,
74 int mode,
75 FdWatcher* watcher) {
76 DCHECK_NE(fd, -1);
77 DCHECK(!watcher_);
78 DCHECK(watcher);
79 DCHECK(pump);
80 fd_ = fd;
81 mode_ = mode;
82 watcher_ = watcher;
83 pump_ = pump;
84 }
85
Reset()86 void MessagePumpKqueue::FdWatchController::Reset() {
87 fd_ = -1;
88 mode_ = 0;
89 watcher_ = nullptr;
90 pump_ = nullptr;
91 }
92
MachPortWatchController(const Location & from_here)93 MessagePumpKqueue::MachPortWatchController::MachPortWatchController(
94 const Location& from_here)
95 : from_here_(from_here) {}
96
~MachPortWatchController()97 MessagePumpKqueue::MachPortWatchController::~MachPortWatchController() {
98 StopWatchingMachPort();
99 }
100
StopWatchingMachPort()101 bool MessagePumpKqueue::MachPortWatchController::StopWatchingMachPort() {
102 if (!pump_)
103 return true;
104 return pump_->StopWatchingMachPort(this);
105 }
106
Init(WeakPtr<MessagePumpKqueue> pump,mach_port_t port,MachPortWatcher * watcher)107 void MessagePumpKqueue::MachPortWatchController::Init(
108 WeakPtr<MessagePumpKqueue> pump,
109 mach_port_t port,
110 MachPortWatcher* watcher) {
111 DCHECK(!watcher_);
112 DCHECK(watcher);
113 DCHECK(pump);
114 port_ = port;
115 watcher_ = watcher;
116 pump_ = pump;
117 }
118
Reset()119 void MessagePumpKqueue::MachPortWatchController::Reset() {
120 port_ = MACH_PORT_NULL;
121 watcher_ = nullptr;
122 pump_ = nullptr;
123 }
124
MessagePumpKqueue()125 MessagePumpKqueue::MessagePumpKqueue()
126 : kqueue_(kqueue()), weak_factory_(this) {
127 PCHECK(kqueue_.is_valid()) << "kqueue";
128
129 // Create a Mach port that will be used to wake up the pump by sending
130 // a message in response to ScheduleWork(). This is significantly faster than
131 // using an EVFILT_USER event, especially when triggered across threads.
132 kern_return_t kr = mach_port_allocate(
133 mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
134 base::mac::ScopedMachReceiveRight::Receiver(wakeup_).get());
135 MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate";
136
137 // Configure the event to directly receive the Mach message as part of the
138 // kevent64() call.
139 kevent64_s event{};
140 event.ident = wakeup_.get();
141 event.filter = EVFILT_MACHPORT;
142 event.flags = EV_ADD;
143 event.fflags = MACH_RCV_MSG;
144 event.ext[0] = reinterpret_cast<uint64_t>(&wakeup_buffer_);
145 event.ext[1] = sizeof(wakeup_buffer_);
146
147 int rv = ChangeOneEvent(kqueue_, &event);
148 PCHECK(rv == 0) << "kevent64";
149 }
150
~MessagePumpKqueue()151 MessagePumpKqueue::~MessagePumpKqueue() {}
152
InitializeFeatures()153 void MessagePumpKqueue::InitializeFeatures() {
154 g_use_simplified_version.store(
155 base::FeatureList::IsEnabled(kUseSimplifiedMessagePumpKqueueLoop),
156 std::memory_order_relaxed);
157 }
158
Run(Delegate * delegate)159 void MessagePumpKqueue::Run(Delegate* delegate) {
160 AutoReset<bool> reset_keep_running(&keep_running_, true);
161
162 if (g_use_simplified_version.load(std::memory_order_relaxed)) {
163 RunSimplified(delegate);
164 } else {
165 while (keep_running_) {
166 mac::ScopedNSAutoreleasePool pool;
167
168 bool do_more_work = DoInternalWork(delegate, nullptr);
169 if (!keep_running_)
170 break;
171
172 Delegate::NextWorkInfo next_work_info = delegate->DoWork();
173 do_more_work |= next_work_info.is_immediate();
174 if (!keep_running_)
175 break;
176
177 if (do_more_work)
178 continue;
179
180 do_more_work |= delegate->DoIdleWork();
181 if (!keep_running_)
182 break;
183
184 if (do_more_work)
185 continue;
186
187 DoInternalWork(delegate, &next_work_info);
188 }
189 }
190 }
191
RunSimplified(Delegate * delegate)192 void MessagePumpKqueue::RunSimplified(Delegate* delegate) {
193 // Look for native work once before the loop starts. Without this call the
194 // loop would break without checking native work even once in cases where
195 // QuitWhenIdle was used. This is sometimes the case in tests.
196 DoInternalWork(delegate, nullptr);
197
198 while (keep_running_) {
199 mac::ScopedNSAutoreleasePool pool;
200
201 Delegate::NextWorkInfo next_work_info = delegate->DoWork();
202 if (!keep_running_)
203 break;
204
205 if (!next_work_info.is_immediate()) {
206 delegate->DoIdleWork();
207 }
208 if (!keep_running_)
209 break;
210
211 DoInternalWork(delegate, &next_work_info);
212 }
213 }
214
Quit()215 void MessagePumpKqueue::Quit() {
216 keep_running_ = false;
217 ScheduleWork();
218 }
219
ScheduleWork()220 void MessagePumpKqueue::ScheduleWork() {
221 mach_msg_empty_send_t message{};
222 message.header.msgh_size = sizeof(message);
223 message.header.msgh_bits =
224 MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE);
225 message.header.msgh_remote_port = wakeup_.get();
226 kern_return_t kr = mach_msg_send(&message.header);
227 if (kr != KERN_SUCCESS) {
228 // If ScheduleWork() is being called by other threads faster than the pump
229 // can dispatch work, the kernel message queue for the wakeup port can fill
230 // up (this happens under base_perftests, for example). The kernel does
231 // return a SEND_ONCE right in the case of failure, which must be destroyed
232 // to avoid leaking.
233 MACH_DLOG_IF(ERROR, (kr & ~MACH_MSG_IPC_SPACE) != MACH_SEND_NO_BUFFER, kr)
234 << "mach_msg_send";
235 mach_msg_destroy(&message.header);
236 }
237 }
238
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)239 void MessagePumpKqueue::ScheduleDelayedWork(
240 const Delegate::NextWorkInfo& next_work_info) {
241 // Nothing to do. This MessagePump uses DoWork().
242 }
243
WatchMachReceivePort(mach_port_t port,MachPortWatchController * controller,MachPortWatcher * delegate)244 bool MessagePumpKqueue::WatchMachReceivePort(
245 mach_port_t port,
246 MachPortWatchController* controller,
247 MachPortWatcher* delegate) {
248 DCHECK(port != MACH_PORT_NULL);
249 DCHECK(controller);
250 DCHECK(delegate);
251
252 if (controller->port() != MACH_PORT_NULL) {
253 DLOG(ERROR)
254 << "Cannot use the same MachPortWatchController while it is active";
255 return false;
256 }
257
258 kevent64_s event{};
259 event.ident = port;
260 event.filter = EVFILT_MACHPORT;
261 event.flags = EV_ADD;
262 int rv = ChangeOneEvent(kqueue_, &event);
263 if (rv < 0) {
264 DPLOG(ERROR) << "kevent64";
265 return false;
266 }
267 ++event_count_;
268
269 controller->Init(weak_factory_.GetWeakPtr(), port, delegate);
270 port_controllers_.AddWithID(controller, port);
271
272 return true;
273 }
274
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * delegate)275 bool MessagePumpKqueue::WatchFileDescriptor(int fd,
276 bool persistent,
277 int mode,
278 FdWatchController* controller,
279 FdWatcher* delegate) {
280 DCHECK_GE(fd, 0);
281 DCHECK(controller);
282 DCHECK(delegate);
283 DCHECK_NE(mode & Mode::WATCH_READ_WRITE, 0);
284
285 if (controller->fd() != -1 && controller->fd() != fd) {
286 DLOG(ERROR) << "Cannot use the same FdWatchController on two different FDs";
287 return false;
288 }
289 StopWatchingFileDescriptor(controller);
290
291 std::vector<kevent64_s> events;
292
293 kevent64_s base_event{};
294 base_event.ident = static_cast<uint64_t>(fd);
295 base_event.flags = EV_ADD | (!persistent ? EV_ONESHOT : 0);
296
297 if (mode & Mode::WATCH_READ) {
298 base_event.filter = EVFILT_READ;
299 base_event.udata = fd_controllers_.Add(controller);
300 events.push_back(base_event);
301 }
302 if (mode & Mode::WATCH_WRITE) {
303 base_event.filter = EVFILT_WRITE;
304 base_event.udata = fd_controllers_.Add(controller);
305 events.push_back(base_event);
306 }
307
308 int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(),
309 checked_cast<int>(events.size()), nullptr, 0,
310 0, nullptr));
311 if (rv < 0) {
312 DPLOG(ERROR) << "WatchFileDescriptor kevent64";
313 return false;
314 }
315
316 event_count_ += events.size();
317 controller->Init(weak_factory_.GetWeakPtr(), fd, mode, delegate);
318
319 return true;
320 }
321
SetWakeupTimerEvent(const base::TimeTicks & wakeup_time,kevent64_s * timer_event)322 void MessagePumpKqueue::SetWakeupTimerEvent(const base::TimeTicks& wakeup_time,
323 kevent64_s* timer_event) {
324 // The ident of the wakeup timer. There's only the one timer as the pair
325 // (ident, filter) is the identity of the event.
326 constexpr uint64_t kWakeupTimerIdent = 0x0;
327 timer_event->ident = kWakeupTimerIdent;
328 timer_event->filter = EVFILT_TIMER;
329 if (wakeup_time == base::TimeTicks::Max()) {
330 timer_event->flags = EV_DELETE;
331 } else {
332 timer_event->filter = EVFILT_TIMER;
333 // This updates the timer if it already exists in |kqueue_|.
334 timer_event->flags = EV_ADD | EV_ONESHOT;
335
336 // Specify the sleep in microseconds to avoid undersleeping due to
337 // numeric problems. The sleep is computed from TimeTicks::Now rather than
338 // NextWorkInfo::recent_now because recent_now is strictly earlier than
339 // current wall-clock. Using an earlier wall clock time to compute the
340 // delta to the next wakeup wall-clock time would guarantee oversleep.
341 // If wakeup_time is in the past, the delta below will be negative and the
342 // timer is set immediately.
343 timer_event->fflags = NOTE_USECONDS;
344 timer_event->data = (wakeup_time - base::TimeTicks::Now()).InMicroseconds();
345 }
346 }
347
StopWatchingMachPort(MachPortWatchController * controller)348 bool MessagePumpKqueue::StopWatchingMachPort(
349 MachPortWatchController* controller) {
350 mach_port_t port = controller->port();
351 controller->Reset();
352 port_controllers_.Remove(port);
353
354 kevent64_s event{};
355 event.ident = port;
356 event.filter = EVFILT_MACHPORT;
357 event.flags = EV_DELETE;
358 --event_count_;
359 int rv = ChangeOneEvent(kqueue_, &event);
360 if (rv < 0) {
361 DPLOG(ERROR) << "kevent64";
362 return false;
363 }
364
365 return true;
366 }
367
StopWatchingFileDescriptor(FdWatchController * controller)368 bool MessagePumpKqueue::StopWatchingFileDescriptor(
369 FdWatchController* controller) {
370 int fd = controller->fd();
371 int mode = controller->mode();
372 controller->Reset();
373
374 if (fd < 0)
375 return true;
376
377 std::vector<kevent64_s> events;
378
379 kevent64_s base_event{};
380 base_event.ident = static_cast<uint64_t>(fd);
381 base_event.flags = EV_DELETE;
382
383 if (mode & Mode::WATCH_READ) {
384 base_event.filter = EVFILT_READ;
385 events.push_back(base_event);
386 }
387 if (mode & Mode::WATCH_WRITE) {
388 base_event.filter = EVFILT_WRITE;
389 events.push_back(base_event);
390 }
391
392 int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(),
393 checked_cast<int>(events.size()), nullptr, 0,
394 0, nullptr));
395 DPLOG_IF(ERROR, rv < 0) << "StopWatchingFileDescriptor kevent64";
396
397 // The keys for the IDMap aren't recorded anywhere (they're attached to the
398 // kevent object in the kernel), so locate the entries by controller pointer.
399 for (IDMap<FdWatchController*, uint64_t>::iterator it(&fd_controllers_);
400 !it.IsAtEnd(); it.Advance()) {
401 if (it.GetCurrentValue() == controller) {
402 fd_controllers_.Remove(it.GetCurrentKey());
403 }
404 }
405
406 event_count_ -= events.size();
407
408 return rv >= 0;
409 }
410
DoInternalWork(Delegate * delegate,Delegate::NextWorkInfo * next_work_info)411 bool MessagePumpKqueue::DoInternalWork(Delegate* delegate,
412 Delegate::NextWorkInfo* next_work_info) {
413 if (events_.size() < event_count_) {
414 events_.resize(event_count_);
415 }
416
417 bool immediate = next_work_info == nullptr;
418 unsigned int flags = immediate ? KEVENT_FLAG_IMMEDIATE : 0;
419
420 if (!immediate) {
421 MaybeUpdateWakeupTimer(next_work_info->delayed_run_time);
422 DCHECK_EQ(scheduled_wakeup_time_, next_work_info->delayed_run_time);
423 delegate->BeforeWait();
424 }
425
426 int rv =
427 HANDLE_EINTR(kevent64(kqueue_.get(), nullptr, 0, events_.data(),
428 checked_cast<int>(events_.size()), flags, nullptr));
429 if (rv == 0) {
430 // No events to dispatch so no need to call ProcessEvents().
431 return false;
432 }
433
434 PCHECK(rv > 0) << "kevent64";
435 return ProcessEvents(delegate, static_cast<size_t>(rv));
436 }
437
ProcessEvents(Delegate * delegate,size_t count)438 bool MessagePumpKqueue::ProcessEvents(Delegate* delegate, size_t count) {
439 bool did_work = false;
440
441 for (size_t i = 0; i < count; ++i) {
442 auto* event = &events_[i];
443 if (event->filter == EVFILT_READ || event->filter == EVFILT_WRITE) {
444 did_work = true;
445
446 FdWatchController* controller = fd_controllers_.Lookup(event->udata);
447 if (!controller) {
448 // The controller was removed by some other work callout before
449 // this event could be processed.
450 continue;
451 }
452 FdWatcher* fd_watcher = controller->watcher();
453
454 if (event->flags & EV_ONESHOT) {
455 // If this was a one-shot event, the Controller needs to stop tracking
456 // the descriptor, so it is not double-removed when it is told to stop
457 // watching.
458 controller->Reset();
459 fd_controllers_.Remove(event->udata);
460 --event_count_;
461 }
462
463 auto scoped_do_work_item = delegate->BeginWorkItem();
464 // WatchFileDescriptor() originally upcasts event->ident from an int.
465 if (event->filter == EVFILT_READ) {
466 fd_watcher->OnFileCanReadWithoutBlocking(
467 static_cast<int>(event->ident));
468 } else if (event->filter == EVFILT_WRITE) {
469 fd_watcher->OnFileCanWriteWithoutBlocking(
470 static_cast<int>(event->ident));
471 }
472 } else if (event->filter == EVFILT_MACHPORT) {
473 // WatchMachReceivePort() originally sets event->ident from a mach_port_t.
474 mach_port_t port = static_cast<mach_port_t>(event->ident);
475 if (port == wakeup_.get()) {
476 // The wakeup event has been received, do not treat this as "doing
477 // work", this just wakes up the pump.
478 continue;
479 }
480
481 did_work = true;
482
483 MachPortWatchController* controller = port_controllers_.Lookup(port);
484 // The controller could have been removed by some other work callout
485 // before this event could be processed.
486 if (controller) {
487 auto scoped_do_work_item = delegate->BeginWorkItem();
488 controller->watcher()->OnMachMessageReceived(port);
489 }
490 } else if (event->filter == EVFILT_TIMER) {
491 // The wakeup timer fired.
492 #if DCHECK_IS_ON()
493 // On macOS 10.13 and earlier, kqueue timers may spuriously wake up.
494 // When this happens, the timer will be re-scheduled the next time
495 // DoInternalWork is entered, which means this doesn't lead to a
496 // spinning wait.
497 // When clock overrides are active, TimeTicks::Now may be decoupled from
498 // wall-clock time, and can therefore not be used to validate whether the
499 // expected wall-clock time has passed.
500 if (!KqueueTimersSpuriouslyWakeUp() &&
501 !subtle::ScopedTimeClockOverrides::overrides_active()) {
502 // Given the caveats above, assert that the timer didn't fire early.
503 DCHECK_LE(scheduled_wakeup_time_, base::TimeTicks::Now());
504 }
505 #endif
506 DCHECK_NE(scheduled_wakeup_time_, base::TimeTicks::Max());
507 scheduled_wakeup_time_ = base::TimeTicks::Max();
508 --event_count_;
509 } else {
510 NOTREACHED() << "Unexpected event for filter " << event->filter;
511 }
512 }
513
514 return did_work;
515 }
516
MaybeUpdateWakeupTimer(const base::TimeTicks & wakeup_time)517 void MessagePumpKqueue::MaybeUpdateWakeupTimer(
518 const base::TimeTicks& wakeup_time) {
519 if (wakeup_time == scheduled_wakeup_time_) {
520 // No change in the timer setting necessary.
521 return;
522 }
523
524 if (wakeup_time == base::TimeTicks::Max()) {
525 // If the timer was already reset, don't re-reset it on a suspend toggle.
526 if (scheduled_wakeup_time_ != base::TimeTicks::Max()) {
527 // Clear the timer.
528 kevent64_s timer{};
529 SetWakeupTimerEvent(wakeup_time, &timer);
530 int rv = ChangeOneEvent(kqueue_, &timer);
531 PCHECK(rv == 0) << "kevent64, delete timer";
532 --event_count_;
533 }
534 } else {
535 // Set/reset the timer.
536 kevent64_s timer{};
537 SetWakeupTimerEvent(wakeup_time, &timer);
538 int rv = ChangeOneEvent(kqueue_, &timer);
539 PCHECK(rv == 0) << "kevent64, set timer";
540
541 // Bump the event count if we just added the timer.
542 if (scheduled_wakeup_time_ == base::TimeTicks::Max())
543 ++event_count_;
544 }
545
546 scheduled_wakeup_time_ = wakeup_time;
547 }
548
549 } // namespace base
550