1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/channel.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/compiler_specific.h"
11 #include "base/logging.h"
12 #include "base/macros.h"
13 #include "base/strings/stringprintf.h"
14 #include "mojo/embedder/platform_handle_vector.h"
15 #include "mojo/system/message_pipe_endpoint.h"
16 #include "mojo/system/transport_data.h"
17
18 namespace mojo {
19 namespace system {
20
21 static_assert(Channel::kBootstrapEndpointId !=
22 MessageInTransit::kInvalidEndpointId,
23 "kBootstrapEndpointId is invalid");
24
25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
26 Channel::kBootstrapEndpointId;
27
Channel(embedder::PlatformSupport * platform_support)28 Channel::Channel(embedder::PlatformSupport* platform_support)
29 : platform_support_(platform_support),
30 is_running_(false),
31 is_shutting_down_(false),
32 next_local_id_(kBootstrapEndpointId) {
33 }
34
Init(scoped_ptr<RawChannel> raw_channel)35 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
36 DCHECK(creation_thread_checker_.CalledOnValidThread());
37 DCHECK(raw_channel);
38
39 // No need to take |lock_|, since this must be called before this object
40 // becomes thread-safe.
41 DCHECK(!is_running_);
42 raw_channel_ = raw_channel.Pass();
43
44 if (!raw_channel_->Init(this)) {
45 raw_channel_.reset();
46 return false;
47 }
48
49 is_running_ = true;
50 return true;
51 }
52
Shutdown()53 void Channel::Shutdown() {
54 DCHECK(creation_thread_checker_.CalledOnValidThread());
55
56 IdToEndpointMap to_destroy;
57 {
58 base::AutoLock locker(lock_);
59 if (!is_running_)
60 return;
61
62 // Note: Don't reset |raw_channel_|, in case we're being called from within
63 // |OnReadMessage()| or |OnError()|.
64 raw_channel_->Shutdown();
65 is_running_ = false;
66
67 // We need to deal with it outside the lock.
68 std::swap(to_destroy, local_id_to_endpoint_map_);
69 }
70
71 size_t num_live = 0;
72 size_t num_zombies = 0;
73 for (IdToEndpointMap::iterator it = to_destroy.begin();
74 it != to_destroy.end();
75 ++it) {
76 if (it->second->state_ == ChannelEndpoint::STATE_NORMAL) {
77 it->second->message_pipe_->OnRemove(it->second->port_);
78 num_live++;
79 } else {
80 DCHECK(!it->second->message_pipe_.get());
81 num_zombies++;
82 }
83 it->second->DetachFromChannel();
84 }
85 DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live
86 << " live endpoints and " << num_zombies
87 << " zombies";
88 }
89
WillShutdownSoon()90 void Channel::WillShutdownSoon() {
91 base::AutoLock locker(lock_);
92 is_shutting_down_ = true;
93 }
94
95 // Note: |endpoint| being a |scoped_refptr| makes this function safe, since it
96 // keeps the endpoint alive even after the lock is released. Otherwise, there's
97 // the temptation to simply pass the result of |new ChannelEndpoint(...)|
98 // directly to this function, which wouldn't be sufficient for safety.
AttachEndpoint(scoped_refptr<ChannelEndpoint> endpoint)99 MessageInTransit::EndpointId Channel::AttachEndpoint(
100 scoped_refptr<ChannelEndpoint> endpoint) {
101 DCHECK(endpoint.get());
102
103 MessageInTransit::EndpointId local_id;
104 {
105 base::AutoLock locker(lock_);
106
107 DLOG_IF(WARNING, is_shutting_down_)
108 << "AttachEndpoint() while shutting down";
109
110 while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
111 local_id_to_endpoint_map_.find(next_local_id_) !=
112 local_id_to_endpoint_map_.end())
113 next_local_id_++;
114
115 local_id = next_local_id_;
116 next_local_id_++;
117 local_id_to_endpoint_map_[local_id] = endpoint;
118 }
119
120 endpoint->AttachToChannel(this, local_id);
121 return local_id;
122 }
123
RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,MessageInTransit::EndpointId remote_id)124 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
125 MessageInTransit::EndpointId remote_id) {
126 scoped_refptr<ChannelEndpoint> endpoint;
127 ChannelEndpoint::State state;
128 {
129 base::AutoLock locker(lock_);
130
131 DLOG_IF(WARNING, is_shutting_down_)
132 << "RunMessagePipeEndpoint() while shutting down";
133
134 IdToEndpointMap::const_iterator it =
135 local_id_to_endpoint_map_.find(local_id);
136 if (it == local_id_to_endpoint_map_.end())
137 return false;
138 endpoint = it->second;
139 state = it->second->state_;
140 }
141
142 // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
143 // and ignore it.
144 if (state != ChannelEndpoint::STATE_NORMAL) {
145 DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
146 "(local ID " << local_id << ", remote ID " << remote_id << ")";
147 return true;
148 }
149
150 // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
151 // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
152 endpoint->Run(remote_id);
153 return true;
154 }
155
RunRemoteMessagePipeEndpoint(MessageInTransit::EndpointId local_id,MessageInTransit::EndpointId remote_id)156 void Channel::RunRemoteMessagePipeEndpoint(
157 MessageInTransit::EndpointId local_id,
158 MessageInTransit::EndpointId remote_id) {
159 #if DCHECK_IS_ON
160 {
161 base::AutoLock locker(lock_);
162 DCHECK(local_id_to_endpoint_map_.find(local_id) !=
163 local_id_to_endpoint_map_.end());
164 }
165 #endif
166
167 if (!SendControlMessage(
168 MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
169 local_id,
170 remote_id)) {
171 HandleLocalError(base::StringPrintf(
172 "Failed to send message to run remote message pipe endpoint (local ID "
173 "%u, remote ID %u)",
174 static_cast<unsigned>(local_id),
175 static_cast<unsigned>(remote_id)));
176 }
177 }
178
WriteMessage(scoped_ptr<MessageInTransit> message)179 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
180 base::AutoLock locker(lock_);
181 if (!is_running_) {
182 // TODO(vtl): I think this is probably not an error condition, but I should
183 // think about it (and the shutdown sequence) more carefully.
184 LOG(WARNING) << "WriteMessage() after shutdown";
185 return false;
186 }
187
188 DLOG_IF(WARNING, is_shutting_down_) << "WriteMessage() while shutting down";
189 return raw_channel_->WriteMessage(message.Pass());
190 }
191
IsWriteBufferEmpty()192 bool Channel::IsWriteBufferEmpty() {
193 base::AutoLock locker(lock_);
194 if (!is_running_)
195 return true;
196 return raw_channel_->IsWriteBufferEmpty();
197 }
198
DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id,MessageInTransit::EndpointId remote_id)199 void Channel::DetachMessagePipeEndpoint(
200 MessageInTransit::EndpointId local_id,
201 MessageInTransit::EndpointId remote_id) {
202 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
203
204 // If this is non-null after the locked block, the endpoint should be detached
205 // (and no remove message sent).
206 scoped_refptr<ChannelEndpoint> endpoint_to_detach;
207 {
208 base::AutoLock locker_(lock_);
209 if (!is_running_)
210 return;
211
212 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
213 DCHECK(it != local_id_to_endpoint_map_.end());
214
215 switch (it->second->state_) {
216 case ChannelEndpoint::STATE_NORMAL:
217 it->second->state_ = ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK;
218 it->second->message_pipe_ = nullptr;
219 if (remote_id == MessageInTransit::kInvalidEndpointId)
220 return;
221 // We have to send a remove message (outside the lock).
222 break;
223 case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH:
224 endpoint_to_detach = it->second;
225 local_id_to_endpoint_map_.erase(it);
226 // We have to detach (outside the lock).
227 break;
228 case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK:
229 NOTREACHED();
230 return;
231 }
232 }
233 if (endpoint_to_detach.get()) {
234 endpoint_to_detach->DetachFromChannel();
235 return;
236 }
237
238 if (!SendControlMessage(
239 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
240 local_id,
241 remote_id)) {
242 HandleLocalError(base::StringPrintf(
243 "Failed to send message to remove remote message pipe endpoint (local "
244 "ID %u, remote ID %u)",
245 static_cast<unsigned>(local_id),
246 static_cast<unsigned>(remote_id)));
247 }
248 }
249
GetSerializedPlatformHandleSize() const250 size_t Channel::GetSerializedPlatformHandleSize() const {
251 return raw_channel_->GetSerializedPlatformHandleSize();
252 }
253
~Channel()254 Channel::~Channel() {
255 // The channel should have been shut down first.
256 DCHECK(!is_running_);
257 }
258
OnReadMessage(const MessageInTransit::View & message_view,embedder::ScopedPlatformHandleVectorPtr platform_handles)259 void Channel::OnReadMessage(
260 const MessageInTransit::View& message_view,
261 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
262 DCHECK(creation_thread_checker_.CalledOnValidThread());
263
264 switch (message_view.type()) {
265 case MessageInTransit::kTypeMessagePipeEndpoint:
266 case MessageInTransit::kTypeMessagePipe:
267 OnReadMessageForDownstream(message_view, platform_handles.Pass());
268 break;
269 case MessageInTransit::kTypeChannel:
270 OnReadMessageForChannel(message_view, platform_handles.Pass());
271 break;
272 default:
273 HandleRemoteError(
274 base::StringPrintf("Received message of invalid type %u",
275 static_cast<unsigned>(message_view.type())));
276 break;
277 }
278 }
279
OnError(Error error)280 void Channel::OnError(Error error) {
281 DCHECK(creation_thread_checker_.CalledOnValidThread());
282
283 switch (error) {
284 case ERROR_READ_SHUTDOWN:
285 // The other side was cleanly closed, so this isn't actually an error.
286 DVLOG(1) << "RawChannel read error (shutdown)";
287 break;
288 case ERROR_READ_BROKEN: {
289 base::AutoLock locker(lock_);
290 LOG_IF(ERROR, !is_shutting_down_)
291 << "RawChannel read error (connection broken)";
292 break;
293 }
294 case ERROR_READ_BAD_MESSAGE:
295 // Receiving a bad message means either a bug, data corruption, or
296 // malicious attack (probably due to some other bug).
297 LOG(ERROR) << "RawChannel read error (received bad message)";
298 break;
299 case ERROR_READ_UNKNOWN:
300 LOG(ERROR) << "RawChannel read error (unknown)";
301 break;
302 case ERROR_WRITE:
303 // Write errors are slightly notable: they probably shouldn't happen under
304 // normal operation (but maybe the other side crashed).
305 LOG(WARNING) << "RawChannel write error";
306 break;
307 }
308 Shutdown();
309 }
310
OnReadMessageForDownstream(const MessageInTransit::View & message_view,embedder::ScopedPlatformHandleVectorPtr platform_handles)311 void Channel::OnReadMessageForDownstream(
312 const MessageInTransit::View& message_view,
313 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
314 DCHECK(creation_thread_checker_.CalledOnValidThread());
315 DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
316 message_view.type() == MessageInTransit::kTypeMessagePipe);
317
318 MessageInTransit::EndpointId local_id = message_view.destination_id();
319 if (local_id == MessageInTransit::kInvalidEndpointId) {
320 HandleRemoteError("Received message with no destination ID");
321 return;
322 }
323
324 ChannelEndpoint::State state = ChannelEndpoint::STATE_NORMAL;
325 scoped_refptr<MessagePipe> message_pipe;
326 unsigned port = ~0u;
327 bool nonexistent_local_id_error = false;
328 {
329 base::AutoLock locker(lock_);
330
331 // Since we own |raw_channel_|, and this method and |Shutdown()| should only
332 // be called from the creation thread, |raw_channel_| should never be null
333 // here.
334 DCHECK(is_running_);
335
336 IdToEndpointMap::const_iterator it =
337 local_id_to_endpoint_map_.find(local_id);
338 if (it == local_id_to_endpoint_map_.end()) {
339 nonexistent_local_id_error = true;
340 } else {
341 state = it->second->state_;
342 message_pipe = it->second->message_pipe_;
343 port = it->second->port_;
344 }
345 }
346 if (nonexistent_local_id_error) {
347 HandleRemoteError(base::StringPrintf(
348 "Received a message for nonexistent local destination ID %u",
349 static_cast<unsigned>(local_id)));
350 // This is strongly indicative of some problem. However, it's not a fatal
351 // error, since it may indicate a buggy (or hostile) remote process. Don't
352 // die even for Debug builds, since handling this properly needs to be
353 // tested (TODO(vtl)).
354 DLOG(ERROR) << "This should not happen under normal operation.";
355 return;
356 }
357
358 // Ignore messages for zombie endpoints (not an error).
359 if (state != ChannelEndpoint::STATE_NORMAL) {
360 DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
361 << local_id << ", remote ID = " << message_view.source_id() << ")";
362 return;
363 }
364
365 // We need to duplicate the message (data), because |EnqueueMessage()| will
366 // take ownership of it.
367 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
368 if (message_view.transport_data_buffer_size() > 0) {
369 DCHECK(message_view.transport_data_buffer());
370 message->SetDispatchers(TransportData::DeserializeDispatchers(
371 message_view.transport_data_buffer(),
372 message_view.transport_data_buffer_size(),
373 platform_handles.Pass(),
374 this));
375 }
376 MojoResult result = message_pipe->EnqueueMessage(
377 MessagePipe::GetPeerPort(port), message.Pass());
378 if (result != MOJO_RESULT_OK) {
379 // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint
380 // has been closed (in an unavoidable race). This might also be a "remote"
381 // error, e.g., if the remote side is sending invalid control messages (to
382 // the message pipe).
383 HandleLocalError(base::StringPrintf(
384 "Failed to enqueue message to local ID %u (result %d)",
385 static_cast<unsigned>(local_id),
386 static_cast<int>(result)));
387 return;
388 }
389 }
390
OnReadMessageForChannel(const MessageInTransit::View & message_view,embedder::ScopedPlatformHandleVectorPtr platform_handles)391 void Channel::OnReadMessageForChannel(
392 const MessageInTransit::View& message_view,
393 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
394 DCHECK(creation_thread_checker_.CalledOnValidThread());
395 DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel);
396
397 // Currently, no channel messages take platform handles.
398 if (platform_handles) {
399 HandleRemoteError(
400 "Received invalid channel message (has platform handles)");
401 NOTREACHED();
402 return;
403 }
404
405 switch (message_view.subtype()) {
406 case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
407 DVLOG(2) << "Handling channel message to run message pipe (local ID "
408 << message_view.destination_id() << ", remote ID "
409 << message_view.source_id() << ")";
410 if (!RunMessagePipeEndpoint(message_view.destination_id(),
411 message_view.source_id())) {
412 HandleRemoteError(
413 "Received invalid channel message to run message pipe");
414 }
415 break;
416 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
417 DVLOG(2) << "Handling channel message to remove message pipe (local ID "
418 << message_view.destination_id() << ", remote ID "
419 << message_view.source_id() << ")";
420 if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
421 message_view.source_id())) {
422 HandleRemoteError(
423 "Received invalid channel message to remove message pipe");
424 }
425 break;
426 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
427 DVLOG(2) << "Handling channel message to ack remove message pipe (local "
428 "ID " << message_view.destination_id() << ", remote ID "
429 << message_view.source_id() << ")";
430 if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
431 message_view.source_id())) {
432 HandleRemoteError(
433 "Received invalid channel message to ack remove message pipe");
434 }
435 break;
436 default:
437 HandleRemoteError("Received invalid channel message");
438 NOTREACHED();
439 break;
440 }
441 }
442
RemoveMessagePipeEndpoint(MessageInTransit::EndpointId local_id,MessageInTransit::EndpointId remote_id)443 bool Channel::RemoveMessagePipeEndpoint(
444 MessageInTransit::EndpointId local_id,
445 MessageInTransit::EndpointId remote_id) {
446 DCHECK(creation_thread_checker_.CalledOnValidThread());
447
448 // If this is non-null after the locked block, the endpoint should be detached
449 // (and no remove ack message sent).
450 scoped_refptr<ChannelEndpoint> endpoint_to_detach;
451 scoped_refptr<MessagePipe> message_pipe;
452 unsigned port = ~0u;
453 {
454 base::AutoLock locker(lock_);
455
456 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
457 if (it == local_id_to_endpoint_map_.end()) {
458 DVLOG(2) << "Remove message pipe error: not found";
459 return false;
460 }
461
462 switch (it->second->state_) {
463 case ChannelEndpoint::STATE_NORMAL:
464 it->second->state_ = ChannelEndpoint::STATE_WAIT_LOCAL_DETACH;
465 message_pipe = it->second->message_pipe_;
466 port = it->second->port_;
467 it->second->message_pipe_ = nullptr;
468 // We have to send a remove ack message (outside the lock).
469 break;
470 case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH:
471 DVLOG(2) << "Remove message pipe error: wrong state";
472 return false;
473 case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK:
474 endpoint_to_detach = it->second;
475 local_id_to_endpoint_map_.erase(it);
476 // We have to detach (outside the lock).
477 break;
478 }
479 }
480 if (endpoint_to_detach.get()) {
481 endpoint_to_detach->DetachFromChannel();
482 return true;
483 }
484
485 if (!SendControlMessage(
486 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
487 local_id,
488 remote_id)) {
489 HandleLocalError(base::StringPrintf(
490 "Failed to send message to remove remote message pipe endpoint ack "
491 "(local ID %u, remote ID %u)",
492 static_cast<unsigned>(local_id),
493 static_cast<unsigned>(remote_id)));
494 }
495
496 message_pipe->OnRemove(port);
497
498 return true;
499 }
500
SendControlMessage(MessageInTransit::Subtype subtype,MessageInTransit::EndpointId local_id,MessageInTransit::EndpointId remote_id)501 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
502 MessageInTransit::EndpointId local_id,
503 MessageInTransit::EndpointId remote_id) {
504 DVLOG(2) << "Sending channel control message: subtype " << subtype
505 << ", local ID " << local_id << ", remote ID " << remote_id;
506 scoped_ptr<MessageInTransit> message(new MessageInTransit(
507 MessageInTransit::kTypeChannel, subtype, 0, nullptr));
508 message->set_source_id(local_id);
509 message->set_destination_id(remote_id);
510 return WriteMessage(message.Pass());
511 }
512
HandleRemoteError(const base::StringPiece & error_message)513 void Channel::HandleRemoteError(const base::StringPiece& error_message) {
514 // TODO(vtl): Is this how we really want to handle this? Probably we want to
515 // terminate the connection, since it's spewing invalid stuff.
516 LOG(WARNING) << error_message;
517 }
518
HandleLocalError(const base::StringPiece & error_message)519 void Channel::HandleLocalError(const base::StringPiece& error_message) {
520 // TODO(vtl): Is this how we really want to handle this?
521 // Sometimes we'll want to propagate the error back to the message pipe
522 // (endpoint), and notify it that the remote is (effectively) closed.
523 // Sometimes we'll want to kill the channel (and notify all the endpoints that
524 // their remotes are dead.
525 LOG(WARNING) << error_message;
526 }
527
528 } // namespace system
529 } // namespace mojo
530