• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/common/message_pump_mojo.h"
6 
7 #include <algorithm>
8 #include <vector>
9 
10 #include "base/debug/alias.h"
11 #include "base/logging.h"
12 #include "base/time/time.h"
13 #include "mojo/common/message_pump_mojo_handler.h"
14 #include "mojo/common/time_helper.h"
15 
16 namespace mojo {
17 namespace common {
18 
19 // State needed for one iteration of WaitMany. The first handle and flags
20 // corresponds to that of the control pipe.
21 struct MessagePumpMojo::WaitState {
22   std::vector<Handle> handles;
23   std::vector<MojoHandleSignals> wait_signals;
24 };
25 
26 struct MessagePumpMojo::RunState {
RunStatemojo::common::MessagePumpMojo::RunState27   RunState() : should_quit(false) {
28     CreateMessagePipe(NULL, &read_handle, &write_handle);
29   }
30 
31   base::TimeTicks delayed_work_time;
32 
33   // Used to wake up WaitForWork().
34   ScopedMessagePipeHandle read_handle;
35   ScopedMessagePipeHandle write_handle;
36 
37   bool should_quit;
38 };
39 
MessagePumpMojo()40 MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) {
41 }
42 
~MessagePumpMojo()43 MessagePumpMojo::~MessagePumpMojo() {
44 }
45 
46 // static
Create()47 scoped_ptr<base::MessagePump> MessagePumpMojo::Create() {
48   return scoped_ptr<MessagePump>(new MessagePumpMojo());
49 }
50 
AddHandler(MessagePumpMojoHandler * handler,const Handle & handle,MojoHandleSignals wait_signals,base::TimeTicks deadline)51 void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler,
52                                  const Handle& handle,
53                                  MojoHandleSignals wait_signals,
54                                  base::TimeTicks deadline) {
55   DCHECK(handler);
56   DCHECK(handle.is_valid());
57   // Assume it's an error if someone tries to reregister an existing handle.
58   DCHECK_EQ(0u, handlers_.count(handle));
59   Handler handler_data;
60   handler_data.handler = handler;
61   handler_data.wait_signals = wait_signals;
62   handler_data.deadline = deadline;
63   handler_data.id = next_handler_id_++;
64   handlers_[handle] = handler_data;
65 }
66 
RemoveHandler(const Handle & handle)67 void MessagePumpMojo::RemoveHandler(const Handle& handle) {
68   handlers_.erase(handle);
69 }
70 
Run(Delegate * delegate)71 void MessagePumpMojo::Run(Delegate* delegate) {
72   RunState run_state;
73   // TODO: better deal with error handling.
74   CHECK(run_state.read_handle.is_valid());
75   CHECK(run_state.write_handle.is_valid());
76   RunState* old_state = NULL;
77   {
78     base::AutoLock auto_lock(run_state_lock_);
79     old_state = run_state_;
80     run_state_ = &run_state;
81   }
82   DoRunLoop(&run_state, delegate);
83   {
84     base::AutoLock auto_lock(run_state_lock_);
85     run_state_ = old_state;
86   }
87 }
88 
Quit()89 void MessagePumpMojo::Quit() {
90   base::AutoLock auto_lock(run_state_lock_);
91   if (run_state_)
92     run_state_->should_quit = true;
93 }
94 
ScheduleWork()95 void MessagePumpMojo::ScheduleWork() {
96   base::AutoLock auto_lock(run_state_lock_);
97   if (run_state_)
98     SignalControlPipe(*run_state_);
99 }
100 
ScheduleDelayedWork(const base::TimeTicks & delayed_work_time)101 void MessagePumpMojo::ScheduleDelayedWork(
102     const base::TimeTicks& delayed_work_time) {
103   base::AutoLock auto_lock(run_state_lock_);
104   if (!run_state_)
105     return;
106   run_state_->delayed_work_time = delayed_work_time;
107   SignalControlPipe(*run_state_);
108 }
109 
DoRunLoop(RunState * run_state,Delegate * delegate)110 void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) {
111   bool more_work_is_plausible = true;
112   for (;;) {
113     const bool block = !more_work_is_plausible;
114     DoInternalWork(*run_state, block);
115 
116     // There isn't a good way to know if there are more handles ready, we assume
117     // not.
118     more_work_is_plausible = false;
119 
120     if (run_state->should_quit)
121       break;
122 
123     more_work_is_plausible |= delegate->DoWork();
124     if (run_state->should_quit)
125       break;
126 
127     more_work_is_plausible |= delegate->DoDelayedWork(
128         &run_state->delayed_work_time);
129     if (run_state->should_quit)
130       break;
131 
132     if (more_work_is_plausible)
133       continue;
134 
135     more_work_is_plausible = delegate->DoIdleWork();
136     if (run_state->should_quit)
137       break;
138   }
139 }
140 
DoInternalWork(const RunState & run_state,bool block)141 void MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
142   const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0;
143   const WaitState wait_state = GetWaitState(run_state);
144   const MojoResult result =
145       WaitMany(wait_state.handles, wait_state.wait_signals, deadline);
146   if (result == 0) {
147     // Control pipe was written to.
148     uint32_t num_bytes = 0;
149     ReadMessageRaw(run_state.read_handle.get(), NULL, &num_bytes, NULL, NULL,
150                    MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
151   } else if (result > 0) {
152     const size_t index = static_cast<size_t>(result);
153     DCHECK(handlers_.find(wait_state.handles[index]) != handlers_.end());
154     handlers_[wait_state.handles[index]].handler->OnHandleReady(
155         wait_state.handles[index]);
156   } else {
157     switch (result) {
158       case MOJO_RESULT_CANCELLED:
159       case MOJO_RESULT_FAILED_PRECONDITION:
160       case MOJO_RESULT_INVALID_ARGUMENT:
161         RemoveFirstInvalidHandle(wait_state);
162         break;
163       case MOJO_RESULT_DEADLINE_EXCEEDED:
164         break;
165       default:
166         base::debug::Alias(&result);
167         // Unexpected result is likely fatal, crash so we can determine cause.
168         CHECK(false);
169     }
170   }
171 
172   // Notify and remove any handlers whose time has expired. Make a copy in case
173   // someone tries to add/remove new handlers from notification.
174   const HandleToHandler cloned_handlers(handlers_);
175   const base::TimeTicks now(internal::NowTicks());
176   for (HandleToHandler::const_iterator i = cloned_handlers.begin();
177        i != cloned_handlers.end(); ++i) {
178     // Since we're iterating over a clone of the handlers, verify the handler is
179     // still valid before notifying.
180     if (!i->second.deadline.is_null() && i->second.deadline < now &&
181         handlers_.find(i->first) != handlers_.end() &&
182         handlers_[i->first].id == i->second.id) {
183       i->second.handler->OnHandleError(i->first, MOJO_RESULT_DEADLINE_EXCEEDED);
184     }
185   }
186 }
187 
RemoveFirstInvalidHandle(const WaitState & wait_state)188 void MessagePumpMojo::RemoveFirstInvalidHandle(const WaitState& wait_state) {
189   // TODO(sky): deal with control pipe going bad.
190   for (size_t i = 1; i < wait_state.handles.size(); ++i) {
191     const MojoResult result =
192         Wait(wait_state.handles[i], wait_state.wait_signals[i], 0);
193     if (result == MOJO_RESULT_INVALID_ARGUMENT ||
194         result == MOJO_RESULT_FAILED_PRECONDITION ||
195         result == MOJO_RESULT_CANCELLED) {
196       // Remove the handle first, this way if OnHandleError() tries to remove
197       // the handle our iterator isn't invalidated.
198       DCHECK(handlers_.find(wait_state.handles[i]) != handlers_.end());
199       MessagePumpMojoHandler* handler =
200           handlers_[wait_state.handles[i]].handler;
201       handlers_.erase(wait_state.handles[i]);
202       handler->OnHandleError(wait_state.handles[i], result);
203       return;
204     }
205   }
206 }
207 
SignalControlPipe(const RunState & run_state)208 void MessagePumpMojo::SignalControlPipe(const RunState& run_state) {
209   // TODO(sky): deal with error?
210   WriteMessageRaw(run_state.write_handle.get(), NULL, 0, NULL, 0,
211                   MOJO_WRITE_MESSAGE_FLAG_NONE);
212 }
213 
GetWaitState(const RunState & run_state) const214 MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState(
215     const RunState& run_state) const {
216   WaitState wait_state;
217   wait_state.handles.push_back(run_state.read_handle.get());
218   wait_state.wait_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
219 
220   for (HandleToHandler::const_iterator i = handlers_.begin();
221        i != handlers_.end(); ++i) {
222     wait_state.handles.push_back(i->first);
223     wait_state.wait_signals.push_back(i->second.wait_signals);
224   }
225   return wait_state;
226 }
227 
GetDeadlineForWait(const RunState & run_state) const228 MojoDeadline MessagePumpMojo::GetDeadlineForWait(
229     const RunState& run_state) const {
230   base::TimeTicks min_time = run_state.delayed_work_time;
231   for (HandleToHandler::const_iterator i = handlers_.begin();
232        i != handlers_.end(); ++i) {
233     if (min_time.is_null() && i->second.deadline < min_time)
234       min_time = i->second.deadline;
235   }
236   return min_time.is_null() ? MOJO_DEADLINE_INDEFINITE :
237       std::max(static_cast<MojoDeadline>(0),
238                static_cast<MojoDeadline>(
239                    (min_time - internal::NowTicks()).InMicroseconds()));
240 }
241 
242 }  // namespace common
243 }  // namespace mojo
244