// Copyright 2016 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/core/node_channel.h" #include #include #include #include "base/bind.h" #include "base/location.h" #include "base/logging.h" #include "base/memory/ptr_util.h" #include "mojo/core/channel.h" #include "mojo/core/configuration.h" #include "mojo/core/core.h" #include "mojo/core/request_context.h" namespace mojo { namespace core { namespace { // NOTE: Please ONLY append messages to the end of this enum. enum class MessageType : uint32_t { ACCEPT_INVITEE, ACCEPT_INVITATION, ADD_BROKER_CLIENT, BROKER_CLIENT_ADDED, ACCEPT_BROKER_CLIENT, EVENT_MESSAGE, REQUEST_PORT_MERGE, REQUEST_INTRODUCTION, INTRODUCE, #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) RELAY_EVENT_MESSAGE, #endif BROADCAST_EVENT, #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) EVENT_MESSAGE_FROM_RELAY, #endif ACCEPT_PEER, }; struct Header { MessageType type; uint32_t padding; }; static_assert(IsAlignedForChannelMessage(sizeof(Header)), "Invalid header size."); struct AcceptInviteeData { ports::NodeName inviter_name; ports::NodeName token; }; struct AcceptInvitationData { ports::NodeName token; ports::NodeName invitee_name; }; struct AcceptPeerData { ports::NodeName token; ports::NodeName peer_name; ports::PortName port_name; }; // This message may include a process handle on plaforms that require it. struct AddBrokerClientData { ports::NodeName client_name; #if !defined(OS_WIN) uint32_t process_handle; uint32_t padding; #endif }; #if !defined(OS_WIN) static_assert(sizeof(base::ProcessHandle) == sizeof(uint32_t), "Unexpected pid size"); static_assert(sizeof(AddBrokerClientData) % kChannelMessageAlignment == 0, "Invalid AddBrokerClientData size."); #endif // This data is followed by a platform channel handle to the broker. struct BrokerClientAddedData { ports::NodeName client_name; }; // This data may be followed by a platform channel handle to the broker. If not, // then the inviter is the broker and its channel should be used as such. struct AcceptBrokerClientData { ports::NodeName broker_name; }; // This is followed by arbitrary payload data which is interpreted as a token // string for port location. struct RequestPortMergeData { ports::PortName connector_port_name; }; // Used for both REQUEST_INTRODUCTION and INTRODUCE. // // For INTRODUCE the message also includes a valid platform handle for a channel // the receiver may use to communicate with the named node directly, or an // invalid platform handle if the node is unknown to the sender or otherwise // cannot be introduced. struct IntroductionData { ports::NodeName name; }; #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) // This struct is followed by the full payload of a message to be relayed. struct RelayEventMessageData { ports::NodeName destination; }; // This struct is followed by the full payload of a relayed message. struct EventMessageFromRelayData { ports::NodeName source; }; #endif template Channel::MessagePtr CreateMessage(MessageType type, size_t payload_size, size_t num_handles, DataType** out_data, size_t capacity = 0) { const size_t total_size = payload_size + sizeof(Header); if (capacity == 0) capacity = total_size; else capacity = std::max(total_size, capacity); auto message = std::make_unique(capacity, total_size, num_handles); Header* header = reinterpret_cast(message->mutable_payload()); header->type = type; header->padding = 0; *out_data = reinterpret_cast(&header[1]); return message; } template bool GetMessagePayload(const void* bytes, size_t num_bytes, DataType** out_data) { static_assert(sizeof(DataType) > 0, "DataType must have non-zero size."); if (num_bytes < sizeof(Header) + sizeof(DataType)) return false; *out_data = reinterpret_cast( static_cast(bytes) + sizeof(Header)); return true; } } // namespace // static scoped_refptr NodeChannel::Create( Delegate* delegate, ConnectionParams connection_params, scoped_refptr io_task_runner, const ProcessErrorCallback& process_error_callback) { #if defined(OS_NACL_SFI) LOG(FATAL) << "Multi-process not yet supported on NaCl-SFI"; return nullptr; #else return new NodeChannel(delegate, std::move(connection_params), io_task_runner, process_error_callback); #endif } // static Channel::MessagePtr NodeChannel::CreateEventMessage(size_t capacity, size_t payload_size, void** payload, size_t num_handles) { return CreateMessage(MessageType::EVENT_MESSAGE, payload_size, num_handles, payload, capacity); } // static void NodeChannel::GetEventMessageData(Channel::Message* message, void** data, size_t* num_data_bytes) { // NOTE: OnChannelMessage guarantees that we never accept a Channel::Message // with a payload of fewer than |sizeof(Header)| bytes. *data = reinterpret_cast(message->mutable_payload()) + 1; *num_data_bytes = message->payload_size() - sizeof(Header); } void NodeChannel::Start() { base::AutoLock lock(channel_lock_); // ShutDown() may have already been called, in which case |channel_| is null. if (channel_) channel_->Start(); } void NodeChannel::ShutDown() { base::AutoLock lock(channel_lock_); if (channel_) { channel_->ShutDown(); channel_ = nullptr; } } void NodeChannel::LeakHandleOnShutdown() { base::AutoLock lock(channel_lock_); if (channel_) { channel_->LeakHandle(); } } void NodeChannel::NotifyBadMessage(const std::string& error) { if (!process_error_callback_.is_null()) process_error_callback_.Run("Received bad user message: " + error); } void NodeChannel::SetRemoteProcessHandle(ScopedProcessHandle process_handle) { DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); { base::AutoLock lock(channel_lock_); if (channel_) channel_->set_remote_process(process_handle.Clone()); } base::AutoLock lock(remote_process_handle_lock_); DCHECK(!remote_process_handle_.is_valid()); CHECK_NE(remote_process_handle_.get(), base::GetCurrentProcessHandle()); remote_process_handle_ = std::move(process_handle); } bool NodeChannel::HasRemoteProcessHandle() { base::AutoLock lock(remote_process_handle_lock_); return remote_process_handle_.is_valid(); } ScopedProcessHandle NodeChannel::CloneRemoteProcessHandle() { base::AutoLock lock(remote_process_handle_lock_); if (!remote_process_handle_.is_valid()) return ScopedProcessHandle(); return remote_process_handle_.Clone(); } void NodeChannel::SetRemoteNodeName(const ports::NodeName& name) { DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); remote_node_name_ = name; } void NodeChannel::AcceptInvitee(const ports::NodeName& inviter_name, const ports::NodeName& token) { AcceptInviteeData* data; Channel::MessagePtr message = CreateMessage( MessageType::ACCEPT_INVITEE, sizeof(AcceptInviteeData), 0, &data); data->inviter_name = inviter_name; data->token = token; WriteChannelMessage(std::move(message)); } void NodeChannel::AcceptInvitation(const ports::NodeName& token, const ports::NodeName& invitee_name) { AcceptInvitationData* data; Channel::MessagePtr message = CreateMessage( MessageType::ACCEPT_INVITATION, sizeof(AcceptInvitationData), 0, &data); data->token = token; data->invitee_name = invitee_name; WriteChannelMessage(std::move(message)); } void NodeChannel::AcceptPeer(const ports::NodeName& sender_name, const ports::NodeName& token, const ports::PortName& port_name) { AcceptPeerData* data; Channel::MessagePtr message = CreateMessage(MessageType::ACCEPT_PEER, sizeof(AcceptPeerData), 0, &data); data->token = token; data->peer_name = sender_name; data->port_name = port_name; WriteChannelMessage(std::move(message)); } void NodeChannel::AddBrokerClient(const ports::NodeName& client_name, ScopedProcessHandle process_handle) { AddBrokerClientData* data; std::vector handles; #if defined(OS_WIN) handles.emplace_back(base::win::ScopedHandle(process_handle.release())); #endif Channel::MessagePtr message = CreateMessage(MessageType::ADD_BROKER_CLIENT, sizeof(AddBrokerClientData), handles.size(), &data); message->SetHandles(std::move(handles)); data->client_name = client_name; #if !defined(OS_WIN) data->process_handle = process_handle.get(); data->padding = 0; #endif WriteChannelMessage(std::move(message)); } void NodeChannel::BrokerClientAdded(const ports::NodeName& client_name, PlatformHandle broker_channel) { BrokerClientAddedData* data; std::vector handles; if (broker_channel.is_valid()) handles.emplace_back(std::move(broker_channel)); Channel::MessagePtr message = CreateMessage(MessageType::BROKER_CLIENT_ADDED, sizeof(BrokerClientAddedData), handles.size(), &data); message->SetHandles(std::move(handles)); data->client_name = client_name; WriteChannelMessage(std::move(message)); } void NodeChannel::AcceptBrokerClient(const ports::NodeName& broker_name, PlatformHandle broker_channel) { AcceptBrokerClientData* data; std::vector handles; if (broker_channel.is_valid()) handles.emplace_back(std::move(broker_channel)); Channel::MessagePtr message = CreateMessage(MessageType::ACCEPT_BROKER_CLIENT, sizeof(AcceptBrokerClientData), handles.size(), &data); message->SetHandles(std::move(handles)); data->broker_name = broker_name; WriteChannelMessage(std::move(message)); } void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name, const std::string& token) { RequestPortMergeData* data; Channel::MessagePtr message = CreateMessage(MessageType::REQUEST_PORT_MERGE, sizeof(RequestPortMergeData) + token.size(), 0, &data); data->connector_port_name = connector_port_name; memcpy(data + 1, token.data(), token.size()); WriteChannelMessage(std::move(message)); } void NodeChannel::RequestIntroduction(const ports::NodeName& name) { IntroductionData* data; Channel::MessagePtr message = CreateMessage( MessageType::REQUEST_INTRODUCTION, sizeof(IntroductionData), 0, &data); data->name = name; WriteChannelMessage(std::move(message)); } void NodeChannel::Introduce(const ports::NodeName& name, PlatformHandle channel_handle) { IntroductionData* data; std::vector handles; if (channel_handle.is_valid()) handles.emplace_back(std::move(channel_handle)); Channel::MessagePtr message = CreateMessage( MessageType::INTRODUCE, sizeof(IntroductionData), handles.size(), &data); message->SetHandles(std::move(handles)); data->name = name; WriteChannelMessage(std::move(message)); } void NodeChannel::SendChannelMessage(Channel::MessagePtr message) { WriteChannelMessage(std::move(message)); } void NodeChannel::Broadcast(Channel::MessagePtr message) { DCHECK(!message->has_handles()); void* data; Channel::MessagePtr broadcast_message = CreateMessage( MessageType::BROADCAST_EVENT, message->data_num_bytes(), 0, &data); memcpy(data, message->data(), message->data_num_bytes()); WriteChannelMessage(std::move(broadcast_message)); } #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) void NodeChannel::RelayEventMessage(const ports::NodeName& destination, Channel::MessagePtr message) { #if defined(OS_WIN) DCHECK(message->has_handles()); // Note that this is only used on Windows, and on Windows all platform // handles are included in the message data. We blindly copy all the data // here and the relay node (the broker) will duplicate handles as needed. size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes(); RelayEventMessageData* data; Channel::MessagePtr relay_message = CreateMessage(MessageType::RELAY_EVENT_MESSAGE, num_bytes, 0, &data); data->destination = destination; memcpy(data + 1, message->data(), message->data_num_bytes()); // When the handles are duplicated in the broker, the source handles will // be closed. If the broker never receives this message then these handles // will leak, but that means something else has probably broken and the // sending process won't likely be around much longer. // // TODO(https://crbug.com/813112): We would like to be able to violate the // above stated assumption. We should not leak handles in cases where we // outlive the broker, as we may continue existing and eventually accept a new // broker invitation. std::vector handles = message->TakeHandles(); for (auto& handle : handles) handle.TakeHandle().release(); #else DCHECK(message->has_mach_ports()); // On OSX, the handles are extracted from the relayed message and attached to // the wrapper. The broker then takes the handles attached to the wrapper and // moves them back to the relayed message. This is necessary because the // message may contain fds which need to be attached to the outer message so // that they can be transferred to the broker. std::vector handles = message->TakeHandles(); size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes(); RelayEventMessageData* data; Channel::MessagePtr relay_message = CreateMessage( MessageType::RELAY_EVENT_MESSAGE, num_bytes, handles.size(), &data); data->destination = destination; memcpy(data + 1, message->data(), message->data_num_bytes()); relay_message->SetHandles(std::move(handles)); #endif // defined(OS_WIN) WriteChannelMessage(std::move(relay_message)); } void NodeChannel::EventMessageFromRelay(const ports::NodeName& source, Channel::MessagePtr message) { size_t num_bytes = sizeof(EventMessageFromRelayData) + message->payload_size(); EventMessageFromRelayData* data; Channel::MessagePtr relayed_message = CreateMessage(MessageType::EVENT_MESSAGE_FROM_RELAY, num_bytes, message->num_handles(), &data); data->source = source; if (message->payload_size()) memcpy(data + 1, message->payload(), message->payload_size()); relayed_message->SetHandles(message->TakeHandles()); WriteChannelMessage(std::move(relayed_message)); } #endif // defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) NodeChannel::NodeChannel(Delegate* delegate, ConnectionParams connection_params, scoped_refptr io_task_runner, const ProcessErrorCallback& process_error_callback) : delegate_(delegate), io_task_runner_(io_task_runner), process_error_callback_(process_error_callback) #if !defined(OS_NACL_SFI) , channel_( Channel::Create(this, std::move(connection_params), io_task_runner_)) #endif { } NodeChannel::~NodeChannel() { ShutDown(); } void NodeChannel::OnChannelMessage(const void* payload, size_t payload_size, std::vector handles) { DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); RequestContext request_context(RequestContext::Source::SYSTEM); // Ensure this NodeChannel stays alive through the extent of this method. The // delegate may have the only other reference to this object and it may choose // to drop it here in response to, e.g., a malformed message. scoped_refptr keepalive = this; if (payload_size <= sizeof(Header)) { delegate_->OnChannelError(remote_node_name_, this); return; } const Header* header = static_cast(payload); switch (header->type) { case MessageType::ACCEPT_INVITEE: { const AcceptInviteeData* data; if (GetMessagePayload(payload, payload_size, &data)) { delegate_->OnAcceptInvitee(remote_node_name_, data->inviter_name, data->token); return; } break; } case MessageType::ACCEPT_INVITATION: { const AcceptInvitationData* data; if (GetMessagePayload(payload, payload_size, &data)) { delegate_->OnAcceptInvitation(remote_node_name_, data->token, data->invitee_name); return; } break; } case MessageType::ADD_BROKER_CLIENT: { const AddBrokerClientData* data; if (GetMessagePayload(payload, payload_size, &data)) { #if defined(OS_WIN) if (handles.size() != 1) { DLOG(ERROR) << "Dropping invalid AddBrokerClient message."; break; } delegate_->OnAddBrokerClient(remote_node_name_, data->client_name, handles[0].ReleaseHandle()); #else if (!handles.empty()) { DLOG(ERROR) << "Dropping invalid AddBrokerClient message."; break; } delegate_->OnAddBrokerClient(remote_node_name_, data->client_name, data->process_handle); #endif return; } break; } case MessageType::BROKER_CLIENT_ADDED: { const BrokerClientAddedData* data; if (GetMessagePayload(payload, payload_size, &data)) { if (handles.size() != 1) { DLOG(ERROR) << "Dropping invalid BrokerClientAdded message."; break; } delegate_->OnBrokerClientAdded(remote_node_name_, data->client_name, std::move(handles[0])); return; } break; } case MessageType::ACCEPT_BROKER_CLIENT: { const AcceptBrokerClientData* data; if (GetMessagePayload(payload, payload_size, &data)) { PlatformHandle broker_channel; if (handles.size() > 1) { DLOG(ERROR) << "Dropping invalid AcceptBrokerClient message."; break; } if (handles.size() == 1) broker_channel = std::move(handles[0]); delegate_->OnAcceptBrokerClient(remote_node_name_, data->broker_name, std::move(broker_channel)); return; } break; } case MessageType::EVENT_MESSAGE: { Channel::MessagePtr message( new Channel::Message(payload_size, handles.size())); message->SetHandles(std::move(handles)); memcpy(message->mutable_payload(), payload, payload_size); delegate_->OnEventMessage(remote_node_name_, std::move(message)); return; } case MessageType::REQUEST_PORT_MERGE: { const RequestPortMergeData* data; if (GetMessagePayload(payload, payload_size, &data)) { // Don't accept an empty token. size_t token_size = payload_size - sizeof(*data) - sizeof(Header); if (token_size == 0) break; std::string token(reinterpret_cast(data + 1), token_size); delegate_->OnRequestPortMerge(remote_node_name_, data->connector_port_name, token); return; } break; } case MessageType::REQUEST_INTRODUCTION: { const IntroductionData* data; if (GetMessagePayload(payload, payload_size, &data)) { delegate_->OnRequestIntroduction(remote_node_name_, data->name); return; } break; } case MessageType::INTRODUCE: { const IntroductionData* data; if (GetMessagePayload(payload, payload_size, &data)) { if (handles.size() > 1) { DLOG(ERROR) << "Dropping invalid introduction message."; break; } PlatformHandle channel_handle; if (handles.size() == 1) channel_handle = std::move(handles[0]); delegate_->OnIntroduce(remote_node_name_, data->name, std::move(channel_handle)); return; } break; } #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) case MessageType::RELAY_EVENT_MESSAGE: { base::ProcessHandle from_process; { base::AutoLock lock(remote_process_handle_lock_); // NOTE: It's safe to retain a weak reference to this process handle // through the extent of this call because |this| is kept alive and // |remote_process_handle_| is never reset once set. from_process = remote_process_handle_.get(); } const RelayEventMessageData* data; if (GetMessagePayload(payload, payload_size, &data)) { // Don't try to relay an empty message. if (payload_size <= sizeof(Header) + sizeof(RelayEventMessageData)) break; const void* message_start = data + 1; Channel::MessagePtr message = Channel::Message::Deserialize( message_start, payload_size - sizeof(Header) - sizeof(*data), from_process); if (!message) { DLOG(ERROR) << "Dropping invalid relay message."; break; } #if defined(OS_MACOSX) && !defined(OS_IOS) message->SetHandles(std::move(handles)); #endif delegate_->OnRelayEventMessage(remote_node_name_, from_process, data->destination, std::move(message)); return; } break; } #endif case MessageType::BROADCAST_EVENT: { if (payload_size <= sizeof(Header)) break; const void* data = static_cast( reinterpret_cast(payload) + 1); Channel::MessagePtr message = Channel::Message::Deserialize(data, payload_size - sizeof(Header)); if (!message || message->has_handles()) { DLOG(ERROR) << "Dropping invalid broadcast message."; break; } delegate_->OnBroadcast(remote_node_name_, std::move(message)); return; } #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) case MessageType::EVENT_MESSAGE_FROM_RELAY: const EventMessageFromRelayData* data; if (GetMessagePayload(payload, payload_size, &data)) { size_t num_bytes = payload_size - sizeof(*data); if (num_bytes < sizeof(Header)) break; num_bytes -= sizeof(Header); Channel::MessagePtr message( new Channel::Message(num_bytes, handles.size())); message->SetHandles(std::move(handles)); if (num_bytes) memcpy(message->mutable_payload(), data + 1, num_bytes); delegate_->OnEventMessageFromRelay(remote_node_name_, data->source, std::move(message)); return; } break; #endif // defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) case MessageType::ACCEPT_PEER: { const AcceptPeerData* data; if (GetMessagePayload(payload, payload_size, &data)) { delegate_->OnAcceptPeer(remote_node_name_, data->token, data->peer_name, data->port_name); return; } break; } default: // Ignore unrecognized message types, allowing for future extensibility. return; } DLOG(ERROR) << "Received invalid message. Closing channel."; if (process_error_callback_) process_error_callback_.Run("NodeChannel received a malformed message"); delegate_->OnChannelError(remote_node_name_, this); } void NodeChannel::OnChannelError(Channel::Error error) { DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); RequestContext request_context(RequestContext::Source::SYSTEM); ShutDown(); if (process_error_callback_ && error == Channel::Error::kReceivedMalformedData) { process_error_callback_.Run("Channel received a malformed message"); } // |OnChannelError()| may cause |this| to be destroyed, but still need access // to the name name after that destruction. So may a copy of // |remote_node_name_| so it can be used if |this| becomes destroyed. ports::NodeName node_name = remote_node_name_; delegate_->OnChannelError(node_name, this); } void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) { // Force a crash if this process attempts to send a message larger than the // maximum allowed size. This is more useful than killing a Channel when we // *receive* an oversized message, as we should consider oversized message // transmission to be a bug and this helps easily identify offending code. CHECK(message->data_num_bytes() < GetConfiguration().max_message_num_bytes); base::AutoLock lock(channel_lock_); if (!channel_) DLOG(ERROR) << "Dropping message on closed channel."; else channel_->Write(std::move(message)); } } // namespace core } // namespace mojo