1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/message_pipe_dispatcher.h"
6
7 #include "base/logging.h"
8 #include "mojo/system/channel.h"
9 #include "mojo/system/channel_endpoint.h"
10 #include "mojo/system/constants.h"
11 #include "mojo/system/local_message_pipe_endpoint.h"
12 #include "mojo/system/memory.h"
13 #include "mojo/system/message_in_transit.h"
14 #include "mojo/system/message_pipe.h"
15 #include "mojo/system/options_validation.h"
16 #include "mojo/system/proxy_message_pipe_endpoint.h"
17
18 namespace mojo {
19 namespace system {
20
21 namespace {
22
23 const unsigned kInvalidPort = static_cast<unsigned>(-1);
24
25 struct SerializedMessagePipeDispatcher {
26 MessageInTransit::EndpointId endpoint_id;
27 };
28
29 } // namespace
30
31 // MessagePipeDispatcher -------------------------------------------------------
32
33 // static
34 const MojoCreateMessagePipeOptions
35 MessagePipeDispatcher::kDefaultCreateOptions = {
36 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
37 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE};
38
MessagePipeDispatcher(const MojoCreateMessagePipeOptions &)39 MessagePipeDispatcher::MessagePipeDispatcher(
40 const MojoCreateMessagePipeOptions& /*validated_options*/)
41 : port_(kInvalidPort) {
42 }
43
44 // static
ValidateCreateOptions(UserPointer<const MojoCreateMessagePipeOptions> in_options,MojoCreateMessagePipeOptions * out_options)45 MojoResult MessagePipeDispatcher::ValidateCreateOptions(
46 UserPointer<const MojoCreateMessagePipeOptions> in_options,
47 MojoCreateMessagePipeOptions* out_options) {
48 const MojoCreateMessagePipeOptionsFlags kKnownFlags =
49 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
50
51 *out_options = kDefaultCreateOptions;
52 if (in_options.IsNull())
53 return MOJO_RESULT_OK;
54
55 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options);
56 if (!reader.is_valid())
57 return MOJO_RESULT_INVALID_ARGUMENT;
58
59 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader))
60 return MOJO_RESULT_OK;
61 if ((reader.options().flags & ~kKnownFlags))
62 return MOJO_RESULT_UNIMPLEMENTED;
63 out_options->flags = reader.options().flags;
64
65 // Checks for fields beyond |flags|:
66
67 // (Nothing here yet.)
68
69 return MOJO_RESULT_OK;
70 }
71
Init(scoped_refptr<MessagePipe> message_pipe,unsigned port)72 void MessagePipeDispatcher::Init(scoped_refptr<MessagePipe> message_pipe,
73 unsigned port) {
74 DCHECK(message_pipe.get());
75 DCHECK(port == 0 || port == 1);
76
77 message_pipe_ = message_pipe;
78 port_ = port;
79 }
80
GetType() const81 Dispatcher::Type MessagePipeDispatcher::GetType() const {
82 return kTypeMessagePipe;
83 }
84
85 // static
86 scoped_refptr<MessagePipeDispatcher>
CreateRemoteMessagePipe(scoped_refptr<ChannelEndpoint> * channel_endpoint)87 MessagePipeDispatcher::CreateRemoteMessagePipe(
88 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
89 scoped_refptr<MessagePipe> message_pipe(
90 MessagePipe::CreateLocalProxy(channel_endpoint));
91 scoped_refptr<MessagePipeDispatcher> dispatcher(
92 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
93 dispatcher->Init(message_pipe, 0);
94 return dispatcher;
95 }
96
97 // static
Deserialize(Channel * channel,const void * source,size_t size)98 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
99 Channel* channel,
100 const void* source,
101 size_t size) {
102 if (size != sizeof(SerializedMessagePipeDispatcher)) {
103 LOG(ERROR) << "Invalid serialized message pipe dispatcher";
104 return scoped_refptr<MessagePipeDispatcher>();
105 }
106
107 scoped_refptr<ChannelEndpoint> channel_endpoint;
108 scoped_refptr<MessagePipeDispatcher> dispatcher =
109 CreateRemoteMessagePipe(&channel_endpoint);
110
111 MessageInTransit::EndpointId remote_id =
112 static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id;
113 if (remote_id == MessageInTransit::kInvalidEndpointId) {
114 // This means that the other end was closed, and there were no messages
115 // enqueued for us.
116 // TODO(vtl): This is wrong. We should produce a "dead" message pipe
117 // dispatcher.
118 NOTIMPLEMENTED();
119 return scoped_refptr<MessagePipeDispatcher>();
120 }
121 MessageInTransit::EndpointId local_id =
122 channel->AttachEndpoint(channel_endpoint);
123 if (local_id == MessageInTransit::kInvalidEndpointId) {
124 LOG(ERROR) << "Failed to deserialize message pipe dispatcher (failed to "
125 "attach; remote ID = " << remote_id << ")";
126 return scoped_refptr<MessagePipeDispatcher>();
127 }
128 DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = " << remote_id
129 << ", new local ID = " << local_id << ")";
130
131 if (!channel->RunMessagePipeEndpoint(local_id, remote_id)) {
132 // In general, this shouldn't fail, since we generated |local_id| locally.
133 NOTREACHED();
134 return scoped_refptr<MessagePipeDispatcher>();
135 }
136
137 // TODO(vtl): FIXME -- Need some error handling here.
138 channel->RunRemoteMessagePipeEndpoint(local_id, remote_id);
139 return dispatcher;
140 }
141
~MessagePipeDispatcher()142 MessagePipeDispatcher::~MessagePipeDispatcher() {
143 // |Close()|/|CloseImplNoLock()| should have taken care of the pipe.
144 DCHECK(!message_pipe_.get());
145 }
146
GetMessagePipeNoLock() const147 MessagePipe* MessagePipeDispatcher::GetMessagePipeNoLock() const {
148 lock().AssertAcquired();
149 return message_pipe_.get();
150 }
151
GetPortNoLock() const152 unsigned MessagePipeDispatcher::GetPortNoLock() const {
153 lock().AssertAcquired();
154 return port_;
155 }
156
CancelAllWaitersNoLock()157 void MessagePipeDispatcher::CancelAllWaitersNoLock() {
158 lock().AssertAcquired();
159 message_pipe_->CancelAllWaiters(port_);
160 }
161
CloseImplNoLock()162 void MessagePipeDispatcher::CloseImplNoLock() {
163 lock().AssertAcquired();
164 message_pipe_->Close(port_);
165 message_pipe_ = nullptr;
166 port_ = kInvalidPort;
167 }
168
169 scoped_refptr<Dispatcher>
CreateEquivalentDispatcherAndCloseImplNoLock()170 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
171 lock().AssertAcquired();
172
173 // TODO(vtl): Currently, there are no options, so we just use
174 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
175 // too.
176 scoped_refptr<MessagePipeDispatcher> rv =
177 new MessagePipeDispatcher(kDefaultCreateOptions);
178 rv->Init(message_pipe_, port_);
179 message_pipe_ = nullptr;
180 port_ = kInvalidPort;
181 return scoped_refptr<Dispatcher>(rv.get());
182 }
183
WriteMessageImplNoLock(UserPointer<const void> bytes,uint32_t num_bytes,std::vector<DispatcherTransport> * transports,MojoWriteMessageFlags flags)184 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
185 UserPointer<const void> bytes,
186 uint32_t num_bytes,
187 std::vector<DispatcherTransport>* transports,
188 MojoWriteMessageFlags flags) {
189 DCHECK(!transports || (transports->size() > 0 &&
190 transports->size() <= kMaxMessageNumHandles));
191
192 lock().AssertAcquired();
193
194 if (num_bytes > kMaxMessageNumBytes)
195 return MOJO_RESULT_RESOURCE_EXHAUSTED;
196
197 return message_pipe_->WriteMessage(
198 port_, bytes, num_bytes, transports, flags);
199 }
200
ReadMessageImplNoLock(UserPointer<void> bytes,UserPointer<uint32_t> num_bytes,DispatcherVector * dispatchers,uint32_t * num_dispatchers,MojoReadMessageFlags flags)201 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
202 UserPointer<void> bytes,
203 UserPointer<uint32_t> num_bytes,
204 DispatcherVector* dispatchers,
205 uint32_t* num_dispatchers,
206 MojoReadMessageFlags flags) {
207 lock().AssertAcquired();
208 return message_pipe_->ReadMessage(
209 port_, bytes, num_bytes, dispatchers, num_dispatchers, flags);
210 }
211
GetHandleSignalsStateImplNoLock() const212 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
213 const {
214 lock().AssertAcquired();
215 return message_pipe_->GetHandleSignalsState(port_);
216 }
217
AddWaiterImplNoLock(Waiter * waiter,MojoHandleSignals signals,uint32_t context,HandleSignalsState * signals_state)218 MojoResult MessagePipeDispatcher::AddWaiterImplNoLock(
219 Waiter* waiter,
220 MojoHandleSignals signals,
221 uint32_t context,
222 HandleSignalsState* signals_state) {
223 lock().AssertAcquired();
224 return message_pipe_->AddWaiter(
225 port_, waiter, signals, context, signals_state);
226 }
227
RemoveWaiterImplNoLock(Waiter * waiter,HandleSignalsState * signals_state)228 void MessagePipeDispatcher::RemoveWaiterImplNoLock(
229 Waiter* waiter,
230 HandleSignalsState* signals_state) {
231 lock().AssertAcquired();
232 message_pipe_->RemoveWaiter(port_, waiter, signals_state);
233 }
234
StartSerializeImplNoLock(Channel *,size_t * max_size,size_t * max_platform_handles)235 void MessagePipeDispatcher::StartSerializeImplNoLock(
236 Channel* /*channel*/,
237 size_t* max_size,
238 size_t* max_platform_handles) {
239 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
240 *max_size = sizeof(SerializedMessagePipeDispatcher);
241 *max_platform_handles = 0;
242 }
243
EndSerializeAndCloseImplNoLock(Channel * channel,void * destination,size_t * actual_size,embedder::PlatformHandleVector *)244 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
245 Channel* channel,
246 void* destination,
247 size_t* actual_size,
248 embedder::PlatformHandleVector* /*platform_handles*/) {
249 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
250
251 // Convert the local endpoint to a proxy endpoint (moving the message queue)
252 // and attach it to the channel.
253 MessageInTransit::EndpointId endpoint_id =
254 channel->AttachEndpoint(message_pipe_->ConvertLocalToProxy(port_));
255 // Note: It's okay to get an endpoint ID of |kInvalidEndpointId|. (It's
256 // possible that the other endpoint -- the one that we're not sending -- was
257 // closed in the intervening time.) In that case, we need to deserialize a
258 // "dead" message pipe dispatcher on the other end. (Note that this is
259 // different from just producing |MOJO_HANDLE_INVALID|.)
260 DVLOG(2) << "Serializing message pipe dispatcher (local ID = " << endpoint_id
261 << ")";
262
263 // We now have a local ID. Before we can run the proxy endpoint, we need to
264 // get an ack back from the other side with the remote ID.
265 static_cast<SerializedMessagePipeDispatcher*>(destination)->endpoint_id =
266 endpoint_id;
267
268 message_pipe_ = nullptr;
269 port_ = kInvalidPort;
270
271 *actual_size = sizeof(SerializedMessagePipeDispatcher);
272 return true;
273 }
274
275 // MessagePipeDispatcherTransport ----------------------------------------------
276
MessagePipeDispatcherTransport(DispatcherTransport transport)277 MessagePipeDispatcherTransport::MessagePipeDispatcherTransport(
278 DispatcherTransport transport)
279 : DispatcherTransport(transport) {
280 DCHECK_EQ(message_pipe_dispatcher()->GetType(), Dispatcher::kTypeMessagePipe);
281 }
282
283 } // namespace system
284 } // namespace mojo
285