1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/system/wait_set.h"
6
7 #include <algorithm>
8 #include <limits>
9 #include <map>
10 #include <set>
11 #include <vector>
12
13 #include "base/containers/stack_container.h"
14 #include "base/logging.h"
15 #include "base/macros.h"
16 #include "base/memory/ptr_util.h"
17 #include "base/synchronization/lock.h"
18 #include "base/synchronization/waitable_event.h"
19 #include "mojo/public/cpp/system/trap.h"
20
21 namespace mojo {
22
23 class WaitSet::State : public base::RefCountedThreadSafe<State> {
24 public:
State()25 State()
26 : handle_event_(base::WaitableEvent::ResetPolicy::MANUAL,
27 base::WaitableEvent::InitialState::NOT_SIGNALED) {
28 MojoResult rv = CreateTrap(&Context::OnNotification, &trap_handle_);
29 DCHECK_EQ(MOJO_RESULT_OK, rv);
30 }
31
ShutDown()32 void ShutDown() {
33 // NOTE: This may immediately invoke Notify for every context.
34 trap_handle_.reset();
35
36 cancelled_contexts_.clear();
37 }
38
AddEvent(base::WaitableEvent * event)39 MojoResult AddEvent(base::WaitableEvent* event) {
40 auto result = user_events_.insert(event);
41 if (result.second)
42 return MOJO_RESULT_OK;
43 return MOJO_RESULT_ALREADY_EXISTS;
44 }
45
RemoveEvent(base::WaitableEvent * event)46 MojoResult RemoveEvent(base::WaitableEvent* event) {
47 auto it = user_events_.find(event);
48 if (it == user_events_.end())
49 return MOJO_RESULT_NOT_FOUND;
50 user_events_.erase(it);
51 return MOJO_RESULT_OK;
52 }
53
AddHandle(Handle handle,MojoHandleSignals signals)54 MojoResult AddHandle(Handle handle, MojoHandleSignals signals) {
55 DCHECK(trap_handle_.is_valid());
56
57 scoped_refptr<Context> context = new Context(this, handle);
58
59 {
60 base::AutoLock lock(lock_);
61
62 if (handle_to_context_.count(handle))
63 return MOJO_RESULT_ALREADY_EXISTS;
64 DCHECK(!contexts_.count(context->context_value()));
65
66 handle_to_context_[handle] = context;
67 contexts_[context->context_value()] = context;
68 }
69
70 // Balanced in State::Notify() with MOJO_RESULT_CANCELLED if
71 // MojoAddTrigger() succeeds. Otherwise balanced immediately below.
72 context->AddRef();
73
74 // This can notify immediately if the watcher is already armed. Don't hold
75 // |lock_| while calling it.
76 MojoResult rv =
77 MojoAddTrigger(trap_handle_.get().value(), handle.value(), signals,
78 MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
79 context->context_value(), nullptr);
80 if (rv == MOJO_RESULT_INVALID_ARGUMENT) {
81 base::AutoLock lock(lock_);
82 handle_to_context_.erase(handle);
83 contexts_.erase(context->context_value());
84
85 // Balanced above.
86 context->Release();
87 return rv;
88 }
89 DCHECK_EQ(MOJO_RESULT_OK, rv);
90
91 return rv;
92 }
93
RemoveHandle(Handle handle)94 MojoResult RemoveHandle(Handle handle) {
95 DCHECK(trap_handle_.is_valid());
96
97 scoped_refptr<Context> context;
98 {
99 base::AutoLock lock(lock_);
100
101 // Always clear |cancelled_contexts_| in case it's accumulated any more
102 // entries since the last time we ran.
103 cancelled_contexts_.clear();
104
105 auto it = handle_to_context_.find(handle);
106 if (it == handle_to_context_.end())
107 return MOJO_RESULT_NOT_FOUND;
108
109 context = std::move(it->second);
110 handle_to_context_.erase(it);
111
112 // Ensure that we never return this handle as a ready result again. Note
113 // that it's removal from |handle_to_context_| above ensures it will never
114 // be added back to this map.
115 ready_handles_.erase(handle);
116 }
117
118 // NOTE: This may enter the notification callback immediately, so don't hold
119 // |lock_| while calling it.
120 MojoResult rv = MojoRemoveTrigger(trap_handle_.get().value(),
121 context->context_value(), nullptr);
122
123 // We don't really care whether or not this succeeds. In either case, the
124 // context was or will imminently be cancelled and moved from |contexts_|
125 // to |cancelled_contexts_|.
126 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_NOT_FOUND);
127
128 return rv;
129 }
130
Wait(base::WaitableEvent ** ready_event,size_t * num_ready_handles,Handle * ready_handles,MojoResult * ready_results,MojoHandleSignalsState * signals_states)131 void Wait(base::WaitableEvent** ready_event,
132 size_t* num_ready_handles,
133 Handle* ready_handles,
134 MojoResult* ready_results,
135 MojoHandleSignalsState* signals_states) {
136 DCHECK(trap_handle_.is_valid());
137 DCHECK(num_ready_handles);
138 DCHECK(ready_handles);
139 DCHECK(ready_results);
140 {
141 base::AutoLock lock(lock_);
142 if (ready_handles_.empty()) {
143 // No handles are currently in the ready set. Make sure the event is
144 // reset and try to arm the watcher.
145 handle_event_.Reset();
146
147 DCHECK_LE(*num_ready_handles, std::numeric_limits<uint32_t>::max());
148 uint32_t num_blocking_events =
149 static_cast<uint32_t>(*num_ready_handles);
150
151 base::StackVector<MojoTrapEvent, 4> blocking_events;
152 blocking_events.container().resize(num_blocking_events);
153 for (size_t i = 0; i < num_blocking_events; ++i) {
154 blocking_events.container()[i].struct_size =
155 sizeof(blocking_events.container()[i]);
156 }
157 MojoResult rv = MojoArmTrap(trap_handle_.get().value(), nullptr,
158 &num_blocking_events,
159 blocking_events.container().data());
160
161 if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
162 // Simulate the handles becoming ready. We do this in lieu of
163 // returning the results immediately so as to avoid potentially
164 // starving user events. i.e., we always want to call WaitMany()
165 // below.
166 handle_event_.Signal();
167 for (size_t i = 0; i < num_blocking_events; ++i) {
168 const auto& event = blocking_events.container()[i];
169 auto it = contexts_.find(event.trigger_context);
170 DCHECK(it != contexts_.end());
171 ready_handles_[it->second->handle()] = {event.result,
172 event.signals_state};
173 }
174 } else if (rv == MOJO_RESULT_NOT_FOUND) {
175 // Nothing to watch. If there are no user events, always signal to
176 // avoid deadlock.
177 if (user_events_.empty())
178 handle_event_.Signal();
179 } else {
180 // Watcher must be armed now. No need to manually signal.
181 DCHECK_EQ(MOJO_RESULT_OK, rv);
182 }
183 }
184 }
185
186 // Build a local contiguous array of events to wait on. These are rotated
187 // across Wait() calls to avoid starvation, by virtue of the fact that
188 // WaitMany guarantees left-to-right priority when multiple events are
189 // signaled.
190
191 base::StackVector<base::WaitableEvent*, 4> events;
192 events.container().resize(user_events_.size() + 1);
193 if (waitable_index_shift_ > user_events_.size())
194 waitable_index_shift_ = 0;
195
196 size_t dest_index = waitable_index_shift_++;
197 events.container()[dest_index] = &handle_event_;
198 for (auto* e : user_events_) {
199 dest_index = (dest_index + 1) % events.container().size();
200 events.container()[dest_index] = e;
201 }
202
203 size_t index = base::WaitableEvent::WaitMany(events.container().data(),
204 events.container().size());
205 base::AutoLock lock(lock_);
206
207 // Pop as many handles as we can out of the ready set and return them. Note
208 // that we do this regardless of which event signaled, as there may be
209 // ready handles in any case and they may be interesting to the caller.
210 *num_ready_handles = std::min(*num_ready_handles, ready_handles_.size());
211 for (size_t i = 0; i < *num_ready_handles; ++i) {
212 auto it = ready_handles_.begin();
213 ready_handles[i] = it->first;
214 ready_results[i] = it->second.result;
215 if (signals_states)
216 signals_states[i] = it->second.signals_state;
217 ready_handles_.erase(it);
218 }
219
220 // If the caller cares, let them know which user event unblocked us, if any.
221 if (ready_event) {
222 if (events.container()[index] == &handle_event_)
223 *ready_event = nullptr;
224 else
225 *ready_event = events.container()[index];
226 }
227 }
228
229 private:
230 friend class base::RefCountedThreadSafe<State>;
231
232 class Context : public base::RefCountedThreadSafe<Context> {
233 public:
Context(scoped_refptr<State> state,Handle handle)234 Context(scoped_refptr<State> state, Handle handle)
235 : state_(state), handle_(handle) {}
236
handle() const237 Handle handle() const { return handle_; }
238
context_value() const239 uintptr_t context_value() const {
240 return reinterpret_cast<uintptr_t>(this);
241 }
242
OnNotification(const MojoTrapEvent * event)243 static void OnNotification(const MojoTrapEvent* event) {
244 reinterpret_cast<Context*>(event->trigger_context)
245 ->Notify(event->result, event->signals_state);
246 }
247
248 private:
249 friend class base::RefCountedThreadSafe<Context>;
250
~Context()251 ~Context() {}
252
Notify(MojoResult result,MojoHandleSignalsState signals_state)253 void Notify(MojoResult result, MojoHandleSignalsState signals_state) {
254 state_->Notify(handle_, result, signals_state, this);
255 }
256
257 const scoped_refptr<State> state_;
258 const Handle handle_;
259
260 DISALLOW_COPY_AND_ASSIGN(Context);
261 };
262
~State()263 ~State() {}
264
Notify(Handle handle,MojoResult result,MojoHandleSignalsState signals_state,Context * context)265 void Notify(Handle handle,
266 MojoResult result,
267 MojoHandleSignalsState signals_state,
268 Context* context) {
269 base::AutoLock lock(lock_);
270
271 // This notification may have raced with RemoveHandle() from another
272 // sequence. We only signal the WaitSet if that's not the case.
273 if (handle_to_context_.count(handle)) {
274 ready_handles_[handle] = {result, signals_state};
275 handle_event_.Signal();
276 }
277
278 // Whether it's an implicit or explicit cancellation, erase from |contexts_|
279 // and append to |cancelled_contexts_|.
280 if (result == MOJO_RESULT_CANCELLED) {
281 contexts_.erase(context->context_value());
282 handle_to_context_.erase(handle);
283
284 // NOTE: We retain a context ref in |cancelled_contexts_| to ensure that
285 // this Context's heap address is not reused too soon. For example, it
286 // would otherwise be possible for the user to call AddHandle() from the
287 // WaitSet's sequence immediately after this notification has fired on
288 // another sequence, potentially reusing the same heap address for the
289 // newly added Context; and then they may call RemoveHandle() for this
290 // handle (not knowing its context has just been implicitly cancelled) and
291 // cause the new Context to be incorrectly removed from |contexts_|.
292 //
293 // This vector is cleared on the WaitSet's own sequence every time
294 // RemoveHandle is called.
295 cancelled_contexts_.emplace_back(base::WrapRefCounted(context));
296
297 // Balanced in State::AddHandle().
298 context->Release();
299 }
300 }
301
302 struct ReadyState {
303 ReadyState() = default;
ReadyStatemojo::WaitSet::State::ReadyState304 ReadyState(MojoResult result, MojoHandleSignalsState signals_state)
305 : result(result), signals_state(signals_state) {}
306 ~ReadyState() = default;
307
308 MojoResult result = MOJO_RESULT_UNKNOWN;
309 MojoHandleSignalsState signals_state = {0, 0};
310 };
311
312 // Not guarded by lock. Must only be accessed from the WaitSet's owning
313 // sequence.
314 ScopedTrapHandle trap_handle_;
315
316 base::Lock lock_;
317 std::map<uintptr_t, scoped_refptr<Context>> contexts_;
318 std::map<Handle, scoped_refptr<Context>> handle_to_context_;
319 std::map<Handle, ReadyState> ready_handles_;
320 std::vector<scoped_refptr<Context>> cancelled_contexts_;
321 std::set<base::WaitableEvent*> user_events_;
322
323 // Event signaled any time a handle notification is received.
324 base::WaitableEvent handle_event_;
325
326 // Offset by which to rotate the current set of waitable objects. This is used
327 // to guard against event starvation, as base::WaitableEvent::WaitMany gives
328 // preference to events in left-to-right order.
329 size_t waitable_index_shift_ = 0;
330
331 DISALLOW_COPY_AND_ASSIGN(State);
332 };
333
WaitSet()334 WaitSet::WaitSet() : state_(new State) {}
335
~WaitSet()336 WaitSet::~WaitSet() {
337 state_->ShutDown();
338 }
339
AddEvent(base::WaitableEvent * event)340 MojoResult WaitSet::AddEvent(base::WaitableEvent* event) {
341 return state_->AddEvent(event);
342 }
343
RemoveEvent(base::WaitableEvent * event)344 MojoResult WaitSet::RemoveEvent(base::WaitableEvent* event) {
345 return state_->RemoveEvent(event);
346 }
347
AddHandle(Handle handle,MojoHandleSignals signals)348 MojoResult WaitSet::AddHandle(Handle handle, MojoHandleSignals signals) {
349 return state_->AddHandle(handle, signals);
350 }
351
RemoveHandle(Handle handle)352 MojoResult WaitSet::RemoveHandle(Handle handle) {
353 return state_->RemoveHandle(handle);
354 }
355
Wait(base::WaitableEvent ** ready_event,size_t * num_ready_handles,Handle * ready_handles,MojoResult * ready_results,MojoHandleSignalsState * signals_states)356 void WaitSet::Wait(base::WaitableEvent** ready_event,
357 size_t* num_ready_handles,
358 Handle* ready_handles,
359 MojoResult* ready_results,
360 MojoHandleSignalsState* signals_states) {
361 state_->Wait(ready_event, num_ready_handles, ready_handles, ready_results,
362 signals_states);
363 }
364
365 } // namespace mojo
366