// Copyright 2017 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/public/cpp/system/file_data_pipe_producer.h" #include #include #include #include #include "base/bind.h" #include "base/callback.h" #include "base/location.h" #include "base/memory/ref_counted_delete_on_sequence.h" #include "base/numerics/safe_conversions.h" #include "base/sequenced_task_runner.h" #include "base/synchronization/lock.h" #include "base/task_scheduler/post_task.h" #include "base/threading/sequenced_task_runner_handle.h" #include "mojo/public/cpp/system/simple_watcher.h" namespace mojo { namespace { // No good reason not to attempt very large pipe transactions in case the data // pipe in use has a very large capacity available, so we default to trying // 64 MB chunks whenever a producer is writable. constexpr uint32_t kDefaultMaxReadSize = 64 * 1024 * 1024; MojoResult FileErrorToMojoResult(base::File::Error error) { switch (error) { case base::File::FILE_OK: return MOJO_RESULT_OK; case base::File::FILE_ERROR_NOT_FOUND: return MOJO_RESULT_NOT_FOUND; case base::File::FILE_ERROR_SECURITY: case base::File::FILE_ERROR_ACCESS_DENIED: return MOJO_RESULT_PERMISSION_DENIED; case base::File::FILE_ERROR_TOO_MANY_OPENED: case base::File::FILE_ERROR_NO_MEMORY: return MOJO_RESULT_RESOURCE_EXHAUSTED; case base::File::FILE_ERROR_ABORT: return MOJO_RESULT_ABORTED; default: return MOJO_RESULT_UNKNOWN; } } } // namespace class FileDataPipeProducer::FileSequenceState : public base::RefCountedDeleteOnSequence { public: using CompletionCallback = base::OnceCallback; FileSequenceState( ScopedDataPipeProducerHandle producer_handle, scoped_refptr file_task_runner, CompletionCallback callback, scoped_refptr callback_task_runner, std::unique_ptr observer) : base::RefCountedDeleteOnSequence( std::move(file_task_runner)), callback_task_runner_(std::move(callback_task_runner)), producer_handle_(std::move(producer_handle)), callback_(std::move(callback)), observer_(std::move(observer)) {} void Cancel() { base::AutoLock lock(lock_); is_cancelled_ = true; } void StartFromFile(base::File file, size_t max_bytes) { owning_task_runner()->PostTask( FROM_HERE, base::BindOnce(&FileSequenceState::StartFromFileOnFileSequence, this, std::move(file), max_bytes)); } void StartFromPath(const base::FilePath& path) { owning_task_runner()->PostTask( FROM_HERE, base::BindOnce(&FileSequenceState::StartFromPathOnFileSequence, this, path)); } private: friend class base::DeleteHelper; friend class base::RefCountedDeleteOnSequence; ~FileSequenceState() = default; void StartFromFileOnFileSequence(base::File file, size_t max_bytes) { if (file.error_details() != base::File::FILE_OK) { Finish(FileErrorToMojoResult(file.error_details())); return; } file_ = std::move(file); max_bytes_ = max_bytes; TransferSomeBytes(); if (producer_handle_.is_valid()) { // If we didn't nail it all on the first transaction attempt, setup a // watcher and complete the read asynchronously. watcher_ = std::make_unique( FROM_HERE, SimpleWatcher::ArmingPolicy::AUTOMATIC, base::SequencedTaskRunnerHandle::Get()); watcher_->Watch(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_WATCH_CONDITION_SATISFIED, base::Bind(&FileSequenceState::OnHandleReady, this)); } } void StartFromPathOnFileSequence(const base::FilePath& path) { StartFromFileOnFileSequence( base::File(path, base::File::FLAG_OPEN | base::File::FLAG_READ), std::numeric_limits::max()); } void OnHandleReady(MojoResult result, const HandleSignalsState& state) { { // Stop ourselves from doing redundant work if we've been cancelled from // another thread. Note that we do not rely on this for any kind of thread // safety concerns. base::AutoLock lock(lock_); if (is_cancelled_) return; } if (result != MOJO_RESULT_OK) { // Either the consumer pipe has been closed or something terrible // happened. In any case, we'll never be able to write more data. Finish(result); return; } TransferSomeBytes(); } void TransferSomeBytes() { while (true) { // Lock as much of the pipe as we can. void* pipe_buffer; uint32_t size = kDefaultMaxReadSize; MojoResult result = producer_handle_->BeginWriteData( &pipe_buffer, &size, MOJO_WRITE_DATA_FLAG_NONE); if (result == MOJO_RESULT_SHOULD_WAIT) return; if (result != MOJO_RESULT_OK) { Finish(result); return; } // Attempt to read that many bytes from the file, directly into the data // pipe. Note that while |max_bytes_remaining| may be very large, the // length we attempt read is bounded by the much smaller // |kDefaultMaxReadSize| via |size|. DCHECK(base::IsValueInRangeForNumericType(size)); const size_t max_bytes_remaining = max_bytes_ - bytes_transferred_; int attempted_read_size = static_cast( std::min(static_cast(size), max_bytes_remaining)); int read_size = file_.ReadAtCurrentPos(static_cast(pipe_buffer), attempted_read_size); base::File::Error read_error; if (read_size < 0) { read_error = base::File::GetLastFileError(); DCHECK_NE(base::File::FILE_OK, read_error); if (observer_) observer_->OnBytesRead(pipe_buffer, 0u, read_error); } else { read_error = base::File::FILE_OK; if (observer_) { observer_->OnBytesRead(pipe_buffer, static_cast(read_size), base::File::FILE_OK); } } producer_handle_->EndWriteData( read_size >= 0 ? static_cast(read_size) : 0); if (read_size < 0) { Finish(FileErrorToMojoResult(read_error)); return; } bytes_transferred_ += read_size; DCHECK_LE(bytes_transferred_, max_bytes_); if (read_size < attempted_read_size) { // ReadAtCurrentPos makes a best effort to read all requested bytes. We // reasonably assume if it fails to read what we ask for, we've hit EOF. Finish(MOJO_RESULT_OK); return; } if (bytes_transferred_ == max_bytes_) { // We've read as much as we were asked to read. Finish(MOJO_RESULT_OK); return; } } } void Finish(MojoResult result) { if (observer_) { observer_->OnDoneReading(); observer_ = nullptr; } watcher_.reset(); callback_task_runner_->PostTask( FROM_HERE, base::BindOnce(std::move(callback_), std::move(producer_handle_), result)); } const scoped_refptr callback_task_runner_; // State which is effectively owned and used only on the file sequence. ScopedDataPipeProducerHandle producer_handle_; base::File file_; size_t max_bytes_ = 0; size_t bytes_transferred_ = 0; CompletionCallback callback_; std::unique_ptr watcher_; // Guards |is_cancelled_|. base::Lock lock_; bool is_cancelled_ = false; std::unique_ptr observer_; DISALLOW_COPY_AND_ASSIGN(FileSequenceState); }; FileDataPipeProducer::FileDataPipeProducer( ScopedDataPipeProducerHandle producer, std::unique_ptr observer) : producer_(std::move(producer)), observer_(std::move(observer)), weak_factory_(this) {} FileDataPipeProducer::~FileDataPipeProducer() { if (file_sequence_state_) file_sequence_state_->Cancel(); } void FileDataPipeProducer::WriteFromFile(base::File file, CompletionCallback callback) { WriteFromFile(std::move(file), std::numeric_limits::max(), std::move(callback)); } void FileDataPipeProducer::WriteFromFile(base::File file, size_t max_bytes, CompletionCallback callback) { InitializeNewRequest(std::move(callback)); file_sequence_state_->StartFromFile(std::move(file), max_bytes); } void FileDataPipeProducer::WriteFromPath(const base::FilePath& path, CompletionCallback callback) { InitializeNewRequest(std::move(callback)); file_sequence_state_->StartFromPath(path); } void FileDataPipeProducer::InitializeNewRequest(CompletionCallback callback) { DCHECK(!file_sequence_state_); LOG(FATAL) << "unsupported in libchrome"; // auto file_task_runner = base::CreateSequencedTaskRunnerWithTraits( // {base::MayBlock(), base::TaskPriority::BACKGROUND}); // file_sequence_state_ = new FileSequenceState( // std::move(producer_), file_task_runner, // base::BindOnce(&FileDataPipeProducer::OnWriteComplete, // weak_factory_.GetWeakPtr(), std::move(callback)), // base::SequencedTaskRunnerHandle::Get(), std::move(observer_)); } void FileDataPipeProducer::OnWriteComplete( CompletionCallback callback, ScopedDataPipeProducerHandle producer, MojoResult ready_result) { producer_ = std::move(producer); file_sequence_state_ = nullptr; std::move(callback).Run(ready_result); } } // namespace mojo