• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_kqueue.h"
6 
7 #include <sys/errno.h>
8 
9 #include <atomic>
10 
11 #include "base/auto_reset.h"
12 #include "base/feature_list.h"
13 #include "base/logging.h"
14 #include "base/mac/mac_util.h"
15 #include "base/mac/mach_logging.h"
16 #include "base/mac/scoped_nsautorelease_pool.h"
17 #include "base/notreached.h"
18 #include "base/posix/eintr_wrapper.h"
19 #include "base/time/time_override.h"
20 
21 namespace base {
22 
23 namespace {
24 
25 // Under this feature a simplified version of the Run() function is used. It
26 // improves legibility and avoids some calls to kevent64(). Remove once
27 // crbug.com/1200141 is resolved.
28 BASE_FEATURE(kUseSimplifiedMessagePumpKqueueLoop,
29              "UseSimplifiedMessagePumpKqueueLoop",
30              base::FEATURE_DISABLED_BY_DEFAULT);
31 
32 // Caches the state of the "UseSimplifiedMessagePumpKqueueLoop".
33 std::atomic_bool g_use_simplified_version = false;
34 
35 #if DCHECK_IS_ON()
36 // Prior to macOS 10.14, kqueue timers may spuriously wake up, because earlier
37 // wake ups race with timer resets in the kernel. As of macOS 10.14, updating a
38 // timer from the thread that reads the kqueue does not cause spurious wakeups.
39 // Note that updating a kqueue timer from one thread while another thread is
40 // waiting in a kevent64 invocation is still (inherently) racy.
KqueueTimersSpuriouslyWakeUp()41 bool KqueueTimersSpuriouslyWakeUp() {
42 #if BUILDFLAG(IS_MAC)
43   static const bool kqueue_timers_spuriously_wakeup = mac::IsAtMostOS10_13();
44   return kqueue_timers_spuriously_wakeup;
45 #else
46   // This still happens on iOS15.
47   return true;
48 #endif
49 }
50 #endif
51 
ChangeOneEvent(const ScopedFD & kqueue,kevent64_s * event)52 int ChangeOneEvent(const ScopedFD& kqueue, kevent64_s* event) {
53   return HANDLE_EINTR(kevent64(kqueue.get(), event, 1, nullptr, 0, 0, nullptr));
54 }
55 
56 }  // namespace
57 
FdWatchController(const Location & from_here)58 MessagePumpKqueue::FdWatchController::FdWatchController(
59     const Location& from_here)
60     : FdWatchControllerInterface(from_here) {}
61 
~FdWatchController()62 MessagePumpKqueue::FdWatchController::~FdWatchController() {
63   StopWatchingFileDescriptor();
64 }
65 
StopWatchingFileDescriptor()66 bool MessagePumpKqueue::FdWatchController::StopWatchingFileDescriptor() {
67   if (!pump_)
68     return true;
69   return pump_->StopWatchingFileDescriptor(this);
70 }
71 
Init(WeakPtr<MessagePumpKqueue> pump,int fd,int mode,FdWatcher * watcher)72 void MessagePumpKqueue::FdWatchController::Init(WeakPtr<MessagePumpKqueue> pump,
73                                                 int fd,
74                                                 int mode,
75                                                 FdWatcher* watcher) {
76   DCHECK_NE(fd, -1);
77   DCHECK(!watcher_);
78   DCHECK(watcher);
79   DCHECK(pump);
80   fd_ = fd;
81   mode_ = mode;
82   watcher_ = watcher;
83   pump_ = pump;
84 }
85 
Reset()86 void MessagePumpKqueue::FdWatchController::Reset() {
87   fd_ = -1;
88   mode_ = 0;
89   watcher_ = nullptr;
90   pump_ = nullptr;
91 }
92 
MachPortWatchController(const Location & from_here)93 MessagePumpKqueue::MachPortWatchController::MachPortWatchController(
94     const Location& from_here)
95     : from_here_(from_here) {}
96 
~MachPortWatchController()97 MessagePumpKqueue::MachPortWatchController::~MachPortWatchController() {
98   StopWatchingMachPort();
99 }
100 
StopWatchingMachPort()101 bool MessagePumpKqueue::MachPortWatchController::StopWatchingMachPort() {
102   if (!pump_)
103     return true;
104   return pump_->StopWatchingMachPort(this);
105 }
106 
Init(WeakPtr<MessagePumpKqueue> pump,mach_port_t port,MachPortWatcher * watcher)107 void MessagePumpKqueue::MachPortWatchController::Init(
108     WeakPtr<MessagePumpKqueue> pump,
109     mach_port_t port,
110     MachPortWatcher* watcher) {
111   DCHECK(!watcher_);
112   DCHECK(watcher);
113   DCHECK(pump);
114   port_ = port;
115   watcher_ = watcher;
116   pump_ = pump;
117 }
118 
Reset()119 void MessagePumpKqueue::MachPortWatchController::Reset() {
120   port_ = MACH_PORT_NULL;
121   watcher_ = nullptr;
122   pump_ = nullptr;
123 }
124 
MessagePumpKqueue()125 MessagePumpKqueue::MessagePumpKqueue()
126     : kqueue_(kqueue()), weak_factory_(this) {
127   PCHECK(kqueue_.is_valid()) << "kqueue";
128 
129   // Create a Mach port that will be used to wake up the pump by sending
130   // a message in response to ScheduleWork(). This is significantly faster than
131   // using an EVFILT_USER event, especially when triggered across threads.
132   kern_return_t kr = mach_port_allocate(
133       mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
134       base::mac::ScopedMachReceiveRight::Receiver(wakeup_).get());
135   MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate";
136 
137   // Configure the event to directly receive the Mach message as part of the
138   // kevent64() call.
139   kevent64_s event{};
140   event.ident = wakeup_.get();
141   event.filter = EVFILT_MACHPORT;
142   event.flags = EV_ADD;
143   event.fflags = MACH_RCV_MSG;
144   event.ext[0] = reinterpret_cast<uint64_t>(&wakeup_buffer_);
145   event.ext[1] = sizeof(wakeup_buffer_);
146 
147   int rv = ChangeOneEvent(kqueue_, &event);
148   PCHECK(rv == 0) << "kevent64";
149 }
150 
~MessagePumpKqueue()151 MessagePumpKqueue::~MessagePumpKqueue() {}
152 
InitializeFeatures()153 void MessagePumpKqueue::InitializeFeatures() {
154   g_use_simplified_version.store(
155       base::FeatureList::IsEnabled(kUseSimplifiedMessagePumpKqueueLoop),
156       std::memory_order_relaxed);
157 }
158 
Run(Delegate * delegate)159 void MessagePumpKqueue::Run(Delegate* delegate) {
160   AutoReset<bool> reset_keep_running(&keep_running_, true);
161 
162   if (g_use_simplified_version.load(std::memory_order_relaxed)) {
163     RunSimplified(delegate);
164   } else {
165     while (keep_running_) {
166       mac::ScopedNSAutoreleasePool pool;
167 
168       bool do_more_work = DoInternalWork(delegate, nullptr);
169       if (!keep_running_)
170         break;
171 
172       Delegate::NextWorkInfo next_work_info = delegate->DoWork();
173       do_more_work |= next_work_info.is_immediate();
174       if (!keep_running_)
175         break;
176 
177       if (do_more_work)
178         continue;
179 
180       do_more_work |= delegate->DoIdleWork();
181       if (!keep_running_)
182         break;
183 
184       if (do_more_work)
185         continue;
186 
187       DoInternalWork(delegate, &next_work_info);
188     }
189   }
190 }
191 
RunSimplified(Delegate * delegate)192 void MessagePumpKqueue::RunSimplified(Delegate* delegate) {
193   // Look for native work once before the loop starts. Without this call the
194   // loop would break without checking native work even once in cases where
195   // QuitWhenIdle was used. This is sometimes the case in tests.
196   DoInternalWork(delegate, nullptr);
197 
198   while (keep_running_) {
199     mac::ScopedNSAutoreleasePool pool;
200 
201     Delegate::NextWorkInfo next_work_info = delegate->DoWork();
202     if (!keep_running_)
203       break;
204 
205     if (!next_work_info.is_immediate()) {
206       delegate->DoIdleWork();
207     }
208     if (!keep_running_)
209       break;
210 
211     DoInternalWork(delegate, &next_work_info);
212   }
213 }
214 
Quit()215 void MessagePumpKqueue::Quit() {
216   keep_running_ = false;
217   ScheduleWork();
218 }
219 
ScheduleWork()220 void MessagePumpKqueue::ScheduleWork() {
221   mach_msg_empty_send_t message{};
222   message.header.msgh_size = sizeof(message);
223   message.header.msgh_bits =
224       MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE);
225   message.header.msgh_remote_port = wakeup_.get();
226   kern_return_t kr = mach_msg_send(&message.header);
227   if (kr != KERN_SUCCESS) {
228     // If ScheduleWork() is being called by other threads faster than the pump
229     // can dispatch work, the kernel message queue for the wakeup port can fill
230     // up (this happens under base_perftests, for example). The kernel does
231     // return a SEND_ONCE right in the case of failure, which must be destroyed
232     // to avoid leaking.
233     MACH_DLOG_IF(ERROR, (kr & ~MACH_MSG_IPC_SPACE) != MACH_SEND_NO_BUFFER, kr)
234         << "mach_msg_send";
235     mach_msg_destroy(&message.header);
236   }
237 }
238 
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)239 void MessagePumpKqueue::ScheduleDelayedWork(
240     const Delegate::NextWorkInfo& next_work_info) {
241   // Nothing to do. This MessagePump uses DoWork().
242 }
243 
WatchMachReceivePort(mach_port_t port,MachPortWatchController * controller,MachPortWatcher * delegate)244 bool MessagePumpKqueue::WatchMachReceivePort(
245     mach_port_t port,
246     MachPortWatchController* controller,
247     MachPortWatcher* delegate) {
248   DCHECK(port != MACH_PORT_NULL);
249   DCHECK(controller);
250   DCHECK(delegate);
251 
252   if (controller->port() != MACH_PORT_NULL) {
253     DLOG(ERROR)
254         << "Cannot use the same MachPortWatchController while it is active";
255     return false;
256   }
257 
258   kevent64_s event{};
259   event.ident = port;
260   event.filter = EVFILT_MACHPORT;
261   event.flags = EV_ADD;
262   int rv = ChangeOneEvent(kqueue_, &event);
263   if (rv < 0) {
264     DPLOG(ERROR) << "kevent64";
265     return false;
266   }
267   ++event_count_;
268 
269   controller->Init(weak_factory_.GetWeakPtr(), port, delegate);
270   port_controllers_.AddWithID(controller, port);
271 
272   return true;
273 }
274 
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * delegate)275 bool MessagePumpKqueue::WatchFileDescriptor(int fd,
276                                             bool persistent,
277                                             int mode,
278                                             FdWatchController* controller,
279                                             FdWatcher* delegate) {
280   DCHECK_GE(fd, 0);
281   DCHECK(controller);
282   DCHECK(delegate);
283   DCHECK_NE(mode & Mode::WATCH_READ_WRITE, 0);
284 
285   if (controller->fd() != -1 && controller->fd() != fd) {
286     DLOG(ERROR) << "Cannot use the same FdWatchController on two different FDs";
287     return false;
288   }
289   StopWatchingFileDescriptor(controller);
290 
291   std::vector<kevent64_s> events;
292 
293   kevent64_s base_event{};
294   base_event.ident = static_cast<uint64_t>(fd);
295   base_event.flags = EV_ADD | (!persistent ? EV_ONESHOT : 0);
296 
297   if (mode & Mode::WATCH_READ) {
298     base_event.filter = EVFILT_READ;
299     base_event.udata = fd_controllers_.Add(controller);
300     events.push_back(base_event);
301   }
302   if (mode & Mode::WATCH_WRITE) {
303     base_event.filter = EVFILT_WRITE;
304     base_event.udata = fd_controllers_.Add(controller);
305     events.push_back(base_event);
306   }
307 
308   int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(),
309                                  checked_cast<int>(events.size()), nullptr, 0,
310                                  0, nullptr));
311   if (rv < 0) {
312     DPLOG(ERROR) << "WatchFileDescriptor kevent64";
313     return false;
314   }
315 
316   event_count_ += events.size();
317   controller->Init(weak_factory_.GetWeakPtr(), fd, mode, delegate);
318 
319   return true;
320 }
321 
SetWakeupTimerEvent(const base::TimeTicks & wakeup_time,kevent64_s * timer_event)322 void MessagePumpKqueue::SetWakeupTimerEvent(const base::TimeTicks& wakeup_time,
323                                             kevent64_s* timer_event) {
324   // The ident of the wakeup timer. There's only the one timer as the pair
325   // (ident, filter) is the identity of the event.
326   constexpr uint64_t kWakeupTimerIdent = 0x0;
327   timer_event->ident = kWakeupTimerIdent;
328   timer_event->filter = EVFILT_TIMER;
329   if (wakeup_time == base::TimeTicks::Max()) {
330     timer_event->flags = EV_DELETE;
331   } else {
332     timer_event->filter = EVFILT_TIMER;
333     // This updates the timer if it already exists in |kqueue_|.
334     timer_event->flags = EV_ADD | EV_ONESHOT;
335 
336     // Specify the sleep in microseconds to avoid undersleeping due to
337     // numeric problems. The sleep is computed from TimeTicks::Now rather than
338     // NextWorkInfo::recent_now because recent_now is strictly earlier than
339     // current wall-clock. Using an earlier wall clock time  to compute the
340     // delta to the next wakeup wall-clock time would guarantee oversleep.
341     // If wakeup_time is in the past, the delta below will be negative and the
342     // timer is set immediately.
343     timer_event->fflags = NOTE_USECONDS;
344     timer_event->data = (wakeup_time - base::TimeTicks::Now()).InMicroseconds();
345   }
346 }
347 
StopWatchingMachPort(MachPortWatchController * controller)348 bool MessagePumpKqueue::StopWatchingMachPort(
349     MachPortWatchController* controller) {
350   mach_port_t port = controller->port();
351   controller->Reset();
352   port_controllers_.Remove(port);
353 
354   kevent64_s event{};
355   event.ident = port;
356   event.filter = EVFILT_MACHPORT;
357   event.flags = EV_DELETE;
358   --event_count_;
359   int rv = ChangeOneEvent(kqueue_, &event);
360   if (rv < 0) {
361     DPLOG(ERROR) << "kevent64";
362     return false;
363   }
364 
365   return true;
366 }
367 
StopWatchingFileDescriptor(FdWatchController * controller)368 bool MessagePumpKqueue::StopWatchingFileDescriptor(
369     FdWatchController* controller) {
370   int fd = controller->fd();
371   int mode = controller->mode();
372   controller->Reset();
373 
374   if (fd < 0)
375     return true;
376 
377   std::vector<kevent64_s> events;
378 
379   kevent64_s base_event{};
380   base_event.ident = static_cast<uint64_t>(fd);
381   base_event.flags = EV_DELETE;
382 
383   if (mode & Mode::WATCH_READ) {
384     base_event.filter = EVFILT_READ;
385     events.push_back(base_event);
386   }
387   if (mode & Mode::WATCH_WRITE) {
388     base_event.filter = EVFILT_WRITE;
389     events.push_back(base_event);
390   }
391 
392   int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(),
393                                  checked_cast<int>(events.size()), nullptr, 0,
394                                  0, nullptr));
395   DPLOG_IF(ERROR, rv < 0) << "StopWatchingFileDescriptor kevent64";
396 
397   // The keys for the IDMap aren't recorded anywhere (they're attached to the
398   // kevent object in the kernel), so locate the entries by controller pointer.
399   for (IDMap<FdWatchController*, uint64_t>::iterator it(&fd_controllers_);
400        !it.IsAtEnd(); it.Advance()) {
401     if (it.GetCurrentValue() == controller) {
402       fd_controllers_.Remove(it.GetCurrentKey());
403     }
404   }
405 
406   event_count_ -= events.size();
407 
408   return rv >= 0;
409 }
410 
DoInternalWork(Delegate * delegate,Delegate::NextWorkInfo * next_work_info)411 bool MessagePumpKqueue::DoInternalWork(Delegate* delegate,
412                                        Delegate::NextWorkInfo* next_work_info) {
413   if (events_.size() < event_count_) {
414     events_.resize(event_count_);
415   }
416 
417   bool immediate = next_work_info == nullptr;
418   unsigned int flags = immediate ? KEVENT_FLAG_IMMEDIATE : 0;
419 
420   if (!immediate) {
421     MaybeUpdateWakeupTimer(next_work_info->delayed_run_time);
422     DCHECK_EQ(scheduled_wakeup_time_, next_work_info->delayed_run_time);
423     delegate->BeforeWait();
424   }
425 
426   int rv =
427       HANDLE_EINTR(kevent64(kqueue_.get(), nullptr, 0, events_.data(),
428                             checked_cast<int>(events_.size()), flags, nullptr));
429   if (rv == 0) {
430     // No events to dispatch so no need to call ProcessEvents().
431     return false;
432   }
433 
434   PCHECK(rv > 0) << "kevent64";
435   return ProcessEvents(delegate, static_cast<size_t>(rv));
436 }
437 
ProcessEvents(Delegate * delegate,size_t count)438 bool MessagePumpKqueue::ProcessEvents(Delegate* delegate, size_t count) {
439   bool did_work = false;
440 
441   for (size_t i = 0; i < count; ++i) {
442     auto* event = &events_[i];
443     if (event->filter == EVFILT_READ || event->filter == EVFILT_WRITE) {
444       did_work = true;
445 
446       FdWatchController* controller = fd_controllers_.Lookup(event->udata);
447       if (!controller) {
448         // The controller was removed by some other work callout before
449         // this event could be processed.
450         continue;
451       }
452       FdWatcher* fd_watcher = controller->watcher();
453 
454       if (event->flags & EV_ONESHOT) {
455         // If this was a one-shot event, the Controller needs to stop tracking
456         // the descriptor, so it is not double-removed when it is told to stop
457         // watching.
458         controller->Reset();
459         fd_controllers_.Remove(event->udata);
460         --event_count_;
461       }
462 
463       auto scoped_do_work_item = delegate->BeginWorkItem();
464       // WatchFileDescriptor() originally upcasts event->ident from an int.
465       if (event->filter == EVFILT_READ) {
466         fd_watcher->OnFileCanReadWithoutBlocking(
467             static_cast<int>(event->ident));
468       } else if (event->filter == EVFILT_WRITE) {
469         fd_watcher->OnFileCanWriteWithoutBlocking(
470             static_cast<int>(event->ident));
471       }
472     } else if (event->filter == EVFILT_MACHPORT) {
473       // WatchMachReceivePort() originally sets event->ident from a mach_port_t.
474       mach_port_t port = static_cast<mach_port_t>(event->ident);
475       if (port == wakeup_.get()) {
476         // The wakeup event has been received, do not treat this as "doing
477         // work", this just wakes up the pump.
478         continue;
479       }
480 
481       did_work = true;
482 
483       MachPortWatchController* controller = port_controllers_.Lookup(port);
484       // The controller could have been removed by some other work callout
485       // before this event could be processed.
486       if (controller) {
487         auto scoped_do_work_item = delegate->BeginWorkItem();
488         controller->watcher()->OnMachMessageReceived(port);
489       }
490     } else if (event->filter == EVFILT_TIMER) {
491       // The wakeup timer fired.
492 #if DCHECK_IS_ON()
493       // On macOS 10.13 and earlier, kqueue timers may spuriously wake up.
494       // When this happens, the timer will be re-scheduled the next time
495       // DoInternalWork is entered, which means this doesn't lead to a
496       // spinning wait.
497       // When clock overrides are active, TimeTicks::Now may be decoupled from
498       // wall-clock time, and can therefore not be used to validate whether the
499       // expected wall-clock time has passed.
500       if (!KqueueTimersSpuriouslyWakeUp() &&
501           !subtle::ScopedTimeClockOverrides::overrides_active()) {
502         // Given the caveats above, assert that the timer didn't fire early.
503         DCHECK_LE(scheduled_wakeup_time_, base::TimeTicks::Now());
504       }
505 #endif
506       DCHECK_NE(scheduled_wakeup_time_, base::TimeTicks::Max());
507       scheduled_wakeup_time_ = base::TimeTicks::Max();
508       --event_count_;
509     } else {
510       NOTREACHED() << "Unexpected event for filter " << event->filter;
511     }
512   }
513 
514   return did_work;
515 }
516 
MaybeUpdateWakeupTimer(const base::TimeTicks & wakeup_time)517 void MessagePumpKqueue::MaybeUpdateWakeupTimer(
518     const base::TimeTicks& wakeup_time) {
519   if (wakeup_time == scheduled_wakeup_time_) {
520     // No change in the timer setting necessary.
521     return;
522   }
523 
524   if (wakeup_time == base::TimeTicks::Max()) {
525     // If the timer was already reset, don't re-reset it on a suspend toggle.
526     if (scheduled_wakeup_time_ != base::TimeTicks::Max()) {
527       // Clear the timer.
528       kevent64_s timer{};
529       SetWakeupTimerEvent(wakeup_time, &timer);
530       int rv = ChangeOneEvent(kqueue_, &timer);
531       PCHECK(rv == 0) << "kevent64, delete timer";
532       --event_count_;
533     }
534   } else {
535     // Set/reset the timer.
536     kevent64_s timer{};
537     SetWakeupTimerEvent(wakeup_time, &timer);
538     int rv = ChangeOneEvent(kqueue_, &timer);
539     PCHECK(rv == 0) << "kevent64, set timer";
540 
541     // Bump the event count if we just added the timer.
542     if (scheduled_wakeup_time_ == base::TimeTicks::Max())
543       ++event_count_;
544   }
545 
546   scheduled_wakeup_time_ = wakeup_time;
547 }
548 
549 }  // namespace base
550