1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/message_pipe.h"
6
7 #include "base/logging.h"
8 #include "mojo/system/channel_endpoint.h"
9 #include "mojo/system/local_message_pipe_endpoint.h"
10 #include "mojo/system/message_in_transit.h"
11 #include "mojo/system/message_pipe_dispatcher.h"
12 #include "mojo/system/message_pipe_endpoint.h"
13 #include "mojo/system/proxy_message_pipe_endpoint.h"
14
15 namespace mojo {
16 namespace system {
17
18 // static
CreateLocalLocal()19 MessagePipe* MessagePipe::CreateLocalLocal() {
20 MessagePipe* message_pipe = new MessagePipe();
21 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
22 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
23 return message_pipe;
24 }
25
26 // static
CreateLocalProxy(scoped_refptr<ChannelEndpoint> * channel_endpoint)27 MessagePipe* MessagePipe::CreateLocalProxy(
28 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
29 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
30 MessagePipe* message_pipe = new MessagePipe();
31 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
32 *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
33 message_pipe->endpoints_[1].reset(
34 new ProxyMessagePipeEndpoint(channel_endpoint->get()));
35 return message_pipe;
36 }
37
38 // static
CreateProxyLocal(scoped_refptr<ChannelEndpoint> * channel_endpoint)39 MessagePipe* MessagePipe::CreateProxyLocal(
40 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
41 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
42 MessagePipe* message_pipe = new MessagePipe();
43 *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
44 message_pipe->endpoints_[0].reset(
45 new ProxyMessagePipeEndpoint(channel_endpoint->get()));
46 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
47 return message_pipe;
48 }
49
50 // static
GetPeerPort(unsigned port)51 unsigned MessagePipe::GetPeerPort(unsigned port) {
52 DCHECK(port == 0 || port == 1);
53 return port ^ 1;
54 }
55
GetType(unsigned port)56 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
57 DCHECK(port == 0 || port == 1);
58 base::AutoLock locker(lock_);
59 DCHECK(endpoints_[port]);
60
61 return endpoints_[port]->GetType();
62 }
63
CancelAllWaiters(unsigned port)64 void MessagePipe::CancelAllWaiters(unsigned port) {
65 DCHECK(port == 0 || port == 1);
66
67 base::AutoLock locker(lock_);
68 DCHECK(endpoints_[port]);
69 endpoints_[port]->CancelAllWaiters();
70 }
71
Close(unsigned port)72 void MessagePipe::Close(unsigned port) {
73 DCHECK(port == 0 || port == 1);
74
75 unsigned destination_port = GetPeerPort(port);
76
77 base::AutoLock locker(lock_);
78 DCHECK(endpoints_[port]);
79
80 endpoints_[port]->Close();
81 if (endpoints_[destination_port]) {
82 if (!endpoints_[destination_port]->OnPeerClose())
83 endpoints_[destination_port].reset();
84 }
85 endpoints_[port].reset();
86 }
87
88 // TODO(vtl): Handle flags.
WriteMessage(unsigned port,UserPointer<const void> bytes,uint32_t num_bytes,std::vector<DispatcherTransport> * transports,MojoWriteMessageFlags flags)89 MojoResult MessagePipe::WriteMessage(
90 unsigned port,
91 UserPointer<const void> bytes,
92 uint32_t num_bytes,
93 std::vector<DispatcherTransport>* transports,
94 MojoWriteMessageFlags flags) {
95 DCHECK(port == 0 || port == 1);
96 return EnqueueMessageInternal(
97 GetPeerPort(port),
98 make_scoped_ptr(new MessageInTransit(
99 MessageInTransit::kTypeMessagePipeEndpoint,
100 MessageInTransit::kSubtypeMessagePipeEndpointData,
101 num_bytes,
102 bytes)),
103 transports);
104 }
105
ReadMessage(unsigned port,UserPointer<void> bytes,UserPointer<uint32_t> num_bytes,DispatcherVector * dispatchers,uint32_t * num_dispatchers,MojoReadMessageFlags flags)106 MojoResult MessagePipe::ReadMessage(unsigned port,
107 UserPointer<void> bytes,
108 UserPointer<uint32_t> num_bytes,
109 DispatcherVector* dispatchers,
110 uint32_t* num_dispatchers,
111 MojoReadMessageFlags flags) {
112 DCHECK(port == 0 || port == 1);
113
114 base::AutoLock locker(lock_);
115 DCHECK(endpoints_[port]);
116
117 return endpoints_[port]->ReadMessage(
118 bytes, num_bytes, dispatchers, num_dispatchers, flags);
119 }
120
GetHandleSignalsState(unsigned port) const121 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
122 DCHECK(port == 0 || port == 1);
123
124 base::AutoLock locker(const_cast<base::Lock&>(lock_));
125 DCHECK(endpoints_[port]);
126
127 return endpoints_[port]->GetHandleSignalsState();
128 }
129
AddWaiter(unsigned port,Waiter * waiter,MojoHandleSignals signals,uint32_t context,HandleSignalsState * signals_state)130 MojoResult MessagePipe::AddWaiter(unsigned port,
131 Waiter* waiter,
132 MojoHandleSignals signals,
133 uint32_t context,
134 HandleSignalsState* signals_state) {
135 DCHECK(port == 0 || port == 1);
136
137 base::AutoLock locker(lock_);
138 DCHECK(endpoints_[port]);
139
140 return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state);
141 }
142
RemoveWaiter(unsigned port,Waiter * waiter,HandleSignalsState * signals_state)143 void MessagePipe::RemoveWaiter(unsigned port,
144 Waiter* waiter,
145 HandleSignalsState* signals_state) {
146 DCHECK(port == 0 || port == 1);
147
148 base::AutoLock locker(lock_);
149 DCHECK(endpoints_[port]);
150
151 endpoints_[port]->RemoveWaiter(waiter, signals_state);
152 }
153
ConvertLocalToProxy(unsigned port)154 scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
155 DCHECK(port == 0 || port == 1);
156
157 base::AutoLock locker(lock_);
158 DCHECK(endpoints_[port]);
159 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
160
161 // TODO(vtl): Allowing this case is a temporary hack. It'll set up a
162 // |MessagePipe| with two proxy endpoints, which will then act as a proxy
163 // (rather than trying to connect the two ends directly).
164 DLOG_IF(WARNING,
165 !!endpoints_[GetPeerPort(port)] &&
166 endpoints_[GetPeerPort(port)]->GetType() !=
167 MessagePipeEndpoint::kTypeLocal)
168 << "Direct message pipe passing across multiple channels not yet "
169 "implemented; will proxy";
170
171 scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass());
172 scoped_refptr<ChannelEndpoint> channel_endpoint(
173 new ChannelEndpoint(this, port));
174 endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get()));
175 channel_endpoint->TakeMessages(static_cast<LocalMessagePipeEndpoint*>(
176 old_endpoint.get())->message_queue());
177 old_endpoint->Close();
178
179 return channel_endpoint;
180 }
181
EnqueueMessage(unsigned port,scoped_ptr<MessageInTransit> message)182 MojoResult MessagePipe::EnqueueMessage(unsigned port,
183 scoped_ptr<MessageInTransit> message) {
184 return EnqueueMessageInternal(port, message.Pass(), nullptr);
185 }
186
OnRemove(unsigned port)187 void MessagePipe::OnRemove(unsigned port) {
188 unsigned destination_port = GetPeerPort(port);
189
190 base::AutoLock locker(lock_);
191 // A |OnPeerClose()| can come in first, before |OnRemove()| gets called.
192 if (!endpoints_[port])
193 return;
194
195 if (endpoints_[destination_port]) {
196 if (!endpoints_[destination_port]->OnPeerClose())
197 endpoints_[destination_port].reset();
198 }
199 endpoints_[port].reset();
200 }
201
MessagePipe()202 MessagePipe::MessagePipe() {
203 }
204
~MessagePipe()205 MessagePipe::~MessagePipe() {
206 // Owned by the dispatchers. The owning dispatchers should only release us via
207 // their |Close()| method, which should inform us of being closed via our
208 // |Close()|. Thus these should already be null.
209 DCHECK(!endpoints_[0]);
210 DCHECK(!endpoints_[1]);
211 }
212
EnqueueMessageInternal(unsigned port,scoped_ptr<MessageInTransit> message,std::vector<DispatcherTransport> * transports)213 MojoResult MessagePipe::EnqueueMessageInternal(
214 unsigned port,
215 scoped_ptr<MessageInTransit> message,
216 std::vector<DispatcherTransport>* transports) {
217 DCHECK(port == 0 || port == 1);
218 DCHECK(message);
219
220 if (message->type() == MessageInTransit::kTypeMessagePipe) {
221 DCHECK(!transports);
222 return HandleControlMessage(port, message.Pass());
223 }
224
225 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
226
227 base::AutoLock locker(lock_);
228 DCHECK(endpoints_[GetPeerPort(port)]);
229
230 // The destination port need not be open, unlike the source port.
231 if (!endpoints_[port])
232 return MOJO_RESULT_FAILED_PRECONDITION;
233
234 if (transports) {
235 MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
236 if (result != MOJO_RESULT_OK)
237 return result;
238 }
239
240 // The endpoint's |EnqueueMessage()| may not report failure.
241 endpoints_[port]->EnqueueMessage(message.Pass());
242 return MOJO_RESULT_OK;
243 }
244
AttachTransportsNoLock(unsigned port,MessageInTransit * message,std::vector<DispatcherTransport> * transports)245 MojoResult MessagePipe::AttachTransportsNoLock(
246 unsigned port,
247 MessageInTransit* message,
248 std::vector<DispatcherTransport>* transports) {
249 DCHECK(!message->has_dispatchers());
250
251 // You're not allowed to send either handle to a message pipe over the message
252 // pipe, so check for this. (The case of trying to write a handle to itself is
253 // taken care of by |Core|. That case kind of makes sense, but leads to
254 // complications if, e.g., both sides try to do the same thing with their
255 // respective handles simultaneously. The other case, of trying to write the
256 // peer handle to a handle, doesn't make sense -- since no handle will be
257 // available to read the message from.)
258 for (size_t i = 0; i < transports->size(); i++) {
259 if (!(*transports)[i].is_valid())
260 continue;
261 if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
262 MessagePipeDispatcherTransport mp_transport((*transports)[i]);
263 if (mp_transport.GetMessagePipe() == this) {
264 // The other case should have been disallowed by |Core|. (Note: |port|
265 // is the peer port of the handle given to |WriteMessage()|.)
266 DCHECK_EQ(mp_transport.GetPort(), port);
267 return MOJO_RESULT_INVALID_ARGUMENT;
268 }
269 }
270 }
271
272 // Clone the dispatchers and attach them to the message. (This must be done as
273 // a separate loop, since we want to leave the dispatchers alone on failure.)
274 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
275 dispatchers->reserve(transports->size());
276 for (size_t i = 0; i < transports->size(); i++) {
277 if ((*transports)[i].is_valid()) {
278 dispatchers->push_back(
279 (*transports)[i].CreateEquivalentDispatcherAndClose());
280 } else {
281 LOG(WARNING) << "Enqueueing null dispatcher";
282 dispatchers->push_back(scoped_refptr<Dispatcher>());
283 }
284 }
285 message->SetDispatchers(dispatchers.Pass());
286 return MOJO_RESULT_OK;
287 }
288
HandleControlMessage(unsigned,scoped_ptr<MessageInTransit> message)289 MojoResult MessagePipe::HandleControlMessage(
290 unsigned /*port*/,
291 scoped_ptr<MessageInTransit> message) {
292 LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
293 << message->subtype();
294 return MOJO_RESULT_UNKNOWN;
295 }
296
297 } // namespace system
298 } // namespace mojo
299