1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifdef UNSAFE_BUFFERS_BUILD
6 // TODO(crbug.com/40284755): Remove this and spanify to fix the errors.
7 #pragma allow_unsafe_buffers
8 #endif
9
10 #include "base/message_loop/message_pump_kqueue.h"
11
12 #include <sys/errno.h>
13
14 #include <atomic>
15
16 #include "base/apple/mach_logging.h"
17 #include "base/apple/scoped_nsautorelease_pool.h"
18 #include "base/auto_reset.h"
19 #include "base/feature_list.h"
20 #include "base/logging.h"
21 #include "base/mac/mac_util.h"
22 #include "base/notreached.h"
23 #include "base/posix/eintr_wrapper.h"
24 #include "base/task/task_features.h"
25 #include "base/time/time_override.h"
26
27 namespace base {
28
29 namespace {
30
31 // Under this feature native work is batched. Remove it once crbug.com/1200141
32 // is resolved.
33 BASE_FEATURE(kBatchNativeEventsInMessagePumpKqueue,
34 "BatchNativeEventsInMessagePumpKqueue",
35 base::FEATURE_DISABLED_BY_DEFAULT);
36
37 // Caches the state of the "BatchNativeEventsInMessagePumpKqueue".
38 std::atomic_bool g_use_batched_version = false;
39
40 // Caches the state of the "TimerSlackMac" feature for efficiency.
41 std::atomic_bool g_timer_slack = false;
42
43 #if DCHECK_IS_ON()
44 // Prior to macOS 10.14, kqueue timers may spuriously wake up, because earlier
45 // wake ups race with timer resets in the kernel. As of macOS 10.14, updating a
46 // timer from the thread that reads the kqueue does not cause spurious wakeups.
47 // Note that updating a kqueue timer from one thread while another thread is
48 // waiting in a kevent64 invocation is still (inherently) racy.
KqueueTimersSpuriouslyWakeUp()49 bool KqueueTimersSpuriouslyWakeUp() {
50 #if BUILDFLAG(IS_MAC)
51 return false;
52 #else
53 // This still happens on iOS15.
54 return true;
55 #endif
56 }
57 #endif
58
ChangeOneEvent(const ScopedFD & kqueue,kevent64_s * event)59 int ChangeOneEvent(const ScopedFD& kqueue, kevent64_s* event) {
60 return HANDLE_EINTR(kevent64(kqueue.get(), event, 1, nullptr, 0, 0, nullptr));
61 }
62
63 } // namespace
64
FdWatchController(const Location & from_here)65 MessagePumpKqueue::FdWatchController::FdWatchController(
66 const Location& from_here)
67 : FdWatchControllerInterface(from_here) {}
68
~FdWatchController()69 MessagePumpKqueue::FdWatchController::~FdWatchController() {
70 StopWatchingFileDescriptor();
71 }
72
StopWatchingFileDescriptor()73 bool MessagePumpKqueue::FdWatchController::StopWatchingFileDescriptor() {
74 if (!pump_)
75 return true;
76 return pump_->StopWatchingFileDescriptor(this);
77 }
78
Init(WeakPtr<MessagePumpKqueue> pump,int fd,int mode,FdWatcher * watcher)79 void MessagePumpKqueue::FdWatchController::Init(WeakPtr<MessagePumpKqueue> pump,
80 int fd,
81 int mode,
82 FdWatcher* watcher) {
83 DCHECK_NE(fd, -1);
84 DCHECK(!watcher_);
85 DCHECK(watcher);
86 DCHECK(pump);
87 fd_ = fd;
88 mode_ = mode;
89 watcher_ = watcher;
90 pump_ = pump;
91 }
92
Reset()93 void MessagePumpKqueue::FdWatchController::Reset() {
94 fd_ = -1;
95 mode_ = 0;
96 watcher_ = nullptr;
97 pump_ = nullptr;
98 }
99
MachPortWatchController(const Location & from_here)100 MessagePumpKqueue::MachPortWatchController::MachPortWatchController(
101 const Location& from_here)
102 : from_here_(from_here) {}
103
~MachPortWatchController()104 MessagePumpKqueue::MachPortWatchController::~MachPortWatchController() {
105 StopWatchingMachPort();
106 }
107
StopWatchingMachPort()108 bool MessagePumpKqueue::MachPortWatchController::StopWatchingMachPort() {
109 if (!pump_)
110 return true;
111 return pump_->StopWatchingMachPort(this);
112 }
113
Init(WeakPtr<MessagePumpKqueue> pump,mach_port_t port,MachPortWatcher * watcher)114 void MessagePumpKqueue::MachPortWatchController::Init(
115 WeakPtr<MessagePumpKqueue> pump,
116 mach_port_t port,
117 MachPortWatcher* watcher) {
118 DCHECK(!watcher_);
119 DCHECK(watcher);
120 DCHECK(pump);
121 port_ = port;
122 watcher_ = watcher;
123 pump_ = pump;
124 }
125
Reset()126 void MessagePumpKqueue::MachPortWatchController::Reset() {
127 port_ = MACH_PORT_NULL;
128 watcher_ = nullptr;
129 pump_ = nullptr;
130 }
131
MessagePumpKqueue()132 MessagePumpKqueue::MessagePumpKqueue()
133 : kqueue_(kqueue()), weak_factory_(this) {
134 PCHECK(kqueue_.is_valid()) << "kqueue";
135
136 // Create a Mach port that will be used to wake up the pump by sending
137 // a message in response to ScheduleWork(). This is significantly faster than
138 // using an EVFILT_USER event, especially when triggered across threads.
139 kern_return_t kr = mach_port_allocate(
140 mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
141 base::apple::ScopedMachReceiveRight::Receiver(wakeup_).get());
142 MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate";
143
144 // Configure the event to directly receive the Mach message as part of the
145 // kevent64() call.
146 kevent64_s event{};
147 event.ident = wakeup_.get();
148 event.filter = EVFILT_MACHPORT;
149 event.flags = EV_ADD;
150 event.fflags = MACH_RCV_MSG;
151 event.ext[0] = reinterpret_cast<uint64_t>(&wakeup_buffer_);
152 event.ext[1] = sizeof(wakeup_buffer_);
153
154 int rv = ChangeOneEvent(kqueue_, &event);
155 PCHECK(rv == 0) << "kevent64";
156 }
157
158 MessagePumpKqueue::~MessagePumpKqueue() = default;
159
InitializeFeatures()160 void MessagePumpKqueue::InitializeFeatures() {
161 g_use_batched_version.store(
162 base::FeatureList::IsEnabled(kBatchNativeEventsInMessagePumpKqueue),
163 std::memory_order_relaxed);
164 g_timer_slack.store(FeatureList::IsEnabled(kTimerSlackMac),
165 std::memory_order_relaxed);
166 }
167
Run(Delegate * delegate)168 void MessagePumpKqueue::Run(Delegate* delegate) {
169 AutoReset<bool> reset_keep_running(&keep_running_, true);
170
171 if (g_use_batched_version.load(std::memory_order_relaxed)) {
172 RunBatched(delegate);
173 } else {
174 while (keep_running_) {
175 apple::ScopedNSAutoreleasePool pool;
176
177 bool do_more_work = DoInternalWork(delegate, nullptr);
178 if (!keep_running_)
179 break;
180
181 Delegate::NextWorkInfo next_work_info = delegate->DoWork();
182 do_more_work |= next_work_info.is_immediate();
183 if (!keep_running_)
184 break;
185
186 if (do_more_work)
187 continue;
188
189 delegate->DoIdleWork();
190 if (!keep_running_)
191 break;
192
193 DoInternalWork(delegate, &next_work_info);
194 }
195 }
196 }
197
RunBatched(Delegate * delegate)198 void MessagePumpKqueue::RunBatched(Delegate* delegate) {
199 // Look for native work once before the loop starts. Without this call the
200 // loop would break without checking native work even once in cases where
201 // QuitWhenIdle was used. This is sometimes the case in tests.
202 DoInternalWork(delegate, nullptr);
203
204 while (keep_running_) {
205 apple::ScopedNSAutoreleasePool pool;
206
207 Delegate::NextWorkInfo next_work_info = delegate->DoWork();
208 if (!keep_running_)
209 break;
210
211 if (!next_work_info.is_immediate()) {
212 delegate->DoIdleWork();
213 }
214 if (!keep_running_)
215 break;
216
217 int batch_size = 0;
218 if (DoInternalWork(delegate, &next_work_info)) {
219 // More than one call can be necessary to fully dispatch all available
220 // internal work. Making an effort to dispatch more than the minimum
221 // before moving on to application tasks reduces the overhead of going
222 // through the whole loop. It also more closely mirrors the behavior of
223 // application task execution where tasks are batched. A value of 16 was
224 // chosen via local experimentation showing that is was sufficient to
225 // dispatch all work in roughly 95% of cases.
226 constexpr int kMaxAttempts = 16;
227 while (DoInternalWork(delegate, nullptr) && batch_size < kMaxAttempts) {
228 ++batch_size;
229 }
230 }
231 }
232 }
233
Quit()234 void MessagePumpKqueue::Quit() {
235 keep_running_ = false;
236 ScheduleWork();
237 }
238
ScheduleWork()239 void MessagePumpKqueue::ScheduleWork() {
240 mach_msg_empty_send_t message{};
241 message.header.msgh_size = sizeof(message);
242 message.header.msgh_bits =
243 MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE);
244 message.header.msgh_remote_port = wakeup_.get();
245 kern_return_t kr = mach_msg_send(&message.header);
246 if (kr != KERN_SUCCESS) {
247 // If ScheduleWork() is being called by other threads faster than the pump
248 // can dispatch work, the kernel message queue for the wakeup port can fill
249 // up (this happens under base_perftests, for example). The kernel does
250 // return a SEND_ONCE right in the case of failure, which must be destroyed
251 // to avoid leaking.
252 MACH_DLOG_IF(ERROR, (kr & ~MACH_MSG_IPC_SPACE) != MACH_SEND_NO_BUFFER, kr)
253 << "mach_msg_send";
254 mach_msg_destroy(&message.header);
255 }
256 }
257
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)258 void MessagePumpKqueue::ScheduleDelayedWork(
259 const Delegate::NextWorkInfo& next_work_info) {
260 // Nothing to do. This MessagePump uses DoWork().
261 }
262
WatchMachReceivePort(mach_port_t port,MachPortWatchController * controller,MachPortWatcher * delegate)263 bool MessagePumpKqueue::WatchMachReceivePort(
264 mach_port_t port,
265 MachPortWatchController* controller,
266 MachPortWatcher* delegate) {
267 DCHECK(port != MACH_PORT_NULL);
268 DCHECK(controller);
269 DCHECK(delegate);
270
271 if (controller->port() != MACH_PORT_NULL) {
272 DLOG(ERROR)
273 << "Cannot use the same MachPortWatchController while it is active";
274 return false;
275 }
276
277 kevent64_s event{};
278 event.ident = port;
279 event.filter = EVFILT_MACHPORT;
280 event.flags = EV_ADD;
281 int rv = ChangeOneEvent(kqueue_, &event);
282 if (rv < 0) {
283 DPLOG(ERROR) << "kevent64";
284 return false;
285 }
286 ++event_count_;
287
288 controller->Init(weak_factory_.GetWeakPtr(), port, delegate);
289 port_controllers_.AddWithID(controller, port);
290
291 return true;
292 }
293
AdjustDelayedRunTime(TimeTicks earliest_time,TimeTicks run_time,TimeTicks latest_time)294 TimeTicks MessagePumpKqueue::AdjustDelayedRunTime(TimeTicks earliest_time,
295 TimeTicks run_time,
296 TimeTicks latest_time) {
297 if (GetAlignWakeUpsEnabled() &&
298 g_timer_slack.load(std::memory_order_relaxed)) {
299 return earliest_time;
300 }
301 return MessagePump::AdjustDelayedRunTime(earliest_time, run_time,
302 latest_time);
303 }
304
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * delegate)305 bool MessagePumpKqueue::WatchFileDescriptor(int fd,
306 bool persistent,
307 int mode,
308 FdWatchController* controller,
309 FdWatcher* delegate) {
310 DCHECK_GE(fd, 0);
311 DCHECK(controller);
312 DCHECK(delegate);
313 DCHECK_NE(mode & Mode::WATCH_READ_WRITE, 0);
314
315 if (controller->fd() != -1 && controller->fd() != fd) {
316 DLOG(ERROR) << "Cannot use the same FdWatchController on two different FDs";
317 return false;
318 }
319 StopWatchingFileDescriptor(controller);
320
321 std::vector<kevent64_s> events;
322
323 kevent64_s base_event{};
324 base_event.ident = static_cast<uint64_t>(fd);
325 base_event.flags = EV_ADD | (!persistent ? EV_ONESHOT : 0);
326
327 if (mode & Mode::WATCH_READ) {
328 base_event.filter = EVFILT_READ;
329 base_event.udata = fd_controllers_.Add(controller);
330 events.push_back(base_event);
331 }
332 if (mode & Mode::WATCH_WRITE) {
333 base_event.filter = EVFILT_WRITE;
334 base_event.udata = fd_controllers_.Add(controller);
335 events.push_back(base_event);
336 }
337
338 int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(),
339 checked_cast<int>(events.size()), nullptr, 0,
340 0, nullptr));
341 if (rv < 0) {
342 DPLOG(ERROR) << "WatchFileDescriptor kevent64";
343 return false;
344 }
345
346 event_count_ += events.size();
347 controller->Init(weak_factory_.GetWeakPtr(), fd, mode, delegate);
348
349 return true;
350 }
351
SetWakeupTimerEvent(const base::TimeTicks & wakeup_time,base::TimeDelta leeway,kevent64_s * timer_event)352 void MessagePumpKqueue::SetWakeupTimerEvent(const base::TimeTicks& wakeup_time,
353 base::TimeDelta leeway,
354 kevent64_s* timer_event) {
355 // The ident of the wakeup timer. There's only the one timer as the pair
356 // (ident, filter) is the identity of the event.
357 constexpr uint64_t kWakeupTimerIdent = 0x0;
358 timer_event->ident = kWakeupTimerIdent;
359 timer_event->filter = EVFILT_TIMER;
360 if (wakeup_time == base::TimeTicks::Max()) {
361 timer_event->flags = EV_DELETE;
362 } else {
363 timer_event->filter = EVFILT_TIMER;
364 // This updates the timer if it already exists in |kqueue_|.
365 timer_event->flags = EV_ADD | EV_ONESHOT;
366
367 // Specify the sleep in microseconds to avoid undersleeping due to
368 // numeric problems. The sleep is computed from TimeTicks::Now rather than
369 // NextWorkInfo::recent_now because recent_now is strictly earlier than
370 // current wall-clock. Using an earlier wall clock time to compute the
371 // delta to the next wakeup wall-clock time would guarantee oversleep.
372 // If wakeup_time is in the past, the delta below will be negative and the
373 // timer is set immediately.
374 timer_event->fflags = NOTE_USECONDS;
375 timer_event->data = (wakeup_time - base::TimeTicks::Now()).InMicroseconds();
376
377 if (!leeway.is_zero() && g_timer_slack.load(std::memory_order_relaxed)) {
378 // Specify slack based on |leeway|.
379 // See "man kqueue" in recent macOSen for documentation.
380 timer_event->fflags |= NOTE_LEEWAY;
381 timer_event->ext[1] = static_cast<uint64_t>(leeway.InMicroseconds());
382 }
383 }
384 }
385
StopWatchingMachPort(MachPortWatchController * controller)386 bool MessagePumpKqueue::StopWatchingMachPort(
387 MachPortWatchController* controller) {
388 mach_port_t port = controller->port();
389 controller->Reset();
390 port_controllers_.Remove(port);
391
392 kevent64_s event{};
393 event.ident = port;
394 event.filter = EVFILT_MACHPORT;
395 event.flags = EV_DELETE;
396 --event_count_;
397 int rv = ChangeOneEvent(kqueue_, &event);
398 if (rv < 0) {
399 DPLOG(ERROR) << "kevent64";
400 return false;
401 }
402
403 return true;
404 }
405
StopWatchingFileDescriptor(FdWatchController * controller)406 bool MessagePumpKqueue::StopWatchingFileDescriptor(
407 FdWatchController* controller) {
408 int fd = controller->fd();
409 int mode = controller->mode();
410 controller->Reset();
411
412 if (fd < 0)
413 return true;
414
415 std::vector<kevent64_s> events;
416
417 kevent64_s base_event{};
418 base_event.ident = static_cast<uint64_t>(fd);
419 base_event.flags = EV_DELETE;
420
421 if (mode & Mode::WATCH_READ) {
422 base_event.filter = EVFILT_READ;
423 events.push_back(base_event);
424 }
425 if (mode & Mode::WATCH_WRITE) {
426 base_event.filter = EVFILT_WRITE;
427 events.push_back(base_event);
428 }
429
430 int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(),
431 checked_cast<int>(events.size()), nullptr, 0,
432 0, nullptr));
433 DPLOG_IF(ERROR, rv < 0) << "StopWatchingFileDescriptor kevent64";
434
435 // The keys for the IDMap aren't recorded anywhere (they're attached to the
436 // kevent object in the kernel), so locate the entries by controller pointer.
437 for (IDMap<FdWatchController*, uint64_t>::iterator it(&fd_controllers_);
438 !it.IsAtEnd(); it.Advance()) {
439 if (it.GetCurrentValue() == controller) {
440 fd_controllers_.Remove(it.GetCurrentKey());
441 }
442 }
443
444 event_count_ -= events.size();
445
446 return rv >= 0;
447 }
448
DoInternalWork(Delegate * delegate,Delegate::NextWorkInfo * next_work_info)449 bool MessagePumpKqueue::DoInternalWork(Delegate* delegate,
450 Delegate::NextWorkInfo* next_work_info) {
451 if (events_.size() < event_count_) {
452 events_.resize(event_count_);
453 }
454
455 bool immediate = next_work_info == nullptr;
456 unsigned int flags = immediate ? KEVENT_FLAG_IMMEDIATE : 0;
457
458 if (!immediate) {
459 MaybeUpdateWakeupTimer(next_work_info->delayed_run_time,
460 next_work_info->leeway);
461 DCHECK_EQ(scheduled_wakeup_time_, next_work_info->delayed_run_time);
462 delegate->BeforeWait();
463 }
464
465 int rv =
466 HANDLE_EINTR(kevent64(kqueue_.get(), nullptr, 0, events_.data(),
467 checked_cast<int>(events_.size()), flags, nullptr));
468 if (rv == 0) {
469 // No events to dispatch so no need to call ProcessEvents().
470 return false;
471 }
472
473 PCHECK(rv > 0) << "kevent64";
474 return ProcessEvents(delegate, static_cast<size_t>(rv));
475 }
476
ProcessEvents(Delegate * delegate,size_t count)477 bool MessagePumpKqueue::ProcessEvents(Delegate* delegate, size_t count) {
478 bool did_work = false;
479
480 delegate->BeginNativeWorkBeforeDoWork();
481 for (size_t i = 0; i < count; ++i) {
482 auto* event = &events_[i];
483 if (event->filter == EVFILT_READ || event->filter == EVFILT_WRITE) {
484 did_work = true;
485
486 FdWatchController* controller = fd_controllers_.Lookup(event->udata);
487 if (!controller) {
488 // The controller was removed by some other work callout before
489 // this event could be processed.
490 continue;
491 }
492 FdWatcher* fd_watcher = controller->watcher();
493
494 if (event->flags & EV_ONESHOT) {
495 // If this was a one-shot event, the Controller needs to stop tracking
496 // the descriptor, so it is not double-removed when it is told to stop
497 // watching.
498 controller->Reset();
499 fd_controllers_.Remove(event->udata);
500 --event_count_;
501 }
502
503 if (fd_watcher) {
504 auto scoped_do_work_item = delegate->BeginWorkItem();
505 // WatchFileDescriptor() originally upcasts event->ident from an int.
506 if (event->filter == EVFILT_READ) {
507 fd_watcher->OnFileCanReadWithoutBlocking(
508 static_cast<int>(event->ident));
509 } else if (event->filter == EVFILT_WRITE) {
510 fd_watcher->OnFileCanWriteWithoutBlocking(
511 static_cast<int>(event->ident));
512 }
513 }
514 } else if (event->filter == EVFILT_MACHPORT) {
515 // WatchMachReceivePort() originally sets event->ident from a mach_port_t.
516 mach_port_t port = static_cast<mach_port_t>(event->ident);
517 if (port == wakeup_.get()) {
518 // The wakeup event has been received, do not treat this as "doing
519 // work", this just wakes up the pump.
520 continue;
521 }
522
523 did_work = true;
524
525 MachPortWatchController* controller = port_controllers_.Lookup(port);
526 // The controller could have been removed by some other work callout
527 // before this event could be processed.
528 if (controller) {
529 auto scoped_do_work_item = delegate->BeginWorkItem();
530 controller->watcher()->OnMachMessageReceived(port);
531 }
532 } else if (event->filter == EVFILT_TIMER) {
533 // The wakeup timer fired.
534 #if DCHECK_IS_ON()
535 // On macOS 10.13 and earlier, kqueue timers may spuriously wake up.
536 // When this happens, the timer will be re-scheduled the next time
537 // DoInternalWork is entered, which means this doesn't lead to a
538 // spinning wait.
539 // When clock overrides are active, TimeTicks::Now may be decoupled from
540 // wall-clock time, and can therefore not be used to validate whether the
541 // expected wall-clock time has passed.
542 if (!KqueueTimersSpuriouslyWakeUp() &&
543 !subtle::ScopedTimeClockOverrides::overrides_active()) {
544 // Given the caveats above, assert that the timer didn't fire early.
545 DCHECK_LE(scheduled_wakeup_time_, base::TimeTicks::Now());
546 }
547 #endif
548 DCHECK_NE(scheduled_wakeup_time_, base::TimeTicks::Max());
549 scheduled_wakeup_time_ = base::TimeTicks::Max();
550 --event_count_;
551 } else {
552 NOTREACHED() << "Unexpected event for filter " << event->filter;
553 }
554 }
555
556 return did_work;
557 }
558
MaybeUpdateWakeupTimer(const base::TimeTicks & wakeup_time,base::TimeDelta leeway)559 void MessagePumpKqueue::MaybeUpdateWakeupTimer(
560 const base::TimeTicks& wakeup_time,
561 base::TimeDelta leeway) {
562 if (wakeup_time == scheduled_wakeup_time_) {
563 // No change in the timer setting necessary.
564 return;
565 }
566
567 if (wakeup_time == base::TimeTicks::Max()) {
568 // If the timer was already reset, don't re-reset it on a suspend toggle.
569 if (scheduled_wakeup_time_ != base::TimeTicks::Max()) {
570 // Clear the timer.
571 kevent64_s timer{};
572 SetWakeupTimerEvent(wakeup_time, leeway, &timer);
573 int rv = ChangeOneEvent(kqueue_, &timer);
574 PCHECK(rv == 0) << "kevent64, delete timer";
575 --event_count_;
576 }
577 } else {
578 // Set/reset the timer.
579 kevent64_s timer{};
580 SetWakeupTimerEvent(wakeup_time, leeway, &timer);
581 int rv = ChangeOneEvent(kqueue_, &timer);
582 PCHECK(rv == 0) << "kevent64, set timer";
583
584 // Bump the event count if we just added the timer.
585 if (scheduled_wakeup_time_ == base::TimeTicks::Max())
586 ++event_count_;
587 }
588
589 scheduled_wakeup_time_ = wakeup_time;
590 }
591
592 } // namespace base
593