• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifdef UNSAFE_BUFFERS_BUILD
6 // TODO(crbug.com/40284755): Remove this and spanify to fix the errors.
7 #pragma allow_unsafe_buffers
8 #endif
9 
10 #include "base/message_loop/message_pump_kqueue.h"
11 
12 #include <sys/errno.h>
13 
14 #include <atomic>
15 
16 #include "base/apple/mach_logging.h"
17 #include "base/apple/scoped_nsautorelease_pool.h"
18 #include "base/auto_reset.h"
19 #include "base/feature_list.h"
20 #include "base/logging.h"
21 #include "base/mac/mac_util.h"
22 #include "base/notreached.h"
23 #include "base/posix/eintr_wrapper.h"
24 #include "base/task/task_features.h"
25 #include "base/time/time_override.h"
26 
27 namespace base {
28 
29 namespace {
30 
31 // Under this feature native work is batched. Remove it once crbug.com/1200141
32 // is resolved.
33 BASE_FEATURE(kBatchNativeEventsInMessagePumpKqueue,
34              "BatchNativeEventsInMessagePumpKqueue",
35              base::FEATURE_DISABLED_BY_DEFAULT);
36 
37 // Caches the state of the "BatchNativeEventsInMessagePumpKqueue".
38 std::atomic_bool g_use_batched_version = false;
39 
40 // Caches the state of the "TimerSlackMac" feature for efficiency.
41 std::atomic_bool g_timer_slack = false;
42 
43 #if DCHECK_IS_ON()
44 // Prior to macOS 10.14, kqueue timers may spuriously wake up, because earlier
45 // wake ups race with timer resets in the kernel. As of macOS 10.14, updating a
46 // timer from the thread that reads the kqueue does not cause spurious wakeups.
47 // Note that updating a kqueue timer from one thread while another thread is
48 // waiting in a kevent64 invocation is still (inherently) racy.
KqueueTimersSpuriouslyWakeUp()49 bool KqueueTimersSpuriouslyWakeUp() {
50 #if BUILDFLAG(IS_MAC)
51   return false;
52 #else
53   // This still happens on iOS15.
54   return true;
55 #endif
56 }
57 #endif
58 
ChangeOneEvent(const ScopedFD & kqueue,kevent64_s * event)59 int ChangeOneEvent(const ScopedFD& kqueue, kevent64_s* event) {
60   return HANDLE_EINTR(kevent64(kqueue.get(), event, 1, nullptr, 0, 0, nullptr));
61 }
62 
63 }  // namespace
64 
FdWatchController(const Location & from_here)65 MessagePumpKqueue::FdWatchController::FdWatchController(
66     const Location& from_here)
67     : FdWatchControllerInterface(from_here) {}
68 
~FdWatchController()69 MessagePumpKqueue::FdWatchController::~FdWatchController() {
70   StopWatchingFileDescriptor();
71 }
72 
StopWatchingFileDescriptor()73 bool MessagePumpKqueue::FdWatchController::StopWatchingFileDescriptor() {
74   if (!pump_)
75     return true;
76   return pump_->StopWatchingFileDescriptor(this);
77 }
78 
Init(WeakPtr<MessagePumpKqueue> pump,int fd,int mode,FdWatcher * watcher)79 void MessagePumpKqueue::FdWatchController::Init(WeakPtr<MessagePumpKqueue> pump,
80                                                 int fd,
81                                                 int mode,
82                                                 FdWatcher* watcher) {
83   DCHECK_NE(fd, -1);
84   DCHECK(!watcher_);
85   DCHECK(watcher);
86   DCHECK(pump);
87   fd_ = fd;
88   mode_ = mode;
89   watcher_ = watcher;
90   pump_ = pump;
91 }
92 
Reset()93 void MessagePumpKqueue::FdWatchController::Reset() {
94   fd_ = -1;
95   mode_ = 0;
96   watcher_ = nullptr;
97   pump_ = nullptr;
98 }
99 
MachPortWatchController(const Location & from_here)100 MessagePumpKqueue::MachPortWatchController::MachPortWatchController(
101     const Location& from_here)
102     : from_here_(from_here) {}
103 
~MachPortWatchController()104 MessagePumpKqueue::MachPortWatchController::~MachPortWatchController() {
105   StopWatchingMachPort();
106 }
107 
StopWatchingMachPort()108 bool MessagePumpKqueue::MachPortWatchController::StopWatchingMachPort() {
109   if (!pump_)
110     return true;
111   return pump_->StopWatchingMachPort(this);
112 }
113 
Init(WeakPtr<MessagePumpKqueue> pump,mach_port_t port,MachPortWatcher * watcher)114 void MessagePumpKqueue::MachPortWatchController::Init(
115     WeakPtr<MessagePumpKqueue> pump,
116     mach_port_t port,
117     MachPortWatcher* watcher) {
118   DCHECK(!watcher_);
119   DCHECK(watcher);
120   DCHECK(pump);
121   port_ = port;
122   watcher_ = watcher;
123   pump_ = pump;
124 }
125 
Reset()126 void MessagePumpKqueue::MachPortWatchController::Reset() {
127   port_ = MACH_PORT_NULL;
128   watcher_ = nullptr;
129   pump_ = nullptr;
130 }
131 
MessagePumpKqueue()132 MessagePumpKqueue::MessagePumpKqueue()
133     : kqueue_(kqueue()), weak_factory_(this) {
134   PCHECK(kqueue_.is_valid()) << "kqueue";
135 
136   // Create a Mach port that will be used to wake up the pump by sending
137   // a message in response to ScheduleWork(). This is significantly faster than
138   // using an EVFILT_USER event, especially when triggered across threads.
139   kern_return_t kr = mach_port_allocate(
140       mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
141       base::apple::ScopedMachReceiveRight::Receiver(wakeup_).get());
142   MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate";
143 
144   // Configure the event to directly receive the Mach message as part of the
145   // kevent64() call.
146   kevent64_s event{};
147   event.ident = wakeup_.get();
148   event.filter = EVFILT_MACHPORT;
149   event.flags = EV_ADD;
150   event.fflags = MACH_RCV_MSG;
151   event.ext[0] = reinterpret_cast<uint64_t>(&wakeup_buffer_);
152   event.ext[1] = sizeof(wakeup_buffer_);
153 
154   int rv = ChangeOneEvent(kqueue_, &event);
155   PCHECK(rv == 0) << "kevent64";
156 }
157 
158 MessagePumpKqueue::~MessagePumpKqueue() = default;
159 
InitializeFeatures()160 void MessagePumpKqueue::InitializeFeatures() {
161   g_use_batched_version.store(
162       base::FeatureList::IsEnabled(kBatchNativeEventsInMessagePumpKqueue),
163       std::memory_order_relaxed);
164   g_timer_slack.store(FeatureList::IsEnabled(kTimerSlackMac),
165                       std::memory_order_relaxed);
166 }
167 
Run(Delegate * delegate)168 void MessagePumpKqueue::Run(Delegate* delegate) {
169   AutoReset<bool> reset_keep_running(&keep_running_, true);
170 
171   if (g_use_batched_version.load(std::memory_order_relaxed)) {
172     RunBatched(delegate);
173   } else {
174     while (keep_running_) {
175       apple::ScopedNSAutoreleasePool pool;
176 
177       bool do_more_work = DoInternalWork(delegate, nullptr);
178       if (!keep_running_)
179         break;
180 
181       Delegate::NextWorkInfo next_work_info = delegate->DoWork();
182       do_more_work |= next_work_info.is_immediate();
183       if (!keep_running_)
184         break;
185 
186       if (do_more_work)
187         continue;
188 
189       delegate->DoIdleWork();
190       if (!keep_running_)
191         break;
192 
193       DoInternalWork(delegate, &next_work_info);
194     }
195   }
196 }
197 
RunBatched(Delegate * delegate)198 void MessagePumpKqueue::RunBatched(Delegate* delegate) {
199   // Look for native work once before the loop starts. Without this call the
200   // loop would break without checking native work even once in cases where
201   // QuitWhenIdle was used. This is sometimes the case in tests.
202   DoInternalWork(delegate, nullptr);
203 
204   while (keep_running_) {
205     apple::ScopedNSAutoreleasePool pool;
206 
207     Delegate::NextWorkInfo next_work_info = delegate->DoWork();
208     if (!keep_running_)
209       break;
210 
211     if (!next_work_info.is_immediate()) {
212       delegate->DoIdleWork();
213     }
214     if (!keep_running_)
215       break;
216 
217     int batch_size = 0;
218     if (DoInternalWork(delegate, &next_work_info)) {
219       // More than one call can be necessary to fully dispatch all available
220       // internal work. Making an effort to dispatch more than the minimum
221       // before moving on to application tasks reduces the overhead of going
222       // through the whole loop. It also more closely mirrors the behavior of
223       // application task execution where tasks are batched. A value of 16 was
224       // chosen via local experimentation showing that is was sufficient to
225       // dispatch all work in roughly 95% of cases.
226       constexpr int kMaxAttempts = 16;
227       while (DoInternalWork(delegate, nullptr) && batch_size < kMaxAttempts) {
228         ++batch_size;
229       }
230     }
231   }
232 }
233 
Quit()234 void MessagePumpKqueue::Quit() {
235   keep_running_ = false;
236   ScheduleWork();
237 }
238 
ScheduleWork()239 void MessagePumpKqueue::ScheduleWork() {
240   mach_msg_empty_send_t message{};
241   message.header.msgh_size = sizeof(message);
242   message.header.msgh_bits =
243       MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE);
244   message.header.msgh_remote_port = wakeup_.get();
245   kern_return_t kr = mach_msg_send(&message.header);
246   if (kr != KERN_SUCCESS) {
247     // If ScheduleWork() is being called by other threads faster than the pump
248     // can dispatch work, the kernel message queue for the wakeup port can fill
249     // up (this happens under base_perftests, for example). The kernel does
250     // return a SEND_ONCE right in the case of failure, which must be destroyed
251     // to avoid leaking.
252     MACH_DLOG_IF(ERROR, (kr & ~MACH_MSG_IPC_SPACE) != MACH_SEND_NO_BUFFER, kr)
253         << "mach_msg_send";
254     mach_msg_destroy(&message.header);
255   }
256 }
257 
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)258 void MessagePumpKqueue::ScheduleDelayedWork(
259     const Delegate::NextWorkInfo& next_work_info) {
260   // Nothing to do. This MessagePump uses DoWork().
261 }
262 
WatchMachReceivePort(mach_port_t port,MachPortWatchController * controller,MachPortWatcher * delegate)263 bool MessagePumpKqueue::WatchMachReceivePort(
264     mach_port_t port,
265     MachPortWatchController* controller,
266     MachPortWatcher* delegate) {
267   DCHECK(port != MACH_PORT_NULL);
268   DCHECK(controller);
269   DCHECK(delegate);
270 
271   if (controller->port() != MACH_PORT_NULL) {
272     DLOG(ERROR)
273         << "Cannot use the same MachPortWatchController while it is active";
274     return false;
275   }
276 
277   kevent64_s event{};
278   event.ident = port;
279   event.filter = EVFILT_MACHPORT;
280   event.flags = EV_ADD;
281   int rv = ChangeOneEvent(kqueue_, &event);
282   if (rv < 0) {
283     DPLOG(ERROR) << "kevent64";
284     return false;
285   }
286   ++event_count_;
287 
288   controller->Init(weak_factory_.GetWeakPtr(), port, delegate);
289   port_controllers_.AddWithID(controller, port);
290 
291   return true;
292 }
293 
AdjustDelayedRunTime(TimeTicks earliest_time,TimeTicks run_time,TimeTicks latest_time)294 TimeTicks MessagePumpKqueue::AdjustDelayedRunTime(TimeTicks earliest_time,
295                                                   TimeTicks run_time,
296                                                   TimeTicks latest_time) {
297   if (GetAlignWakeUpsEnabled() &&
298       g_timer_slack.load(std::memory_order_relaxed)) {
299     return earliest_time;
300   }
301   return MessagePump::AdjustDelayedRunTime(earliest_time, run_time,
302                                            latest_time);
303 }
304 
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * delegate)305 bool MessagePumpKqueue::WatchFileDescriptor(int fd,
306                                             bool persistent,
307                                             int mode,
308                                             FdWatchController* controller,
309                                             FdWatcher* delegate) {
310   DCHECK_GE(fd, 0);
311   DCHECK(controller);
312   DCHECK(delegate);
313   DCHECK_NE(mode & Mode::WATCH_READ_WRITE, 0);
314 
315   if (controller->fd() != -1 && controller->fd() != fd) {
316     DLOG(ERROR) << "Cannot use the same FdWatchController on two different FDs";
317     return false;
318   }
319   StopWatchingFileDescriptor(controller);
320 
321   std::vector<kevent64_s> events;
322 
323   kevent64_s base_event{};
324   base_event.ident = static_cast<uint64_t>(fd);
325   base_event.flags = EV_ADD | (!persistent ? EV_ONESHOT : 0);
326 
327   if (mode & Mode::WATCH_READ) {
328     base_event.filter = EVFILT_READ;
329     base_event.udata = fd_controllers_.Add(controller);
330     events.push_back(base_event);
331   }
332   if (mode & Mode::WATCH_WRITE) {
333     base_event.filter = EVFILT_WRITE;
334     base_event.udata = fd_controllers_.Add(controller);
335     events.push_back(base_event);
336   }
337 
338   int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(),
339                                  checked_cast<int>(events.size()), nullptr, 0,
340                                  0, nullptr));
341   if (rv < 0) {
342     DPLOG(ERROR) << "WatchFileDescriptor kevent64";
343     return false;
344   }
345 
346   event_count_ += events.size();
347   controller->Init(weak_factory_.GetWeakPtr(), fd, mode, delegate);
348 
349   return true;
350 }
351 
SetWakeupTimerEvent(const base::TimeTicks & wakeup_time,base::TimeDelta leeway,kevent64_s * timer_event)352 void MessagePumpKqueue::SetWakeupTimerEvent(const base::TimeTicks& wakeup_time,
353                                             base::TimeDelta leeway,
354                                             kevent64_s* timer_event) {
355   // The ident of the wakeup timer. There's only the one timer as the pair
356   // (ident, filter) is the identity of the event.
357   constexpr uint64_t kWakeupTimerIdent = 0x0;
358   timer_event->ident = kWakeupTimerIdent;
359   timer_event->filter = EVFILT_TIMER;
360   if (wakeup_time == base::TimeTicks::Max()) {
361     timer_event->flags = EV_DELETE;
362   } else {
363     timer_event->filter = EVFILT_TIMER;
364     // This updates the timer if it already exists in |kqueue_|.
365     timer_event->flags = EV_ADD | EV_ONESHOT;
366 
367     // Specify the sleep in microseconds to avoid undersleeping due to
368     // numeric problems. The sleep is computed from TimeTicks::Now rather than
369     // NextWorkInfo::recent_now because recent_now is strictly earlier than
370     // current wall-clock. Using an earlier wall clock time  to compute the
371     // delta to the next wakeup wall-clock time would guarantee oversleep.
372     // If wakeup_time is in the past, the delta below will be negative and the
373     // timer is set immediately.
374     timer_event->fflags = NOTE_USECONDS;
375     timer_event->data = (wakeup_time - base::TimeTicks::Now()).InMicroseconds();
376 
377     if (!leeway.is_zero() && g_timer_slack.load(std::memory_order_relaxed)) {
378       // Specify slack based on |leeway|.
379       // See "man kqueue" in recent macOSen for documentation.
380       timer_event->fflags |= NOTE_LEEWAY;
381       timer_event->ext[1] = static_cast<uint64_t>(leeway.InMicroseconds());
382     }
383   }
384 }
385 
StopWatchingMachPort(MachPortWatchController * controller)386 bool MessagePumpKqueue::StopWatchingMachPort(
387     MachPortWatchController* controller) {
388   mach_port_t port = controller->port();
389   controller->Reset();
390   port_controllers_.Remove(port);
391 
392   kevent64_s event{};
393   event.ident = port;
394   event.filter = EVFILT_MACHPORT;
395   event.flags = EV_DELETE;
396   --event_count_;
397   int rv = ChangeOneEvent(kqueue_, &event);
398   if (rv < 0) {
399     DPLOG(ERROR) << "kevent64";
400     return false;
401   }
402 
403   return true;
404 }
405 
StopWatchingFileDescriptor(FdWatchController * controller)406 bool MessagePumpKqueue::StopWatchingFileDescriptor(
407     FdWatchController* controller) {
408   int fd = controller->fd();
409   int mode = controller->mode();
410   controller->Reset();
411 
412   if (fd < 0)
413     return true;
414 
415   std::vector<kevent64_s> events;
416 
417   kevent64_s base_event{};
418   base_event.ident = static_cast<uint64_t>(fd);
419   base_event.flags = EV_DELETE;
420 
421   if (mode & Mode::WATCH_READ) {
422     base_event.filter = EVFILT_READ;
423     events.push_back(base_event);
424   }
425   if (mode & Mode::WATCH_WRITE) {
426     base_event.filter = EVFILT_WRITE;
427     events.push_back(base_event);
428   }
429 
430   int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(),
431                                  checked_cast<int>(events.size()), nullptr, 0,
432                                  0, nullptr));
433   DPLOG_IF(ERROR, rv < 0) << "StopWatchingFileDescriptor kevent64";
434 
435   // The keys for the IDMap aren't recorded anywhere (they're attached to the
436   // kevent object in the kernel), so locate the entries by controller pointer.
437   for (IDMap<FdWatchController*, uint64_t>::iterator it(&fd_controllers_);
438        !it.IsAtEnd(); it.Advance()) {
439     if (it.GetCurrentValue() == controller) {
440       fd_controllers_.Remove(it.GetCurrentKey());
441     }
442   }
443 
444   event_count_ -= events.size();
445 
446   return rv >= 0;
447 }
448 
DoInternalWork(Delegate * delegate,Delegate::NextWorkInfo * next_work_info)449 bool MessagePumpKqueue::DoInternalWork(Delegate* delegate,
450                                        Delegate::NextWorkInfo* next_work_info) {
451   if (events_.size() < event_count_) {
452     events_.resize(event_count_);
453   }
454 
455   bool immediate = next_work_info == nullptr;
456   unsigned int flags = immediate ? KEVENT_FLAG_IMMEDIATE : 0;
457 
458   if (!immediate) {
459     MaybeUpdateWakeupTimer(next_work_info->delayed_run_time,
460                            next_work_info->leeway);
461     DCHECK_EQ(scheduled_wakeup_time_, next_work_info->delayed_run_time);
462     delegate->BeforeWait();
463   }
464 
465   int rv =
466       HANDLE_EINTR(kevent64(kqueue_.get(), nullptr, 0, events_.data(),
467                             checked_cast<int>(events_.size()), flags, nullptr));
468   if (rv == 0) {
469     // No events to dispatch so no need to call ProcessEvents().
470     return false;
471   }
472 
473   PCHECK(rv > 0) << "kevent64";
474   return ProcessEvents(delegate, static_cast<size_t>(rv));
475 }
476 
ProcessEvents(Delegate * delegate,size_t count)477 bool MessagePumpKqueue::ProcessEvents(Delegate* delegate, size_t count) {
478   bool did_work = false;
479 
480   delegate->BeginNativeWorkBeforeDoWork();
481   for (size_t i = 0; i < count; ++i) {
482     auto* event = &events_[i];
483     if (event->filter == EVFILT_READ || event->filter == EVFILT_WRITE) {
484       did_work = true;
485 
486       FdWatchController* controller = fd_controllers_.Lookup(event->udata);
487       if (!controller) {
488         // The controller was removed by some other work callout before
489         // this event could be processed.
490         continue;
491       }
492       FdWatcher* fd_watcher = controller->watcher();
493 
494       if (event->flags & EV_ONESHOT) {
495         // If this was a one-shot event, the Controller needs to stop tracking
496         // the descriptor, so it is not double-removed when it is told to stop
497         // watching.
498         controller->Reset();
499         fd_controllers_.Remove(event->udata);
500         --event_count_;
501       }
502 
503       if (fd_watcher) {
504         auto scoped_do_work_item = delegate->BeginWorkItem();
505         // WatchFileDescriptor() originally upcasts event->ident from an int.
506         if (event->filter == EVFILT_READ) {
507           fd_watcher->OnFileCanReadWithoutBlocking(
508               static_cast<int>(event->ident));
509         } else if (event->filter == EVFILT_WRITE) {
510           fd_watcher->OnFileCanWriteWithoutBlocking(
511               static_cast<int>(event->ident));
512         }
513       }
514     } else if (event->filter == EVFILT_MACHPORT) {
515       // WatchMachReceivePort() originally sets event->ident from a mach_port_t.
516       mach_port_t port = static_cast<mach_port_t>(event->ident);
517       if (port == wakeup_.get()) {
518         // The wakeup event has been received, do not treat this as "doing
519         // work", this just wakes up the pump.
520         continue;
521       }
522 
523       did_work = true;
524 
525       MachPortWatchController* controller = port_controllers_.Lookup(port);
526       // The controller could have been removed by some other work callout
527       // before this event could be processed.
528       if (controller) {
529         auto scoped_do_work_item = delegate->BeginWorkItem();
530         controller->watcher()->OnMachMessageReceived(port);
531       }
532     } else if (event->filter == EVFILT_TIMER) {
533       // The wakeup timer fired.
534 #if DCHECK_IS_ON()
535       // On macOS 10.13 and earlier, kqueue timers may spuriously wake up.
536       // When this happens, the timer will be re-scheduled the next time
537       // DoInternalWork is entered, which means this doesn't lead to a
538       // spinning wait.
539       // When clock overrides are active, TimeTicks::Now may be decoupled from
540       // wall-clock time, and can therefore not be used to validate whether the
541       // expected wall-clock time has passed.
542       if (!KqueueTimersSpuriouslyWakeUp() &&
543           !subtle::ScopedTimeClockOverrides::overrides_active()) {
544         // Given the caveats above, assert that the timer didn't fire early.
545         DCHECK_LE(scheduled_wakeup_time_, base::TimeTicks::Now());
546       }
547 #endif
548       DCHECK_NE(scheduled_wakeup_time_, base::TimeTicks::Max());
549       scheduled_wakeup_time_ = base::TimeTicks::Max();
550       --event_count_;
551     } else {
552       NOTREACHED() << "Unexpected event for filter " << event->filter;
553     }
554   }
555 
556   return did_work;
557 }
558 
MaybeUpdateWakeupTimer(const base::TimeTicks & wakeup_time,base::TimeDelta leeway)559 void MessagePumpKqueue::MaybeUpdateWakeupTimer(
560     const base::TimeTicks& wakeup_time,
561     base::TimeDelta leeway) {
562   if (wakeup_time == scheduled_wakeup_time_) {
563     // No change in the timer setting necessary.
564     return;
565   }
566 
567   if (wakeup_time == base::TimeTicks::Max()) {
568     // If the timer was already reset, don't re-reset it on a suspend toggle.
569     if (scheduled_wakeup_time_ != base::TimeTicks::Max()) {
570       // Clear the timer.
571       kevent64_s timer{};
572       SetWakeupTimerEvent(wakeup_time, leeway, &timer);
573       int rv = ChangeOneEvent(kqueue_, &timer);
574       PCHECK(rv == 0) << "kevent64, delete timer";
575       --event_count_;
576     }
577   } else {
578     // Set/reset the timer.
579     kevent64_s timer{};
580     SetWakeupTimerEvent(wakeup_time, leeway, &timer);
581     int rv = ChangeOneEvent(kqueue_, &timer);
582     PCHECK(rv == 0) << "kevent64, set timer";
583 
584     // Bump the event count if we just added the timer.
585     if (scheduled_wakeup_time_ == base::TimeTicks::Max())
586       ++event_count_;
587   }
588 
589   scheduled_wakeup_time_ = wakeup_time;
590 }
591 
592 }  // namespace base
593