1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/channel.h"
6
7 #include <algorithm>
8
9 #include "base/basictypes.h"
10 #include "base/bind.h"
11 #include "base/compiler_specific.h"
12 #include "base/logging.h"
13 #include "base/strings/stringprintf.h"
14 #include "mojo/embedder/platform_handle_vector.h"
15 #include "mojo/system/message_pipe_endpoint.h"
16 #include "mojo/system/transport_data.h"
17
18 namespace mojo {
19 namespace system {
20
21 COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
22 MessageInTransit::kInvalidEndpointId,
23 kBootstrapEndpointId_is_invalid);
24
25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
26 Channel::kBootstrapEndpointId;
27
EndpointInfo()28 Channel::EndpointInfo::EndpointInfo()
29 : state(STATE_NORMAL),
30 port() {
31 }
32
EndpointInfo(scoped_refptr<MessagePipe> message_pipe,unsigned port)33 Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
34 unsigned port)
35 : state(STATE_NORMAL),
36 message_pipe(message_pipe),
37 port(port) {
38 }
39
~EndpointInfo()40 Channel::EndpointInfo::~EndpointInfo() {
41 }
42
Channel()43 Channel::Channel()
44 : is_running_(false),
45 next_local_id_(kBootstrapEndpointId) {
46 }
47
Init(scoped_ptr<RawChannel> raw_channel)48 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
49 DCHECK(creation_thread_checker_.CalledOnValidThread());
50 DCHECK(raw_channel);
51
52 // No need to take |lock_|, since this must be called before this object
53 // becomes thread-safe.
54 DCHECK(!is_running_no_lock());
55 raw_channel_ = raw_channel.Pass();
56
57 if (!raw_channel_->Init(this)) {
58 raw_channel_.reset();
59 return false;
60 }
61
62 is_running_ = true;
63 return true;
64 }
65
Shutdown()66 void Channel::Shutdown() {
67 DCHECK(creation_thread_checker_.CalledOnValidThread());
68
69 IdToEndpointInfoMap to_destroy;
70 {
71 base::AutoLock locker(lock_);
72 if (!is_running_no_lock())
73 return;
74
75 // Note: Don't reset |raw_channel_|, in case we're being called from within
76 // |OnReadMessage()| or |OnFatalError()|.
77 raw_channel_->Shutdown();
78 is_running_ = false;
79
80 // We need to deal with it outside the lock.
81 std::swap(to_destroy, local_id_to_endpoint_info_map_);
82 }
83
84 size_t num_live = 0;
85 size_t num_zombies = 0;
86 for (IdToEndpointInfoMap::iterator it = to_destroy.begin();
87 it != to_destroy.end();
88 ++it) {
89 if (it->second.state == EndpointInfo::STATE_NORMAL) {
90 it->second.message_pipe->OnRemove(it->second.port);
91 num_live++;
92 } else {
93 DCHECK(!it->second.message_pipe);
94 num_zombies++;
95 }
96 }
97 DVLOG_IF(2, num_live || num_zombies)
98 << "Shut down Channel with " << num_live << " live endpoints and "
99 << num_zombies << " zombies";
100 }
101
AttachMessagePipeEndpoint(scoped_refptr<MessagePipe> message_pipe,unsigned port)102 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
103 scoped_refptr<MessagePipe> message_pipe,
104 unsigned port) {
105 DCHECK(message_pipe);
106 DCHECK(port == 0 || port == 1);
107
108 MessageInTransit::EndpointId local_id;
109 {
110 base::AutoLock locker(lock_);
111
112 while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
113 local_id_to_endpoint_info_map_.find(next_local_id_) !=
114 local_id_to_endpoint_info_map_.end())
115 next_local_id_++;
116
117 local_id = next_local_id_;
118 next_local_id_++;
119
120 // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid
121 // some expensive reference count increment/decrements.) Once this is done,
122 // we should be able to delete |EndpointInfo|'s default constructor.
123 local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
124 }
125
126 // This might fail if that port got an |OnPeerClose()| before attaching.
127 if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id))
128 return local_id;
129
130 // Note: If it failed, quite possibly the endpoint info was removed from that
131 // map (there's a race between us adding it to the map above and calling
132 // |Attach()|). And even if an entry exists for |local_id|, we need to check
133 // that it's the one we added (and not some other one that was added since).
134 {
135 base::AutoLock locker(lock_);
136 IdToEndpointInfoMap::iterator it =
137 local_id_to_endpoint_info_map_.find(local_id);
138 if (it != local_id_to_endpoint_info_map_.end() &&
139 it->second.message_pipe.get() == message_pipe.get() &&
140 it->second.port == port) {
141 DCHECK_EQ(it->second.state, EndpointInfo::STATE_NORMAL);
142 // TODO(vtl): FIXME -- This is wrong. We need to specify (to
143 // |AttachMessagePipeEndpoint()| who's going to be responsible for calling
144 // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a
145 // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to
146 // run, then we'll get messages to an "invalid" local ID (for running, for
147 // removal).
148 local_id_to_endpoint_info_map_.erase(it);
149 }
150 }
151 return MessageInTransit::kInvalidEndpointId;
152 }
153
RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,MessageInTransit::EndpointId remote_id)154 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
155 MessageInTransit::EndpointId remote_id) {
156 EndpointInfo endpoint_info;
157 {
158 base::AutoLock locker(lock_);
159
160 IdToEndpointInfoMap::const_iterator it =
161 local_id_to_endpoint_info_map_.find(local_id);
162 if (it == local_id_to_endpoint_info_map_.end())
163 return false;
164 endpoint_info = it->second;
165 }
166
167 // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
168 // and ignore it.
169 if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
170 DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
171 "(local ID " << local_id << ", remote ID " << remote_id << ")";
172 return true;
173 }
174
175 // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
176 // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
177 endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
178 return true;
179 }
180
RunRemoteMessagePipeEndpoint(MessageInTransit::EndpointId local_id,MessageInTransit::EndpointId remote_id)181 void Channel::RunRemoteMessagePipeEndpoint(
182 MessageInTransit::EndpointId local_id,
183 MessageInTransit::EndpointId remote_id) {
184 #if DCHECK_IS_ON
185 {
186 base::AutoLock locker(lock_);
187 DCHECK(local_id_to_endpoint_info_map_.find(local_id) !=
188 local_id_to_endpoint_info_map_.end());
189 }
190 #endif
191
192 if (!SendControlMessage(
193 MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
194 local_id, remote_id)) {
195 HandleLocalError(base::StringPrintf(
196 "Failed to send message to run remote message pipe endpoint (local ID "
197 "%u, remote ID %u)",
198 static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
199 }
200 }
201
WriteMessage(scoped_ptr<MessageInTransit> message)202 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
203 base::AutoLock locker(lock_);
204 if (!is_running_no_lock()) {
205 // TODO(vtl): I think this is probably not an error condition, but I should
206 // think about it (and the shutdown sequence) more carefully.
207 LOG(WARNING) << "WriteMessage() after shutdown";
208 return false;
209 }
210
211 return raw_channel_->WriteMessage(message.Pass());
212 }
213
IsWriteBufferEmpty()214 bool Channel::IsWriteBufferEmpty() {
215 base::AutoLock locker(lock_);
216 if (!is_running_no_lock())
217 return true;
218 return raw_channel_->IsWriteBufferEmpty();
219 }
220
DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id,MessageInTransit::EndpointId remote_id)221 void Channel::DetachMessagePipeEndpoint(
222 MessageInTransit::EndpointId local_id,
223 MessageInTransit::EndpointId remote_id) {
224 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
225
226 bool should_send_remove_message = false;
227 {
228 base::AutoLock locker_(lock_);
229 if (!is_running_no_lock())
230 return;
231
232 IdToEndpointInfoMap::iterator it =
233 local_id_to_endpoint_info_map_.find(local_id);
234 DCHECK(it != local_id_to_endpoint_info_map_.end());
235
236 switch (it->second.state) {
237 case EndpointInfo::STATE_NORMAL:
238 it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
239 it->second.message_pipe = NULL;
240 should_send_remove_message =
241 (remote_id != MessageInTransit::kInvalidEndpointId);
242 break;
243 case EndpointInfo::STATE_WAIT_LOCAL_DETACH:
244 local_id_to_endpoint_info_map_.erase(it);
245 break;
246 case EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK:
247 NOTREACHED();
248 break;
249 case EndpointInfo::STATE_WAIT_LOCAL_DETACH_AND_REMOTE_REMOVE_ACK:
250 it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
251 break;
252 }
253 }
254 if (!should_send_remove_message)
255 return;
256
257 if (!SendControlMessage(
258 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
259 local_id, remote_id)) {
260 HandleLocalError(base::StringPrintf(
261 "Failed to send message to remove remote message pipe endpoint (local "
262 "ID %u, remote ID %u)",
263 static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
264 }
265 }
266
GetSerializedPlatformHandleSize() const267 size_t Channel::GetSerializedPlatformHandleSize() const {
268 return raw_channel_->GetSerializedPlatformHandleSize();
269 }
270
~Channel()271 Channel::~Channel() {
272 // The channel should have been shut down first.
273 DCHECK(!is_running_no_lock());
274 }
275
OnReadMessage(const MessageInTransit::View & message_view,embedder::ScopedPlatformHandleVectorPtr platform_handles)276 void Channel::OnReadMessage(
277 const MessageInTransit::View& message_view,
278 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
279 switch (message_view.type()) {
280 case MessageInTransit::kTypeMessagePipeEndpoint:
281 case MessageInTransit::kTypeMessagePipe:
282 OnReadMessageForDownstream(message_view, platform_handles.Pass());
283 break;
284 case MessageInTransit::kTypeChannel:
285 OnReadMessageForChannel(message_view, platform_handles.Pass());
286 break;
287 default:
288 HandleRemoteError(base::StringPrintf(
289 "Received message of invalid type %u",
290 static_cast<unsigned>(message_view.type())));
291 break;
292 }
293 }
294
OnFatalError(FatalError fatal_error)295 void Channel::OnFatalError(FatalError fatal_error) {
296 switch (fatal_error) {
297 case FATAL_ERROR_READ:
298 // Most read errors aren't notable: they just reflect that the other side
299 // tore down the channel.
300 DVLOG(1) << "RawChannel fatal error (read)";
301 break;
302 case FATAL_ERROR_WRITE:
303 // Write errors are slightly notable: they probably shouldn't happen under
304 // normal operation (but maybe the other side crashed).
305 LOG(WARNING) << "RawChannel fatal error (write)";
306 break;
307 }
308 Shutdown();
309 }
310
OnReadMessageForDownstream(const MessageInTransit::View & message_view,embedder::ScopedPlatformHandleVectorPtr platform_handles)311 void Channel::OnReadMessageForDownstream(
312 const MessageInTransit::View& message_view,
313 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
314 DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
315 message_view.type() == MessageInTransit::kTypeMessagePipe);
316
317 MessageInTransit::EndpointId local_id = message_view.destination_id();
318 if (local_id == MessageInTransit::kInvalidEndpointId) {
319 HandleRemoteError("Received message with no destination ID");
320 return;
321 }
322
323 EndpointInfo endpoint_info;
324 {
325 base::AutoLock locker(lock_);
326
327 // Since we own |raw_channel_|, and this method and |Shutdown()| should only
328 // be called from the creation thread, |raw_channel_| should never be null
329 // here.
330 DCHECK(is_running_no_lock());
331
332 IdToEndpointInfoMap::const_iterator it =
333 local_id_to_endpoint_info_map_.find(local_id);
334 if (it == local_id_to_endpoint_info_map_.end()) {
335 HandleRemoteError(base::StringPrintf(
336 "Received a message for nonexistent local destination ID %u",
337 static_cast<unsigned>(local_id)));
338 // This is strongly indicative of some problem. However, it's not a fatal
339 // error, since it may indicate a bug (or hostile) remote process. Don't
340 // die even for Debug builds, since handling this properly needs to be
341 // tested (TODO(vtl)).
342 DLOG(ERROR) << "This should not happen under normal operation.";
343 return;
344 }
345 endpoint_info = it->second;
346 }
347
348 // Ignore messages for zombie endpoints (not an error).
349 if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
350 DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
351 << local_id << ", remote ID = " << message_view.source_id() << ")";
352 return;
353 }
354
355 // We need to duplicate the message (data), because |EnqueueMessage()| will
356 // take ownership of it.
357 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
358 if (message_view.transport_data_buffer_size() > 0) {
359 DCHECK(message_view.transport_data_buffer());
360 message->SetDispatchers(
361 TransportData::DeserializeDispatchers(
362 message_view.transport_data_buffer(),
363 message_view.transport_data_buffer_size(),
364 platform_handles.Pass(),
365 this));
366 }
367 MojoResult result = endpoint_info.message_pipe->EnqueueMessage(
368 MessagePipe::GetPeerPort(endpoint_info.port), message.Pass());
369 if (result != MOJO_RESULT_OK) {
370 // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint
371 // has been closed (in an unavoidable race). This might also be a "remote"
372 // error, e.g., if the remote side is sending invalid control messages (to
373 // the message pipe).
374 HandleLocalError(base::StringPrintf(
375 "Failed to enqueue message to local ID %u (result %d)",
376 static_cast<unsigned>(local_id), static_cast<int>(result)));
377 return;
378 }
379 }
380
OnReadMessageForChannel(const MessageInTransit::View & message_view,embedder::ScopedPlatformHandleVectorPtr platform_handles)381 void Channel::OnReadMessageForChannel(
382 const MessageInTransit::View& message_view,
383 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
384 DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel);
385
386 // Currently, no channel messages take platform handles.
387 if (platform_handles) {
388 HandleRemoteError(
389 "Received invalid channel message (has platform handles)");
390 NOTREACHED();
391 return;
392 }
393
394 switch (message_view.subtype()) {
395 case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
396 DVLOG(2) << "Handling channel message to run message pipe (local ID "
397 << message_view.destination_id() << ", remote ID "
398 << message_view.source_id() << ")";
399 if (!RunMessagePipeEndpoint(message_view.destination_id(),
400 message_view.source_id())) {
401 HandleRemoteError(
402 "Received invalid channel message to run message pipe");
403 }
404 break;
405 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
406 DVLOG(2) << "Handling channel message to remove message pipe (local ID "
407 << message_view.destination_id() << ", remote ID "
408 << message_view.source_id() << ")";
409 if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
410 message_view.source_id())) {
411 HandleRemoteError(
412 "Received invalid channel message to remove message pipe");
413 }
414 break;
415 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
416 DVLOG(2) << "Handling channel message to ack remove message pipe (local "
417 "ID "
418 << message_view.destination_id() << ", remote ID "
419 << message_view.source_id() << ")";
420 if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
421 message_view.source_id())) {
422 HandleRemoteError(
423 "Received invalid channel message to ack remove message pipe");
424 }
425 break;
426 default:
427 HandleRemoteError("Received invalid channel message");
428 NOTREACHED();
429 break;
430 }
431 }
432
RemoveMessagePipeEndpoint(MessageInTransit::EndpointId local_id,MessageInTransit::EndpointId remote_id)433 bool Channel::RemoveMessagePipeEndpoint(
434 MessageInTransit::EndpointId local_id,
435 MessageInTransit::EndpointId remote_id) {
436 EndpointInfo endpoint_info;
437 {
438 base::AutoLock locker(lock_);
439
440 IdToEndpointInfoMap::iterator it =
441 local_id_to_endpoint_info_map_.find(local_id);
442 if (it == local_id_to_endpoint_info_map_.end()) {
443 DVLOG(2) << "Remove message pipe error: not found";
444 return false;
445 }
446
447 // If it's waiting for the remove ack, just do it and return.
448 if (it->second.state == EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK) {
449 local_id_to_endpoint_info_map_.erase(it);
450 return true;
451 }
452
453 if (it->second.state != EndpointInfo::STATE_NORMAL) {
454 DVLOG(2) << "Remove message pipe error: wrong state";
455 return false;
456 }
457
458 it->second.state = EndpointInfo::STATE_WAIT_LOCAL_DETACH;
459 endpoint_info = it->second;
460 it->second.message_pipe = NULL;
461 }
462
463 if (!SendControlMessage(
464 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
465 local_id, remote_id)) {
466 HandleLocalError(base::StringPrintf(
467 "Failed to send message to remove remote message pipe endpoint ack "
468 "(local ID %u, remote ID %u)",
469 static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
470 }
471
472 endpoint_info.message_pipe->OnRemove(endpoint_info.port);
473
474 return true;
475 }
476
SendControlMessage(MessageInTransit::Subtype subtype,MessageInTransit::EndpointId local_id,MessageInTransit::EndpointId remote_id)477 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
478 MessageInTransit::EndpointId local_id,
479 MessageInTransit::EndpointId remote_id) {
480 DVLOG(2) << "Sending channel control message: subtype " << subtype
481 << ", local ID " << local_id << ", remote ID " << remote_id;
482 scoped_ptr<MessageInTransit> message(new MessageInTransit(
483 MessageInTransit::kTypeChannel, subtype, 0, NULL));
484 message->set_source_id(local_id);
485 message->set_destination_id(remote_id);
486 return WriteMessage(message.Pass());
487 }
488
HandleRemoteError(const base::StringPiece & error_message)489 void Channel::HandleRemoteError(const base::StringPiece& error_message) {
490 // TODO(vtl): Is this how we really want to handle this? Probably we want to
491 // terminate the connection, since it's spewing invalid stuff.
492 LOG(WARNING) << error_message;
493 }
494
HandleLocalError(const base::StringPiece & error_message)495 void Channel::HandleLocalError(const base::StringPiece& error_message) {
496 // TODO(vtl): Is this how we really want to handle this?
497 // Sometimes we'll want to propagate the error back to the message pipe
498 // (endpoint), and notify it that the remote is (effectively) closed.
499 // Sometimes we'll want to kill the channel (and notify all the endpoints that
500 // their remotes are dead.
501 LOG(WARNING) << error_message;
502 }
503
504 } // namespace system
505 } // namespace mojo
506