• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/system/message_pipe_dispatcher.h"
6 
7 #include "base/logging.h"
8 #include "mojo/system/channel.h"
9 #include "mojo/system/channel_endpoint.h"
10 #include "mojo/system/constants.h"
11 #include "mojo/system/local_message_pipe_endpoint.h"
12 #include "mojo/system/memory.h"
13 #include "mojo/system/message_in_transit.h"
14 #include "mojo/system/message_pipe.h"
15 #include "mojo/system/options_validation.h"
16 #include "mojo/system/proxy_message_pipe_endpoint.h"
17 
18 namespace mojo {
19 namespace system {
20 
21 namespace {
22 
23 const unsigned kInvalidPort = static_cast<unsigned>(-1);
24 
25 struct SerializedMessagePipeDispatcher {
26   MessageInTransit::EndpointId endpoint_id;
27 };
28 
29 }  // namespace
30 
31 // MessagePipeDispatcher -------------------------------------------------------
32 
33 // static
34 const MojoCreateMessagePipeOptions
35     MessagePipeDispatcher::kDefaultCreateOptions = {
36         static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
37         MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE};
38 
MessagePipeDispatcher(const MojoCreateMessagePipeOptions &)39 MessagePipeDispatcher::MessagePipeDispatcher(
40     const MojoCreateMessagePipeOptions& /*validated_options*/)
41     : port_(kInvalidPort) {
42 }
43 
44 // static
ValidateCreateOptions(UserPointer<const MojoCreateMessagePipeOptions> in_options,MojoCreateMessagePipeOptions * out_options)45 MojoResult MessagePipeDispatcher::ValidateCreateOptions(
46     UserPointer<const MojoCreateMessagePipeOptions> in_options,
47     MojoCreateMessagePipeOptions* out_options) {
48   const MojoCreateMessagePipeOptionsFlags kKnownFlags =
49       MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
50 
51   *out_options = kDefaultCreateOptions;
52   if (in_options.IsNull())
53     return MOJO_RESULT_OK;
54 
55   UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options);
56   if (!reader.is_valid())
57     return MOJO_RESULT_INVALID_ARGUMENT;
58 
59   if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader))
60     return MOJO_RESULT_OK;
61   if ((reader.options().flags & ~kKnownFlags))
62     return MOJO_RESULT_UNIMPLEMENTED;
63   out_options->flags = reader.options().flags;
64 
65   // Checks for fields beyond |flags|:
66 
67   // (Nothing here yet.)
68 
69   return MOJO_RESULT_OK;
70 }
71 
Init(scoped_refptr<MessagePipe> message_pipe,unsigned port)72 void MessagePipeDispatcher::Init(scoped_refptr<MessagePipe> message_pipe,
73                                  unsigned port) {
74   DCHECK(message_pipe.get());
75   DCHECK(port == 0 || port == 1);
76 
77   message_pipe_ = message_pipe;
78   port_ = port;
79 }
80 
GetType() const81 Dispatcher::Type MessagePipeDispatcher::GetType() const {
82   return kTypeMessagePipe;
83 }
84 
85 // static
86 scoped_refptr<MessagePipeDispatcher>
CreateRemoteMessagePipe(scoped_refptr<ChannelEndpoint> * channel_endpoint)87 MessagePipeDispatcher::CreateRemoteMessagePipe(
88     scoped_refptr<ChannelEndpoint>* channel_endpoint) {
89   scoped_refptr<MessagePipe> message_pipe(
90       MessagePipe::CreateLocalProxy(channel_endpoint));
91   scoped_refptr<MessagePipeDispatcher> dispatcher(
92       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
93   dispatcher->Init(message_pipe, 0);
94   return dispatcher;
95 }
96 
97 // static
Deserialize(Channel * channel,const void * source,size_t size)98 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
99     Channel* channel,
100     const void* source,
101     size_t size) {
102   if (size != sizeof(SerializedMessagePipeDispatcher)) {
103     LOG(ERROR) << "Invalid serialized message pipe dispatcher";
104     return scoped_refptr<MessagePipeDispatcher>();
105   }
106 
107   scoped_refptr<ChannelEndpoint> channel_endpoint;
108   scoped_refptr<MessagePipeDispatcher> dispatcher =
109       CreateRemoteMessagePipe(&channel_endpoint);
110 
111   MessageInTransit::EndpointId remote_id =
112       static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id;
113   if (remote_id == MessageInTransit::kInvalidEndpointId) {
114     // This means that the other end was closed, and there were no messages
115     // enqueued for us.
116     // TODO(vtl): This is wrong. We should produce a "dead" message pipe
117     // dispatcher.
118     NOTIMPLEMENTED();
119     return scoped_refptr<MessagePipeDispatcher>();
120   }
121   MessageInTransit::EndpointId local_id =
122       channel->AttachEndpoint(channel_endpoint);
123   if (local_id == MessageInTransit::kInvalidEndpointId) {
124     LOG(ERROR) << "Failed to deserialize message pipe dispatcher (failed to "
125                   "attach; remote ID = " << remote_id << ")";
126     return scoped_refptr<MessagePipeDispatcher>();
127   }
128   DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = " << remote_id
129            << ", new local ID = " << local_id << ")";
130 
131   if (!channel->RunMessagePipeEndpoint(local_id, remote_id)) {
132     // In general, this shouldn't fail, since we generated |local_id| locally.
133     NOTREACHED();
134     return scoped_refptr<MessagePipeDispatcher>();
135   }
136 
137   // TODO(vtl): FIXME -- Need some error handling here.
138   channel->RunRemoteMessagePipeEndpoint(local_id, remote_id);
139   return dispatcher;
140 }
141 
~MessagePipeDispatcher()142 MessagePipeDispatcher::~MessagePipeDispatcher() {
143   // |Close()|/|CloseImplNoLock()| should have taken care of the pipe.
144   DCHECK(!message_pipe_.get());
145 }
146 
GetMessagePipeNoLock() const147 MessagePipe* MessagePipeDispatcher::GetMessagePipeNoLock() const {
148   lock().AssertAcquired();
149   return message_pipe_.get();
150 }
151 
GetPortNoLock() const152 unsigned MessagePipeDispatcher::GetPortNoLock() const {
153   lock().AssertAcquired();
154   return port_;
155 }
156 
CancelAllWaitersNoLock()157 void MessagePipeDispatcher::CancelAllWaitersNoLock() {
158   lock().AssertAcquired();
159   message_pipe_->CancelAllWaiters(port_);
160 }
161 
CloseImplNoLock()162 void MessagePipeDispatcher::CloseImplNoLock() {
163   lock().AssertAcquired();
164   message_pipe_->Close(port_);
165   message_pipe_ = nullptr;
166   port_ = kInvalidPort;
167 }
168 
169 scoped_refptr<Dispatcher>
CreateEquivalentDispatcherAndCloseImplNoLock()170 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
171   lock().AssertAcquired();
172 
173   // TODO(vtl): Currently, there are no options, so we just use
174   // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
175   // too.
176   scoped_refptr<MessagePipeDispatcher> rv =
177       new MessagePipeDispatcher(kDefaultCreateOptions);
178   rv->Init(message_pipe_, port_);
179   message_pipe_ = nullptr;
180   port_ = kInvalidPort;
181   return scoped_refptr<Dispatcher>(rv.get());
182 }
183 
WriteMessageImplNoLock(UserPointer<const void> bytes,uint32_t num_bytes,std::vector<DispatcherTransport> * transports,MojoWriteMessageFlags flags)184 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
185     UserPointer<const void> bytes,
186     uint32_t num_bytes,
187     std::vector<DispatcherTransport>* transports,
188     MojoWriteMessageFlags flags) {
189   DCHECK(!transports || (transports->size() > 0 &&
190                          transports->size() <= kMaxMessageNumHandles));
191 
192   lock().AssertAcquired();
193 
194   if (num_bytes > kMaxMessageNumBytes)
195     return MOJO_RESULT_RESOURCE_EXHAUSTED;
196 
197   return message_pipe_->WriteMessage(
198       port_, bytes, num_bytes, transports, flags);
199 }
200 
ReadMessageImplNoLock(UserPointer<void> bytes,UserPointer<uint32_t> num_bytes,DispatcherVector * dispatchers,uint32_t * num_dispatchers,MojoReadMessageFlags flags)201 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
202     UserPointer<void> bytes,
203     UserPointer<uint32_t> num_bytes,
204     DispatcherVector* dispatchers,
205     uint32_t* num_dispatchers,
206     MojoReadMessageFlags flags) {
207   lock().AssertAcquired();
208   return message_pipe_->ReadMessage(
209       port_, bytes, num_bytes, dispatchers, num_dispatchers, flags);
210 }
211 
GetHandleSignalsStateImplNoLock() const212 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
213     const {
214   lock().AssertAcquired();
215   return message_pipe_->GetHandleSignalsState(port_);
216 }
217 
AddWaiterImplNoLock(Waiter * waiter,MojoHandleSignals signals,uint32_t context,HandleSignalsState * signals_state)218 MojoResult MessagePipeDispatcher::AddWaiterImplNoLock(
219     Waiter* waiter,
220     MojoHandleSignals signals,
221     uint32_t context,
222     HandleSignalsState* signals_state) {
223   lock().AssertAcquired();
224   return message_pipe_->AddWaiter(
225       port_, waiter, signals, context, signals_state);
226 }
227 
RemoveWaiterImplNoLock(Waiter * waiter,HandleSignalsState * signals_state)228 void MessagePipeDispatcher::RemoveWaiterImplNoLock(
229     Waiter* waiter,
230     HandleSignalsState* signals_state) {
231   lock().AssertAcquired();
232   message_pipe_->RemoveWaiter(port_, waiter, signals_state);
233 }
234 
StartSerializeImplNoLock(Channel *,size_t * max_size,size_t * max_platform_handles)235 void MessagePipeDispatcher::StartSerializeImplNoLock(
236     Channel* /*channel*/,
237     size_t* max_size,
238     size_t* max_platform_handles) {
239   DCHECK(HasOneRef());  // Only one ref => no need to take the lock.
240   *max_size = sizeof(SerializedMessagePipeDispatcher);
241   *max_platform_handles = 0;
242 }
243 
EndSerializeAndCloseImplNoLock(Channel * channel,void * destination,size_t * actual_size,embedder::PlatformHandleVector *)244 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
245     Channel* channel,
246     void* destination,
247     size_t* actual_size,
248     embedder::PlatformHandleVector* /*platform_handles*/) {
249   DCHECK(HasOneRef());  // Only one ref => no need to take the lock.
250 
251   // Convert the local endpoint to a proxy endpoint (moving the message queue)
252   // and attach it to the channel.
253   MessageInTransit::EndpointId endpoint_id =
254       channel->AttachEndpoint(message_pipe_->ConvertLocalToProxy(port_));
255   // Note: It's okay to get an endpoint ID of |kInvalidEndpointId|. (It's
256   // possible that the other endpoint -- the one that we're not sending -- was
257   // closed in the intervening time.) In that case, we need to deserialize a
258   // "dead" message pipe dispatcher on the other end. (Note that this is
259   // different from just producing |MOJO_HANDLE_INVALID|.)
260   DVLOG(2) << "Serializing message pipe dispatcher (local ID = " << endpoint_id
261            << ")";
262 
263   // We now have a local ID. Before we can run the proxy endpoint, we need to
264   // get an ack back from the other side with the remote ID.
265   static_cast<SerializedMessagePipeDispatcher*>(destination)->endpoint_id =
266       endpoint_id;
267 
268   message_pipe_ = nullptr;
269   port_ = kInvalidPort;
270 
271   *actual_size = sizeof(SerializedMessagePipeDispatcher);
272   return true;
273 }
274 
275 // MessagePipeDispatcherTransport ----------------------------------------------
276 
MessagePipeDispatcherTransport(DispatcherTransport transport)277 MessagePipeDispatcherTransport::MessagePipeDispatcherTransport(
278     DispatcherTransport transport)
279     : DispatcherTransport(transport) {
280   DCHECK_EQ(message_pipe_dispatcher()->GetType(), Dispatcher::kTypeMessagePipe);
281 }
282 
283 }  // namespace system
284 }  // namespace mojo
285