1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/common/handle_watcher.h"
6
7 #include <map>
8
9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h"
11 #include "base/lazy_instance.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/memory/singleton.h"
15 #include "base/memory/weak_ptr.h"
16 #include "base/message_loop/message_loop.h"
17 #include "base/message_loop/message_loop_proxy.h"
18 #include "base/synchronization/lock.h"
19 #include "base/synchronization/waitable_event.h"
20 #include "base/threading/thread.h"
21 #include "base/threading/thread_restrictions.h"
22 #include "base/time/time.h"
23 #include "mojo/common/message_pump_mojo.h"
24 #include "mojo/common/message_pump_mojo_handler.h"
25 #include "mojo/common/time_helper.h"
26
27 namespace mojo {
28 namespace common {
29
30 typedef int WatcherID;
31
32 namespace {
33
34 const char kWatcherThreadName[] = "handle-watcher-thread";
35
MojoDeadlineToTimeTicks(MojoDeadline deadline)36 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
37 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
38 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
39 }
40
41 // Tracks the data for a single call to Start().
42 struct WatchData {
WatchDatamojo::common::__anon16308aab0111::WatchData43 WatchData()
44 : id(0),
45 handle_signals(MOJO_HANDLE_SIGNAL_NONE),
46 message_loop(NULL) {}
47
48 WatcherID id;
49 Handle handle;
50 MojoHandleSignals handle_signals;
51 base::TimeTicks deadline;
52 base::Callback<void(MojoResult)> callback;
53 scoped_refptr<base::MessageLoopProxy> message_loop;
54 };
55
56 // WatcherBackend --------------------------------------------------------------
57
58 // WatcherBackend is responsible for managing the requests and interacting with
59 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
60 // thread WatcherThreadManager creates.
61 class WatcherBackend : public MessagePumpMojoHandler {
62 public:
63 WatcherBackend();
64 virtual ~WatcherBackend();
65
66 void StartWatching(const WatchData& data);
67
68 // Cancels a previously scheduled request to start a watch.
69 void StopWatching(WatcherID watcher_id);
70
71 private:
72 typedef std::map<Handle, WatchData> HandleToWatchDataMap;
73
74 // Invoked when a handle needs to be removed and notified.
75 void RemoveAndNotify(const Handle& handle, MojoResult result);
76
77 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
78 // and sets |handle| to the Handle. Returns false if not a known id.
79 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
80
81 // MessagePumpMojoHandler overrides:
82 virtual void OnHandleReady(const Handle& handle) OVERRIDE;
83 virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE;
84
85 // Maps from assigned id to WatchData.
86 HandleToWatchDataMap handle_to_data_;
87
88 DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
89 };
90
WatcherBackend()91 WatcherBackend::WatcherBackend() {
92 }
93
~WatcherBackend()94 WatcherBackend::~WatcherBackend() {
95 }
96
StartWatching(const WatchData & data)97 void WatcherBackend::StartWatching(const WatchData& data) {
98 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
99
100 DCHECK_EQ(0u, handle_to_data_.count(data.handle));
101
102 handle_to_data_[data.handle] = data;
103 MessagePumpMojo::current()->AddHandler(this, data.handle,
104 data.handle_signals,
105 data.deadline);
106 }
107
StopWatching(WatcherID watcher_id)108 void WatcherBackend::StopWatching(WatcherID watcher_id) {
109 // Because of the thread hop it is entirely possible to get here and not
110 // have a valid handle registered for |watcher_id|.
111 Handle handle;
112 if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
113 handle_to_data_.erase(handle);
114 MessagePumpMojo::current()->RemoveHandler(handle);
115 }
116 }
117
RemoveAndNotify(const Handle & handle,MojoResult result)118 void WatcherBackend::RemoveAndNotify(const Handle& handle,
119 MojoResult result) {
120 if (handle_to_data_.count(handle) == 0)
121 return;
122
123 const WatchData data(handle_to_data_[handle]);
124 handle_to_data_.erase(handle);
125 MessagePumpMojo::current()->RemoveHandler(handle);
126 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
127 }
128
GetMojoHandleByWatcherID(WatcherID watcher_id,Handle * handle) const129 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
130 Handle* handle) const {
131 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
132 i != handle_to_data_.end(); ++i) {
133 if (i->second.id == watcher_id) {
134 *handle = i->second.handle;
135 return true;
136 }
137 }
138 return false;
139 }
140
OnHandleReady(const Handle & handle)141 void WatcherBackend::OnHandleReady(const Handle& handle) {
142 RemoveAndNotify(handle, MOJO_RESULT_OK);
143 }
144
OnHandleError(const Handle & handle,MojoResult result)145 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
146 RemoveAndNotify(handle, result);
147 }
148
149 // WatcherThreadManager --------------------------------------------------------
150
151 // WatcherThreadManager manages the background thread that listens for handles
152 // to be ready. All requests are handled by WatcherBackend.
153 } // namespace
154
155 class WatcherThreadManager {
156 public:
157 ~WatcherThreadManager();
158
159 // Returns the shared instance.
160 static WatcherThreadManager* GetInstance();
161
162 // Starts watching the requested handle. Returns a unique ID that is used to
163 // stop watching the handle. When the handle is ready |callback| is notified
164 // on the thread StartWatching() was invoked on.
165 // This may be invoked on any thread.
166 WatcherID StartWatching(const Handle& handle,
167 MojoHandleSignals handle_signals,
168 base::TimeTicks deadline,
169 const base::Callback<void(MojoResult)>& callback);
170
171 // Stops watching a handle.
172 // This may be invoked on any thread.
173 void StopWatching(WatcherID watcher_id);
174
175 private:
176 enum RequestType {
177 REQUEST_START,
178 REQUEST_STOP,
179 };
180
181 // See description of |requests_| for details.
182 struct RequestData {
RequestDatamojo::common::WatcherThreadManager::RequestData183 RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {}
184
185 RequestType type;
186 WatchData start_data;
187 WatcherID stop_id;
188 base::WaitableEvent* stop_event;
189 };
190
191 typedef std::vector<RequestData> Requests;
192
193 friend struct DefaultSingletonTraits<WatcherThreadManager>;
194
195 WatcherThreadManager();
196
197 // Schedules a request on the background thread. See |requests_| for details.
198 void AddRequest(const RequestData& data);
199
200 // Processes requests added to |requests_|. This is invoked on the backend
201 // thread.
202 void ProcessRequestsOnBackendThread();
203
204 base::Thread thread_;
205
206 base::AtomicSequenceNumber watcher_id_generator_;
207
208 WatcherBackend backend_;
209
210 // Protects |requests_|.
211 base::Lock lock_;
212
213 // Start/Stop result in adding a RequestData to |requests_| (protected by
214 // |lock_|). When the background thread wakes up it processes the requests.
215 Requests requests_;
216
217 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
218 };
219
~WatcherThreadManager()220 WatcherThreadManager::~WatcherThreadManager() {
221 thread_.Stop();
222 }
223
GetInstance()224 WatcherThreadManager* WatcherThreadManager::GetInstance() {
225 return Singleton<WatcherThreadManager>::get();
226 }
227
StartWatching(const Handle & handle,MojoHandleSignals handle_signals,base::TimeTicks deadline,const base::Callback<void (MojoResult)> & callback)228 WatcherID WatcherThreadManager::StartWatching(
229 const Handle& handle,
230 MojoHandleSignals handle_signals,
231 base::TimeTicks deadline,
232 const base::Callback<void(MojoResult)>& callback) {
233 RequestData request_data;
234 request_data.type = REQUEST_START;
235 request_data.start_data.id = watcher_id_generator_.GetNext();
236 request_data.start_data.handle = handle;
237 request_data.start_data.callback = callback;
238 request_data.start_data.handle_signals = handle_signals;
239 request_data.start_data.deadline = deadline;
240 request_data.start_data.message_loop = base::MessageLoopProxy::current();
241 DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL),
242 request_data.start_data.message_loop.get());
243 AddRequest(request_data);
244 return request_data.start_data.id;
245 }
246
StopWatching(WatcherID watcher_id)247 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
248 // Handle the case of StartWatching() followed by StopWatching() before
249 // |thread_| woke up.
250 {
251 base::AutoLock auto_lock(lock_);
252 for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) {
253 if (i->type == REQUEST_START && i->start_data.id == watcher_id) {
254 // Watcher ids are not reused, so if we find it we can stop.
255 requests_.erase(i);
256 return;
257 }
258 }
259 }
260
261 base::ThreadRestrictions::ScopedAllowWait allow_wait;
262 base::WaitableEvent event(true, false);
263 RequestData request_data;
264 request_data.type = REQUEST_STOP;
265 request_data.stop_id = watcher_id;
266 request_data.stop_event = &event;
267 AddRequest(request_data);
268
269 // We need to block until the handle is actually removed.
270 event.Wait();
271 }
272
AddRequest(const RequestData & data)273 void WatcherThreadManager::AddRequest(const RequestData& data) {
274 {
275 base::AutoLock auto_lock(lock_);
276 const bool was_empty = requests_.empty();
277 requests_.push_back(data);
278 if (!was_empty)
279 return;
280 }
281 // We own |thread_|, so it's safe to use Unretained() here.
282 thread_.message_loop()->PostTask(
283 FROM_HERE,
284 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
285 base::Unretained(this)));
286 }
287
ProcessRequestsOnBackendThread()288 void WatcherThreadManager::ProcessRequestsOnBackendThread() {
289 DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current());
290
291 Requests requests;
292 {
293 base::AutoLock auto_lock(lock_);
294 requests_.swap(requests);
295 }
296 for (size_t i = 0; i < requests.size(); ++i) {
297 if (requests[i].type == REQUEST_START) {
298 backend_.StartWatching(requests[i].start_data);
299 } else {
300 backend_.StopWatching(requests[i].stop_id);
301 requests[i].stop_event->Signal();
302 }
303 }
304 }
305
WatcherThreadManager()306 WatcherThreadManager::WatcherThreadManager()
307 : thread_(kWatcherThreadName) {
308 base::Thread::Options thread_options;
309 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
310 thread_.StartWithOptions(thread_options);
311 }
312
313 // HandleWatcher::StateBase and subclasses -------------------------------------
314
315 // The base class of HandleWatcher's state. Owns the user's callback and
316 // monitors the current thread's MessageLoop to know when to force the callback
317 // to run (with an error) even though the pipe hasn't been signaled yet.
318 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
319 public:
StateBase(HandleWatcher * watcher,const base::Callback<void (MojoResult)> & callback)320 StateBase(HandleWatcher* watcher,
321 const base::Callback<void(MojoResult)>& callback)
322 : watcher_(watcher),
323 callback_(callback),
324 got_ready_(false) {
325 base::MessageLoop::current()->AddDestructionObserver(this);
326 }
327
~StateBase()328 virtual ~StateBase() {
329 base::MessageLoop::current()->RemoveDestructionObserver(this);
330 }
331
332 protected:
NotifyHandleReady(MojoResult result)333 void NotifyHandleReady(MojoResult result) {
334 got_ready_ = true;
335 NotifyAndDestroy(result);
336 }
337
got_ready() const338 bool got_ready() const { return got_ready_; }
339
340 private:
WillDestroyCurrentMessageLoop()341 virtual void WillDestroyCurrentMessageLoop() OVERRIDE {
342 // The current thread is exiting. Simulate a watch error.
343 NotifyAndDestroy(MOJO_RESULT_ABORTED);
344 }
345
NotifyAndDestroy(MojoResult result)346 void NotifyAndDestroy(MojoResult result) {
347 base::Callback<void(MojoResult)> callback = callback_;
348 watcher_->Stop(); // Destroys |this|.
349
350 callback.Run(result);
351 }
352
353 HandleWatcher* watcher_;
354 base::Callback<void(MojoResult)> callback_;
355
356 // Have we been notified that the handle is ready?
357 bool got_ready_;
358
359 DISALLOW_COPY_AND_ASSIGN(StateBase);
360 };
361
362 // If the thread on which HandleWatcher is used runs MessagePumpMojo,
363 // SameThreadWatchingState is used to directly watch the handle on the same
364 // thread.
365 class HandleWatcher::SameThreadWatchingState : public StateBase,
366 public MessagePumpMojoHandler {
367 public:
SameThreadWatchingState(HandleWatcher * watcher,const Handle & handle,MojoHandleSignals handle_signals,MojoDeadline deadline,const base::Callback<void (MojoResult)> & callback)368 SameThreadWatchingState(HandleWatcher* watcher,
369 const Handle& handle,
370 MojoHandleSignals handle_signals,
371 MojoDeadline deadline,
372 const base::Callback<void(MojoResult)>& callback)
373 : StateBase(watcher, callback),
374 handle_(handle) {
375 DCHECK(MessagePumpMojo::IsCurrent());
376
377 MessagePumpMojo::current()->AddHandler(
378 this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
379 }
380
~SameThreadWatchingState()381 virtual ~SameThreadWatchingState() {
382 if (!got_ready())
383 MessagePumpMojo::current()->RemoveHandler(handle_);
384 }
385
386 private:
387 // MessagePumpMojoHandler overrides:
OnHandleReady(const Handle & handle)388 virtual void OnHandleReady(const Handle& handle) OVERRIDE {
389 StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK);
390 }
391
OnHandleError(const Handle & handle,MojoResult result)392 virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE {
393 StopWatchingAndNotifyReady(handle, result);
394 }
395
StopWatchingAndNotifyReady(const Handle & handle,MojoResult result)396 void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) {
397 DCHECK_EQ(handle.value(), handle_.value());
398 MessagePumpMojo::current()->RemoveHandler(handle_);
399 NotifyHandleReady(result);
400 }
401
402 Handle handle_;
403
404 DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState);
405 };
406
407 // If the thread on which HandleWatcher is used runs a message pump different
408 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
409 // handle on the handle watcher thread.
410 class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
411 public:
SecondaryThreadWatchingState(HandleWatcher * watcher,const Handle & handle,MojoHandleSignals handle_signals,MojoDeadline deadline,const base::Callback<void (MojoResult)> & callback)412 SecondaryThreadWatchingState(HandleWatcher* watcher,
413 const Handle& handle,
414 MojoHandleSignals handle_signals,
415 MojoDeadline deadline,
416 const base::Callback<void(MojoResult)>& callback)
417 : StateBase(watcher, callback),
418 weak_factory_(this) {
419 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
420 handle,
421 handle_signals,
422 MojoDeadlineToTimeTicks(deadline),
423 base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady,
424 weak_factory_.GetWeakPtr()));
425 }
426
~SecondaryThreadWatchingState()427 virtual ~SecondaryThreadWatchingState() {
428 // If we've been notified the handle is ready (|got_ready()| is true) then
429 // the watch has been implicitly removed by
430 // WatcherThreadManager/MessagePumpMojo and we don't have to call
431 // StopWatching(). To do so would needlessly entail posting a task and
432 // blocking until the background thread services it.
433 if (!got_ready())
434 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
435 }
436
437 private:
438 WatcherID watcher_id_;
439
440 // Used to weakly bind |this| to the WatcherThreadManager.
441 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
442
443 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState);
444 };
445
446 // HandleWatcher ---------------------------------------------------------------
447
HandleWatcher()448 HandleWatcher::HandleWatcher() {
449 }
450
~HandleWatcher()451 HandleWatcher::~HandleWatcher() {
452 }
453
Start(const Handle & handle,MojoHandleSignals handle_signals,MojoDeadline deadline,const base::Callback<void (MojoResult)> & callback)454 void HandleWatcher::Start(const Handle& handle,
455 MojoHandleSignals handle_signals,
456 MojoDeadline deadline,
457 const base::Callback<void(MojoResult)>& callback) {
458 DCHECK(handle.is_valid());
459 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
460
461 if (MessagePumpMojo::IsCurrent()) {
462 state_.reset(new SameThreadWatchingState(
463 this, handle, handle_signals, deadline, callback));
464 } else {
465 state_.reset(new SecondaryThreadWatchingState(
466 this, handle, handle_signals, deadline, callback));
467 }
468 }
469
Stop()470 void HandleWatcher::Stop() {
471 state_.reset();
472 }
473
474 } // namespace common
475 } // namespace mojo
476