• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/system/channel.h"
6 
7 #include <algorithm>
8 
9 #include "base/bind.h"
10 #include "base/compiler_specific.h"
11 #include "base/logging.h"
12 #include "base/macros.h"
13 #include "base/strings/stringprintf.h"
14 #include "mojo/embedder/platform_handle_vector.h"
15 #include "mojo/system/message_pipe_endpoint.h"
16 #include "mojo/system/transport_data.h"
17 
18 namespace mojo {
19 namespace system {
20 
21 static_assert(Channel::kBootstrapEndpointId !=
22                   MessageInTransit::kInvalidEndpointId,
23               "kBootstrapEndpointId is invalid");
24 
25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
26     Channel::kBootstrapEndpointId;
27 
Channel(embedder::PlatformSupport * platform_support)28 Channel::Channel(embedder::PlatformSupport* platform_support)
29     : platform_support_(platform_support),
30       is_running_(false),
31       is_shutting_down_(false),
32       next_local_id_(kBootstrapEndpointId) {
33 }
34 
Init(scoped_ptr<RawChannel> raw_channel)35 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
36   DCHECK(creation_thread_checker_.CalledOnValidThread());
37   DCHECK(raw_channel);
38 
39   // No need to take |lock_|, since this must be called before this object
40   // becomes thread-safe.
41   DCHECK(!is_running_);
42   raw_channel_ = raw_channel.Pass();
43 
44   if (!raw_channel_->Init(this)) {
45     raw_channel_.reset();
46     return false;
47   }
48 
49   is_running_ = true;
50   return true;
51 }
52 
Shutdown()53 void Channel::Shutdown() {
54   DCHECK(creation_thread_checker_.CalledOnValidThread());
55 
56   IdToEndpointMap to_destroy;
57   {
58     base::AutoLock locker(lock_);
59     if (!is_running_)
60       return;
61 
62     // Note: Don't reset |raw_channel_|, in case we're being called from within
63     // |OnReadMessage()| or |OnError()|.
64     raw_channel_->Shutdown();
65     is_running_ = false;
66 
67     // We need to deal with it outside the lock.
68     std::swap(to_destroy, local_id_to_endpoint_map_);
69   }
70 
71   size_t num_live = 0;
72   size_t num_zombies = 0;
73   for (IdToEndpointMap::iterator it = to_destroy.begin();
74        it != to_destroy.end();
75        ++it) {
76     if (it->second->state_ == ChannelEndpoint::STATE_NORMAL) {
77       it->second->message_pipe_->OnRemove(it->second->port_);
78       num_live++;
79     } else {
80       DCHECK(!it->second->message_pipe_.get());
81       num_zombies++;
82     }
83     it->second->DetachFromChannel();
84   }
85   DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live
86                                        << " live endpoints and " << num_zombies
87                                        << " zombies";
88 }
89 
WillShutdownSoon()90 void Channel::WillShutdownSoon() {
91   base::AutoLock locker(lock_);
92   is_shutting_down_ = true;
93 }
94 
95 // Note: |endpoint| being a |scoped_refptr| makes this function safe, since it
96 // keeps the endpoint alive even after the lock is released. Otherwise, there's
97 // the temptation to simply pass the result of |new ChannelEndpoint(...)|
98 // directly to this function, which wouldn't be sufficient for safety.
AttachEndpoint(scoped_refptr<ChannelEndpoint> endpoint)99 MessageInTransit::EndpointId Channel::AttachEndpoint(
100     scoped_refptr<ChannelEndpoint> endpoint) {
101   DCHECK(endpoint.get());
102 
103   MessageInTransit::EndpointId local_id;
104   {
105     base::AutoLock locker(lock_);
106 
107     DLOG_IF(WARNING, is_shutting_down_)
108         << "AttachEndpoint() while shutting down";
109 
110     while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
111            local_id_to_endpoint_map_.find(next_local_id_) !=
112                local_id_to_endpoint_map_.end())
113       next_local_id_++;
114 
115     local_id = next_local_id_;
116     next_local_id_++;
117     local_id_to_endpoint_map_[local_id] = endpoint;
118   }
119 
120   endpoint->AttachToChannel(this, local_id);
121   return local_id;
122 }
123 
RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,MessageInTransit::EndpointId remote_id)124 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
125                                      MessageInTransit::EndpointId remote_id) {
126   scoped_refptr<ChannelEndpoint> endpoint;
127   ChannelEndpoint::State state;
128   {
129     base::AutoLock locker(lock_);
130 
131     DLOG_IF(WARNING, is_shutting_down_)
132         << "RunMessagePipeEndpoint() while shutting down";
133 
134     IdToEndpointMap::const_iterator it =
135         local_id_to_endpoint_map_.find(local_id);
136     if (it == local_id_to_endpoint_map_.end())
137       return false;
138     endpoint = it->second;
139     state = it->second->state_;
140   }
141 
142   // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
143   // and ignore it.
144   if (state != ChannelEndpoint::STATE_NORMAL) {
145     DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
146                 "(local ID " << local_id << ", remote ID " << remote_id << ")";
147     return true;
148   }
149 
150   // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
151   // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
152   endpoint->Run(remote_id);
153   return true;
154 }
155 
RunRemoteMessagePipeEndpoint(MessageInTransit::EndpointId local_id,MessageInTransit::EndpointId remote_id)156 void Channel::RunRemoteMessagePipeEndpoint(
157     MessageInTransit::EndpointId local_id,
158     MessageInTransit::EndpointId remote_id) {
159 #if DCHECK_IS_ON
160   {
161     base::AutoLock locker(lock_);
162     DCHECK(local_id_to_endpoint_map_.find(local_id) !=
163            local_id_to_endpoint_map_.end());
164   }
165 #endif
166 
167   if (!SendControlMessage(
168           MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
169           local_id,
170           remote_id)) {
171     HandleLocalError(base::StringPrintf(
172         "Failed to send message to run remote message pipe endpoint (local ID "
173         "%u, remote ID %u)",
174         static_cast<unsigned>(local_id),
175         static_cast<unsigned>(remote_id)));
176   }
177 }
178 
WriteMessage(scoped_ptr<MessageInTransit> message)179 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
180   base::AutoLock locker(lock_);
181   if (!is_running_) {
182     // TODO(vtl): I think this is probably not an error condition, but I should
183     // think about it (and the shutdown sequence) more carefully.
184     LOG(WARNING) << "WriteMessage() after shutdown";
185     return false;
186   }
187 
188   DLOG_IF(WARNING, is_shutting_down_) << "WriteMessage() while shutting down";
189   return raw_channel_->WriteMessage(message.Pass());
190 }
191 
IsWriteBufferEmpty()192 bool Channel::IsWriteBufferEmpty() {
193   base::AutoLock locker(lock_);
194   if (!is_running_)
195     return true;
196   return raw_channel_->IsWriteBufferEmpty();
197 }
198 
DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id,MessageInTransit::EndpointId remote_id)199 void Channel::DetachMessagePipeEndpoint(
200     MessageInTransit::EndpointId local_id,
201     MessageInTransit::EndpointId remote_id) {
202   DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
203 
204   // If this is non-null after the locked block, the endpoint should be detached
205   // (and no remove message sent).
206   scoped_refptr<ChannelEndpoint> endpoint_to_detach;
207   {
208     base::AutoLock locker_(lock_);
209     if (!is_running_)
210       return;
211 
212     IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
213     DCHECK(it != local_id_to_endpoint_map_.end());
214 
215     switch (it->second->state_) {
216       case ChannelEndpoint::STATE_NORMAL:
217         it->second->state_ = ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK;
218         it->second->message_pipe_ = nullptr;
219         if (remote_id == MessageInTransit::kInvalidEndpointId)
220           return;
221         // We have to send a remove message (outside the lock).
222         break;
223       case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH:
224         endpoint_to_detach = it->second;
225         local_id_to_endpoint_map_.erase(it);
226         // We have to detach (outside the lock).
227         break;
228       case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK:
229         NOTREACHED();
230         return;
231     }
232   }
233   if (endpoint_to_detach.get()) {
234     endpoint_to_detach->DetachFromChannel();
235     return;
236   }
237 
238   if (!SendControlMessage(
239           MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
240           local_id,
241           remote_id)) {
242     HandleLocalError(base::StringPrintf(
243         "Failed to send message to remove remote message pipe endpoint (local "
244         "ID %u, remote ID %u)",
245         static_cast<unsigned>(local_id),
246         static_cast<unsigned>(remote_id)));
247   }
248 }
249 
GetSerializedPlatformHandleSize() const250 size_t Channel::GetSerializedPlatformHandleSize() const {
251   return raw_channel_->GetSerializedPlatformHandleSize();
252 }
253 
~Channel()254 Channel::~Channel() {
255   // The channel should have been shut down first.
256   DCHECK(!is_running_);
257 }
258 
OnReadMessage(const MessageInTransit::View & message_view,embedder::ScopedPlatformHandleVectorPtr platform_handles)259 void Channel::OnReadMessage(
260     const MessageInTransit::View& message_view,
261     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
262   DCHECK(creation_thread_checker_.CalledOnValidThread());
263 
264   switch (message_view.type()) {
265     case MessageInTransit::kTypeMessagePipeEndpoint:
266     case MessageInTransit::kTypeMessagePipe:
267       OnReadMessageForDownstream(message_view, platform_handles.Pass());
268       break;
269     case MessageInTransit::kTypeChannel:
270       OnReadMessageForChannel(message_view, platform_handles.Pass());
271       break;
272     default:
273       HandleRemoteError(
274           base::StringPrintf("Received message of invalid type %u",
275                              static_cast<unsigned>(message_view.type())));
276       break;
277   }
278 }
279 
OnError(Error error)280 void Channel::OnError(Error error) {
281   DCHECK(creation_thread_checker_.CalledOnValidThread());
282 
283   switch (error) {
284     case ERROR_READ_SHUTDOWN:
285       // The other side was cleanly closed, so this isn't actually an error.
286       DVLOG(1) << "RawChannel read error (shutdown)";
287       break;
288     case ERROR_READ_BROKEN: {
289       base::AutoLock locker(lock_);
290       LOG_IF(ERROR, !is_shutting_down_)
291           << "RawChannel read error (connection broken)";
292       break;
293     }
294     case ERROR_READ_BAD_MESSAGE:
295       // Receiving a bad message means either a bug, data corruption, or
296       // malicious attack (probably due to some other bug).
297       LOG(ERROR) << "RawChannel read error (received bad message)";
298       break;
299     case ERROR_READ_UNKNOWN:
300       LOG(ERROR) << "RawChannel read error (unknown)";
301       break;
302     case ERROR_WRITE:
303       // Write errors are slightly notable: they probably shouldn't happen under
304       // normal operation (but maybe the other side crashed).
305       LOG(WARNING) << "RawChannel write error";
306       break;
307   }
308   Shutdown();
309 }
310 
OnReadMessageForDownstream(const MessageInTransit::View & message_view,embedder::ScopedPlatformHandleVectorPtr platform_handles)311 void Channel::OnReadMessageForDownstream(
312     const MessageInTransit::View& message_view,
313     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
314   DCHECK(creation_thread_checker_.CalledOnValidThread());
315   DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
316          message_view.type() == MessageInTransit::kTypeMessagePipe);
317 
318   MessageInTransit::EndpointId local_id = message_view.destination_id();
319   if (local_id == MessageInTransit::kInvalidEndpointId) {
320     HandleRemoteError("Received message with no destination ID");
321     return;
322   }
323 
324   ChannelEndpoint::State state = ChannelEndpoint::STATE_NORMAL;
325   scoped_refptr<MessagePipe> message_pipe;
326   unsigned port = ~0u;
327   bool nonexistent_local_id_error = false;
328   {
329     base::AutoLock locker(lock_);
330 
331     // Since we own |raw_channel_|, and this method and |Shutdown()| should only
332     // be called from the creation thread, |raw_channel_| should never be null
333     // here.
334     DCHECK(is_running_);
335 
336     IdToEndpointMap::const_iterator it =
337         local_id_to_endpoint_map_.find(local_id);
338     if (it == local_id_to_endpoint_map_.end()) {
339       nonexistent_local_id_error = true;
340     } else {
341       state = it->second->state_;
342       message_pipe = it->second->message_pipe_;
343       port = it->second->port_;
344     }
345   }
346   if (nonexistent_local_id_error) {
347     HandleRemoteError(base::StringPrintf(
348         "Received a message for nonexistent local destination ID %u",
349         static_cast<unsigned>(local_id)));
350     // This is strongly indicative of some problem. However, it's not a fatal
351     // error, since it may indicate a buggy (or hostile) remote process. Don't
352     // die even for Debug builds, since handling this properly needs to be
353     // tested (TODO(vtl)).
354     DLOG(ERROR) << "This should not happen under normal operation.";
355     return;
356   }
357 
358   // Ignore messages for zombie endpoints (not an error).
359   if (state != ChannelEndpoint::STATE_NORMAL) {
360     DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
361              << local_id << ", remote ID = " << message_view.source_id() << ")";
362     return;
363   }
364 
365   // We need to duplicate the message (data), because |EnqueueMessage()| will
366   // take ownership of it.
367   scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
368   if (message_view.transport_data_buffer_size() > 0) {
369     DCHECK(message_view.transport_data_buffer());
370     message->SetDispatchers(TransportData::DeserializeDispatchers(
371         message_view.transport_data_buffer(),
372         message_view.transport_data_buffer_size(),
373         platform_handles.Pass(),
374         this));
375   }
376   MojoResult result = message_pipe->EnqueueMessage(
377       MessagePipe::GetPeerPort(port), message.Pass());
378   if (result != MOJO_RESULT_OK) {
379     // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint
380     // has been closed (in an unavoidable race). This might also be a "remote"
381     // error, e.g., if the remote side is sending invalid control messages (to
382     // the message pipe).
383     HandleLocalError(base::StringPrintf(
384         "Failed to enqueue message to local ID %u (result %d)",
385         static_cast<unsigned>(local_id),
386         static_cast<int>(result)));
387     return;
388   }
389 }
390 
OnReadMessageForChannel(const MessageInTransit::View & message_view,embedder::ScopedPlatformHandleVectorPtr platform_handles)391 void Channel::OnReadMessageForChannel(
392     const MessageInTransit::View& message_view,
393     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
394   DCHECK(creation_thread_checker_.CalledOnValidThread());
395   DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel);
396 
397   // Currently, no channel messages take platform handles.
398   if (platform_handles) {
399     HandleRemoteError(
400         "Received invalid channel message (has platform handles)");
401     NOTREACHED();
402     return;
403   }
404 
405   switch (message_view.subtype()) {
406     case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
407       DVLOG(2) << "Handling channel message to run message pipe (local ID "
408                << message_view.destination_id() << ", remote ID "
409                << message_view.source_id() << ")";
410       if (!RunMessagePipeEndpoint(message_view.destination_id(),
411                                   message_view.source_id())) {
412         HandleRemoteError(
413             "Received invalid channel message to run message pipe");
414       }
415       break;
416     case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
417       DVLOG(2) << "Handling channel message to remove message pipe (local ID "
418                << message_view.destination_id() << ", remote ID "
419                << message_view.source_id() << ")";
420       if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
421                                      message_view.source_id())) {
422         HandleRemoteError(
423             "Received invalid channel message to remove message pipe");
424       }
425       break;
426     case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
427       DVLOG(2) << "Handling channel message to ack remove message pipe (local "
428                   "ID " << message_view.destination_id() << ", remote ID "
429                << message_view.source_id() << ")";
430       if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
431                                      message_view.source_id())) {
432         HandleRemoteError(
433             "Received invalid channel message to ack remove message pipe");
434       }
435       break;
436     default:
437       HandleRemoteError("Received invalid channel message");
438       NOTREACHED();
439       break;
440   }
441 }
442 
RemoveMessagePipeEndpoint(MessageInTransit::EndpointId local_id,MessageInTransit::EndpointId remote_id)443 bool Channel::RemoveMessagePipeEndpoint(
444     MessageInTransit::EndpointId local_id,
445     MessageInTransit::EndpointId remote_id) {
446   DCHECK(creation_thread_checker_.CalledOnValidThread());
447 
448   // If this is non-null after the locked block, the endpoint should be detached
449   // (and no remove ack message sent).
450   scoped_refptr<ChannelEndpoint> endpoint_to_detach;
451   scoped_refptr<MessagePipe> message_pipe;
452   unsigned port = ~0u;
453   {
454     base::AutoLock locker(lock_);
455 
456     IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
457     if (it == local_id_to_endpoint_map_.end()) {
458       DVLOG(2) << "Remove message pipe error: not found";
459       return false;
460     }
461 
462     switch (it->second->state_) {
463       case ChannelEndpoint::STATE_NORMAL:
464         it->second->state_ = ChannelEndpoint::STATE_WAIT_LOCAL_DETACH;
465         message_pipe = it->second->message_pipe_;
466         port = it->second->port_;
467         it->second->message_pipe_ = nullptr;
468         // We have to send a remove ack message (outside the lock).
469         break;
470       case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH:
471         DVLOG(2) << "Remove message pipe error: wrong state";
472         return false;
473       case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK:
474         endpoint_to_detach = it->second;
475         local_id_to_endpoint_map_.erase(it);
476         // We have to detach (outside the lock).
477         break;
478     }
479   }
480   if (endpoint_to_detach.get()) {
481     endpoint_to_detach->DetachFromChannel();
482     return true;
483   }
484 
485   if (!SendControlMessage(
486           MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
487           local_id,
488           remote_id)) {
489     HandleLocalError(base::StringPrintf(
490         "Failed to send message to remove remote message pipe endpoint ack "
491         "(local ID %u, remote ID %u)",
492         static_cast<unsigned>(local_id),
493         static_cast<unsigned>(remote_id)));
494   }
495 
496   message_pipe->OnRemove(port);
497 
498   return true;
499 }
500 
SendControlMessage(MessageInTransit::Subtype subtype,MessageInTransit::EndpointId local_id,MessageInTransit::EndpointId remote_id)501 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
502                                  MessageInTransit::EndpointId local_id,
503                                  MessageInTransit::EndpointId remote_id) {
504   DVLOG(2) << "Sending channel control message: subtype " << subtype
505            << ", local ID " << local_id << ", remote ID " << remote_id;
506   scoped_ptr<MessageInTransit> message(new MessageInTransit(
507       MessageInTransit::kTypeChannel, subtype, 0, nullptr));
508   message->set_source_id(local_id);
509   message->set_destination_id(remote_id);
510   return WriteMessage(message.Pass());
511 }
512 
HandleRemoteError(const base::StringPiece & error_message)513 void Channel::HandleRemoteError(const base::StringPiece& error_message) {
514   // TODO(vtl): Is this how we really want to handle this? Probably we want to
515   // terminate the connection, since it's spewing invalid stuff.
516   LOG(WARNING) << error_message;
517 }
518 
HandleLocalError(const base::StringPiece & error_message)519 void Channel::HandleLocalError(const base::StringPiece& error_message) {
520   // TODO(vtl): Is this how we really want to handle this?
521   // Sometimes we'll want to propagate the error back to the message pipe
522   // (endpoint), and notify it that the remote is (effectively) closed.
523   // Sometimes we'll want to kill the channel (and notify all the endpoints that
524   // their remotes are dead.
525   LOG(WARNING) << error_message;
526 }
527 
528 }  // namespace system
529 }  // namespace mojo
530