1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/synchronization/waitable_event.h"
6
7 #include <mach/mach.h>
8 #include <sys/event.h>
9
10 #include <limits>
11 #include <memory>
12
13 #include "base/files/scoped_file.h"
14 #include "base/mac/mach_logging.h"
15 #include "base/notreached.h"
16 #include "base/posix/eintr_wrapper.h"
17 #include "base/threading/scoped_blocking_call.h"
18 #include "base/threading/thread_restrictions.h"
19 #include "base/time/time.h"
20 #include "base/time/time_override.h"
21 #include "build/build_config.h"
22 #include "third_party/abseil-cpp/absl/types/optional.h"
23
24 namespace base {
25
WaitableEvent(ResetPolicy reset_policy,InitialState initial_state)26 WaitableEvent::WaitableEvent(ResetPolicy reset_policy,
27 InitialState initial_state)
28 : policy_(reset_policy) {
29 mach_port_options_t options{};
30 options.flags = MPO_INSERT_SEND_RIGHT;
31 options.mpl.mpl_qlimit = 1;
32
33 mach_port_t name;
34 kern_return_t kr = mach_port_construct(mach_task_self(), &options, 0, &name);
35 MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_construct";
36
37 receive_right_ = new ReceiveRight(name);
38 send_right_.reset(name);
39
40 if (initial_state == InitialState::SIGNALED)
41 Signal();
42 }
43
44 WaitableEvent::~WaitableEvent() = default;
45
Reset()46 void WaitableEvent::Reset() {
47 PeekPort(receive_right_->Name(), true);
48 }
49
SignalImpl()50 void WaitableEvent::SignalImpl() {
51 mach_msg_empty_send_t msg{};
52 msg.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_COPY_SEND);
53 msg.header.msgh_size = sizeof(&msg);
54 msg.header.msgh_remote_port = send_right_.get();
55 // If the event is already signaled, this will time out because the queue
56 // has a length of one.
57 kern_return_t kr =
58 mach_msg(&msg.header, MACH_SEND_MSG | MACH_SEND_TIMEOUT, sizeof(msg), 0,
59 MACH_PORT_NULL, 0, MACH_PORT_NULL);
60 MACH_CHECK(kr == KERN_SUCCESS || kr == MACH_SEND_TIMED_OUT, kr) << "mach_msg";
61 }
62
IsSignaled()63 bool WaitableEvent::IsSignaled() {
64 return PeekPort(receive_right_->Name(), policy_ == ResetPolicy::AUTOMATIC);
65 }
66
TimedWaitImpl(TimeDelta wait_delta)67 bool WaitableEvent::TimedWaitImpl(TimeDelta wait_delta) {
68 mach_msg_empty_rcv_t msg{};
69 msg.header.msgh_local_port = receive_right_->Name();
70
71 mach_msg_option_t options = MACH_RCV_MSG;
72
73 if (!wait_delta.is_max())
74 options |= MACH_RCV_TIMEOUT | MACH_RCV_INTERRUPT;
75
76 mach_msg_size_t rcv_size = sizeof(msg);
77 if (policy_ == ResetPolicy::MANUAL) {
78 // To avoid dequeing the message, receive with a size of 0 and set
79 // MACH_RCV_LARGE to keep the message in the queue.
80 options |= MACH_RCV_LARGE;
81 rcv_size = 0;
82 }
83
84 // TimeTicks takes care of overflow but we special case is_max() nonetheless
85 // to avoid invoking TimeTicksNowIgnoringOverride() unnecessarily (same for
86 // the increment step of the for loop if the condition variable returns
87 // early). Ref: https://crbug.com/910524#c7
88 const TimeTicks end_time =
89 wait_delta.is_max() ? TimeTicks::Max()
90 : subtle::TimeTicksNowIgnoringOverride() + wait_delta;
91 // Fake |kr| value to boostrap the for loop.
92 kern_return_t kr = MACH_RCV_INTERRUPTED;
93 for (mach_msg_timeout_t timeout =
94 wait_delta.is_max() ? MACH_MSG_TIMEOUT_NONE
95 : saturated_cast<mach_msg_timeout_t>(
96 wait_delta.InMillisecondsRoundedUp());
97 // If the thread is interrupted during mach_msg(), the system call will
98 // be restarted. However, the libsyscall wrapper does not adjust the
99 // timeout by the amount of time already waited. Using MACH_RCV_INTERRUPT
100 // will instead return from mach_msg(), so that the call can be retried
101 // with an adjusted timeout.
102 kr == MACH_RCV_INTERRUPTED;
103 timeout = end_time.is_max()
104 ? MACH_MSG_TIMEOUT_NONE
105 : std::max(mach_msg_timeout_t{0},
106 saturated_cast<mach_msg_timeout_t>(
107 (end_time -
108 subtle::TimeTicksNowIgnoringOverride())
109 .InMillisecondsRoundedUp()))) {
110 kr = mach_msg(&msg.header, options, 0, rcv_size, receive_right_->Name(),
111 timeout, MACH_PORT_NULL);
112 }
113
114 if (kr == KERN_SUCCESS) {
115 return true;
116 } else if (rcv_size == 0 && kr == MACH_RCV_TOO_LARGE) {
117 return true;
118 } else {
119 MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg";
120 return false;
121 }
122 }
123
124 // static
WaitMany(WaitableEvent ** raw_waitables,size_t count)125 size_t WaitableEvent::WaitMany(WaitableEvent** raw_waitables, size_t count) {
126 DCHECK(count) << "Cannot wait on no events";
127 internal::ScopedBlockingCallWithBaseSyncPrimitives scoped_blocking_call(
128 FROM_HERE, BlockingType::MAY_BLOCK);
129 // On macOS 10.11+, using Mach port sets may cause system instability, per
130 // https://crbug.com/756102. On macOS 10.12+, a kqueue can be used
131 // instead to work around that.
132 enum WaitManyPrimitive {
133 KQUEUE,
134 PORT_SET,
135 };
136 #if BUILDFLAG(IS_IOS)
137 const WaitManyPrimitive kPrimitive = PORT_SET;
138 #else
139 const WaitManyPrimitive kPrimitive = KQUEUE;
140 #endif
141 if (kPrimitive == KQUEUE) {
142 std::vector<kevent64_s> events(count);
143 for (size_t i = 0; i < count; ++i) {
144 EV_SET64(&events[i], raw_waitables[i]->receive_right_->Name(),
145 EVFILT_MACHPORT, EV_ADD, 0, 0, i, 0, 0);
146 }
147
148 std::vector<kevent64_s> out_events(count);
149
150 ScopedFD wait_many(kqueue());
151 PCHECK(wait_many.is_valid()) << "kqueue";
152
153 const int count_int = checked_cast<int>(count);
154 int rv = HANDLE_EINTR(kevent64(wait_many.get(), events.data(), count_int,
155 out_events.data(), count_int, 0, nullptr));
156 PCHECK(rv > 0) << "kevent64";
157
158 size_t triggered = std::numeric_limits<size_t>::max();
159 for (size_t i = 0; i < static_cast<size_t>(rv); ++i) {
160 // WaitMany should return the lowest index in |raw_waitables| that was
161 // triggered.
162 size_t index = static_cast<size_t>(out_events[i].udata);
163 triggered = std::min(triggered, index);
164 }
165
166 if (raw_waitables[triggered]->policy_ == ResetPolicy::AUTOMATIC) {
167 // The message needs to be dequeued to reset the event.
168 PeekPort(raw_waitables[triggered]->receive_right_->Name(), true);
169 }
170
171 return triggered;
172 } else {
173 DCHECK_EQ(kPrimitive, PORT_SET);
174
175 kern_return_t kr;
176
177 mac::ScopedMachPortSet port_set;
178 {
179 mach_port_t name;
180 kr =
181 mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, &name);
182 MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate";
183 port_set.reset(name);
184 }
185
186 for (size_t i = 0; i < count; ++i) {
187 kr = mach_port_insert_member(mach_task_self(),
188 raw_waitables[i]->receive_right_->Name(),
189 port_set.get());
190 MACH_CHECK(kr == KERN_SUCCESS, kr) << "index " << i;
191 }
192
193 mach_msg_empty_rcv_t msg{};
194 // Wait on the port set. Only specify space enough for the header, to
195 // identify which port in the set is signaled. Otherwise, receiving from the
196 // port set may dequeue a message for a manual-reset event object, which
197 // would cause it to be reset.
198 kr = mach_msg(&msg.header,
199 MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY, 0,
200 sizeof(msg.header), port_set.get(), 0, MACH_PORT_NULL);
201 MACH_CHECK(kr == MACH_RCV_TOO_LARGE, kr) << "mach_msg";
202
203 for (size_t i = 0; i < count; ++i) {
204 WaitableEvent* event = raw_waitables[i];
205 if (msg.header.msgh_local_port == event->receive_right_->Name()) {
206 if (event->policy_ == ResetPolicy::AUTOMATIC) {
207 // The message needs to be dequeued to reset the event.
208 PeekPort(msg.header.msgh_local_port, true);
209 }
210 return i;
211 }
212 }
213
214 NOTREACHED();
215 return 0;
216 }
217 }
218
219 // static
PeekPort(mach_port_t port,bool dequeue)220 bool WaitableEvent::PeekPort(mach_port_t port, bool dequeue) {
221 if (dequeue) {
222 mach_msg_empty_rcv_t msg{};
223 msg.header.msgh_local_port = port;
224 kern_return_t kr = mach_msg(&msg.header, MACH_RCV_MSG | MACH_RCV_TIMEOUT, 0,
225 sizeof(msg), port, 0, MACH_PORT_NULL);
226 if (kr == KERN_SUCCESS) {
227 return true;
228 } else {
229 MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg";
230 return false;
231 }
232 } else {
233 mach_port_seqno_t seqno = 0;
234 mach_msg_size_t size;
235 mach_msg_id_t id;
236 mach_msg_trailer_t trailer;
237 mach_msg_type_number_t trailer_size = sizeof(trailer);
238 kern_return_t kr = mach_port_peek(
239 mach_task_self(), port, MACH_RCV_TRAILER_TYPE(MACH_RCV_TRAILER_NULL),
240 &seqno, &size, &id, reinterpret_cast<mach_msg_trailer_info_t>(&trailer),
241 &trailer_size);
242 if (kr == KERN_SUCCESS) {
243 return true;
244 } else {
245 MACH_CHECK(kr == KERN_FAILURE, kr) << "mach_port_peek";
246 return false;
247 }
248 }
249 }
250
ReceiveRight(mach_port_t name)251 WaitableEvent::ReceiveRight::ReceiveRight(mach_port_t name) : right_(name) {}
252
253 WaitableEvent::ReceiveRight::~ReceiveRight() = default;
254
255 } // namespace base
256