1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/common/message_pump_mojo.h"
6
7 #include <algorithm>
8 #include <vector>
9
10 #include "base/debug/alias.h"
11 #include "base/lazy_instance.h"
12 #include "base/logging.h"
13 #include "base/threading/thread_local.h"
14 #include "base/time/time.h"
15 #include "mojo/common/message_pump_mojo_handler.h"
16 #include "mojo/common/time_helper.h"
17
18 namespace mojo {
19 namespace common {
20 namespace {
21
22 base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky
23 g_tls_current_pump = LAZY_INSTANCE_INITIALIZER;
24
TimeTicksToMojoDeadline(base::TimeTicks time_ticks,base::TimeTicks now)25 MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks,
26 base::TimeTicks now) {
27 // The is_null() check matches that of HandleWatcher as well as how
28 // |delayed_work_time| is used.
29 if (time_ticks.is_null())
30 return MOJO_DEADLINE_INDEFINITE;
31 const int64_t delta = (time_ticks - now).InMicroseconds();
32 return delta < 0 ? static_cast<MojoDeadline>(0) :
33 static_cast<MojoDeadline>(delta);
34 }
35
36 } // namespace
37
38 // State needed for one iteration of WaitMany. The first handle and flags
39 // corresponds to that of the control pipe.
40 struct MessagePumpMojo::WaitState {
41 std::vector<Handle> handles;
42 std::vector<MojoHandleSignals> wait_signals;
43 };
44
45 struct MessagePumpMojo::RunState {
RunStatemojo::common::MessagePumpMojo::RunState46 RunState() : should_quit(false) {
47 CreateMessagePipe(NULL, &read_handle, &write_handle);
48 }
49
50 base::TimeTicks delayed_work_time;
51
52 // Used to wake up WaitForWork().
53 ScopedMessagePipeHandle read_handle;
54 ScopedMessagePipeHandle write_handle;
55
56 bool should_quit;
57 };
58
MessagePumpMojo()59 MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) {
60 DCHECK(!current())
61 << "There is already a MessagePumpMojo instance on this thread.";
62 g_tls_current_pump.Pointer()->Set(this);
63 }
64
~MessagePumpMojo()65 MessagePumpMojo::~MessagePumpMojo() {
66 DCHECK_EQ(this, current());
67 g_tls_current_pump.Pointer()->Set(NULL);
68 }
69
70 // static
Create()71 scoped_ptr<base::MessagePump> MessagePumpMojo::Create() {
72 return scoped_ptr<MessagePump>(new MessagePumpMojo());
73 }
74
75 // static
current()76 MessagePumpMojo* MessagePumpMojo::current() {
77 return g_tls_current_pump.Pointer()->Get();
78 }
79
AddHandler(MessagePumpMojoHandler * handler,const Handle & handle,MojoHandleSignals wait_signals,base::TimeTicks deadline)80 void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler,
81 const Handle& handle,
82 MojoHandleSignals wait_signals,
83 base::TimeTicks deadline) {
84 CHECK(handler);
85 DCHECK(handle.is_valid());
86 // Assume it's an error if someone tries to reregister an existing handle.
87 CHECK_EQ(0u, handlers_.count(handle));
88 Handler handler_data;
89 handler_data.handler = handler;
90 handler_data.wait_signals = wait_signals;
91 handler_data.deadline = deadline;
92 handler_data.id = next_handler_id_++;
93 handlers_[handle] = handler_data;
94 }
95
RemoveHandler(const Handle & handle)96 void MessagePumpMojo::RemoveHandler(const Handle& handle) {
97 handlers_.erase(handle);
98 }
99
Run(Delegate * delegate)100 void MessagePumpMojo::Run(Delegate* delegate) {
101 RunState run_state;
102 // TODO: better deal with error handling.
103 CHECK(run_state.read_handle.is_valid());
104 CHECK(run_state.write_handle.is_valid());
105 RunState* old_state = NULL;
106 {
107 base::AutoLock auto_lock(run_state_lock_);
108 old_state = run_state_;
109 run_state_ = &run_state;
110 }
111 DoRunLoop(&run_state, delegate);
112 {
113 base::AutoLock auto_lock(run_state_lock_);
114 run_state_ = old_state;
115 }
116 }
117
Quit()118 void MessagePumpMojo::Quit() {
119 base::AutoLock auto_lock(run_state_lock_);
120 if (run_state_)
121 run_state_->should_quit = true;
122 }
123
ScheduleWork()124 void MessagePumpMojo::ScheduleWork() {
125 base::AutoLock auto_lock(run_state_lock_);
126 if (run_state_)
127 SignalControlPipe(*run_state_);
128 }
129
ScheduleDelayedWork(const base::TimeTicks & delayed_work_time)130 void MessagePumpMojo::ScheduleDelayedWork(
131 const base::TimeTicks& delayed_work_time) {
132 base::AutoLock auto_lock(run_state_lock_);
133 if (!run_state_)
134 return;
135 run_state_->delayed_work_time = delayed_work_time;
136 }
137
DoRunLoop(RunState * run_state,Delegate * delegate)138 void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) {
139 bool more_work_is_plausible = true;
140 for (;;) {
141 const bool block = !more_work_is_plausible;
142 DoInternalWork(*run_state, block);
143
144 // There isn't a good way to know if there are more handles ready, we assume
145 // not.
146 more_work_is_plausible = false;
147
148 if (run_state->should_quit)
149 break;
150
151 more_work_is_plausible |= delegate->DoWork();
152 if (run_state->should_quit)
153 break;
154
155 more_work_is_plausible |= delegate->DoDelayedWork(
156 &run_state->delayed_work_time);
157 if (run_state->should_quit)
158 break;
159
160 if (more_work_is_plausible)
161 continue;
162
163 more_work_is_plausible = delegate->DoIdleWork();
164 if (run_state->should_quit)
165 break;
166 }
167 }
168
DoInternalWork(const RunState & run_state,bool block)169 void MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
170 const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0;
171 const WaitState wait_state = GetWaitState(run_state);
172 const MojoResult result =
173 WaitMany(wait_state.handles, wait_state.wait_signals, deadline);
174 if (result == 0) {
175 // Control pipe was written to.
176 uint32_t num_bytes = 0;
177 ReadMessageRaw(run_state.read_handle.get(), NULL, &num_bytes, NULL, NULL,
178 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
179 } else if (result > 0) {
180 const size_t index = static_cast<size_t>(result);
181 DCHECK(handlers_.find(wait_state.handles[index]) != handlers_.end());
182 handlers_[wait_state.handles[index]].handler->OnHandleReady(
183 wait_state.handles[index]);
184 } else {
185 switch (result) {
186 case MOJO_RESULT_CANCELLED:
187 case MOJO_RESULT_FAILED_PRECONDITION:
188 RemoveFirstInvalidHandle(wait_state);
189 break;
190 case MOJO_RESULT_DEADLINE_EXCEEDED:
191 break;
192 default:
193 base::debug::Alias(&result);
194 // Unexpected result is likely fatal, crash so we can determine cause.
195 CHECK(false);
196 }
197 }
198
199 // Notify and remove any handlers whose time has expired. Make a copy in case
200 // someone tries to add/remove new handlers from notification.
201 const HandleToHandler cloned_handlers(handlers_);
202 const base::TimeTicks now(internal::NowTicks());
203 for (HandleToHandler::const_iterator i = cloned_handlers.begin();
204 i != cloned_handlers.end(); ++i) {
205 // Since we're iterating over a clone of the handlers, verify the handler is
206 // still valid before notifying.
207 if (!i->second.deadline.is_null() && i->second.deadline < now &&
208 handlers_.find(i->first) != handlers_.end() &&
209 handlers_[i->first].id == i->second.id) {
210 i->second.handler->OnHandleError(i->first, MOJO_RESULT_DEADLINE_EXCEEDED);
211 }
212 }
213 }
214
RemoveFirstInvalidHandle(const WaitState & wait_state)215 void MessagePumpMojo::RemoveFirstInvalidHandle(const WaitState& wait_state) {
216 // TODO(sky): deal with control pipe going bad.
217 for (size_t i = 0; i < wait_state.handles.size(); ++i) {
218 const MojoResult result =
219 Wait(wait_state.handles[i], wait_state.wait_signals[i], 0);
220 if (result == MOJO_RESULT_INVALID_ARGUMENT) {
221 // We should never have an invalid argument. If we do it indicates
222 // RemoveHandler() was not invoked and is likely to cause problems else
223 // where in the stack if we ignore it.
224 CHECK(false);
225 } else if (result == MOJO_RESULT_FAILED_PRECONDITION ||
226 result == MOJO_RESULT_CANCELLED) {
227 CHECK_NE(i, 0u); // Indicates the control pipe went bad.
228
229 // Remove the handle first, this way if OnHandleError() tries to remove
230 // the handle our iterator isn't invalidated.
231 CHECK(handlers_.find(wait_state.handles[i]) != handlers_.end());
232 MessagePumpMojoHandler* handler =
233 handlers_[wait_state.handles[i]].handler;
234 handlers_.erase(wait_state.handles[i]);
235 handler->OnHandleError(wait_state.handles[i], result);
236 return;
237 }
238 }
239 }
240
SignalControlPipe(const RunState & run_state)241 void MessagePumpMojo::SignalControlPipe(const RunState& run_state) {
242 const MojoResult result =
243 WriteMessageRaw(run_state.write_handle.get(), NULL, 0, NULL, 0,
244 MOJO_WRITE_MESSAGE_FLAG_NONE);
245 // If we can't write we likely won't wake up the thread and there is a strong
246 // chance we'll deadlock.
247 CHECK_EQ(MOJO_RESULT_OK, result);
248 }
249
GetWaitState(const RunState & run_state) const250 MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState(
251 const RunState& run_state) const {
252 WaitState wait_state;
253 wait_state.handles.push_back(run_state.read_handle.get());
254 wait_state.wait_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
255
256 for (HandleToHandler::const_iterator i = handlers_.begin();
257 i != handlers_.end(); ++i) {
258 wait_state.handles.push_back(i->first);
259 wait_state.wait_signals.push_back(i->second.wait_signals);
260 }
261 return wait_state;
262 }
263
GetDeadlineForWait(const RunState & run_state) const264 MojoDeadline MessagePumpMojo::GetDeadlineForWait(
265 const RunState& run_state) const {
266 const base::TimeTicks now(internal::NowTicks());
267 MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time,
268 now);
269 for (HandleToHandler::const_iterator i = handlers_.begin();
270 i != handlers_.end(); ++i) {
271 deadline = std::min(
272 TimeTicksToMojoDeadline(i->second.deadline, now), deadline);
273 }
274 return deadline;
275 }
276
277 } // namespace common
278 } // namespace mojo
279