1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/message_pipe.h"
6
7 #include "base/logging.h"
8 #include "mojo/system/channel.h"
9 #include "mojo/system/local_message_pipe_endpoint.h"
10 #include "mojo/system/message_in_transit.h"
11 #include "mojo/system/message_pipe_dispatcher.h"
12 #include "mojo/system/message_pipe_endpoint.h"
13 #include "mojo/system/proxy_message_pipe_endpoint.h"
14
15 namespace mojo {
16 namespace system {
17
MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0,scoped_ptr<MessagePipeEndpoint> endpoint1)18 MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0,
19 scoped_ptr<MessagePipeEndpoint> endpoint1) {
20 endpoints_[0].reset(endpoint0.release());
21 endpoints_[1].reset(endpoint1.release());
22 }
23
MessagePipe()24 MessagePipe::MessagePipe() {
25 endpoints_[0].reset(new LocalMessagePipeEndpoint());
26 endpoints_[1].reset(new LocalMessagePipeEndpoint());
27 }
28
29 // static
GetPeerPort(unsigned port)30 unsigned MessagePipe::GetPeerPort(unsigned port) {
31 DCHECK(port == 0 || port == 1);
32 return port ^ 1;
33 }
34
GetType(unsigned port)35 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
36 DCHECK(port == 0 || port == 1);
37 base::AutoLock locker(lock_);
38 DCHECK(endpoints_[port]);
39
40 return endpoints_[port]->GetType();
41 }
42
CancelAllWaiters(unsigned port)43 void MessagePipe::CancelAllWaiters(unsigned port) {
44 DCHECK(port == 0 || port == 1);
45
46 base::AutoLock locker(lock_);
47 DCHECK(endpoints_[port]);
48 endpoints_[port]->CancelAllWaiters();
49 }
50
Close(unsigned port)51 void MessagePipe::Close(unsigned port) {
52 DCHECK(port == 0 || port == 1);
53
54 unsigned destination_port = GetPeerPort(port);
55
56 base::AutoLock locker(lock_);
57 DCHECK(endpoints_[port]);
58
59 endpoints_[port]->Close();
60 if (endpoints_[destination_port]) {
61 if (!endpoints_[destination_port]->OnPeerClose())
62 endpoints_[destination_port].reset();
63 }
64 endpoints_[port].reset();
65 }
66
67 // TODO(vtl): Handle flags.
WriteMessage(unsigned port,const void * bytes,uint32_t num_bytes,std::vector<DispatcherTransport> * transports,MojoWriteMessageFlags flags)68 MojoResult MessagePipe::WriteMessage(
69 unsigned port,
70 const void* bytes,
71 uint32_t num_bytes,
72 std::vector<DispatcherTransport>* transports,
73 MojoWriteMessageFlags flags) {
74 DCHECK(port == 0 || port == 1);
75 return EnqueueMessageInternal(
76 GetPeerPort(port),
77 make_scoped_ptr(new MessageInTransit(
78 MessageInTransit::kTypeMessagePipeEndpoint,
79 MessageInTransit::kSubtypeMessagePipeEndpointData,
80 num_bytes,
81 bytes)),
82 transports);
83 }
84
ReadMessage(unsigned port,void * bytes,uint32_t * num_bytes,DispatcherVector * dispatchers,uint32_t * num_dispatchers,MojoReadMessageFlags flags)85 MojoResult MessagePipe::ReadMessage(unsigned port,
86 void* bytes,
87 uint32_t* num_bytes,
88 DispatcherVector* dispatchers,
89 uint32_t* num_dispatchers,
90 MojoReadMessageFlags flags) {
91 DCHECK(port == 0 || port == 1);
92
93 base::AutoLock locker(lock_);
94 DCHECK(endpoints_[port]);
95
96 return endpoints_[port]->ReadMessage(bytes, num_bytes, dispatchers,
97 num_dispatchers, flags);
98 }
99
AddWaiter(unsigned port,Waiter * waiter,MojoHandleSignals signals,uint32_t context)100 MojoResult MessagePipe::AddWaiter(unsigned port,
101 Waiter* waiter,
102 MojoHandleSignals signals,
103 uint32_t context) {
104 DCHECK(port == 0 || port == 1);
105
106 base::AutoLock locker(lock_);
107 DCHECK(endpoints_[port]);
108
109 return endpoints_[port]->AddWaiter(waiter, signals, context);
110 }
111
RemoveWaiter(unsigned port,Waiter * waiter)112 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
113 DCHECK(port == 0 || port == 1);
114
115 base::AutoLock locker(lock_);
116 DCHECK(endpoints_[port]);
117
118 endpoints_[port]->RemoveWaiter(waiter);
119 }
120
ConvertLocalToProxy(unsigned port)121 void MessagePipe::ConvertLocalToProxy(unsigned port) {
122 DCHECK(port == 0 || port == 1);
123
124 base::AutoLock locker(lock_);
125 DCHECK(endpoints_[port]);
126 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
127
128 bool is_peer_open = !!endpoints_[GetPeerPort(port)];
129
130 // TODO(vtl): Hopefully this will work if the peer has been closed and when
131 // the peer is local. If the peer is remote, we should do something more
132 // sophisticated.
133 DCHECK(!is_peer_open ||
134 endpoints_[GetPeerPort(port)]->GetType() ==
135 MessagePipeEndpoint::kTypeLocal);
136
137 scoped_ptr<MessagePipeEndpoint> replacement_endpoint(
138 new ProxyMessagePipeEndpoint(
139 static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()),
140 is_peer_open));
141 endpoints_[port].swap(replacement_endpoint);
142 }
143
EnqueueMessage(unsigned port,scoped_ptr<MessageInTransit> message)144 MojoResult MessagePipe::EnqueueMessage(
145 unsigned port,
146 scoped_ptr<MessageInTransit> message) {
147 return EnqueueMessageInternal(port, message.Pass(), NULL);
148 }
149
Attach(unsigned port,scoped_refptr<Channel> channel,MessageInTransit::EndpointId local_id)150 bool MessagePipe::Attach(unsigned port,
151 scoped_refptr<Channel> channel,
152 MessageInTransit::EndpointId local_id) {
153 DCHECK(port == 0 || port == 1);
154 DCHECK(channel);
155 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
156
157 base::AutoLock locker(lock_);
158 if (!endpoints_[port])
159 return false;
160
161 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy);
162 endpoints_[port]->Attach(channel, local_id);
163 return true;
164 }
165
Run(unsigned port,MessageInTransit::EndpointId remote_id)166 void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) {
167 DCHECK(port == 0 || port == 1);
168 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
169
170 base::AutoLock locker(lock_);
171 DCHECK(endpoints_[port]);
172 if (!endpoints_[port]->Run(remote_id))
173 endpoints_[port].reset();
174 }
175
OnRemove(unsigned port)176 void MessagePipe::OnRemove(unsigned port) {
177 unsigned destination_port = GetPeerPort(port);
178
179 base::AutoLock locker(lock_);
180 // A |OnPeerClose()| can come in first, before |OnRemove()| gets called.
181 if (!endpoints_[port])
182 return;
183
184 endpoints_[port]->OnRemove();
185 if (endpoints_[destination_port]) {
186 if (!endpoints_[destination_port]->OnPeerClose())
187 endpoints_[destination_port].reset();
188 }
189 endpoints_[port].reset();
190 }
191
~MessagePipe()192 MessagePipe::~MessagePipe() {
193 // Owned by the dispatchers. The owning dispatchers should only release us via
194 // their |Close()| method, which should inform us of being closed via our
195 // |Close()|. Thus these should already be null.
196 DCHECK(!endpoints_[0]);
197 DCHECK(!endpoints_[1]);
198 }
199
EnqueueMessageInternal(unsigned port,scoped_ptr<MessageInTransit> message,std::vector<DispatcherTransport> * transports)200 MojoResult MessagePipe::EnqueueMessageInternal(
201 unsigned port,
202 scoped_ptr<MessageInTransit> message,
203 std::vector<DispatcherTransport>* transports) {
204 DCHECK(port == 0 || port == 1);
205 DCHECK(message);
206
207 if (message->type() == MessageInTransit::kTypeMessagePipe) {
208 DCHECK(!transports);
209 return HandleControlMessage(port, message.Pass());
210 }
211
212 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
213
214 base::AutoLock locker(lock_);
215 DCHECK(endpoints_[GetPeerPort(port)]);
216
217 // The destination port need not be open, unlike the source port.
218 if (!endpoints_[port])
219 return MOJO_RESULT_FAILED_PRECONDITION;
220
221 if (transports) {
222 MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
223 if (result != MOJO_RESULT_OK)
224 return result;
225 }
226
227 // The endpoint's |EnqueueMessage()| may not report failure.
228 endpoints_[port]->EnqueueMessage(message.Pass());
229 return MOJO_RESULT_OK;
230 }
231
AttachTransportsNoLock(unsigned port,MessageInTransit * message,std::vector<DispatcherTransport> * transports)232 MojoResult MessagePipe::AttachTransportsNoLock(
233 unsigned port,
234 MessageInTransit* message,
235 std::vector<DispatcherTransport>* transports) {
236 DCHECK(!message->has_dispatchers());
237
238 // You're not allowed to send either handle to a message pipe over the message
239 // pipe, so check for this. (The case of trying to write a handle to itself is
240 // taken care of by |Core|. That case kind of makes sense, but leads to
241 // complications if, e.g., both sides try to do the same thing with their
242 // respective handles simultaneously. The other case, of trying to write the
243 // peer handle to a handle, doesn't make sense -- since no handle will be
244 // available to read the message from.)
245 for (size_t i = 0; i < transports->size(); i++) {
246 if (!(*transports)[i].is_valid())
247 continue;
248 if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
249 MessagePipeDispatcherTransport mp_transport((*transports)[i]);
250 if (mp_transport.GetMessagePipe() == this) {
251 // The other case should have been disallowed by |Core|. (Note: |port|
252 // is the peer port of the handle given to |WriteMessage()|.)
253 DCHECK_EQ(mp_transport.GetPort(), port);
254 return MOJO_RESULT_INVALID_ARGUMENT;
255 }
256 }
257 }
258
259 // Clone the dispatchers and attach them to the message. (This must be done as
260 // a separate loop, since we want to leave the dispatchers alone on failure.)
261 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
262 dispatchers->reserve(transports->size());
263 for (size_t i = 0; i < transports->size(); i++) {
264 if ((*transports)[i].is_valid()) {
265 dispatchers->push_back(
266 (*transports)[i].CreateEquivalentDispatcherAndClose());
267 } else {
268 LOG(WARNING) << "Enqueueing null dispatcher";
269 dispatchers->push_back(scoped_refptr<Dispatcher>());
270 }
271 }
272 message->SetDispatchers(dispatchers.Pass());
273 return MOJO_RESULT_OK;
274 }
275
HandleControlMessage(unsigned,scoped_ptr<MessageInTransit> message)276 MojoResult MessagePipe::HandleControlMessage(
277 unsigned /*port*/,
278 scoped_ptr<MessageInTransit> message) {
279 LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
280 << message->subtype();
281 return MOJO_RESULT_UNKNOWN;
282 }
283
284 } // namespace system
285 } // namespace mojo
286