1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/message_pipe_dispatcher.h"
6
7 #include "base/logging.h"
8 #include "mojo/system/channel.h"
9 #include "mojo/system/constants.h"
10 #include "mojo/system/local_message_pipe_endpoint.h"
11 #include "mojo/system/memory.h"
12 #include "mojo/system/message_in_transit.h"
13 #include "mojo/system/message_pipe.h"
14 #include "mojo/system/options_validation.h"
15 #include "mojo/system/proxy_message_pipe_endpoint.h"
16
17 namespace mojo {
18 namespace system {
19
20 namespace {
21
22 const unsigned kInvalidPort = static_cast<unsigned>(-1);
23
24 struct SerializedMessagePipeDispatcher {
25 MessageInTransit::EndpointId endpoint_id;
26 };
27
28 } // namespace
29
30 // MessagePipeDispatcher -------------------------------------------------------
31
32 // static
33 const MojoCreateMessagePipeOptions
34 MessagePipeDispatcher::kDefaultCreateOptions = {
35 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
36 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE
37 };
38
MessagePipeDispatcher(const MojoCreateMessagePipeOptions &)39 MessagePipeDispatcher::MessagePipeDispatcher(
40 const MojoCreateMessagePipeOptions& /*validated_options*/)
41 : port_(kInvalidPort) {
42 }
43
44 // static
ValidateCreateOptions(const MojoCreateMessagePipeOptions * in_options,MojoCreateMessagePipeOptions * out_options)45 MojoResult MessagePipeDispatcher::ValidateCreateOptions(
46 const MojoCreateMessagePipeOptions* in_options,
47 MojoCreateMessagePipeOptions* out_options) {
48 const MojoCreateMessagePipeOptionsFlags kKnownFlags =
49 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
50
51 *out_options = kDefaultCreateOptions;
52 if (!in_options)
53 return MOJO_RESULT_OK;
54
55 MojoResult result =
56 ValidateOptionsStructPointerSizeAndFlags<MojoCreateMessagePipeOptions>(
57 in_options, kKnownFlags, out_options);
58 if (result != MOJO_RESULT_OK)
59 return result;
60
61 // Checks for fields beyond |flags|:
62
63 // (Nothing here yet.)
64
65 return MOJO_RESULT_OK;
66 }
67
Init(scoped_refptr<MessagePipe> message_pipe,unsigned port)68 void MessagePipeDispatcher::Init(scoped_refptr<MessagePipe> message_pipe,
69 unsigned port) {
70 DCHECK(message_pipe);
71 DCHECK(port == 0 || port == 1);
72
73 message_pipe_ = message_pipe;
74 port_ = port;
75 }
76
GetType() const77 Dispatcher::Type MessagePipeDispatcher::GetType() const {
78 return kTypeMessagePipe;
79 }
80
81 // static
82 std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
CreateRemoteMessagePipe()83 MessagePipeDispatcher::CreateRemoteMessagePipe() {
84 scoped_refptr<MessagePipe> message_pipe(
85 new MessagePipe(
86 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
87 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
88 scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher(
89 MessagePipeDispatcher::kDefaultCreateOptions));
90 dispatcher->Init(message_pipe, 0);
91
92 return std::make_pair(dispatcher, message_pipe);
93 }
94
95 // static
Deserialize(Channel * channel,const void * source,size_t size)96 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
97 Channel* channel,
98 const void* source,
99 size_t size) {
100 if (size != sizeof(SerializedMessagePipeDispatcher)) {
101 LOG(ERROR) << "Invalid serialized message pipe dispatcher";
102 return scoped_refptr<MessagePipeDispatcher>();
103 }
104
105 std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
106 remote_message_pipe = CreateRemoteMessagePipe();
107
108 MessageInTransit::EndpointId remote_id =
109 static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id;
110 if (remote_id == MessageInTransit::kInvalidEndpointId) {
111 // This means that the other end was closed, and there were no messages
112 // enqueued for us.
113 // TODO(vtl): This is wrong. We should produce a "dead" message pipe
114 // dispatcher.
115 NOTIMPLEMENTED();
116 return scoped_refptr<MessagePipeDispatcher>();
117 }
118 MessageInTransit::EndpointId local_id =
119 channel->AttachMessagePipeEndpoint(remote_message_pipe.second, 1);
120 if (local_id == MessageInTransit::kInvalidEndpointId) {
121 LOG(ERROR) << "Failed to deserialize message pipe dispatcher (failed to "
122 "attach; remote ID = " << remote_id << ")";
123 return scoped_refptr<MessagePipeDispatcher>();
124 }
125 DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = "
126 << remote_id << ", new local ID = " << local_id << ")";
127
128 if (!channel->RunMessagePipeEndpoint(local_id, remote_id)) {
129 // In general, this shouldn't fail, since we generated |local_id| locally.
130 NOTREACHED();
131 return scoped_refptr<MessagePipeDispatcher>();
132 }
133
134 // TODO(vtl): FIXME -- Need some error handling here.
135 channel->RunRemoteMessagePipeEndpoint(local_id, remote_id);
136 return remote_message_pipe.first;
137 }
138
~MessagePipeDispatcher()139 MessagePipeDispatcher::~MessagePipeDispatcher() {
140 // |Close()|/|CloseImplNoLock()| should have taken care of the pipe.
141 DCHECK(!message_pipe_);
142 }
143
GetMessagePipeNoLock() const144 MessagePipe* MessagePipeDispatcher::GetMessagePipeNoLock() const {
145 lock().AssertAcquired();
146 return message_pipe_.get();
147 }
148
GetPortNoLock() const149 unsigned MessagePipeDispatcher::GetPortNoLock() const {
150 lock().AssertAcquired();
151 return port_;
152 }
153
CancelAllWaitersNoLock()154 void MessagePipeDispatcher::CancelAllWaitersNoLock() {
155 lock().AssertAcquired();
156 message_pipe_->CancelAllWaiters(port_);
157 }
158
CloseImplNoLock()159 void MessagePipeDispatcher::CloseImplNoLock() {
160 lock().AssertAcquired();
161 message_pipe_->Close(port_);
162 message_pipe_ = NULL;
163 port_ = kInvalidPort;
164 }
165
166 scoped_refptr<Dispatcher>
CreateEquivalentDispatcherAndCloseImplNoLock()167 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
168 lock().AssertAcquired();
169
170 // TODO(vtl): Currently, there are no options, so we just use
171 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
172 // too.
173 scoped_refptr<MessagePipeDispatcher> rv =
174 new MessagePipeDispatcher(kDefaultCreateOptions);
175 rv->Init(message_pipe_, port_);
176 message_pipe_ = NULL;
177 port_ = kInvalidPort;
178 return scoped_refptr<Dispatcher>(rv.get());
179 }
180
WriteMessageImplNoLock(const void * bytes,uint32_t num_bytes,std::vector<DispatcherTransport> * transports,MojoWriteMessageFlags flags)181 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
182 const void* bytes,
183 uint32_t num_bytes,
184 std::vector<DispatcherTransport>* transports,
185 MojoWriteMessageFlags flags) {
186 DCHECK(!transports || (transports->size() > 0 &&
187 transports->size() <= kMaxMessageNumHandles));
188
189 lock().AssertAcquired();
190
191 if (!VerifyUserPointerWithSize<1>(bytes, num_bytes))
192 return MOJO_RESULT_INVALID_ARGUMENT;
193 if (num_bytes > kMaxMessageNumBytes)
194 return MOJO_RESULT_RESOURCE_EXHAUSTED;
195
196 return message_pipe_->WriteMessage(port_, bytes, num_bytes, transports,
197 flags);
198 }
199
ReadMessageImplNoLock(void * bytes,uint32_t * num_bytes,DispatcherVector * dispatchers,uint32_t * num_dispatchers,MojoReadMessageFlags flags)200 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
201 void* bytes,
202 uint32_t* num_bytes,
203 DispatcherVector* dispatchers,
204 uint32_t* num_dispatchers,
205 MojoReadMessageFlags flags) {
206 lock().AssertAcquired();
207
208 if (num_bytes) {
209 if (!VerifyUserPointer<uint32_t>(num_bytes))
210 return MOJO_RESULT_INVALID_ARGUMENT;
211 if (!VerifyUserPointerWithSize<1>(bytes, *num_bytes))
212 return MOJO_RESULT_INVALID_ARGUMENT;
213 }
214
215 return message_pipe_->ReadMessage(port_, bytes, num_bytes, dispatchers,
216 num_dispatchers, flags);
217 }
218
AddWaiterImplNoLock(Waiter * waiter,MojoHandleSignals signals,uint32_t context)219 MojoResult MessagePipeDispatcher::AddWaiterImplNoLock(Waiter* waiter,
220 MojoHandleSignals signals,
221 uint32_t context) {
222 lock().AssertAcquired();
223 return message_pipe_->AddWaiter(port_, waiter, signals, context);
224 }
225
RemoveWaiterImplNoLock(Waiter * waiter)226 void MessagePipeDispatcher::RemoveWaiterImplNoLock(Waiter* waiter) {
227 lock().AssertAcquired();
228 message_pipe_->RemoveWaiter(port_, waiter);
229 }
230
StartSerializeImplNoLock(Channel *,size_t * max_size,size_t * max_platform_handles)231 void MessagePipeDispatcher::StartSerializeImplNoLock(
232 Channel* /*channel*/,
233 size_t* max_size,
234 size_t* max_platform_handles) {
235 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
236 *max_size = sizeof(SerializedMessagePipeDispatcher);
237 *max_platform_handles = 0;
238 }
239
EndSerializeAndCloseImplNoLock(Channel * channel,void * destination,size_t * actual_size,embedder::PlatformHandleVector *)240 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
241 Channel* channel,
242 void* destination,
243 size_t* actual_size,
244 embedder::PlatformHandleVector* /*platform_handles*/) {
245 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
246
247 // Convert the local endpoint to a proxy endpoint (moving the message queue).
248 message_pipe_->ConvertLocalToProxy(port_);
249
250 // Attach the new proxy endpoint to the channel.
251 MessageInTransit::EndpointId endpoint_id =
252 channel->AttachMessagePipeEndpoint(message_pipe_, port_);
253 // Note: It's okay to get an endpoint ID of |kInvalidEndpointId|. (It's
254 // possible that the other endpoint -- the one that we're not sending -- was
255 // closed in the intervening time.) In that case, we need to deserialize a
256 // "dead" message pipe dispatcher on the other end. (Note that this is
257 // different from just producing |MOJO_HANDLE_INVALID|.)
258 DVLOG(2) << "Serializing message pipe dispatcher (local ID = " << endpoint_id
259 << ")";
260
261 // We now have a local ID. Before we can run the proxy endpoint, we need to
262 // get an ack back from the other side with the remote ID.
263 static_cast<SerializedMessagePipeDispatcher*>(destination)->endpoint_id =
264 endpoint_id;
265
266 message_pipe_ = NULL;
267 port_ = kInvalidPort;
268
269 *actual_size = sizeof(SerializedMessagePipeDispatcher);
270 return true;
271 }
272
273 // MessagePipeDispatcherTransport ----------------------------------------------
274
MessagePipeDispatcherTransport(DispatcherTransport transport)275 MessagePipeDispatcherTransport::MessagePipeDispatcherTransport(
276 DispatcherTransport transport) : DispatcherTransport(transport) {
277 DCHECK_EQ(message_pipe_dispatcher()->GetType(), Dispatcher::kTypeMessagePipe);
278 }
279
280 } // namespace system
281 } // namespace mojo
282