• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/system/message_pipe_dispatcher.h"
6 
7 #include "base/logging.h"
8 #include "mojo/system/channel.h"
9 #include "mojo/system/constants.h"
10 #include "mojo/system/local_message_pipe_endpoint.h"
11 #include "mojo/system/memory.h"
12 #include "mojo/system/message_in_transit.h"
13 #include "mojo/system/message_pipe.h"
14 #include "mojo/system/options_validation.h"
15 #include "mojo/system/proxy_message_pipe_endpoint.h"
16 
17 namespace mojo {
18 namespace system {
19 
20 namespace {
21 
22 const unsigned kInvalidPort = static_cast<unsigned>(-1);
23 
24 struct SerializedMessagePipeDispatcher {
25   MessageInTransit::EndpointId endpoint_id;
26 };
27 
28 }  // namespace
29 
30 // MessagePipeDispatcher -------------------------------------------------------
31 
32 // static
33 const MojoCreateMessagePipeOptions
34     MessagePipeDispatcher::kDefaultCreateOptions = {
35   static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
36   MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE
37 };
38 
MessagePipeDispatcher(const MojoCreateMessagePipeOptions &)39 MessagePipeDispatcher::MessagePipeDispatcher(
40     const MojoCreateMessagePipeOptions& /*validated_options*/)
41     : port_(kInvalidPort) {
42 }
43 
44 // static
ValidateCreateOptions(const MojoCreateMessagePipeOptions * in_options,MojoCreateMessagePipeOptions * out_options)45 MojoResult MessagePipeDispatcher::ValidateCreateOptions(
46     const MojoCreateMessagePipeOptions* in_options,
47     MojoCreateMessagePipeOptions* out_options) {
48   const MojoCreateMessagePipeOptionsFlags kKnownFlags =
49       MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
50 
51   *out_options = kDefaultCreateOptions;
52   if (!in_options)
53     return MOJO_RESULT_OK;
54 
55   MojoResult result =
56       ValidateOptionsStructPointerSizeAndFlags<MojoCreateMessagePipeOptions>(
57           in_options, kKnownFlags, out_options);
58   if (result != MOJO_RESULT_OK)
59     return result;
60 
61   // Checks for fields beyond |flags|:
62 
63   // (Nothing here yet.)
64 
65   return MOJO_RESULT_OK;
66 }
67 
Init(scoped_refptr<MessagePipe> message_pipe,unsigned port)68 void MessagePipeDispatcher::Init(scoped_refptr<MessagePipe> message_pipe,
69                                  unsigned port) {
70   DCHECK(message_pipe);
71   DCHECK(port == 0 || port == 1);
72 
73   message_pipe_ = message_pipe;
74   port_ = port;
75 }
76 
GetType() const77 Dispatcher::Type MessagePipeDispatcher::GetType() const {
78   return kTypeMessagePipe;
79 }
80 
81 // static
82 std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
CreateRemoteMessagePipe()83 MessagePipeDispatcher::CreateRemoteMessagePipe() {
84   scoped_refptr<MessagePipe> message_pipe(
85       new MessagePipe(
86           scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
87           scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
88   scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher(
89       MessagePipeDispatcher::kDefaultCreateOptions));
90   dispatcher->Init(message_pipe, 0);
91 
92   return std::make_pair(dispatcher, message_pipe);
93 }
94 
95 // static
Deserialize(Channel * channel,const void * source,size_t size)96 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
97     Channel* channel,
98     const void* source,
99     size_t size) {
100   if (size != sizeof(SerializedMessagePipeDispatcher)) {
101     LOG(ERROR) << "Invalid serialized message pipe dispatcher";
102     return scoped_refptr<MessagePipeDispatcher>();
103   }
104 
105   std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
106       remote_message_pipe = CreateRemoteMessagePipe();
107 
108   MessageInTransit::EndpointId remote_id =
109       static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id;
110   if (remote_id == MessageInTransit::kInvalidEndpointId) {
111     // This means that the other end was closed, and there were no messages
112     // enqueued for us.
113     // TODO(vtl): This is wrong. We should produce a "dead" message pipe
114     // dispatcher.
115     NOTIMPLEMENTED();
116     return scoped_refptr<MessagePipeDispatcher>();
117   }
118   MessageInTransit::EndpointId local_id =
119       channel->AttachMessagePipeEndpoint(remote_message_pipe.second, 1);
120   if (local_id == MessageInTransit::kInvalidEndpointId) {
121     LOG(ERROR) << "Failed to deserialize message pipe dispatcher (failed to "
122                   "attach; remote ID = " << remote_id << ")";
123     return scoped_refptr<MessagePipeDispatcher>();
124   }
125   DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = "
126            << remote_id << ", new local ID = " << local_id << ")";
127 
128   if (!channel->RunMessagePipeEndpoint(local_id, remote_id)) {
129     // In general, this shouldn't fail, since we generated |local_id| locally.
130     NOTREACHED();
131     return scoped_refptr<MessagePipeDispatcher>();
132   }
133 
134   // TODO(vtl): FIXME -- Need some error handling here.
135   channel->RunRemoteMessagePipeEndpoint(local_id, remote_id);
136   return remote_message_pipe.first;
137 }
138 
~MessagePipeDispatcher()139 MessagePipeDispatcher::~MessagePipeDispatcher() {
140   // |Close()|/|CloseImplNoLock()| should have taken care of the pipe.
141   DCHECK(!message_pipe_);
142 }
143 
GetMessagePipeNoLock() const144 MessagePipe* MessagePipeDispatcher::GetMessagePipeNoLock() const {
145   lock().AssertAcquired();
146   return message_pipe_.get();
147 }
148 
GetPortNoLock() const149 unsigned MessagePipeDispatcher::GetPortNoLock() const {
150   lock().AssertAcquired();
151   return port_;
152 }
153 
CancelAllWaitersNoLock()154 void MessagePipeDispatcher::CancelAllWaitersNoLock() {
155   lock().AssertAcquired();
156   message_pipe_->CancelAllWaiters(port_);
157 }
158 
CloseImplNoLock()159 void MessagePipeDispatcher::CloseImplNoLock() {
160   lock().AssertAcquired();
161   message_pipe_->Close(port_);
162   message_pipe_ = NULL;
163   port_ = kInvalidPort;
164 }
165 
166 scoped_refptr<Dispatcher>
CreateEquivalentDispatcherAndCloseImplNoLock()167 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
168   lock().AssertAcquired();
169 
170   // TODO(vtl): Currently, there are no options, so we just use
171   // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
172   // too.
173   scoped_refptr<MessagePipeDispatcher> rv =
174       new MessagePipeDispatcher(kDefaultCreateOptions);
175   rv->Init(message_pipe_, port_);
176   message_pipe_ = NULL;
177   port_ = kInvalidPort;
178   return scoped_refptr<Dispatcher>(rv.get());
179 }
180 
WriteMessageImplNoLock(const void * bytes,uint32_t num_bytes,std::vector<DispatcherTransport> * transports,MojoWriteMessageFlags flags)181 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
182     const void* bytes,
183     uint32_t num_bytes,
184     std::vector<DispatcherTransport>* transports,
185     MojoWriteMessageFlags flags) {
186   DCHECK(!transports || (transports->size() > 0 &&
187                          transports->size() <= kMaxMessageNumHandles));
188 
189   lock().AssertAcquired();
190 
191   if (!VerifyUserPointerWithSize<1>(bytes, num_bytes))
192     return MOJO_RESULT_INVALID_ARGUMENT;
193   if (num_bytes > kMaxMessageNumBytes)
194     return MOJO_RESULT_RESOURCE_EXHAUSTED;
195 
196   return message_pipe_->WriteMessage(port_, bytes, num_bytes, transports,
197                                      flags);
198 }
199 
ReadMessageImplNoLock(void * bytes,uint32_t * num_bytes,DispatcherVector * dispatchers,uint32_t * num_dispatchers,MojoReadMessageFlags flags)200 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
201     void* bytes,
202     uint32_t* num_bytes,
203     DispatcherVector* dispatchers,
204     uint32_t* num_dispatchers,
205     MojoReadMessageFlags flags) {
206   lock().AssertAcquired();
207 
208   if (num_bytes) {
209     if (!VerifyUserPointer<uint32_t>(num_bytes))
210       return MOJO_RESULT_INVALID_ARGUMENT;
211     if (!VerifyUserPointerWithSize<1>(bytes, *num_bytes))
212       return MOJO_RESULT_INVALID_ARGUMENT;
213   }
214 
215   return message_pipe_->ReadMessage(port_, bytes, num_bytes, dispatchers,
216                                     num_dispatchers, flags);
217 }
218 
AddWaiterImplNoLock(Waiter * waiter,MojoHandleSignals signals,uint32_t context)219 MojoResult MessagePipeDispatcher::AddWaiterImplNoLock(Waiter* waiter,
220                                                       MojoHandleSignals signals,
221                                                       uint32_t context) {
222   lock().AssertAcquired();
223   return message_pipe_->AddWaiter(port_, waiter, signals, context);
224 }
225 
RemoveWaiterImplNoLock(Waiter * waiter)226 void MessagePipeDispatcher::RemoveWaiterImplNoLock(Waiter* waiter) {
227   lock().AssertAcquired();
228   message_pipe_->RemoveWaiter(port_, waiter);
229 }
230 
StartSerializeImplNoLock(Channel *,size_t * max_size,size_t * max_platform_handles)231 void MessagePipeDispatcher::StartSerializeImplNoLock(
232     Channel* /*channel*/,
233     size_t* max_size,
234     size_t* max_platform_handles) {
235   DCHECK(HasOneRef());  // Only one ref => no need to take the lock.
236   *max_size = sizeof(SerializedMessagePipeDispatcher);
237   *max_platform_handles = 0;
238 }
239 
EndSerializeAndCloseImplNoLock(Channel * channel,void * destination,size_t * actual_size,embedder::PlatformHandleVector *)240 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
241     Channel* channel,
242     void* destination,
243     size_t* actual_size,
244     embedder::PlatformHandleVector* /*platform_handles*/) {
245   DCHECK(HasOneRef());  // Only one ref => no need to take the lock.
246 
247   // Convert the local endpoint to a proxy endpoint (moving the message queue).
248   message_pipe_->ConvertLocalToProxy(port_);
249 
250   // Attach the new proxy endpoint to the channel.
251   MessageInTransit::EndpointId endpoint_id =
252       channel->AttachMessagePipeEndpoint(message_pipe_, port_);
253   // Note: It's okay to get an endpoint ID of |kInvalidEndpointId|. (It's
254   // possible that the other endpoint -- the one that we're not sending -- was
255   // closed in the intervening time.) In that case, we need to deserialize a
256   // "dead" message pipe dispatcher on the other end. (Note that this is
257   // different from just producing |MOJO_HANDLE_INVALID|.)
258   DVLOG(2) << "Serializing message pipe dispatcher (local ID = " << endpoint_id
259            << ")";
260 
261   // We now have a local ID. Before we can run the proxy endpoint, we need to
262   // get an ack back from the other side with the remote ID.
263   static_cast<SerializedMessagePipeDispatcher*>(destination)->endpoint_id =
264       endpoint_id;
265 
266   message_pipe_ = NULL;
267   port_ = kInvalidPort;
268 
269   *actual_size = sizeof(SerializedMessagePipeDispatcher);
270   return true;
271 }
272 
273 // MessagePipeDispatcherTransport ----------------------------------------------
274 
MessagePipeDispatcherTransport(DispatcherTransport transport)275 MessagePipeDispatcherTransport::MessagePipeDispatcherTransport(
276     DispatcherTransport transport) : DispatcherTransport(transport) {
277   DCHECK_EQ(message_pipe_dispatcher()->GetType(), Dispatcher::kTypeMessagePipe);
278 }
279 
280 }  // namespace system
281 }  // namespace mojo
282