• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/edk/system/wait_set_dispatcher.h"
6 
7 #include <stdint.h>
8 
9 #include <algorithm>
10 #include <utility>
11 
12 #include "base/logging.h"
13 #include "mojo/edk/system/awakable.h"
14 
15 namespace mojo {
16 namespace edk {
17 
18 class WaitSetDispatcher::Waiter final : public Awakable {
19  public:
Waiter(WaitSetDispatcher * dispatcher)20   explicit Waiter(WaitSetDispatcher* dispatcher) : dispatcher_(dispatcher) {}
~Waiter()21   ~Waiter() {}
22 
23   // |Awakable| implementation.
Awake(MojoResult result,uintptr_t context)24   bool Awake(MojoResult result, uintptr_t context) override {
25     // Note: This is called with various Mojo locks held.
26     dispatcher_->WakeDispatcher(result, context);
27     // Removes |this| from the dispatcher's list of waiters.
28     return false;
29   }
30 
31  private:
32   WaitSetDispatcher* const dispatcher_;
33 };
34 
WaitState()35 WaitSetDispatcher::WaitState::WaitState() {}
36 
37 WaitSetDispatcher::WaitState::WaitState(const WaitState& other) = default;
38 
~WaitState()39 WaitSetDispatcher::WaitState::~WaitState() {}
40 
WaitSetDispatcher()41 WaitSetDispatcher::WaitSetDispatcher()
42     : waiter_(new WaitSetDispatcher::Waiter(this)) {}
43 
GetType() const44 Dispatcher::Type WaitSetDispatcher::GetType() const {
45   return Type::WAIT_SET;
46 }
47 
Close()48 MojoResult WaitSetDispatcher::Close() {
49   base::AutoLock lock(lock_);
50 
51   if (is_closed_)
52     return MOJO_RESULT_INVALID_ARGUMENT;
53   is_closed_ = true;
54 
55   {
56     base::AutoLock locker(awakable_lock_);
57     awakable_list_.CancelAll();
58   }
59 
60   for (const auto& entry : waiting_dispatchers_)
61     entry.second.dispatcher->RemoveAwakable(waiter_.get(), nullptr);
62   waiting_dispatchers_.clear();
63 
64   base::AutoLock locker(awoken_lock_);
65   awoken_queue_.clear();
66   processed_dispatchers_.clear();
67 
68   return MOJO_RESULT_OK;
69 }
70 
AddWaitingDispatcher(const scoped_refptr<Dispatcher> & dispatcher,MojoHandleSignals signals,uintptr_t context)71 MojoResult WaitSetDispatcher::AddWaitingDispatcher(
72     const scoped_refptr<Dispatcher>& dispatcher,
73     MojoHandleSignals signals,
74     uintptr_t context) {
75   if (dispatcher == this)
76     return MOJO_RESULT_INVALID_ARGUMENT;
77 
78   base::AutoLock lock(lock_);
79 
80   if (is_closed_)
81     return MOJO_RESULT_INVALID_ARGUMENT;
82 
83   uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
84   auto it = waiting_dispatchers_.find(dispatcher_handle);
85   if (it != waiting_dispatchers_.end()) {
86     return MOJO_RESULT_ALREADY_EXISTS;
87   }
88 
89   const MojoResult result = dispatcher->AddAwakable(waiter_.get(), signals,
90                                                     dispatcher_handle, nullptr);
91   if (result == MOJO_RESULT_INVALID_ARGUMENT) {
92     // Dispatcher is closed.
93     return result;
94   } else if (result != MOJO_RESULT_OK) {
95     WakeDispatcher(result, dispatcher_handle);
96   }
97 
98   WaitState state;
99   state.dispatcher = dispatcher;
100   state.context = context;
101   state.signals = signals;
102   bool inserted = waiting_dispatchers_.insert(
103       std::make_pair(dispatcher_handle, state)).second;
104   DCHECK(inserted);
105 
106   return MOJO_RESULT_OK;
107 }
108 
RemoveWaitingDispatcher(const scoped_refptr<Dispatcher> & dispatcher)109 MojoResult WaitSetDispatcher::RemoveWaitingDispatcher(
110     const scoped_refptr<Dispatcher>& dispatcher) {
111   uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
112 
113   base::AutoLock lock(lock_);
114   if (is_closed_)
115     return MOJO_RESULT_INVALID_ARGUMENT;
116 
117   auto it = waiting_dispatchers_.find(dispatcher_handle);
118   if (it == waiting_dispatchers_.end())
119     return MOJO_RESULT_NOT_FOUND;
120 
121   dispatcher->RemoveAwakable(waiter_.get(), nullptr);
122   // At this point, it should not be possible for |waiter_| to be woken with
123   // |dispatcher|.
124   waiting_dispatchers_.erase(it);
125 
126   base::AutoLock locker(awoken_lock_);
127   int num_erased = 0;
128   for (auto it = awoken_queue_.begin(); it != awoken_queue_.end();) {
129     if (it->first == dispatcher_handle) {
130       it = awoken_queue_.erase(it);
131       num_erased++;
132     } else {
133       ++it;
134     }
135   }
136   // The dispatcher should only exist in the queue once.
137   DCHECK_LE(num_erased, 1);
138   processed_dispatchers_.erase(
139       std::remove(processed_dispatchers_.begin(), processed_dispatchers_.end(),
140                   dispatcher_handle),
141       processed_dispatchers_.end());
142 
143   return MOJO_RESULT_OK;
144 }
145 
GetReadyDispatchers(uint32_t * count,DispatcherVector * dispatchers,MojoResult * results,uintptr_t * contexts)146 MojoResult WaitSetDispatcher::GetReadyDispatchers(
147     uint32_t* count,
148     DispatcherVector* dispatchers,
149     MojoResult* results,
150     uintptr_t* contexts) {
151   base::AutoLock lock(lock_);
152 
153   if (is_closed_)
154     return MOJO_RESULT_INVALID_ARGUMENT;
155 
156   dispatchers->clear();
157 
158   // Re-queue any already retrieved dispatchers. These should be the dispatchers
159   // that were returned on the last call to this function. This loop is
160   // necessary to preserve the logically level-triggering behaviour of waiting
161   // in Mojo. In particular, if no action is taken on a signal, that signal
162   // continues to be satisfied, and therefore a |MojoWait()| on that
163   // handle/signal continues to return immediately.
164   std::deque<uintptr_t> pending;
165   {
166     base::AutoLock locker(awoken_lock_);
167     pending.swap(processed_dispatchers_);
168   }
169   for (uintptr_t d : pending) {
170     auto it = waiting_dispatchers_.find(d);
171     // Anything in |processed_dispatchers_| should also be in
172     // |waiting_dispatchers_| since dispatchers are removed from both in
173     // |RemoveWaitingDispatcherImplNoLock()|.
174     DCHECK(it != waiting_dispatchers_.end());
175 
176     // |awoken_mutex_| cannot be held here because
177     // |Dispatcher::AddAwakable()| acquires the Dispatcher's mutex. This
178     // mutex is held while running |WakeDispatcher()| below, which needs to
179     // acquire |awoken_mutex_|. Holding |awoken_mutex_| here would result in
180     // a deadlock.
181     const MojoResult result = it->second.dispatcher->AddAwakable(
182         waiter_.get(), it->second.signals, d, nullptr);
183 
184     if (result == MOJO_RESULT_INVALID_ARGUMENT) {
185       // Dispatcher is closed. Implicitly remove it from the wait set since
186       // it may be impossible to remove using |MojoRemoveHandle()|.
187       waiting_dispatchers_.erase(it);
188     } else if (result != MOJO_RESULT_OK) {
189       WakeDispatcher(result, d);
190     }
191   }
192 
193   const uint32_t max_woken = *count;
194   uint32_t num_woken = 0;
195 
196   base::AutoLock locker(awoken_lock_);
197   while (!awoken_queue_.empty() && num_woken < max_woken) {
198     uintptr_t d = awoken_queue_.front().first;
199     MojoResult result = awoken_queue_.front().second;
200     awoken_queue_.pop_front();
201 
202     auto it = waiting_dispatchers_.find(d);
203     DCHECK(it != waiting_dispatchers_.end());
204 
205     results[num_woken] = result;
206     dispatchers->push_back(it->second.dispatcher);
207     if (contexts)
208       contexts[num_woken] = it->second.context;
209 
210     if (result != MOJO_RESULT_CANCELLED) {
211       processed_dispatchers_.push_back(d);
212     } else {
213       // |MOJO_RESULT_CANCELLED| indicates that the dispatcher was closed.
214       // Return it, but also implcitly remove it from the wait set.
215       waiting_dispatchers_.erase(it);
216     }
217 
218     num_woken++;
219   }
220 
221   *count = num_woken;
222   if (!num_woken)
223     return MOJO_RESULT_SHOULD_WAIT;
224 
225   return MOJO_RESULT_OK;
226 }
227 
GetHandleSignalsState() const228 HandleSignalsState WaitSetDispatcher::GetHandleSignalsState() const {
229   base::AutoLock lock(lock_);
230   return GetHandleSignalsStateNoLock();
231 }
232 
GetHandleSignalsStateNoLock() const233 HandleSignalsState WaitSetDispatcher::GetHandleSignalsStateNoLock() const {
234   lock_.AssertAcquired();
235   if (is_closed_)
236     return HandleSignalsState();
237 
238   HandleSignalsState rv;
239   rv.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
240   base::AutoLock locker(awoken_lock_);
241   if (!awoken_queue_.empty() || !processed_dispatchers_.empty())
242     rv.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
243   return rv;
244 }
245 
AddAwakable(Awakable * awakable,MojoHandleSignals signals,uintptr_t context,HandleSignalsState * signals_state)246 MojoResult WaitSetDispatcher::AddAwakable(Awakable* awakable,
247                                           MojoHandleSignals signals,
248                                           uintptr_t context,
249                                           HandleSignalsState* signals_state) {
250   base::AutoLock lock(lock_);
251   // |awakable_lock_| is acquired here instead of immediately before adding to
252   // |awakable_list_| because we need to check the signals state and add to
253   // |awakable_list_| as an atomic operation. If the pair isn't atomic, it is
254   // possible for the signals state to change after it is checked, but before
255   // the awakable is added. In that case, the added awakable won't be signalled.
256   base::AutoLock awakable_locker(awakable_lock_);
257   HandleSignalsState state(GetHandleSignalsStateNoLock());
258   if (state.satisfies(signals)) {
259     if (signals_state)
260       *signals_state = state;
261     return MOJO_RESULT_ALREADY_EXISTS;
262   }
263   if (!state.can_satisfy(signals)) {
264     if (signals_state)
265       *signals_state = state;
266     return MOJO_RESULT_FAILED_PRECONDITION;
267   }
268 
269   awakable_list_.Add(awakable, signals, context);
270   return MOJO_RESULT_OK;
271 }
272 
RemoveAwakable(Awakable * awakable,HandleSignalsState * signals_state)273 void WaitSetDispatcher::RemoveAwakable(Awakable* awakable,
274                                        HandleSignalsState* signals_state) {
275   {
276     base::AutoLock locker(awakable_lock_);
277     awakable_list_.Remove(awakable);
278   }
279   if (signals_state)
280     *signals_state = GetHandleSignalsState();
281 }
282 
BeginTransit()283 bool WaitSetDispatcher::BeginTransit() {
284   // You can't transfer wait sets!
285   return false;
286 }
287 
~WaitSetDispatcher()288 WaitSetDispatcher::~WaitSetDispatcher() {
289   DCHECK(waiting_dispatchers_.empty());
290   DCHECK(awoken_queue_.empty());
291   DCHECK(processed_dispatchers_.empty());
292 }
293 
WakeDispatcher(MojoResult result,uintptr_t context)294 void WaitSetDispatcher::WakeDispatcher(MojoResult result, uintptr_t context) {
295   {
296     base::AutoLock locker(awoken_lock_);
297 
298     if (result == MOJO_RESULT_ALREADY_EXISTS)
299       result = MOJO_RESULT_OK;
300 
301     awoken_queue_.push_back(std::make_pair(context, result));
302   }
303 
304   base::AutoLock locker(awakable_lock_);
305   HandleSignalsState signals_state;
306   signals_state.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
307   signals_state.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
308   awakable_list_.AwakeForStateChange(signals_state);
309 }
310 
311 }  // namespace edk
312 }  // namespace mojo
313