• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/common/handle_watcher.h"
6 
7 #include <map>
8 
9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h"
11 #include "base/lazy_instance.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/memory/singleton.h"
15 #include "base/memory/weak_ptr.h"
16 #include "base/message_loop/message_loop.h"
17 #include "base/message_loop/message_loop_proxy.h"
18 #include "base/synchronization/lock.h"
19 #include "base/synchronization/waitable_event.h"
20 #include "base/threading/thread.h"
21 #include "base/threading/thread_restrictions.h"
22 #include "base/time/time.h"
23 #include "mojo/common/message_pump_mojo.h"
24 #include "mojo/common/message_pump_mojo_handler.h"
25 #include "mojo/common/time_helper.h"
26 
27 namespace mojo {
28 namespace common {
29 
30 typedef int WatcherID;
31 
32 namespace {
33 
34 const char kWatcherThreadName[] = "handle-watcher-thread";
35 
MojoDeadlineToTimeTicks(MojoDeadline deadline)36 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
37   return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
38       internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
39 }
40 
41 // Tracks the data for a single call to Start().
42 struct WatchData {
WatchDatamojo::common::__anon16308aab0111::WatchData43   WatchData()
44       : id(0),
45         handle_signals(MOJO_HANDLE_SIGNAL_NONE),
46         message_loop(NULL) {}
47 
48   WatcherID id;
49   Handle handle;
50   MojoHandleSignals handle_signals;
51   base::TimeTicks deadline;
52   base::Callback<void(MojoResult)> callback;
53   scoped_refptr<base::MessageLoopProxy> message_loop;
54 };
55 
56 // WatcherBackend --------------------------------------------------------------
57 
58 // WatcherBackend is responsible for managing the requests and interacting with
59 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
60 // thread WatcherThreadManager creates.
61 class WatcherBackend : public MessagePumpMojoHandler {
62  public:
63   WatcherBackend();
64   virtual ~WatcherBackend();
65 
66   void StartWatching(const WatchData& data);
67 
68   // Cancels a previously scheduled request to start a watch.
69   void StopWatching(WatcherID watcher_id);
70 
71  private:
72   typedef std::map<Handle, WatchData> HandleToWatchDataMap;
73 
74   // Invoked when a handle needs to be removed and notified.
75   void RemoveAndNotify(const Handle& handle, MojoResult result);
76 
77   // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
78   // and sets |handle| to the Handle. Returns false if not a known id.
79   bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
80 
81   // MessagePumpMojoHandler overrides:
82   virtual void OnHandleReady(const Handle& handle) OVERRIDE;
83   virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE;
84 
85   // Maps from assigned id to WatchData.
86   HandleToWatchDataMap handle_to_data_;
87 
88   DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
89 };
90 
WatcherBackend()91 WatcherBackend::WatcherBackend() {
92 }
93 
~WatcherBackend()94 WatcherBackend::~WatcherBackend() {
95 }
96 
StartWatching(const WatchData & data)97 void WatcherBackend::StartWatching(const WatchData& data) {
98   RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
99 
100   DCHECK_EQ(0u, handle_to_data_.count(data.handle));
101 
102   handle_to_data_[data.handle] = data;
103   MessagePumpMojo::current()->AddHandler(this, data.handle,
104                                          data.handle_signals,
105                                          data.deadline);
106 }
107 
StopWatching(WatcherID watcher_id)108 void WatcherBackend::StopWatching(WatcherID watcher_id) {
109   // Because of the thread hop it is entirely possible to get here and not
110   // have a valid handle registered for |watcher_id|.
111   Handle handle;
112   if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
113     handle_to_data_.erase(handle);
114     MessagePumpMojo::current()->RemoveHandler(handle);
115   }
116 }
117 
RemoveAndNotify(const Handle & handle,MojoResult result)118 void WatcherBackend::RemoveAndNotify(const Handle& handle,
119                                      MojoResult result) {
120   if (handle_to_data_.count(handle) == 0)
121     return;
122 
123   const WatchData data(handle_to_data_[handle]);
124   handle_to_data_.erase(handle);
125   MessagePumpMojo::current()->RemoveHandler(handle);
126   data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
127 }
128 
GetMojoHandleByWatcherID(WatcherID watcher_id,Handle * handle) const129 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
130                                               Handle* handle) const {
131   for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
132        i != handle_to_data_.end(); ++i) {
133     if (i->second.id == watcher_id) {
134       *handle = i->second.handle;
135       return true;
136     }
137   }
138   return false;
139 }
140 
OnHandleReady(const Handle & handle)141 void WatcherBackend::OnHandleReady(const Handle& handle) {
142   RemoveAndNotify(handle, MOJO_RESULT_OK);
143 }
144 
OnHandleError(const Handle & handle,MojoResult result)145 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
146   RemoveAndNotify(handle, result);
147 }
148 
149 // WatcherThreadManager --------------------------------------------------------
150 
151 // WatcherThreadManager manages the background thread that listens for handles
152 // to be ready. All requests are handled by WatcherBackend.
153 }  // namespace
154 
155 class WatcherThreadManager {
156  public:
157   ~WatcherThreadManager();
158 
159   // Returns the shared instance.
160   static WatcherThreadManager* GetInstance();
161 
162   // Starts watching the requested handle. Returns a unique ID that is used to
163   // stop watching the handle. When the handle is ready |callback| is notified
164   // on the thread StartWatching() was invoked on.
165   // This may be invoked on any thread.
166   WatcherID StartWatching(const Handle& handle,
167                           MojoHandleSignals handle_signals,
168                           base::TimeTicks deadline,
169                           const base::Callback<void(MojoResult)>& callback);
170 
171   // Stops watching a handle.
172   // This may be invoked on any thread.
173   void StopWatching(WatcherID watcher_id);
174 
175  private:
176   enum RequestType {
177     REQUEST_START,
178     REQUEST_STOP,
179   };
180 
181   // See description of |requests_| for details.
182   struct RequestData {
RequestDatamojo::common::WatcherThreadManager::RequestData183     RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {}
184 
185     RequestType type;
186     WatchData start_data;
187     WatcherID stop_id;
188     base::WaitableEvent* stop_event;
189   };
190 
191   typedef std::vector<RequestData> Requests;
192 
193   friend struct DefaultSingletonTraits<WatcherThreadManager>;
194 
195   WatcherThreadManager();
196 
197   // Schedules a request on the background thread. See |requests_| for details.
198   void AddRequest(const RequestData& data);
199 
200   // Processes requests added to |requests_|. This is invoked on the backend
201   // thread.
202   void ProcessRequestsOnBackendThread();
203 
204   base::Thread thread_;
205 
206   base::AtomicSequenceNumber watcher_id_generator_;
207 
208   WatcherBackend backend_;
209 
210   // Protects |requests_|.
211   base::Lock lock_;
212 
213   // Start/Stop result in adding a RequestData to |requests_| (protected by
214   // |lock_|). When the background thread wakes up it processes the requests.
215   Requests requests_;
216 
217   DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
218 };
219 
~WatcherThreadManager()220 WatcherThreadManager::~WatcherThreadManager() {
221   thread_.Stop();
222 }
223 
GetInstance()224 WatcherThreadManager* WatcherThreadManager::GetInstance() {
225   return Singleton<WatcherThreadManager>::get();
226 }
227 
StartWatching(const Handle & handle,MojoHandleSignals handle_signals,base::TimeTicks deadline,const base::Callback<void (MojoResult)> & callback)228 WatcherID WatcherThreadManager::StartWatching(
229     const Handle& handle,
230     MojoHandleSignals handle_signals,
231     base::TimeTicks deadline,
232     const base::Callback<void(MojoResult)>& callback) {
233   RequestData request_data;
234   request_data.type = REQUEST_START;
235   request_data.start_data.id = watcher_id_generator_.GetNext();
236   request_data.start_data.handle = handle;
237   request_data.start_data.callback = callback;
238   request_data.start_data.handle_signals = handle_signals;
239   request_data.start_data.deadline = deadline;
240   request_data.start_data.message_loop = base::MessageLoopProxy::current();
241   DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL),
242             request_data.start_data.message_loop.get());
243   AddRequest(request_data);
244   return request_data.start_data.id;
245 }
246 
StopWatching(WatcherID watcher_id)247 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
248   // Handle the case of StartWatching() followed by StopWatching() before
249   // |thread_| woke up.
250   {
251     base::AutoLock auto_lock(lock_);
252     for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) {
253       if (i->type == REQUEST_START && i->start_data.id == watcher_id) {
254         // Watcher ids are not reused, so if we find it we can stop.
255         requests_.erase(i);
256         return;
257       }
258     }
259   }
260 
261   base::ThreadRestrictions::ScopedAllowWait allow_wait;
262   base::WaitableEvent event(true, false);
263   RequestData request_data;
264   request_data.type = REQUEST_STOP;
265   request_data.stop_id = watcher_id;
266   request_data.stop_event = &event;
267   AddRequest(request_data);
268 
269   // We need to block until the handle is actually removed.
270   event.Wait();
271 }
272 
AddRequest(const RequestData & data)273 void WatcherThreadManager::AddRequest(const RequestData& data) {
274   {
275     base::AutoLock auto_lock(lock_);
276     const bool was_empty = requests_.empty();
277     requests_.push_back(data);
278     if (!was_empty)
279       return;
280   }
281   // We own |thread_|, so it's safe to use Unretained() here.
282   thread_.message_loop()->PostTask(
283       FROM_HERE,
284       base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
285                  base::Unretained(this)));
286 }
287 
ProcessRequestsOnBackendThread()288 void WatcherThreadManager::ProcessRequestsOnBackendThread() {
289   DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current());
290 
291   Requests requests;
292   {
293     base::AutoLock auto_lock(lock_);
294     requests_.swap(requests);
295   }
296   for (size_t i = 0; i < requests.size(); ++i) {
297     if (requests[i].type == REQUEST_START) {
298       backend_.StartWatching(requests[i].start_data);
299     } else {
300       backend_.StopWatching(requests[i].stop_id);
301       requests[i].stop_event->Signal();
302     }
303   }
304 }
305 
WatcherThreadManager()306 WatcherThreadManager::WatcherThreadManager()
307     : thread_(kWatcherThreadName) {
308   base::Thread::Options thread_options;
309   thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
310   thread_.StartWithOptions(thread_options);
311 }
312 
313 // HandleWatcher::StateBase and subclasses -------------------------------------
314 
315 // The base class of HandleWatcher's state. Owns the user's callback and
316 // monitors the current thread's MessageLoop to know when to force the callback
317 // to run (with an error) even though the pipe hasn't been signaled yet.
318 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
319  public:
StateBase(HandleWatcher * watcher,const base::Callback<void (MojoResult)> & callback)320   StateBase(HandleWatcher* watcher,
321             const base::Callback<void(MojoResult)>& callback)
322       : watcher_(watcher),
323         callback_(callback),
324         got_ready_(false) {
325     base::MessageLoop::current()->AddDestructionObserver(this);
326   }
327 
~StateBase()328   virtual ~StateBase() {
329     base::MessageLoop::current()->RemoveDestructionObserver(this);
330   }
331 
332  protected:
NotifyHandleReady(MojoResult result)333   void NotifyHandleReady(MojoResult result) {
334     got_ready_ = true;
335     NotifyAndDestroy(result);
336   }
337 
got_ready() const338   bool got_ready() const { return got_ready_; }
339 
340  private:
WillDestroyCurrentMessageLoop()341   virtual void WillDestroyCurrentMessageLoop() OVERRIDE {
342     // The current thread is exiting. Simulate a watch error.
343     NotifyAndDestroy(MOJO_RESULT_ABORTED);
344   }
345 
NotifyAndDestroy(MojoResult result)346   void NotifyAndDestroy(MojoResult result) {
347     base::Callback<void(MojoResult)> callback = callback_;
348     watcher_->Stop();  // Destroys |this|.
349 
350     callback.Run(result);
351   }
352 
353   HandleWatcher* watcher_;
354   base::Callback<void(MojoResult)> callback_;
355 
356   // Have we been notified that the handle is ready?
357   bool got_ready_;
358 
359   DISALLOW_COPY_AND_ASSIGN(StateBase);
360 };
361 
362 // If the thread on which HandleWatcher is used runs MessagePumpMojo,
363 // SameThreadWatchingState is used to directly watch the handle on the same
364 // thread.
365 class HandleWatcher::SameThreadWatchingState : public StateBase,
366                                                public MessagePumpMojoHandler {
367  public:
SameThreadWatchingState(HandleWatcher * watcher,const Handle & handle,MojoHandleSignals handle_signals,MojoDeadline deadline,const base::Callback<void (MojoResult)> & callback)368   SameThreadWatchingState(HandleWatcher* watcher,
369                           const Handle& handle,
370                           MojoHandleSignals handle_signals,
371                           MojoDeadline deadline,
372                           const base::Callback<void(MojoResult)>& callback)
373       : StateBase(watcher, callback),
374         handle_(handle) {
375     DCHECK(MessagePumpMojo::IsCurrent());
376 
377     MessagePumpMojo::current()->AddHandler(
378         this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
379   }
380 
~SameThreadWatchingState()381   virtual ~SameThreadWatchingState() {
382     if (!got_ready())
383       MessagePumpMojo::current()->RemoveHandler(handle_);
384   }
385 
386  private:
387   // MessagePumpMojoHandler overrides:
OnHandleReady(const Handle & handle)388   virtual void OnHandleReady(const Handle& handle) OVERRIDE {
389     StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK);
390   }
391 
OnHandleError(const Handle & handle,MojoResult result)392   virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE {
393     StopWatchingAndNotifyReady(handle, result);
394   }
395 
StopWatchingAndNotifyReady(const Handle & handle,MojoResult result)396   void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) {
397     DCHECK_EQ(handle.value(), handle_.value());
398     MessagePumpMojo::current()->RemoveHandler(handle_);
399     NotifyHandleReady(result);
400   }
401 
402   Handle handle_;
403 
404   DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState);
405 };
406 
407 // If the thread on which HandleWatcher is used runs a message pump different
408 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
409 // handle on the handle watcher thread.
410 class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
411  public:
SecondaryThreadWatchingState(HandleWatcher * watcher,const Handle & handle,MojoHandleSignals handle_signals,MojoDeadline deadline,const base::Callback<void (MojoResult)> & callback)412   SecondaryThreadWatchingState(HandleWatcher* watcher,
413                                const Handle& handle,
414                                MojoHandleSignals handle_signals,
415                                MojoDeadline deadline,
416                                const base::Callback<void(MojoResult)>& callback)
417       : StateBase(watcher, callback),
418         weak_factory_(this) {
419     watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
420         handle,
421         handle_signals,
422         MojoDeadlineToTimeTicks(deadline),
423         base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady,
424                    weak_factory_.GetWeakPtr()));
425   }
426 
~SecondaryThreadWatchingState()427   virtual ~SecondaryThreadWatchingState() {
428     // If we've been notified the handle is ready (|got_ready()| is true) then
429     // the watch has been implicitly removed by
430     // WatcherThreadManager/MessagePumpMojo and we don't have to call
431     // StopWatching(). To do so would needlessly entail posting a task and
432     // blocking until the background thread services it.
433     if (!got_ready())
434       WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
435   }
436 
437  private:
438   WatcherID watcher_id_;
439 
440   // Used to weakly bind |this| to the WatcherThreadManager.
441   base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
442 
443   DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState);
444 };
445 
446 // HandleWatcher ---------------------------------------------------------------
447 
HandleWatcher()448 HandleWatcher::HandleWatcher() {
449 }
450 
~HandleWatcher()451 HandleWatcher::~HandleWatcher() {
452 }
453 
Start(const Handle & handle,MojoHandleSignals handle_signals,MojoDeadline deadline,const base::Callback<void (MojoResult)> & callback)454 void HandleWatcher::Start(const Handle& handle,
455                           MojoHandleSignals handle_signals,
456                           MojoDeadline deadline,
457                           const base::Callback<void(MojoResult)>& callback) {
458   DCHECK(handle.is_valid());
459   DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
460 
461   if (MessagePumpMojo::IsCurrent()) {
462     state_.reset(new SameThreadWatchingState(
463         this, handle, handle_signals, deadline, callback));
464   } else {
465     state_.reset(new SecondaryThreadWatchingState(
466         this, handle, handle_signals, deadline, callback));
467   }
468 }
469 
Stop()470 void HandleWatcher::Stop() {
471   state_.reset();
472 }
473 
474 }  // namespace common
475 }  // namespace mojo
476