• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifdef UNSAFE_BUFFERS_BUILD
6 // TODO(crbug.com/40284755): Remove this and spanify to fix the errors.
7 #pragma allow_unsafe_buffers
8 #endif
9 
10 #include "base/synchronization/waitable_event.h"
11 
12 #include <mach/mach.h>
13 #include <sys/event.h>
14 
15 #include <limits>
16 #include <memory>
17 
18 #include "base/apple/mach_logging.h"
19 #include "base/files/scoped_file.h"
20 #include "base/notreached.h"
21 #include "base/posix/eintr_wrapper.h"
22 #include "base/threading/scoped_blocking_call.h"
23 #include "base/time/time.h"
24 #include "base/time/time_override.h"
25 #include "build/build_config.h"
26 
27 namespace base {
28 
WaitableEvent(ResetPolicy reset_policy,InitialState initial_state)29 WaitableEvent::WaitableEvent(ResetPolicy reset_policy,
30                              InitialState initial_state)
31     : policy_(reset_policy) {
32   mach_port_options_t options{};
33   options.flags = MPO_INSERT_SEND_RIGHT;
34   options.mpl.mpl_qlimit = 1;
35 
36   mach_port_t name;
37   kern_return_t kr =
38       mach_port_construct(mach_task_self(), &options, /*context=*/0, &name);
39   MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_construct";
40 
41   receive_right_ = new ReceiveRight(name);
42   send_right_.reset(name);
43 
44   if (initial_state == InitialState::SIGNALED) {
45     Signal();
46   }
47 }
48 
Reset()49 void WaitableEvent::Reset() {
50   PeekPort(receive_right_->Name(), true);
51 }
52 
SignalImpl()53 void WaitableEvent::SignalImpl() {
54   mach_msg_empty_send_t msg{};
55   msg.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_COPY_SEND);
56   msg.header.msgh_size = sizeof(&msg);
57   msg.header.msgh_remote_port = send_right_.get();
58   // If the event is already signaled, this will time out because the queue
59   // has a length of one.
60   kern_return_t kr =
61       mach_msg(&msg.header, MACH_SEND_MSG | MACH_SEND_TIMEOUT, sizeof(msg),
62                /*rcv_size=*/0, /*rcv_name=*/MACH_PORT_NULL, /*timeout=*/0,
63                /*notify=*/MACH_PORT_NULL);
64   MACH_CHECK(kr == KERN_SUCCESS || kr == MACH_SEND_TIMED_OUT, kr) << "mach_msg";
65 }
66 
IsSignaled() const67 bool WaitableEvent::IsSignaled() const {
68   return PeekPort(receive_right_->Name(), policy_ == ResetPolicy::AUTOMATIC);
69 }
70 
TimedWaitImpl(TimeDelta wait_delta)71 bool WaitableEvent::TimedWaitImpl(TimeDelta wait_delta) {
72   mach_msg_empty_rcv_t msg{};
73   msg.header.msgh_local_port = receive_right_->Name();
74 
75   mach_msg_option_t options = MACH_RCV_MSG;
76 
77   if (!wait_delta.is_max()) {
78     options |= MACH_RCV_TIMEOUT | MACH_RCV_INTERRUPT;
79   }
80 
81   mach_msg_size_t rcv_size = sizeof(msg);
82   if (policy_ == ResetPolicy::MANUAL) {
83     // To avoid dequeuing the message, receive with a size of 0 and set
84     // MACH_RCV_LARGE to keep the message in the queue.
85     options |= MACH_RCV_LARGE;
86     rcv_size = 0;
87   }
88 
89   // TimeTicks takes care of overflow but we special case is_max() nonetheless
90   // to avoid invoking TimeTicksNowIgnoringOverride() unnecessarily (same for
91   // the increment step of the for loop if the condition variable returns
92   // early). Ref: https://crbug.com/910524#c7
93   const TimeTicks end_time =
94       wait_delta.is_max() ? TimeTicks::Max()
95                           : subtle::TimeTicksNowIgnoringOverride() + wait_delta;
96   // Fake |kr| value to bootstrap the for loop.
97   kern_return_t kr = MACH_RCV_INTERRUPTED;
98   for (mach_msg_timeout_t timeout =
99            wait_delta.is_max() ? MACH_MSG_TIMEOUT_NONE
100                                : saturated_cast<mach_msg_timeout_t>(
101                                      wait_delta.InMillisecondsRoundedUp());
102        // If the thread is interrupted during mach_msg(), the system call will
103        // be restarted. However, the libsyscall wrapper does not adjust the
104        // timeout by the amount of time already waited. Using MACH_RCV_INTERRUPT
105        // will instead return from mach_msg(), so that the call can be retried
106        // with an adjusted timeout.
107        kr == MACH_RCV_INTERRUPTED;
108        timeout = end_time.is_max()
109                      ? MACH_MSG_TIMEOUT_NONE
110                      : std::max(mach_msg_timeout_t{0},
111                                 saturated_cast<mach_msg_timeout_t>(
112                                     (end_time -
113                                      subtle::TimeTicksNowIgnoringOverride())
114                                         .InMillisecondsRoundedUp()))) {
115     kr = mach_msg(&msg.header, options, /*send_size=*/0, rcv_size,
116                   receive_right_->Name(), timeout, /*notify=*/MACH_PORT_NULL);
117   }
118 
119   if (kr == KERN_SUCCESS) {
120     return true;
121   } else if (rcv_size == 0 && kr == MACH_RCV_TOO_LARGE) {
122     return true;
123   } else {
124     MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg";
125     return false;
126   }
127 }
128 
129 // static
WaitManyImpl(WaitableEvent ** raw_waitables,size_t count)130 size_t WaitableEvent::WaitManyImpl(WaitableEvent** raw_waitables,
131                                    size_t count) {
132   // On macOS 10.11+, using Mach port sets may cause system instability, per
133   // https://crbug.com/756102. On macOS 10.12+, a kqueue can be used
134   // instead to work around that.
135   enum WaitManyPrimitive {
136     KQUEUE,
137     PORT_SET,
138   };
139 #if BUILDFLAG(IS_IOS)
140   const WaitManyPrimitive kPrimitive = PORT_SET;
141 #else
142   const WaitManyPrimitive kPrimitive = KQUEUE;
143 #endif
144   if (kPrimitive == KQUEUE) {
145     std::vector<kevent64_s> events(count);
146     for (size_t i = 0; i < count; ++i) {
147       EV_SET64(&events[i], raw_waitables[i]->receive_right_->Name(),
148                EVFILT_MACHPORT, EV_ADD, 0, 0, i, 0, 0);
149     }
150 
151     std::vector<kevent64_s> out_events(count);
152 
153     ScopedFD wait_many(kqueue());
154     PCHECK(wait_many.is_valid()) << "kqueue";
155 
156     const int count_int = checked_cast<int>(count);
157     int rv = HANDLE_EINTR(kevent64(wait_many.get(), events.data(), count_int,
158                                    out_events.data(), count_int, /*flags=*/0,
159                                    /*timeout=*/nullptr));
160     PCHECK(rv > 0) << "kevent64";
161 
162     size_t triggered = std::numeric_limits<size_t>::max();
163     for (size_t i = 0; i < static_cast<size_t>(rv); ++i) {
164       // WaitMany should return the lowest index in |raw_waitables| that was
165       // triggered.
166       size_t index = static_cast<size_t>(out_events[i].udata);
167       triggered = std::min(triggered, index);
168     }
169 
170     if (raw_waitables[triggered]->policy_ == ResetPolicy::AUTOMATIC) {
171       // The message needs to be dequeued to reset the event.
172       PeekPort(raw_waitables[triggered]->receive_right_->Name(),
173                /*dequeue=*/true);
174     }
175 
176     return triggered;
177   } else {
178     DCHECK_EQ(kPrimitive, PORT_SET);
179 
180     kern_return_t kr;
181 
182     apple::ScopedMachPortSet port_set;
183     {
184       mach_port_t name;
185       kr =
186           mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, &name);
187       MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate";
188       port_set.reset(name);
189     }
190 
191     for (size_t i = 0; i < count; ++i) {
192       kr = mach_port_insert_member(mach_task_self(),
193                                    raw_waitables[i]->receive_right_->Name(),
194                                    port_set.get());
195       MACH_CHECK(kr == KERN_SUCCESS, kr) << "index " << i;
196     }
197 
198     mach_msg_empty_rcv_t msg{};
199     // Wait on the port set. Only specify space enough for the header, to
200     // identify which port in the set is signaled. Otherwise, receiving from the
201     // port set may dequeue a message for a manual-reset event object, which
202     // would cause it to be reset.
203     kr = mach_msg(&msg.header,
204                   MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY,
205                   /*send_size=*/0, sizeof(msg.header), port_set.get(),
206                   /*timeout=*/0, /*notify=*/MACH_PORT_NULL);
207     MACH_CHECK(kr == MACH_RCV_TOO_LARGE, kr) << "mach_msg";
208 
209     for (size_t i = 0; i < count; ++i) {
210       WaitableEvent* event = raw_waitables[i];
211       if (msg.header.msgh_local_port == event->receive_right_->Name()) {
212         if (event->policy_ == ResetPolicy::AUTOMATIC) {
213           // The message needs to be dequeued to reset the event.
214           PeekPort(msg.header.msgh_local_port, true);
215         }
216         return i;
217       }
218     }
219 
220     NOTREACHED();
221   }
222 }
223 
224 // static
PeekPort(mach_port_t port,bool dequeue)225 bool WaitableEvent::PeekPort(mach_port_t port, bool dequeue) {
226   if (dequeue) {
227     mach_msg_empty_rcv_t msg{};
228     msg.header.msgh_local_port = port;
229     kern_return_t kr =
230         mach_msg(&msg.header, MACH_RCV_MSG | MACH_RCV_TIMEOUT, /*send_size=*/0,
231                  sizeof(msg), port, /*timeout=*/0, /*notify=*/MACH_PORT_NULL);
232     if (kr == KERN_SUCCESS) {
233       return true;
234     } else {
235       MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg";
236       return false;
237     }
238   } else {
239     mach_port_seqno_t seqno = 0;
240     mach_msg_size_t size;
241     mach_msg_id_t id;
242     mach_msg_trailer_t trailer;
243     mach_msg_type_number_t trailer_size = sizeof(trailer);
244     kern_return_t kr = mach_port_peek(
245         mach_task_self(), port, MACH_RCV_TRAILER_TYPE(MACH_RCV_TRAILER_NULL),
246         &seqno, &size, &id, reinterpret_cast<mach_msg_trailer_info_t>(&trailer),
247         &trailer_size);
248     if (kr == KERN_SUCCESS) {
249       return true;
250     } else {
251       MACH_CHECK(kr == KERN_FAILURE, kr) << "mach_port_peek";
252       return false;
253     }
254   }
255 }
256 
ReceiveRight(mach_port_t name)257 WaitableEvent::ReceiveRight::ReceiveRight(mach_port_t name) : right_(name) {}
258 
259 WaitableEvent::ReceiveRight::~ReceiveRight() = default;
260 
261 }  // namespace base
262