1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/common/handle_watcher.h"
6
7 #include <map>
8
9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h"
11 #include "base/lazy_instance.h"
12 #include "base/memory/singleton.h"
13 #include "base/memory/weak_ptr.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/message_loop/message_loop_proxy.h"
16 #include "base/synchronization/lock.h"
17 #include "base/threading/thread.h"
18 #include "base/time/time.h"
19 #include "mojo/common/message_pump_mojo.h"
20 #include "mojo/common/message_pump_mojo_handler.h"
21 #include "mojo/common/time_helper.h"
22
23 namespace mojo {
24 namespace common {
25
26 typedef int WatcherID;
27
28 namespace {
29
30 const char kWatcherThreadName[] = "handle-watcher-thread";
31
32 // TODO(sky): this should be unnecessary once MessageLoop has been refactored.
33 MessagePumpMojo* message_pump_mojo = NULL;
34
CreateMessagePumpMojo()35 scoped_ptr<base::MessagePump> CreateMessagePumpMojo() {
36 message_pump_mojo = new MessagePumpMojo;
37 return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
38 }
39
MojoDeadlineToTimeTicks(MojoDeadline deadline)40 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
41 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
42 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
43 }
44
45 // Tracks the data for a single call to Start().
46 struct WatchData {
WatchDatamojo::common::__anon74e0ef780111::WatchData47 WatchData()
48 : id(0),
49 handle_signals(MOJO_HANDLE_SIGNAL_NONE),
50 message_loop(NULL) {}
51
52 WatcherID id;
53 Handle handle;
54 MojoHandleSignals handle_signals;
55 base::TimeTicks deadline;
56 base::Callback<void(MojoResult)> callback;
57 scoped_refptr<base::MessageLoopProxy> message_loop;
58 };
59
60 // WatcherBackend --------------------------------------------------------------
61
62 // WatcherBackend is responsible for managing the requests and interacting with
63 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
64 // thread WatcherThreadManager creates.
65 class WatcherBackend : public MessagePumpMojoHandler {
66 public:
67 WatcherBackend();
68 virtual ~WatcherBackend();
69
70 void StartWatching(const WatchData& data);
71 void StopWatching(WatcherID watcher_id);
72
73 private:
74 typedef std::map<Handle, WatchData> HandleToWatchDataMap;
75
76 // Invoked when a handle needs to be removed and notified.
77 void RemoveAndNotify(const Handle& handle, MojoResult result);
78
79 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
80 // and sets |handle| to the Handle. Returns false if not a known id.
81 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
82
83 // MessagePumpMojoHandler overrides:
84 virtual void OnHandleReady(const Handle& handle) OVERRIDE;
85 virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE;
86
87 // Maps from assigned id to WatchData.
88 HandleToWatchDataMap handle_to_data_;
89
90 DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
91 };
92
WatcherBackend()93 WatcherBackend::WatcherBackend() {
94 }
95
~WatcherBackend()96 WatcherBackend::~WatcherBackend() {
97 }
98
StartWatching(const WatchData & data)99 void WatcherBackend::StartWatching(const WatchData& data) {
100 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
101
102 DCHECK_EQ(0u, handle_to_data_.count(data.handle));
103
104 handle_to_data_[data.handle] = data;
105 message_pump_mojo->AddHandler(this, data.handle,
106 data.handle_signals,
107 data.deadline);
108 }
109
StopWatching(WatcherID watcher_id)110 void WatcherBackend::StopWatching(WatcherID watcher_id) {
111 // Because of the thread hop it is entirely possible to get here and not
112 // have a valid handle registered for |watcher_id|.
113 Handle handle;
114 if (!GetMojoHandleByWatcherID(watcher_id, &handle))
115 return;
116
117 handle_to_data_.erase(handle);
118 message_pump_mojo->RemoveHandler(handle);
119 }
120
RemoveAndNotify(const Handle & handle,MojoResult result)121 void WatcherBackend::RemoveAndNotify(const Handle& handle,
122 MojoResult result) {
123 if (handle_to_data_.count(handle) == 0)
124 return;
125
126 const WatchData data(handle_to_data_[handle]);
127 handle_to_data_.erase(handle);
128 message_pump_mojo->RemoveHandler(handle);
129 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
130 }
131
GetMojoHandleByWatcherID(WatcherID watcher_id,Handle * handle) const132 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
133 Handle* handle) const {
134 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
135 i != handle_to_data_.end(); ++i) {
136 if (i->second.id == watcher_id) {
137 *handle = i->second.handle;
138 return true;
139 }
140 }
141 return false;
142 }
143
OnHandleReady(const Handle & handle)144 void WatcherBackend::OnHandleReady(const Handle& handle) {
145 RemoveAndNotify(handle, MOJO_RESULT_OK);
146 }
147
OnHandleError(const Handle & handle,MojoResult result)148 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
149 RemoveAndNotify(handle, result);
150 }
151
152 // WatcherThreadManager --------------------------------------------------------
153
154 // WatcherThreadManager manages the background thread that listens for handles
155 // to be ready. All requests are handled by WatcherBackend.
156 class WatcherThreadManager {
157 public:
158 ~WatcherThreadManager();
159
160 // Returns the shared instance.
161 static WatcherThreadManager* GetInstance();
162
163 // Starts watching the requested handle. Returns a unique ID that is used to
164 // stop watching the handle. When the handle is ready |callback| is notified
165 // on the thread StartWatching() was invoked on.
166 // This may be invoked on any thread.
167 WatcherID StartWatching(const Handle& handle,
168 MojoHandleSignals handle_signals,
169 base::TimeTicks deadline,
170 const base::Callback<void(MojoResult)>& callback);
171
172 // Stops watching a handle.
173 // This may be invoked on any thread.
174 void StopWatching(WatcherID watcher_id);
175
176 private:
177 friend struct DefaultSingletonTraits<WatcherThreadManager>;
178 WatcherThreadManager();
179
180 base::Thread thread_;
181
182 base::AtomicSequenceNumber watcher_id_generator_;
183
184 WatcherBackend backend_;
185
186 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
187 };
188
~WatcherThreadManager()189 WatcherThreadManager::~WatcherThreadManager() {
190 thread_.Stop();
191 }
192
GetInstance()193 WatcherThreadManager* WatcherThreadManager::GetInstance() {
194 return Singleton<WatcherThreadManager>::get();
195 }
196
StartWatching(const Handle & handle,MojoHandleSignals handle_signals,base::TimeTicks deadline,const base::Callback<void (MojoResult)> & callback)197 WatcherID WatcherThreadManager::StartWatching(
198 const Handle& handle,
199 MojoHandleSignals handle_signals,
200 base::TimeTicks deadline,
201 const base::Callback<void(MojoResult)>& callback) {
202 WatchData data;
203 data.id = watcher_id_generator_.GetNext();
204 data.handle = handle;
205 data.callback = callback;
206 data.handle_signals = handle_signals;
207 data.deadline = deadline;
208 data.message_loop = base::MessageLoopProxy::current();
209 DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL),
210 data.message_loop.get());
211 // We outlive |thread_|, so it's safe to use Unretained() here.
212 thread_.message_loop()->PostTask(
213 FROM_HERE,
214 base::Bind(&WatcherBackend::StartWatching,
215 base::Unretained(&backend_),
216 data));
217 return data.id;
218 }
219
StopWatching(WatcherID watcher_id)220 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
221 // We outlive |thread_|, so it's safe to use Unretained() here.
222 thread_.message_loop()->PostTask(
223 FROM_HERE,
224 base::Bind(&WatcherBackend::StopWatching,
225 base::Unretained(&backend_),
226 watcher_id));
227 }
228
WatcherThreadManager()229 WatcherThreadManager::WatcherThreadManager()
230 : thread_(kWatcherThreadName) {
231 base::Thread::Options thread_options;
232 thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo);
233 thread_.StartWithOptions(thread_options);
234 }
235
236 } // namespace
237
238 // HandleWatcher::State --------------------------------------------------------
239
240 // Represents the state of the HandleWatcher. Owns the user's callback and
241 // monitors the current thread's MessageLoop to know when to force the callback
242 // to run (with an error) even though the pipe hasn't been signaled yet.
243 class HandleWatcher::State : public base::MessageLoop::DestructionObserver {
244 public:
State(HandleWatcher * watcher,const Handle & handle,MojoHandleSignals handle_signals,MojoDeadline deadline,const base::Callback<void (MojoResult)> & callback)245 State(HandleWatcher* watcher,
246 const Handle& handle,
247 MojoHandleSignals handle_signals,
248 MojoDeadline deadline,
249 const base::Callback<void(MojoResult)>& callback)
250 : watcher_(watcher),
251 callback_(callback),
252 weak_factory_(this) {
253 base::MessageLoop::current()->AddDestructionObserver(this);
254
255 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
256 handle,
257 handle_signals,
258 MojoDeadlineToTimeTicks(deadline),
259 base::Bind(&State::OnHandleReady, weak_factory_.GetWeakPtr()));
260 }
261
~State()262 virtual ~State() {
263 base::MessageLoop::current()->RemoveDestructionObserver(this);
264
265 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
266 }
267
268 private:
WillDestroyCurrentMessageLoop()269 virtual void WillDestroyCurrentMessageLoop() OVERRIDE {
270 // The current thread is exiting. Simulate a watch error.
271 OnHandleReady(MOJO_RESULT_ABORTED);
272 }
273
OnHandleReady(MojoResult result)274 void OnHandleReady(MojoResult result) {
275 base::Callback<void(MojoResult)> callback = callback_;
276 watcher_->Stop(); // Destroys |this|.
277
278 callback.Run(result);
279 }
280
281 HandleWatcher* watcher_;
282 WatcherID watcher_id_;
283 base::Callback<void(MojoResult)> callback_;
284
285 // Used to weakly bind |this| to the WatcherThreadManager.
286 base::WeakPtrFactory<State> weak_factory_;
287 };
288
289 // HandleWatcher ---------------------------------------------------------------
290
HandleWatcher()291 HandleWatcher::HandleWatcher() {
292 }
293
~HandleWatcher()294 HandleWatcher::~HandleWatcher() {
295 }
296
Start(const Handle & handle,MojoHandleSignals handle_signals,MojoDeadline deadline,const base::Callback<void (MojoResult)> & callback)297 void HandleWatcher::Start(const Handle& handle,
298 MojoHandleSignals handle_signals,
299 MojoDeadline deadline,
300 const base::Callback<void(MojoResult)>& callback) {
301 DCHECK(handle.is_valid());
302 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
303
304 state_.reset(new State(this, handle, handle_signals, deadline, callback));
305 }
306
Stop()307 void HandleWatcher::Stop() {
308 state_.reset();
309 }
310
311 } // namespace common
312 } // namespace mojo
313