// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/core/data_pipe_consumer_dispatcher.h" #include #include #include #include #include #include "base/bind.h" #include "base/logging.h" #include "base/memory/ref_counted.h" #include "mojo/core/core.h" #include "mojo/core/data_pipe_control_message.h" #include "mojo/core/node_controller.h" #include "mojo/core/platform_handle_utils.h" #include "mojo/core/request_context.h" #include "mojo/core/user_message_impl.h" #include "mojo/public/c/system/data_pipe.h" namespace mojo { namespace core { namespace { const uint8_t kFlagPeerClosed = 0x01; #pragma pack(push, 1) struct SerializedState { MojoCreateDataPipeOptions options; uint64_t pipe_id; uint32_t read_offset; uint32_t bytes_available; uint8_t flags; uint64_t buffer_guid_high; uint64_t buffer_guid_low; char padding[7]; }; static_assert(sizeof(SerializedState) % 8 == 0, "Invalid SerializedState size."); #pragma pack(pop) } // namespace // A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a // reference to the dispatcher to ensure it lives as long as the observed port. class DataPipeConsumerDispatcher::PortObserverThunk : public NodeController::PortObserver { public: explicit PortObserverThunk( scoped_refptr dispatcher) : dispatcher_(dispatcher) {} private: ~PortObserverThunk() override {} // NodeController::PortObserver: void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } scoped_refptr dispatcher_; DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); }; // static scoped_refptr DataPipeConsumerDispatcher::Create( NodeController* node_controller, const ports::PortRef& control_port, base::UnsafeSharedMemoryRegion shared_ring_buffer, const MojoCreateDataPipeOptions& options, uint64_t pipe_id) { scoped_refptr consumer = new DataPipeConsumerDispatcher(node_controller, control_port, std::move(shared_ring_buffer), options, pipe_id); base::AutoLock lock(consumer->lock_); if (!consumer->InitializeNoLock()) return nullptr; return consumer; } Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { return Type::DATA_PIPE_CONSUMER; } MojoResult DataPipeConsumerDispatcher::Close() { base::AutoLock lock(lock_); DVLOG(1) << "Closing data pipe consumer " << pipe_id_; return CloseNoLock(); } MojoResult DataPipeConsumerDispatcher::ReadData( const MojoReadDataOptions& options, void* elements, uint32_t* num_bytes) { base::AutoLock lock(lock_); if (!shared_ring_buffer_.IsValid() || in_transit_) return MOJO_RESULT_INVALID_ARGUMENT; if (in_two_phase_read_) return MOJO_RESULT_BUSY; const bool had_new_data = new_data_available_; new_data_available_ = false; if ((options.flags & MOJO_READ_DATA_FLAG_QUERY)) { if ((options.flags & MOJO_READ_DATA_FLAG_PEEK) || (options.flags & MOJO_READ_DATA_FLAG_DISCARD)) return MOJO_RESULT_INVALID_ARGUMENT; DCHECK(!(options.flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. DVLOG_IF(2, elements) << "Query mode: ignoring non-null |elements|"; *num_bytes = static_cast(bytes_available_); if (had_new_data) watchers_.NotifyState(GetHandleSignalsStateNoLock()); return MOJO_RESULT_OK; } bool discard = false; if ((options.flags & MOJO_READ_DATA_FLAG_DISCARD)) { // These flags are mutally exclusive. if (options.flags & MOJO_READ_DATA_FLAG_PEEK) return MOJO_RESULT_INVALID_ARGUMENT; DVLOG_IF(2, elements) << "Discard mode: ignoring non-null |elements|"; discard = true; } uint32_t max_num_bytes_to_read = *num_bytes; if (max_num_bytes_to_read % options_.element_num_bytes != 0) return MOJO_RESULT_INVALID_ARGUMENT; bool all_or_none = options.flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0; if (min_num_bytes_to_read > bytes_available_) { if (had_new_data) watchers_.NotifyState(GetHandleSignalsStateNoLock()); return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE; } uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); if (bytes_to_read == 0) { if (had_new_data) watchers_.NotifyState(GetHandleSignalsStateNoLock()); return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; } if (!discard) { const uint8_t* data = static_cast(ring_buffer_mapping_.memory()); CHECK(data); uint8_t* destination = static_cast(elements); CHECK(destination); DCHECK_LE(read_offset_, options_.capacity_num_bytes); uint32_t tail_bytes_to_copy = std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read); uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy; if (tail_bytes_to_copy > 0) memcpy(destination, data + read_offset_, tail_bytes_to_copy); if (head_bytes_to_copy > 0) memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy); } *num_bytes = bytes_to_read; bool peek = !!(options.flags & MOJO_READ_DATA_FLAG_PEEK); if (discard || !peek) { read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes; bytes_available_ -= bytes_to_read; base::AutoUnlock unlock(lock_); NotifyRead(bytes_to_read); } // We may have just read the last available data and thus changed the signals // state. watchers_.NotifyState(GetHandleSignalsStateNoLock()); return MOJO_RESULT_OK; } MojoResult DataPipeConsumerDispatcher::BeginReadData( const void** buffer, uint32_t* buffer_num_bytes) { base::AutoLock lock(lock_); if (!shared_ring_buffer_.IsValid() || in_transit_) return MOJO_RESULT_INVALID_ARGUMENT; if (in_two_phase_read_) return MOJO_RESULT_BUSY; const bool had_new_data = new_data_available_; new_data_available_ = false; if (bytes_available_ == 0) { if (had_new_data) watchers_.NotifyState(GetHandleSignalsStateNoLock()); return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; } DCHECK_LT(read_offset_, options_.capacity_num_bytes); uint32_t bytes_to_read = std::min(bytes_available_, options_.capacity_num_bytes - read_offset_); CHECK(ring_buffer_mapping_.IsValid()); uint8_t* data = static_cast(ring_buffer_mapping_.memory()); CHECK(data); in_two_phase_read_ = true; *buffer = data + read_offset_; *buffer_num_bytes = bytes_to_read; two_phase_max_bytes_read_ = bytes_to_read; if (had_new_data) watchers_.NotifyState(GetHandleSignalsStateNoLock()); return MOJO_RESULT_OK; } MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { base::AutoLock lock(lock_); if (!in_two_phase_read_) return MOJO_RESULT_FAILED_PRECONDITION; if (in_transit_) return MOJO_RESULT_INVALID_ARGUMENT; CHECK(shared_ring_buffer_.IsValid()); MojoResult rv; if (num_bytes_read > two_phase_max_bytes_read_ || num_bytes_read % options_.element_num_bytes != 0) { rv = MOJO_RESULT_INVALID_ARGUMENT; } else { rv = MOJO_RESULT_OK; read_offset_ = (read_offset_ + num_bytes_read) % options_.capacity_num_bytes; DCHECK_GE(bytes_available_, num_bytes_read); bytes_available_ -= num_bytes_read; base::AutoUnlock unlock(lock_); NotifyRead(num_bytes_read); } in_two_phase_read_ = false; two_phase_max_bytes_read_ = 0; watchers_.NotifyState(GetHandleSignalsStateNoLock()); return rv; } HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { base::AutoLock lock(lock_); return GetHandleSignalsStateNoLock(); } MojoResult DataPipeConsumerDispatcher::AddWatcherRef( const scoped_refptr& watcher, uintptr_t context) { base::AutoLock lock(lock_); if (is_closed_ || in_transit_) return MOJO_RESULT_INVALID_ARGUMENT; return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); } MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef( WatcherDispatcher* watcher, uintptr_t context) { base::AutoLock lock(lock_); if (is_closed_ || in_transit_) return MOJO_RESULT_INVALID_ARGUMENT; return watchers_.Remove(watcher, context); } void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes, uint32_t* num_ports, uint32_t* num_handles) { base::AutoLock lock(lock_); DCHECK(in_transit_); *num_bytes = static_cast(sizeof(SerializedState)); *num_ports = 1; *num_handles = 1; } bool DataPipeConsumerDispatcher::EndSerialize( void* destination, ports::PortName* ports, PlatformHandle* platform_handles) { SerializedState* state = static_cast(destination); memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions)); memset(state->padding, 0, sizeof(state->padding)); base::AutoLock lock(lock_); DCHECK(in_transit_); state->pipe_id = pipe_id_; state->read_offset = read_offset_; state->bytes_available = bytes_available_; state->flags = peer_closed_ ? kFlagPeerClosed : 0; auto region_handle = base::UnsafeSharedMemoryRegion::TakeHandleForSerialization( std::move(shared_ring_buffer_)); const base::UnguessableToken& guid = region_handle.GetGUID(); state->buffer_guid_high = guid.GetHighForSerialization(); state->buffer_guid_low = guid.GetLowForSerialization(); ports[0] = control_port_.name(); PlatformHandle handle; PlatformHandle ignored_handle; ExtractPlatformHandlesFromSharedMemoryRegionHandle( region_handle.PassPlatformHandle(), &handle, &ignored_handle); if (!handle.is_valid() || ignored_handle.is_valid()) return false; platform_handles[0] = std::move(handle); return true; } bool DataPipeConsumerDispatcher::BeginTransit() { base::AutoLock lock(lock_); if (in_transit_) return false; in_transit_ = !in_two_phase_read_; return in_transit_; } void DataPipeConsumerDispatcher::CompleteTransitAndClose() { node_controller_->SetPortObserver(control_port_, nullptr); base::AutoLock lock(lock_); DCHECK(in_transit_); in_transit_ = false; transferred_ = true; CloseNoLock(); } void DataPipeConsumerDispatcher::CancelTransit() { base::AutoLock lock(lock_); DCHECK(in_transit_); in_transit_ = false; UpdateSignalsStateNoLock(); } // static scoped_refptr DataPipeConsumerDispatcher::Deserialize(const void* data, size_t num_bytes, const ports::PortName* ports, size_t num_ports, PlatformHandle* handles, size_t num_handles) { if (num_ports != 1 || num_handles != 1 || num_bytes != sizeof(SerializedState)) { return nullptr; } const SerializedState* state = static_cast(data); if (!state->options.capacity_num_bytes || !state->options.element_num_bytes || state->options.capacity_num_bytes < state->options.element_num_bytes) { return nullptr; } NodeController* node_controller = Core::Get()->GetNodeController(); ports::PortRef port; if (node_controller->node()->GetPort(ports[0], &port) != ports::OK) return nullptr; auto region_handle = CreateSharedMemoryRegionHandleFromPlatformHandles( std::move(handles[0]), PlatformHandle()); auto region = base::subtle::PlatformSharedMemoryRegion::Take( std::move(region_handle), base::subtle::PlatformSharedMemoryRegion::Mode::kUnsafe, state->options.capacity_num_bytes, base::UnguessableToken::Deserialize(state->buffer_guid_high, state->buffer_guid_low)); auto ring_buffer = base::UnsafeSharedMemoryRegion::Deserialize(std::move(region)); if (!ring_buffer.IsValid()) { DLOG(ERROR) << "Failed to deserialize shared buffer handle."; return nullptr; } scoped_refptr dispatcher = new DataPipeConsumerDispatcher(node_controller, port, std::move(ring_buffer), state->options, state->pipe_id); { base::AutoLock lock(dispatcher->lock_); dispatcher->read_offset_ = state->read_offset; dispatcher->bytes_available_ = state->bytes_available; dispatcher->new_data_available_ = state->bytes_available > 0; dispatcher->peer_closed_ = state->flags & kFlagPeerClosed; if (!dispatcher->InitializeNoLock()) return nullptr; dispatcher->UpdateSignalsStateNoLock(); } return dispatcher; } DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( NodeController* node_controller, const ports::PortRef& control_port, base::UnsafeSharedMemoryRegion shared_ring_buffer, const MojoCreateDataPipeOptions& options, uint64_t pipe_id) : options_(options), node_controller_(node_controller), control_port_(control_port), pipe_id_(pipe_id), watchers_(this), shared_ring_buffer_(std::move(shared_ring_buffer)) {} DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { DCHECK(is_closed_ && !shared_ring_buffer_.IsValid() && !ring_buffer_mapping_.IsValid() && !in_transit_); } bool DataPipeConsumerDispatcher::InitializeNoLock() { lock_.AssertAcquired(); if (!shared_ring_buffer_.IsValid()) return false; DCHECK(!ring_buffer_mapping_.IsValid()); ring_buffer_mapping_ = shared_ring_buffer_.Map(); if (!ring_buffer_mapping_.IsValid()) { DLOG(ERROR) << "Failed to map shared buffer."; shared_ring_buffer_ = base::UnsafeSharedMemoryRegion(); return false; } base::AutoUnlock unlock(lock_); node_controller_->SetPortObserver( control_port_, base::MakeRefCounted(this)); return true; } MojoResult DataPipeConsumerDispatcher::CloseNoLock() { lock_.AssertAcquired(); if (is_closed_ || in_transit_) return MOJO_RESULT_INVALID_ARGUMENT; is_closed_ = true; ring_buffer_mapping_ = base::WritableSharedMemoryMapping(); shared_ring_buffer_ = base::UnsafeSharedMemoryRegion(); watchers_.NotifyClosed(); if (!transferred_) { base::AutoUnlock unlock(lock_); node_controller_->ClosePort(control_port_); } return MOJO_RESULT_OK; } HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { lock_.AssertAcquired(); HandleSignalsState rv; if (shared_ring_buffer_.IsValid() && bytes_available_) { if (!in_two_phase_read_) { rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; if (new_data_available_) rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE; } rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; } else if (!peer_closed_ && shared_ring_buffer_.IsValid()) { rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; } if (shared_ring_buffer_.IsValid()) { if (new_data_available_ || !peer_closed_) rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE; } if (peer_closed_) { rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; } else { rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_REMOTE; if (peer_remote_) rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_REMOTE; } rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; return rv; } void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) { DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: " << num_bytes << " bytes read. [control_port=" << control_port_.name() << "]"; SendDataPipeControlMessage(node_controller_, control_port_, DataPipeCommand::DATA_WAS_READ, num_bytes); } void DataPipeConsumerDispatcher::OnPortStatusChanged() { DCHECK(RequestContext::current()); base::AutoLock lock(lock_); // We stop observing the control port as soon it's transferred, but this can // race with events which are raised right before that happens. This is fine // to ignore. if (transferred_) return; DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_; UpdateSignalsStateNoLock(); } void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() { lock_.AssertAcquired(); const bool was_peer_closed = peer_closed_; const bool was_peer_remote = peer_remote_; size_t previous_bytes_available = bytes_available_; ports::PortStatus port_status; int rv = node_controller_->node()->GetStatus(control_port_, &port_status); peer_remote_ = rv == ports::OK && port_status.peer_remote; if (rv != ports::OK || !port_status.receiving_messages) { DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure" << " [control_port=" << control_port_.name() << "]"; peer_closed_ = true; } else if (rv == ports::OK && port_status.has_messages && !in_transit_) { std::unique_ptr message_event; do { int rv = node_controller_->node()->GetMessage(control_port_, &message_event, nullptr); if (rv != ports::OK) peer_closed_ = true; if (message_event) { auto* message = message_event->GetMessage(); if (message->user_payload_size() < sizeof(DataPipeControlMessage)) { peer_closed_ = true; break; } const DataPipeControlMessage* m = static_cast(message->user_payload()); if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) { DLOG(ERROR) << "Unexpected control message from producer."; peer_closed_ = true; break; } if (static_cast(bytes_available_) + m->num_bytes > options_.capacity_num_bytes) { DLOG(ERROR) << "Producer claims to have written too many bytes."; peer_closed_ = true; break; } DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that " << m->num_bytes << " bytes were written. [control_port=" << control_port_.name() << "]"; bytes_available_ += m->num_bytes; } } while (message_event); } bool has_new_data = bytes_available_ != previous_bytes_available; if (has_new_data) new_data_available_ = true; if (peer_closed_ != was_peer_closed || has_new_data || peer_remote_ != was_peer_remote) { watchers_.NotifyState(GetHandleSignalsStateNoLock()); } } } // namespace core } // namespace mojo