• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/system/message_pipe.h"
6 
7 #include "base/logging.h"
8 #include "mojo/system/channel_endpoint.h"
9 #include "mojo/system/local_message_pipe_endpoint.h"
10 #include "mojo/system/message_in_transit.h"
11 #include "mojo/system/message_pipe_dispatcher.h"
12 #include "mojo/system/message_pipe_endpoint.h"
13 #include "mojo/system/proxy_message_pipe_endpoint.h"
14 
15 namespace mojo {
16 namespace system {
17 
18 // static
CreateLocalLocal()19 MessagePipe* MessagePipe::CreateLocalLocal() {
20   MessagePipe* message_pipe = new MessagePipe();
21   message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
22   message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
23   return message_pipe;
24 }
25 
26 // static
CreateLocalProxy(scoped_refptr<ChannelEndpoint> * channel_endpoint)27 MessagePipe* MessagePipe::CreateLocalProxy(
28     scoped_refptr<ChannelEndpoint>* channel_endpoint) {
29   DCHECK(!channel_endpoint->get());  // Not technically wrong, but unlikely.
30   MessagePipe* message_pipe = new MessagePipe();
31   message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
32   *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
33   message_pipe->endpoints_[1].reset(
34       new ProxyMessagePipeEndpoint(channel_endpoint->get()));
35   return message_pipe;
36 }
37 
38 // static
CreateProxyLocal(scoped_refptr<ChannelEndpoint> * channel_endpoint)39 MessagePipe* MessagePipe::CreateProxyLocal(
40     scoped_refptr<ChannelEndpoint>* channel_endpoint) {
41   DCHECK(!channel_endpoint->get());  // Not technically wrong, but unlikely.
42   MessagePipe* message_pipe = new MessagePipe();
43   *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
44   message_pipe->endpoints_[0].reset(
45       new ProxyMessagePipeEndpoint(channel_endpoint->get()));
46   message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
47   return message_pipe;
48 }
49 
50 // static
GetPeerPort(unsigned port)51 unsigned MessagePipe::GetPeerPort(unsigned port) {
52   DCHECK(port == 0 || port == 1);
53   return port ^ 1;
54 }
55 
GetType(unsigned port)56 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
57   DCHECK(port == 0 || port == 1);
58   base::AutoLock locker(lock_);
59   DCHECK(endpoints_[port]);
60 
61   return endpoints_[port]->GetType();
62 }
63 
CancelAllWaiters(unsigned port)64 void MessagePipe::CancelAllWaiters(unsigned port) {
65   DCHECK(port == 0 || port == 1);
66 
67   base::AutoLock locker(lock_);
68   DCHECK(endpoints_[port]);
69   endpoints_[port]->CancelAllWaiters();
70 }
71 
Close(unsigned port)72 void MessagePipe::Close(unsigned port) {
73   DCHECK(port == 0 || port == 1);
74 
75   unsigned destination_port = GetPeerPort(port);
76 
77   base::AutoLock locker(lock_);
78   DCHECK(endpoints_[port]);
79 
80   endpoints_[port]->Close();
81   if (endpoints_[destination_port]) {
82     if (!endpoints_[destination_port]->OnPeerClose())
83       endpoints_[destination_port].reset();
84   }
85   endpoints_[port].reset();
86 }
87 
88 // TODO(vtl): Handle flags.
WriteMessage(unsigned port,UserPointer<const void> bytes,uint32_t num_bytes,std::vector<DispatcherTransport> * transports,MojoWriteMessageFlags flags)89 MojoResult MessagePipe::WriteMessage(
90     unsigned port,
91     UserPointer<const void> bytes,
92     uint32_t num_bytes,
93     std::vector<DispatcherTransport>* transports,
94     MojoWriteMessageFlags flags) {
95   DCHECK(port == 0 || port == 1);
96   return EnqueueMessageInternal(
97       GetPeerPort(port),
98       make_scoped_ptr(new MessageInTransit(
99           MessageInTransit::kTypeMessagePipeEndpoint,
100           MessageInTransit::kSubtypeMessagePipeEndpointData,
101           num_bytes,
102           bytes)),
103       transports);
104 }
105 
ReadMessage(unsigned port,UserPointer<void> bytes,UserPointer<uint32_t> num_bytes,DispatcherVector * dispatchers,uint32_t * num_dispatchers,MojoReadMessageFlags flags)106 MojoResult MessagePipe::ReadMessage(unsigned port,
107                                     UserPointer<void> bytes,
108                                     UserPointer<uint32_t> num_bytes,
109                                     DispatcherVector* dispatchers,
110                                     uint32_t* num_dispatchers,
111                                     MojoReadMessageFlags flags) {
112   DCHECK(port == 0 || port == 1);
113 
114   base::AutoLock locker(lock_);
115   DCHECK(endpoints_[port]);
116 
117   return endpoints_[port]->ReadMessage(
118       bytes, num_bytes, dispatchers, num_dispatchers, flags);
119 }
120 
GetHandleSignalsState(unsigned port) const121 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
122   DCHECK(port == 0 || port == 1);
123 
124   base::AutoLock locker(const_cast<base::Lock&>(lock_));
125   DCHECK(endpoints_[port]);
126 
127   return endpoints_[port]->GetHandleSignalsState();
128 }
129 
AddWaiter(unsigned port,Waiter * waiter,MojoHandleSignals signals,uint32_t context,HandleSignalsState * signals_state)130 MojoResult MessagePipe::AddWaiter(unsigned port,
131                                   Waiter* waiter,
132                                   MojoHandleSignals signals,
133                                   uint32_t context,
134                                   HandleSignalsState* signals_state) {
135   DCHECK(port == 0 || port == 1);
136 
137   base::AutoLock locker(lock_);
138   DCHECK(endpoints_[port]);
139 
140   return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state);
141 }
142 
RemoveWaiter(unsigned port,Waiter * waiter,HandleSignalsState * signals_state)143 void MessagePipe::RemoveWaiter(unsigned port,
144                                Waiter* waiter,
145                                HandleSignalsState* signals_state) {
146   DCHECK(port == 0 || port == 1);
147 
148   base::AutoLock locker(lock_);
149   DCHECK(endpoints_[port]);
150 
151   endpoints_[port]->RemoveWaiter(waiter, signals_state);
152 }
153 
ConvertLocalToProxy(unsigned port)154 scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
155   DCHECK(port == 0 || port == 1);
156 
157   base::AutoLock locker(lock_);
158   DCHECK(endpoints_[port]);
159   DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
160 
161   // TODO(vtl): Allowing this case is a temporary hack. It'll set up a
162   // |MessagePipe| with two proxy endpoints, which will then act as a proxy
163   // (rather than trying to connect the two ends directly).
164   DLOG_IF(WARNING,
165           !!endpoints_[GetPeerPort(port)] &&
166               endpoints_[GetPeerPort(port)]->GetType() !=
167                   MessagePipeEndpoint::kTypeLocal)
168       << "Direct message pipe passing across multiple channels not yet "
169          "implemented; will proxy";
170 
171   scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass());
172   scoped_refptr<ChannelEndpoint> channel_endpoint(
173       new ChannelEndpoint(this, port));
174   endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get()));
175   channel_endpoint->TakeMessages(static_cast<LocalMessagePipeEndpoint*>(
176                                      old_endpoint.get())->message_queue());
177   old_endpoint->Close();
178 
179   return channel_endpoint;
180 }
181 
EnqueueMessage(unsigned port,scoped_ptr<MessageInTransit> message)182 MojoResult MessagePipe::EnqueueMessage(unsigned port,
183                                        scoped_ptr<MessageInTransit> message) {
184   return EnqueueMessageInternal(port, message.Pass(), nullptr);
185 }
186 
OnRemove(unsigned port)187 void MessagePipe::OnRemove(unsigned port) {
188   unsigned destination_port = GetPeerPort(port);
189 
190   base::AutoLock locker(lock_);
191   // A |OnPeerClose()| can come in first, before |OnRemove()| gets called.
192   if (!endpoints_[port])
193     return;
194 
195   if (endpoints_[destination_port]) {
196     if (!endpoints_[destination_port]->OnPeerClose())
197       endpoints_[destination_port].reset();
198   }
199   endpoints_[port].reset();
200 }
201 
MessagePipe()202 MessagePipe::MessagePipe() {
203 }
204 
~MessagePipe()205 MessagePipe::~MessagePipe() {
206   // Owned by the dispatchers. The owning dispatchers should only release us via
207   // their |Close()| method, which should inform us of being closed via our
208   // |Close()|. Thus these should already be null.
209   DCHECK(!endpoints_[0]);
210   DCHECK(!endpoints_[1]);
211 }
212 
EnqueueMessageInternal(unsigned port,scoped_ptr<MessageInTransit> message,std::vector<DispatcherTransport> * transports)213 MojoResult MessagePipe::EnqueueMessageInternal(
214     unsigned port,
215     scoped_ptr<MessageInTransit> message,
216     std::vector<DispatcherTransport>* transports) {
217   DCHECK(port == 0 || port == 1);
218   DCHECK(message);
219 
220   if (message->type() == MessageInTransit::kTypeMessagePipe) {
221     DCHECK(!transports);
222     return HandleControlMessage(port, message.Pass());
223   }
224 
225   DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
226 
227   base::AutoLock locker(lock_);
228   DCHECK(endpoints_[GetPeerPort(port)]);
229 
230   // The destination port need not be open, unlike the source port.
231   if (!endpoints_[port])
232     return MOJO_RESULT_FAILED_PRECONDITION;
233 
234   if (transports) {
235     MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
236     if (result != MOJO_RESULT_OK)
237       return result;
238   }
239 
240   // The endpoint's |EnqueueMessage()| may not report failure.
241   endpoints_[port]->EnqueueMessage(message.Pass());
242   return MOJO_RESULT_OK;
243 }
244 
AttachTransportsNoLock(unsigned port,MessageInTransit * message,std::vector<DispatcherTransport> * transports)245 MojoResult MessagePipe::AttachTransportsNoLock(
246     unsigned port,
247     MessageInTransit* message,
248     std::vector<DispatcherTransport>* transports) {
249   DCHECK(!message->has_dispatchers());
250 
251   // You're not allowed to send either handle to a message pipe over the message
252   // pipe, so check for this. (The case of trying to write a handle to itself is
253   // taken care of by |Core|. That case kind of makes sense, but leads to
254   // complications if, e.g., both sides try to do the same thing with their
255   // respective handles simultaneously. The other case, of trying to write the
256   // peer handle to a handle, doesn't make sense -- since no handle will be
257   // available to read the message from.)
258   for (size_t i = 0; i < transports->size(); i++) {
259     if (!(*transports)[i].is_valid())
260       continue;
261     if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
262       MessagePipeDispatcherTransport mp_transport((*transports)[i]);
263       if (mp_transport.GetMessagePipe() == this) {
264         // The other case should have been disallowed by |Core|. (Note: |port|
265         // is the peer port of the handle given to |WriteMessage()|.)
266         DCHECK_EQ(mp_transport.GetPort(), port);
267         return MOJO_RESULT_INVALID_ARGUMENT;
268       }
269     }
270   }
271 
272   // Clone the dispatchers and attach them to the message. (This must be done as
273   // a separate loop, since we want to leave the dispatchers alone on failure.)
274   scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
275   dispatchers->reserve(transports->size());
276   for (size_t i = 0; i < transports->size(); i++) {
277     if ((*transports)[i].is_valid()) {
278       dispatchers->push_back(
279           (*transports)[i].CreateEquivalentDispatcherAndClose());
280     } else {
281       LOG(WARNING) << "Enqueueing null dispatcher";
282       dispatchers->push_back(scoped_refptr<Dispatcher>());
283     }
284   }
285   message->SetDispatchers(dispatchers.Pass());
286   return MOJO_RESULT_OK;
287 }
288 
HandleControlMessage(unsigned,scoped_ptr<MessageInTransit> message)289 MojoResult MessagePipe::HandleControlMessage(
290     unsigned /*port*/,
291     scoped_ptr<MessageInTransit> message) {
292   LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
293                << message->subtype();
294   return MOJO_RESULT_UNKNOWN;
295 }
296 
297 }  // namespace system
298 }  // namespace mojo
299