• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/system/message_pipe.h"
6 
7 #include "base/logging.h"
8 #include "mojo/system/channel.h"
9 #include "mojo/system/local_message_pipe_endpoint.h"
10 #include "mojo/system/message_in_transit.h"
11 #include "mojo/system/message_pipe_dispatcher.h"
12 #include "mojo/system/message_pipe_endpoint.h"
13 #include "mojo/system/proxy_message_pipe_endpoint.h"
14 
15 namespace mojo {
16 namespace system {
17 
MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0,scoped_ptr<MessagePipeEndpoint> endpoint1)18 MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0,
19                          scoped_ptr<MessagePipeEndpoint> endpoint1) {
20   endpoints_[0].reset(endpoint0.release());
21   endpoints_[1].reset(endpoint1.release());
22 }
23 
MessagePipe()24 MessagePipe::MessagePipe() {
25   endpoints_[0].reset(new LocalMessagePipeEndpoint());
26   endpoints_[1].reset(new LocalMessagePipeEndpoint());
27 }
28 
29 // static
GetPeerPort(unsigned port)30 unsigned MessagePipe::GetPeerPort(unsigned port) {
31   DCHECK(port == 0 || port == 1);
32   return port ^ 1;
33 }
34 
GetType(unsigned port)35 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
36   DCHECK(port == 0 || port == 1);
37   base::AutoLock locker(lock_);
38   DCHECK(endpoints_[port]);
39 
40   return endpoints_[port]->GetType();
41 }
42 
CancelAllWaiters(unsigned port)43 void MessagePipe::CancelAllWaiters(unsigned port) {
44   DCHECK(port == 0 || port == 1);
45 
46   base::AutoLock locker(lock_);
47   DCHECK(endpoints_[port]);
48   endpoints_[port]->CancelAllWaiters();
49 }
50 
Close(unsigned port)51 void MessagePipe::Close(unsigned port) {
52   DCHECK(port == 0 || port == 1);
53 
54   unsigned destination_port = GetPeerPort(port);
55 
56   base::AutoLock locker(lock_);
57   DCHECK(endpoints_[port]);
58 
59   endpoints_[port]->Close();
60   if (endpoints_[destination_port]) {
61     if (!endpoints_[destination_port]->OnPeerClose())
62       endpoints_[destination_port].reset();
63   }
64   endpoints_[port].reset();
65 }
66 
67 // TODO(vtl): Handle flags.
WriteMessage(unsigned port,const void * bytes,uint32_t num_bytes,std::vector<DispatcherTransport> * transports,MojoWriteMessageFlags flags)68 MojoResult MessagePipe::WriteMessage(
69     unsigned port,
70     const void* bytes,
71     uint32_t num_bytes,
72     std::vector<DispatcherTransport>* transports,
73     MojoWriteMessageFlags flags) {
74   DCHECK(port == 0 || port == 1);
75   return EnqueueMessageInternal(
76       GetPeerPort(port),
77       make_scoped_ptr(new MessageInTransit(
78           MessageInTransit::kTypeMessagePipeEndpoint,
79           MessageInTransit::kSubtypeMessagePipeEndpointData,
80           num_bytes,
81           bytes)),
82       transports);
83 }
84 
ReadMessage(unsigned port,void * bytes,uint32_t * num_bytes,DispatcherVector * dispatchers,uint32_t * num_dispatchers,MojoReadMessageFlags flags)85 MojoResult MessagePipe::ReadMessage(unsigned port,
86                                     void* bytes,
87                                     uint32_t* num_bytes,
88                                     DispatcherVector* dispatchers,
89                                     uint32_t* num_dispatchers,
90                                     MojoReadMessageFlags flags) {
91   DCHECK(port == 0 || port == 1);
92 
93   base::AutoLock locker(lock_);
94   DCHECK(endpoints_[port]);
95 
96   return endpoints_[port]->ReadMessage(bytes, num_bytes, dispatchers,
97                                        num_dispatchers, flags);
98 }
99 
AddWaiter(unsigned port,Waiter * waiter,MojoHandleSignals signals,uint32_t context)100 MojoResult MessagePipe::AddWaiter(unsigned port,
101                                   Waiter* waiter,
102                                   MojoHandleSignals signals,
103                                   uint32_t context) {
104   DCHECK(port == 0 || port == 1);
105 
106   base::AutoLock locker(lock_);
107   DCHECK(endpoints_[port]);
108 
109   return endpoints_[port]->AddWaiter(waiter, signals, context);
110 }
111 
RemoveWaiter(unsigned port,Waiter * waiter)112 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
113   DCHECK(port == 0 || port == 1);
114 
115   base::AutoLock locker(lock_);
116   DCHECK(endpoints_[port]);
117 
118   endpoints_[port]->RemoveWaiter(waiter);
119 }
120 
ConvertLocalToProxy(unsigned port)121 void MessagePipe::ConvertLocalToProxy(unsigned port) {
122   DCHECK(port == 0 || port == 1);
123 
124   base::AutoLock locker(lock_);
125   DCHECK(endpoints_[port]);
126   DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
127 
128   bool is_peer_open = !!endpoints_[GetPeerPort(port)];
129 
130   // TODO(vtl): Hopefully this will work if the peer has been closed and when
131   // the peer is local. If the peer is remote, we should do something more
132   // sophisticated.
133   DCHECK(!is_peer_open ||
134          endpoints_[GetPeerPort(port)]->GetType() ==
135              MessagePipeEndpoint::kTypeLocal);
136 
137   scoped_ptr<MessagePipeEndpoint> replacement_endpoint(
138       new ProxyMessagePipeEndpoint(
139           static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()),
140           is_peer_open));
141   endpoints_[port].swap(replacement_endpoint);
142 }
143 
EnqueueMessage(unsigned port,scoped_ptr<MessageInTransit> message)144 MojoResult MessagePipe::EnqueueMessage(
145     unsigned port,
146     scoped_ptr<MessageInTransit> message) {
147   return EnqueueMessageInternal(port, message.Pass(), NULL);
148 }
149 
Attach(unsigned port,scoped_refptr<Channel> channel,MessageInTransit::EndpointId local_id)150 bool MessagePipe::Attach(unsigned port,
151                          scoped_refptr<Channel> channel,
152                          MessageInTransit::EndpointId local_id) {
153   DCHECK(port == 0 || port == 1);
154   DCHECK(channel);
155   DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
156 
157   base::AutoLock locker(lock_);
158   if (!endpoints_[port])
159     return false;
160 
161   DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy);
162   endpoints_[port]->Attach(channel, local_id);
163   return true;
164 }
165 
Run(unsigned port,MessageInTransit::EndpointId remote_id)166 void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) {
167   DCHECK(port == 0 || port == 1);
168   DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
169 
170   base::AutoLock locker(lock_);
171   DCHECK(endpoints_[port]);
172   if (!endpoints_[port]->Run(remote_id))
173     endpoints_[port].reset();
174 }
175 
OnRemove(unsigned port)176 void MessagePipe::OnRemove(unsigned port) {
177   unsigned destination_port = GetPeerPort(port);
178 
179   base::AutoLock locker(lock_);
180   // A |OnPeerClose()| can come in first, before |OnRemove()| gets called.
181   if (!endpoints_[port])
182     return;
183 
184   endpoints_[port]->OnRemove();
185   if (endpoints_[destination_port]) {
186     if (!endpoints_[destination_port]->OnPeerClose())
187       endpoints_[destination_port].reset();
188   }
189   endpoints_[port].reset();
190 }
191 
~MessagePipe()192 MessagePipe::~MessagePipe() {
193   // Owned by the dispatchers. The owning dispatchers should only release us via
194   // their |Close()| method, which should inform us of being closed via our
195   // |Close()|. Thus these should already be null.
196   DCHECK(!endpoints_[0]);
197   DCHECK(!endpoints_[1]);
198 }
199 
EnqueueMessageInternal(unsigned port,scoped_ptr<MessageInTransit> message,std::vector<DispatcherTransport> * transports)200 MojoResult MessagePipe::EnqueueMessageInternal(
201     unsigned port,
202     scoped_ptr<MessageInTransit> message,
203     std::vector<DispatcherTransport>* transports) {
204   DCHECK(port == 0 || port == 1);
205   DCHECK(message);
206 
207   if (message->type() == MessageInTransit::kTypeMessagePipe) {
208     DCHECK(!transports);
209     return HandleControlMessage(port, message.Pass());
210   }
211 
212   DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
213 
214   base::AutoLock locker(lock_);
215   DCHECK(endpoints_[GetPeerPort(port)]);
216 
217   // The destination port need not be open, unlike the source port.
218   if (!endpoints_[port])
219     return MOJO_RESULT_FAILED_PRECONDITION;
220 
221   if (transports) {
222     MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
223     if (result != MOJO_RESULT_OK)
224       return result;
225   }
226 
227   // The endpoint's |EnqueueMessage()| may not report failure.
228   endpoints_[port]->EnqueueMessage(message.Pass());
229   return MOJO_RESULT_OK;
230 }
231 
AttachTransportsNoLock(unsigned port,MessageInTransit * message,std::vector<DispatcherTransport> * transports)232 MojoResult MessagePipe::AttachTransportsNoLock(
233     unsigned port,
234     MessageInTransit* message,
235     std::vector<DispatcherTransport>* transports) {
236   DCHECK(!message->has_dispatchers());
237 
238   // You're not allowed to send either handle to a message pipe over the message
239   // pipe, so check for this. (The case of trying to write a handle to itself is
240   // taken care of by |Core|. That case kind of makes sense, but leads to
241   // complications if, e.g., both sides try to do the same thing with their
242   // respective handles simultaneously. The other case, of trying to write the
243   // peer handle to a handle, doesn't make sense -- since no handle will be
244   // available to read the message from.)
245   for (size_t i = 0; i < transports->size(); i++) {
246     if (!(*transports)[i].is_valid())
247       continue;
248     if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
249       MessagePipeDispatcherTransport mp_transport((*transports)[i]);
250       if (mp_transport.GetMessagePipe() == this) {
251         // The other case should have been disallowed by |Core|. (Note: |port|
252         // is the peer port of the handle given to |WriteMessage()|.)
253         DCHECK_EQ(mp_transport.GetPort(), port);
254         return MOJO_RESULT_INVALID_ARGUMENT;
255       }
256     }
257   }
258 
259   // Clone the dispatchers and attach them to the message. (This must be done as
260   // a separate loop, since we want to leave the dispatchers alone on failure.)
261   scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
262   dispatchers->reserve(transports->size());
263   for (size_t i = 0; i < transports->size(); i++) {
264     if ((*transports)[i].is_valid()) {
265       dispatchers->push_back(
266           (*transports)[i].CreateEquivalentDispatcherAndClose());
267     } else {
268       LOG(WARNING) << "Enqueueing null dispatcher";
269       dispatchers->push_back(scoped_refptr<Dispatcher>());
270     }
271   }
272   message->SetDispatchers(dispatchers.Pass());
273   return MOJO_RESULT_OK;
274 }
275 
HandleControlMessage(unsigned,scoped_ptr<MessageInTransit> message)276 MojoResult MessagePipe::HandleControlMessage(
277     unsigned /*port*/,
278     scoped_ptr<MessageInTransit> message) {
279   LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
280                << message->subtype();
281   return MOJO_RESULT_UNKNOWN;
282 }
283 
284 }  // namespace system
285 }  // namespace mojo
286