• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/synchronization/waitable_event.h"
6 
7 #include <mach/mach.h>
8 #include <sys/event.h>
9 
10 #include <limits>
11 #include <memory>
12 
13 #include "base/apple/mach_logging.h"
14 #include "base/files/scoped_file.h"
15 #include "base/notreached.h"
16 #include "base/posix/eintr_wrapper.h"
17 #include "base/threading/scoped_blocking_call.h"
18 #include "base/time/time.h"
19 #include "base/time/time_override.h"
20 #include "build/build_config.h"
21 
22 namespace base {
23 
WaitableEvent(ResetPolicy reset_policy,InitialState initial_state)24 WaitableEvent::WaitableEvent(ResetPolicy reset_policy,
25                              InitialState initial_state)
26     : policy_(reset_policy) {
27   mach_port_options_t options{};
28   options.flags = MPO_INSERT_SEND_RIGHT;
29   options.mpl.mpl_qlimit = 1;
30 
31   mach_port_t name;
32   kern_return_t kr =
33       mach_port_construct(mach_task_self(), &options, /*context=*/0, &name);
34   MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_construct";
35 
36   receive_right_ = new ReceiveRight(name);
37   send_right_.reset(name);
38 
39   if (initial_state == InitialState::SIGNALED) {
40     Signal();
41   }
42 }
43 
44 WaitableEvent::~WaitableEvent() = default;
45 
Reset()46 void WaitableEvent::Reset() {
47   PeekPort(receive_right_->Name(), true);
48 }
49 
SignalImpl()50 void WaitableEvent::SignalImpl() {
51   mach_msg_empty_send_t msg{};
52   msg.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_COPY_SEND);
53   msg.header.msgh_size = sizeof(&msg);
54   msg.header.msgh_remote_port = send_right_.get();
55   // If the event is already signaled, this will time out because the queue
56   // has a length of one.
57   kern_return_t kr =
58       mach_msg(&msg.header, MACH_SEND_MSG | MACH_SEND_TIMEOUT, sizeof(msg),
59                /*rcv_size=*/0, /*rcv_name=*/MACH_PORT_NULL, /*timeout=*/0,
60                /*notify=*/MACH_PORT_NULL);
61   MACH_CHECK(kr == KERN_SUCCESS || kr == MACH_SEND_TIMED_OUT, kr) << "mach_msg";
62 }
63 
IsSignaled()64 bool WaitableEvent::IsSignaled() {
65   return PeekPort(receive_right_->Name(), policy_ == ResetPolicy::AUTOMATIC);
66 }
67 
TimedWaitImpl(TimeDelta wait_delta)68 bool WaitableEvent::TimedWaitImpl(TimeDelta wait_delta) {
69   mach_msg_empty_rcv_t msg{};
70   msg.header.msgh_local_port = receive_right_->Name();
71 
72   mach_msg_option_t options = MACH_RCV_MSG;
73 
74   if (!wait_delta.is_max()) {
75     options |= MACH_RCV_TIMEOUT | MACH_RCV_INTERRUPT;
76   }
77 
78   mach_msg_size_t rcv_size = sizeof(msg);
79   if (policy_ == ResetPolicy::MANUAL) {
80     // To avoid dequeuing the message, receive with a size of 0 and set
81     // MACH_RCV_LARGE to keep the message in the queue.
82     options |= MACH_RCV_LARGE;
83     rcv_size = 0;
84   }
85 
86   // TimeTicks takes care of overflow but we special case is_max() nonetheless
87   // to avoid invoking TimeTicksNowIgnoringOverride() unnecessarily (same for
88   // the increment step of the for loop if the condition variable returns
89   // early). Ref: https://crbug.com/910524#c7
90   const TimeTicks end_time =
91       wait_delta.is_max() ? TimeTicks::Max()
92                           : subtle::TimeTicksNowIgnoringOverride() + wait_delta;
93   // Fake |kr| value to bootstrap the for loop.
94   kern_return_t kr = MACH_RCV_INTERRUPTED;
95   for (mach_msg_timeout_t timeout =
96            wait_delta.is_max() ? MACH_MSG_TIMEOUT_NONE
97                                : saturated_cast<mach_msg_timeout_t>(
98                                      wait_delta.InMillisecondsRoundedUp());
99        // If the thread is interrupted during mach_msg(), the system call will
100        // be restarted. However, the libsyscall wrapper does not adjust the
101        // timeout by the amount of time already waited. Using MACH_RCV_INTERRUPT
102        // will instead return from mach_msg(), so that the call can be retried
103        // with an adjusted timeout.
104        kr == MACH_RCV_INTERRUPTED;
105        timeout = end_time.is_max()
106                      ? MACH_MSG_TIMEOUT_NONE
107                      : std::max(mach_msg_timeout_t{0},
108                                 saturated_cast<mach_msg_timeout_t>(
109                                     (end_time -
110                                      subtle::TimeTicksNowIgnoringOverride())
111                                         .InMillisecondsRoundedUp()))) {
112     kr = mach_msg(&msg.header, options, /*send_size=*/0, rcv_size,
113                   receive_right_->Name(), timeout, /*notify=*/MACH_PORT_NULL);
114   }
115 
116   if (kr == KERN_SUCCESS) {
117     return true;
118   } else if (rcv_size == 0 && kr == MACH_RCV_TOO_LARGE) {
119     return true;
120   } else {
121     MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg";
122     return false;
123   }
124 }
125 
126 // static
WaitMany(WaitableEvent ** raw_waitables,size_t count)127 size_t WaitableEvent::WaitMany(WaitableEvent** raw_waitables, size_t count) {
128   DCHECK(count) << "Cannot wait on no events";
129   internal::ScopedBlockingCallWithBaseSyncPrimitives scoped_blocking_call(
130       FROM_HERE, BlockingType::MAY_BLOCK);
131   // On macOS 10.11+, using Mach port sets may cause system instability, per
132   // https://crbug.com/756102. On macOS 10.12+, a kqueue can be used
133   // instead to work around that.
134   enum WaitManyPrimitive {
135     KQUEUE,
136     PORT_SET,
137   };
138 #if BUILDFLAG(IS_IOS)
139   const WaitManyPrimitive kPrimitive = PORT_SET;
140 #else
141   const WaitManyPrimitive kPrimitive = KQUEUE;
142 #endif
143   if (kPrimitive == KQUEUE) {
144     std::vector<kevent64_s> events(count);
145     for (size_t i = 0; i < count; ++i) {
146       EV_SET64(&events[i], raw_waitables[i]->receive_right_->Name(),
147                EVFILT_MACHPORT, EV_ADD, 0, 0, i, 0, 0);
148     }
149 
150     std::vector<kevent64_s> out_events(count);
151 
152     ScopedFD wait_many(kqueue());
153     PCHECK(wait_many.is_valid()) << "kqueue";
154 
155     const int count_int = checked_cast<int>(count);
156     int rv = HANDLE_EINTR(kevent64(wait_many.get(), events.data(), count_int,
157                                    out_events.data(), count_int, /*flags=*/0,
158                                    /*timeout=*/nullptr));
159     PCHECK(rv > 0) << "kevent64";
160 
161     size_t triggered = std::numeric_limits<size_t>::max();
162     for (size_t i = 0; i < static_cast<size_t>(rv); ++i) {
163       // WaitMany should return the lowest index in |raw_waitables| that was
164       // triggered.
165       size_t index = static_cast<size_t>(out_events[i].udata);
166       triggered = std::min(triggered, index);
167     }
168 
169     if (raw_waitables[triggered]->policy_ == ResetPolicy::AUTOMATIC) {
170       // The message needs to be dequeued to reset the event.
171       PeekPort(raw_waitables[triggered]->receive_right_->Name(),
172                /*dequeue=*/true);
173     }
174 
175     return triggered;
176   } else {
177     DCHECK_EQ(kPrimitive, PORT_SET);
178 
179     kern_return_t kr;
180 
181     apple::ScopedMachPortSet port_set;
182     {
183       mach_port_t name;
184       kr =
185           mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, &name);
186       MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate";
187       port_set.reset(name);
188     }
189 
190     for (size_t i = 0; i < count; ++i) {
191       kr = mach_port_insert_member(mach_task_self(),
192                                    raw_waitables[i]->receive_right_->Name(),
193                                    port_set.get());
194       MACH_CHECK(kr == KERN_SUCCESS, kr) << "index " << i;
195     }
196 
197     mach_msg_empty_rcv_t msg{};
198     // Wait on the port set. Only specify space enough for the header, to
199     // identify which port in the set is signaled. Otherwise, receiving from the
200     // port set may dequeue a message for a manual-reset event object, which
201     // would cause it to be reset.
202     kr = mach_msg(&msg.header,
203                   MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY,
204                   /*send_size=*/0, sizeof(msg.header), port_set.get(),
205                   /*timeout=*/0, /*notify=*/MACH_PORT_NULL);
206     MACH_CHECK(kr == MACH_RCV_TOO_LARGE, kr) << "mach_msg";
207 
208     for (size_t i = 0; i < count; ++i) {
209       WaitableEvent* event = raw_waitables[i];
210       if (msg.header.msgh_local_port == event->receive_right_->Name()) {
211         if (event->policy_ == ResetPolicy::AUTOMATIC) {
212           // The message needs to be dequeued to reset the event.
213           PeekPort(msg.header.msgh_local_port, true);
214         }
215         return i;
216       }
217     }
218 
219     NOTREACHED();
220     return 0;
221   }
222 }
223 
224 // static
PeekPort(mach_port_t port,bool dequeue)225 bool WaitableEvent::PeekPort(mach_port_t port, bool dequeue) {
226   if (dequeue) {
227     mach_msg_empty_rcv_t msg{};
228     msg.header.msgh_local_port = port;
229     kern_return_t kr =
230         mach_msg(&msg.header, MACH_RCV_MSG | MACH_RCV_TIMEOUT, /*send_size=*/0,
231                  sizeof(msg), port, /*timeout=*/0, /*notify=*/MACH_PORT_NULL);
232     if (kr == KERN_SUCCESS) {
233       return true;
234     } else {
235       MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg";
236       return false;
237     }
238   } else {
239     mach_port_seqno_t seqno = 0;
240     mach_msg_size_t size;
241     mach_msg_id_t id;
242     mach_msg_trailer_t trailer;
243     mach_msg_type_number_t trailer_size = sizeof(trailer);
244     kern_return_t kr = mach_port_peek(
245         mach_task_self(), port, MACH_RCV_TRAILER_TYPE(MACH_RCV_TRAILER_NULL),
246         &seqno, &size, &id, reinterpret_cast<mach_msg_trailer_info_t>(&trailer),
247         &trailer_size);
248     if (kr == KERN_SUCCESS) {
249       return true;
250     } else {
251       MACH_CHECK(kr == KERN_FAILURE, kr) << "mach_port_peek";
252       return false;
253     }
254   }
255 }
256 
ReceiveRight(mach_port_t name)257 WaitableEvent::ReceiveRight::ReceiveRight(mach_port_t name) : right_(name) {}
258 
259 WaitableEvent::ReceiveRight::~ReceiveRight() = default;
260 
261 }  // namespace base
262