• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/message_pump/handle_watcher.h"
6 
7 #include <stddef.h>
8 #include <stdint.h>
9 
10 #include <map>
11 
12 #include "base/atomic_sequence_num.h"
13 #include "base/bind.h"
14 #include "base/lazy_instance.h"
15 #include "base/logging.h"
16 #include "base/macros.h"
17 #include "base/memory/singleton.h"
18 #include "base/memory/weak_ptr.h"
19 #include "base/message_loop/message_loop.h"
20 #include "base/single_thread_task_runner.h"
21 #include "base/synchronization/lock.h"
22 #include "base/threading/thread.h"
23 #include "base/threading/thread_task_runner_handle.h"
24 #include "base/time/time.h"
25 #include "mojo/message_pump/message_pump_mojo.h"
26 #include "mojo/message_pump/message_pump_mojo_handler.h"
27 #include "mojo/message_pump/time_helper.h"
28 #include "mojo/public/c/system/message_pipe.h"
29 
30 namespace mojo {
31 namespace common {
32 
33 typedef int WatcherID;
34 
35 namespace {
36 
37 const char kWatcherThreadName[] = "handle-watcher-thread";
38 
MojoDeadlineToTimeTicks(MojoDeadline deadline)39 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
40   return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
41       internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
42 }
43 
44 // Tracks the data for a single call to Start().
45 struct WatchData {
WatchDatamojo::common::__anonbd2b9ff10111::WatchData46   WatchData()
47       : id(0), handle_signals(MOJO_HANDLE_SIGNAL_NONE), task_runner(NULL) {}
48 
49   WatcherID id;
50   Handle handle;
51   MojoHandleSignals handle_signals;
52   base::TimeTicks deadline;
53   base::Callback<void(MojoResult)> callback;
54   scoped_refptr<base::SingleThreadTaskRunner> task_runner;
55 };
56 
57 // WatcherBackend --------------------------------------------------------------
58 
59 // WatcherBackend is responsible for managing the requests and interacting with
60 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
61 // thread WatcherThreadManager creates.
62 class WatcherBackend : public MessagePumpMojoHandler {
63  public:
64   WatcherBackend();
65   ~WatcherBackend() override;
66 
67   void StartWatching(const WatchData& data);
68   void StopWatching(WatcherID watcher_id);
69 
70  private:
71   typedef std::map<Handle, WatchData> HandleToWatchDataMap;
72 
73   // Invoked when a handle needs to be removed and notified.
74   void RemoveAndNotify(const Handle& handle, MojoResult result);
75 
76   // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
77   // and sets |handle| to the Handle. Returns false if not a known id.
78   bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
79 
80   // MessagePumpMojoHandler overrides:
81   void OnHandleReady(const Handle& handle) override;
82   void OnHandleError(const Handle& handle, MojoResult result) override;
83 
84   // Maps from assigned id to WatchData.
85   HandleToWatchDataMap handle_to_data_;
86 
87   DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
88 };
89 
WatcherBackend()90 WatcherBackend::WatcherBackend() {
91 }
92 
~WatcherBackend()93 WatcherBackend::~WatcherBackend() {
94 }
95 
StartWatching(const WatchData & data)96 void WatcherBackend::StartWatching(const WatchData& data) {
97   RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
98 
99   DCHECK_EQ(0u, handle_to_data_.count(data.handle));
100 
101   handle_to_data_[data.handle] = data;
102   MessagePumpMojo::current()->AddHandler(this, data.handle,
103                                          data.handle_signals,
104                                          data.deadline);
105 }
106 
StopWatching(WatcherID watcher_id)107 void WatcherBackend::StopWatching(WatcherID watcher_id) {
108   // Because of the thread hop it is entirely possible to get here and not
109   // have a valid handle registered for |watcher_id|.
110   Handle handle;
111   if (!GetMojoHandleByWatcherID(watcher_id, &handle))
112     return;
113 
114   handle_to_data_.erase(handle);
115   MessagePumpMojo::current()->RemoveHandler(handle);
116 }
117 
RemoveAndNotify(const Handle & handle,MojoResult result)118 void WatcherBackend::RemoveAndNotify(const Handle& handle,
119                                      MojoResult result) {
120   if (handle_to_data_.count(handle) == 0)
121     return;
122 
123   const WatchData data(handle_to_data_[handle]);
124   handle_to_data_.erase(handle);
125   MessagePumpMojo::current()->RemoveHandler(handle);
126 
127   data.task_runner->PostTask(FROM_HERE, base::Bind(data.callback, result));
128 }
129 
GetMojoHandleByWatcherID(WatcherID watcher_id,Handle * handle) const130 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
131                                               Handle* handle) const {
132   for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
133        i != handle_to_data_.end(); ++i) {
134     if (i->second.id == watcher_id) {
135       *handle = i->second.handle;
136       return true;
137     }
138   }
139   return false;
140 }
141 
OnHandleReady(const Handle & handle)142 void WatcherBackend::OnHandleReady(const Handle& handle) {
143   RemoveAndNotify(handle, MOJO_RESULT_OK);
144 }
145 
OnHandleError(const Handle & handle,MojoResult result)146 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
147   RemoveAndNotify(handle, result);
148 }
149 
150 // WatcherThreadManager --------------------------------------------------------
151 
152 // WatcherThreadManager manages the background thread that listens for handles
153 // to be ready. All requests are handled by WatcherBackend.
154 class WatcherThreadManager {
155  public:
156   ~WatcherThreadManager();
157 
158   // Returns the shared instance.
159   static WatcherThreadManager* GetInstance();
160 
161   // Starts watching the requested handle. Returns a unique ID that is used to
162   // stop watching the handle. When the handle is ready |callback| is notified
163   // on the thread StartWatching() was invoked on.
164   // This may be invoked on any thread.
165   WatcherID StartWatching(const Handle& handle,
166                           MojoHandleSignals handle_signals,
167                           base::TimeTicks deadline,
168                           const base::Callback<void(MojoResult)>& callback);
169 
170   // Stops watching a handle.
171   // This may be invoked on any thread.
172   void StopWatching(WatcherID watcher_id);
173 
174  private:
175   enum RequestType {
176     REQUEST_START,
177     REQUEST_STOP,
178   };
179 
180   // See description of |requests_| for details.
181   struct RequestData {
RequestDatamojo::common::__anonbd2b9ff10111::WatcherThreadManager::RequestData182     RequestData() : type(REQUEST_START), stop_id(0) {}
183 
184     RequestType type;
185     WatchData start_data;
186     WatcherID stop_id;
187   };
188 
189   typedef std::vector<RequestData> Requests;
190 
191   friend struct base::DefaultSingletonTraits<WatcherThreadManager>;
192 
193   WatcherThreadManager();
194 
195   // Schedules a request on the background thread. See |requests_| for details.
196   void AddRequest(const RequestData& data);
197 
198   // Processes requests added to |requests_|. This is invoked on the backend
199   // thread.
200   void ProcessRequestsOnBackendThread();
201 
202   base::Thread thread_;
203 
204   base::AtomicSequenceNumber watcher_id_generator_;
205 
206   WatcherBackend backend_;
207 
208   // Protects |requests_|.
209   base::Lock lock_;
210 
211   // Start/Stop result in adding a RequestData to |requests_| (protected by
212   // |lock_|). When the background thread wakes up it processes the requests.
213   Requests requests_;
214 
215   DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
216 };
217 
~WatcherThreadManager()218 WatcherThreadManager::~WatcherThreadManager() {
219   thread_.Stop();
220 }
221 
GetInstance()222 WatcherThreadManager* WatcherThreadManager::GetInstance() {
223   return base::Singleton<WatcherThreadManager>::get();
224 }
225 
StartWatching(const Handle & handle,MojoHandleSignals handle_signals,base::TimeTicks deadline,const base::Callback<void (MojoResult)> & callback)226 WatcherID WatcherThreadManager::StartWatching(
227     const Handle& handle,
228     MojoHandleSignals handle_signals,
229     base::TimeTicks deadline,
230     const base::Callback<void(MojoResult)>& callback) {
231   RequestData request_data;
232   request_data.type = REQUEST_START;
233   request_data.start_data.id = watcher_id_generator_.GetNext();
234   request_data.start_data.handle = handle;
235   request_data.start_data.callback = callback;
236   request_data.start_data.handle_signals = handle_signals;
237   request_data.start_data.deadline = deadline;
238   request_data.start_data.task_runner = base::ThreadTaskRunnerHandle::Get();
239   AddRequest(request_data);
240   return request_data.start_data.id;
241 }
242 
StopWatching(WatcherID watcher_id)243 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
244   // Handle the case of StartWatching() followed by StopWatching() before
245   // |thread_| woke up.
246   {
247     base::AutoLock auto_lock(lock_);
248     for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) {
249       if (i->type == REQUEST_START && i->start_data.id == watcher_id) {
250         // Watcher ids are not reused, so if we find it we can stop.
251         requests_.erase(i);
252         return;
253       }
254     }
255   }
256 
257   RequestData request_data;
258   request_data.type = REQUEST_STOP;
259   request_data.stop_id = watcher_id;
260   AddRequest(request_data);
261 }
262 
AddRequest(const RequestData & data)263 void WatcherThreadManager::AddRequest(const RequestData& data) {
264   {
265     base::AutoLock auto_lock(lock_);
266     const bool was_empty = requests_.empty();
267     requests_.push_back(data);
268     if (!was_empty)
269       return;
270   }
271 
272   // We outlive |thread_|, so it's safe to use Unretained() here.
273   thread_.task_runner()->PostTask(
274       FROM_HERE,
275       base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
276                  base::Unretained(this)));
277 }
278 
ProcessRequestsOnBackendThread()279 void WatcherThreadManager::ProcessRequestsOnBackendThread() {
280   DCHECK(thread_.task_runner()->BelongsToCurrentThread());
281 
282   Requests requests;
283   {
284     base::AutoLock auto_lock(lock_);
285     requests_.swap(requests);
286   }
287   for (size_t i = 0; i < requests.size(); ++i) {
288     if (requests[i].type == REQUEST_START) {
289       backend_.StartWatching(requests[i].start_data);
290     } else {
291       backend_.StopWatching(requests[i].stop_id);
292     }
293   }
294 }
295 
WatcherThreadManager()296 WatcherThreadManager::WatcherThreadManager()
297     : thread_(kWatcherThreadName) {
298   base::Thread::Options thread_options;
299   thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
300   thread_.StartWithOptions(thread_options);
301 }
302 
303 }  // namespace
304 
305 // HandleWatcher::StateBase and subclasses -------------------------------------
306 
307 // The base class of HandleWatcher's state. Owns the user's callback and
308 // monitors the current thread's MessageLoop to know when to force the callback
309 // to run (with an error) even though the pipe hasn't been signaled yet.
310 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
311  public:
StateBase(HandleWatcher * watcher,const base::Callback<void (MojoResult)> & callback)312   StateBase(HandleWatcher* watcher,
313             const base::Callback<void(MojoResult)>& callback)
314       : watcher_(watcher),
315         callback_(callback),
316         got_ready_(false) {
317     base::MessageLoop::current()->AddDestructionObserver(this);
318   }
319 
~StateBase()320   ~StateBase() override {
321     base::MessageLoop::current()->RemoveDestructionObserver(this);
322   }
323 
324  protected:
NotifyHandleReady(MojoResult result)325   void NotifyHandleReady(MojoResult result) {
326     got_ready_ = true;
327     NotifyAndDestroy(result);
328   }
329 
got_ready() const330   bool got_ready() const { return got_ready_; }
331 
332  private:
WillDestroyCurrentMessageLoop()333   void WillDestroyCurrentMessageLoop() override {
334     // The current thread is exiting. Simulate a watch error.
335     NotifyAndDestroy(MOJO_RESULT_ABORTED);
336   }
337 
NotifyAndDestroy(MojoResult result)338   void NotifyAndDestroy(MojoResult result) {
339     base::Callback<void(MojoResult)> callback = callback_;
340     watcher_->Stop();  // Destroys |this|.
341 
342     callback.Run(result);
343   }
344 
345   HandleWatcher* watcher_;
346   base::Callback<void(MojoResult)> callback_;
347 
348   // Have we been notified that the handle is ready?
349   bool got_ready_;
350 
351   DISALLOW_COPY_AND_ASSIGN(StateBase);
352 };
353 
354 // If the thread on which HandleWatcher is used runs MessagePumpMojo,
355 // SameThreadWatchingState is used to directly watch the handle on the same
356 // thread.
357 class HandleWatcher::SameThreadWatchingState : public StateBase,
358                                                public MessagePumpMojoHandler {
359  public:
SameThreadWatchingState(HandleWatcher * watcher,const Handle & handle,MojoHandleSignals handle_signals,MojoDeadline deadline,const base::Callback<void (MojoResult)> & callback)360   SameThreadWatchingState(HandleWatcher* watcher,
361                           const Handle& handle,
362                           MojoHandleSignals handle_signals,
363                           MojoDeadline deadline,
364                           const base::Callback<void(MojoResult)>& callback)
365       : StateBase(watcher, callback),
366         handle_(handle) {
367     DCHECK(MessagePumpMojo::IsCurrent());
368 
369     MessagePumpMojo::current()->AddHandler(
370         this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
371   }
372 
~SameThreadWatchingState()373   ~SameThreadWatchingState() override {
374     if (!got_ready())
375       MessagePumpMojo::current()->RemoveHandler(handle_);
376   }
377 
378  private:
379   // MessagePumpMojoHandler overrides:
OnHandleReady(const Handle & handle)380   void OnHandleReady(const Handle& handle) override {
381     StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK);
382   }
383 
OnHandleError(const Handle & handle,MojoResult result)384   void OnHandleError(const Handle& handle, MojoResult result) override {
385     StopWatchingAndNotifyReady(handle, result);
386   }
387 
StopWatchingAndNotifyReady(const Handle & handle,MojoResult result)388   void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) {
389     DCHECK_EQ(handle.value(), handle_.value());
390     MessagePumpMojo::current()->RemoveHandler(handle_);
391     NotifyHandleReady(result);
392   }
393 
394   Handle handle_;
395 
396   DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState);
397 };
398 
399 // If the thread on which HandleWatcher is used runs a message pump different
400 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
401 // handle on the handle watcher thread.
402 class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
403  public:
SecondaryThreadWatchingState(HandleWatcher * watcher,const Handle & handle,MojoHandleSignals handle_signals,MojoDeadline deadline,const base::Callback<void (MojoResult)> & callback)404   SecondaryThreadWatchingState(HandleWatcher* watcher,
405                                const Handle& handle,
406                                MojoHandleSignals handle_signals,
407                                MojoDeadline deadline,
408                                const base::Callback<void(MojoResult)>& callback)
409       : StateBase(watcher, callback),
410         weak_factory_(this) {
411     watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
412         handle,
413         handle_signals,
414         MojoDeadlineToTimeTicks(deadline),
415         base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady,
416                    weak_factory_.GetWeakPtr()));
417   }
418 
~SecondaryThreadWatchingState()419   ~SecondaryThreadWatchingState() override {
420     // If we've been notified the handle is ready (|got_ready()| is true) then
421     // the watch has been implicitly removed by
422     // WatcherThreadManager/MessagePumpMojo and we don't have to call
423     // StopWatching(). To do so would needlessly entail posting a task and
424     // blocking until the background thread services it.
425     if (!got_ready())
426       WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
427   }
428 
429  private:
430   WatcherID watcher_id_;
431 
432   // Used to weakly bind |this| to the WatcherThreadManager.
433   base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
434 
435   DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState);
436 };
437 
438 // HandleWatcher ---------------------------------------------------------------
439 
HandleWatcher()440 HandleWatcher::HandleWatcher() {
441 }
442 
~HandleWatcher()443 HandleWatcher::~HandleWatcher() {
444 }
445 
Start(const Handle & handle,MojoHandleSignals handle_signals,MojoDeadline deadline,const base::Callback<void (MojoResult)> & callback)446 void HandleWatcher::Start(const Handle& handle,
447                           MojoHandleSignals handle_signals,
448                           MojoDeadline deadline,
449                           const base::Callback<void(MojoResult)>& callback) {
450   DCHECK(handle.is_valid());
451   DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
452 
453   // Need to clear the state before creating a new one.
454   state_.reset();
455   if (MessagePumpMojo::IsCurrent()) {
456     state_.reset(new SameThreadWatchingState(
457         this, handle, handle_signals, deadline, callback));
458   } else {
459 #if !defined(OFFICIAL_BUILD)
460     // Just for making debugging non-transferable message pipes easier. Since
461     // they can't be sent after they're read/written/listened to,
462     // MessagePipeDispatcher saves the callstack of when it's "bound" to a
463     // pipe id. Triggering a read here, instead of later in the PostTask, means
464     // we have a callstack that is useful to check if the pipe is erronously
465     // attempted to be sent.
466     uint32_t temp = 0;
467     MojoReadMessage(handle.value(), nullptr, &temp, nullptr, nullptr,
468                     MOJO_READ_MESSAGE_FLAG_NONE);
469 #endif
470     state_.reset(new SecondaryThreadWatchingState(
471         this, handle, handle_signals, deadline, callback));
472   }
473 }
474 
Stop()475 void HandleWatcher::Stop() {
476   state_.reset();
477 }
478 
479 }  // namespace common
480 }  // namespace mojo
481