• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/public/cpp/system/file_data_pipe_producer.h"
6 
7 #include <algorithm>
8 #include <limits>
9 #include <memory>
10 #include <utility>
11 
12 #include "base/bind.h"
13 #include "base/callback.h"
14 #include "base/location.h"
15 #include "base/memory/ref_counted_delete_on_sequence.h"
16 #include "base/numerics/safe_conversions.h"
17 #include "base/sequenced_task_runner.h"
18 #include "base/synchronization/lock.h"
19 #include "base/task_scheduler/post_task.h"
20 #include "base/threading/sequenced_task_runner_handle.h"
21 #include "mojo/public/cpp/system/simple_watcher.h"
22 
23 namespace mojo {
24 
25 namespace {
26 
27 // No good reason not to attempt very large pipe transactions in case the data
28 // pipe in use has a very large capacity available, so we default to trying
29 // 64 MB chunks whenever a producer is writable.
30 constexpr uint32_t kDefaultMaxReadSize = 64 * 1024 * 1024;
31 
FileErrorToMojoResult(base::File::Error error)32 MojoResult FileErrorToMojoResult(base::File::Error error) {
33   switch (error) {
34     case base::File::FILE_OK:
35       return MOJO_RESULT_OK;
36     case base::File::FILE_ERROR_NOT_FOUND:
37       return MOJO_RESULT_NOT_FOUND;
38     case base::File::FILE_ERROR_SECURITY:
39     case base::File::FILE_ERROR_ACCESS_DENIED:
40       return MOJO_RESULT_PERMISSION_DENIED;
41     case base::File::FILE_ERROR_TOO_MANY_OPENED:
42     case base::File::FILE_ERROR_NO_MEMORY:
43       return MOJO_RESULT_RESOURCE_EXHAUSTED;
44     case base::File::FILE_ERROR_ABORT:
45       return MOJO_RESULT_ABORTED;
46     default:
47       return MOJO_RESULT_UNKNOWN;
48   }
49 }
50 
51 }  // namespace
52 
53 class FileDataPipeProducer::FileSequenceState
54     : public base::RefCountedDeleteOnSequence<FileSequenceState> {
55  public:
56   using CompletionCallback =
57       base::OnceCallback<void(ScopedDataPipeProducerHandle producer,
58                               MojoResult result)>;
59 
FileSequenceState(ScopedDataPipeProducerHandle producer_handle,scoped_refptr<base::SequencedTaskRunner> file_task_runner,CompletionCallback callback,scoped_refptr<base::SequencedTaskRunner> callback_task_runner,std::unique_ptr<Observer> observer)60   FileSequenceState(
61       ScopedDataPipeProducerHandle producer_handle,
62       scoped_refptr<base::SequencedTaskRunner> file_task_runner,
63       CompletionCallback callback,
64       scoped_refptr<base::SequencedTaskRunner> callback_task_runner,
65       std::unique_ptr<Observer> observer)
66       : base::RefCountedDeleteOnSequence<FileSequenceState>(
67             std::move(file_task_runner)),
68         callback_task_runner_(std::move(callback_task_runner)),
69         producer_handle_(std::move(producer_handle)),
70         callback_(std::move(callback)),
71         observer_(std::move(observer)) {}
72 
Cancel()73   void Cancel() {
74     base::AutoLock lock(lock_);
75     is_cancelled_ = true;
76   }
77 
StartFromFile(base::File file,size_t max_bytes)78   void StartFromFile(base::File file, size_t max_bytes) {
79     owning_task_runner()->PostTask(
80         FROM_HERE,
81         base::BindOnce(&FileSequenceState::StartFromFileOnFileSequence, this,
82                        std::move(file), max_bytes));
83   }
84 
StartFromPath(const base::FilePath & path)85   void StartFromPath(const base::FilePath& path) {
86     owning_task_runner()->PostTask(
87         FROM_HERE,
88         base::BindOnce(&FileSequenceState::StartFromPathOnFileSequence, this,
89                        path));
90   }
91 
92  private:
93   friend class base::DeleteHelper<FileSequenceState>;
94   friend class base::RefCountedDeleteOnSequence<FileSequenceState>;
95 
96   ~FileSequenceState() = default;
97 
StartFromFileOnFileSequence(base::File file,size_t max_bytes)98   void StartFromFileOnFileSequence(base::File file, size_t max_bytes) {
99     if (file.error_details() != base::File::FILE_OK) {
100       Finish(FileErrorToMojoResult(file.error_details()));
101       return;
102     }
103     file_ = std::move(file);
104     max_bytes_ = max_bytes;
105     TransferSomeBytes();
106     if (producer_handle_.is_valid()) {
107       // If we didn't nail it all on the first transaction attempt, setup a
108       // watcher and complete the read asynchronously.
109       watcher_ = std::make_unique<SimpleWatcher>(
110           FROM_HERE, SimpleWatcher::ArmingPolicy::AUTOMATIC,
111           base::SequencedTaskRunnerHandle::Get());
112       watcher_->Watch(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
113                       MOJO_WATCH_CONDITION_SATISFIED,
114                       base::Bind(&FileSequenceState::OnHandleReady, this));
115     }
116   }
117 
StartFromPathOnFileSequence(const base::FilePath & path)118   void StartFromPathOnFileSequence(const base::FilePath& path) {
119     StartFromFileOnFileSequence(
120         base::File(path, base::File::FLAG_OPEN | base::File::FLAG_READ),
121         std::numeric_limits<size_t>::max());
122   }
123 
OnHandleReady(MojoResult result,const HandleSignalsState & state)124   void OnHandleReady(MojoResult result, const HandleSignalsState& state) {
125     {
126       // Stop ourselves from doing redundant work if we've been cancelled from
127       // another thread. Note that we do not rely on this for any kind of thread
128       // safety concerns.
129       base::AutoLock lock(lock_);
130       if (is_cancelled_)
131         return;
132     }
133 
134     if (result != MOJO_RESULT_OK) {
135       // Either the consumer pipe has been closed or something terrible
136       // happened. In any case, we'll never be able to write more data.
137       Finish(result);
138       return;
139     }
140 
141     TransferSomeBytes();
142   }
143 
TransferSomeBytes()144   void TransferSomeBytes() {
145     while (true) {
146       // Lock as much of the pipe as we can.
147       void* pipe_buffer;
148       uint32_t size = kDefaultMaxReadSize;
149       MojoResult result = producer_handle_->BeginWriteData(
150           &pipe_buffer, &size, MOJO_WRITE_DATA_FLAG_NONE);
151       if (result == MOJO_RESULT_SHOULD_WAIT)
152         return;
153       if (result != MOJO_RESULT_OK) {
154         Finish(result);
155         return;
156       }
157 
158       // Attempt to read that many bytes from the file, directly into the data
159       // pipe. Note that while |max_bytes_remaining| may be very large, the
160       // length we attempt read is bounded by the much smaller
161       // |kDefaultMaxReadSize| via |size|.
162       DCHECK(base::IsValueInRangeForNumericType<int>(size));
163       const size_t max_bytes_remaining = max_bytes_ - bytes_transferred_;
164       int attempted_read_size = static_cast<int>(
165           std::min(static_cast<size_t>(size), max_bytes_remaining));
166       int read_size = file_.ReadAtCurrentPos(static_cast<char*>(pipe_buffer),
167                                              attempted_read_size);
168       base::File::Error read_error;
169       if (read_size < 0) {
170         read_error = base::File::GetLastFileError();
171         DCHECK_NE(base::File::FILE_OK, read_error);
172         if (observer_)
173           observer_->OnBytesRead(pipe_buffer, 0u, read_error);
174       } else {
175         read_error = base::File::FILE_OK;
176         if (observer_) {
177           observer_->OnBytesRead(pipe_buffer, static_cast<size_t>(read_size),
178                                  base::File::FILE_OK);
179         }
180       }
181       producer_handle_->EndWriteData(
182           read_size >= 0 ? static_cast<uint32_t>(read_size) : 0);
183 
184       if (read_size < 0) {
185         Finish(FileErrorToMojoResult(read_error));
186         return;
187       }
188 
189       bytes_transferred_ += read_size;
190       DCHECK_LE(bytes_transferred_, max_bytes_);
191 
192       if (read_size < attempted_read_size) {
193         // ReadAtCurrentPos makes a best effort to read all requested bytes. We
194         // reasonably assume if it fails to read what we ask for, we've hit EOF.
195         Finish(MOJO_RESULT_OK);
196         return;
197       }
198 
199       if (bytes_transferred_ == max_bytes_) {
200         // We've read as much as we were asked to read.
201         Finish(MOJO_RESULT_OK);
202         return;
203       }
204     }
205   }
206 
Finish(MojoResult result)207   void Finish(MojoResult result) {
208     if (observer_) {
209       observer_->OnDoneReading();
210       observer_ = nullptr;
211     }
212     watcher_.reset();
213     callback_task_runner_->PostTask(
214         FROM_HERE, base::BindOnce(std::move(callback_),
215                                   std::move(producer_handle_), result));
216   }
217 
218   const scoped_refptr<base::SequencedTaskRunner> callback_task_runner_;
219 
220   // State which is effectively owned and used only on the file sequence.
221   ScopedDataPipeProducerHandle producer_handle_;
222   base::File file_;
223   size_t max_bytes_ = 0;
224   size_t bytes_transferred_ = 0;
225   CompletionCallback callback_;
226   std::unique_ptr<SimpleWatcher> watcher_;
227 
228   // Guards |is_cancelled_|.
229   base::Lock lock_;
230   bool is_cancelled_ = false;
231   std::unique_ptr<Observer> observer_;
232 
233   DISALLOW_COPY_AND_ASSIGN(FileSequenceState);
234 };
235 
FileDataPipeProducer(ScopedDataPipeProducerHandle producer,std::unique_ptr<Observer> observer)236 FileDataPipeProducer::FileDataPipeProducer(
237     ScopedDataPipeProducerHandle producer,
238     std::unique_ptr<Observer> observer)
239     : producer_(std::move(producer)),
240       observer_(std::move(observer)),
241       weak_factory_(this) {}
242 
~FileDataPipeProducer()243 FileDataPipeProducer::~FileDataPipeProducer() {
244   if (file_sequence_state_)
245     file_sequence_state_->Cancel();
246 }
247 
WriteFromFile(base::File file,CompletionCallback callback)248 void FileDataPipeProducer::WriteFromFile(base::File file,
249                                          CompletionCallback callback) {
250   WriteFromFile(std::move(file), std::numeric_limits<size_t>::max(),
251                 std::move(callback));
252 }
253 
WriteFromFile(base::File file,size_t max_bytes,CompletionCallback callback)254 void FileDataPipeProducer::WriteFromFile(base::File file,
255                                          size_t max_bytes,
256                                          CompletionCallback callback) {
257   InitializeNewRequest(std::move(callback));
258   file_sequence_state_->StartFromFile(std::move(file), max_bytes);
259 }
260 
WriteFromPath(const base::FilePath & path,CompletionCallback callback)261 void FileDataPipeProducer::WriteFromPath(const base::FilePath& path,
262                                          CompletionCallback callback) {
263   InitializeNewRequest(std::move(callback));
264   file_sequence_state_->StartFromPath(path);
265 }
266 
InitializeNewRequest(CompletionCallback callback)267 void FileDataPipeProducer::InitializeNewRequest(CompletionCallback callback) {
268   DCHECK(!file_sequence_state_);
269 
270   LOG(FATAL) << "unsupported in libchrome";
271   // auto file_task_runner = base::CreateSequencedTaskRunnerWithTraits(
272   //     {base::MayBlock(), base::TaskPriority::BACKGROUND});
273   // file_sequence_state_ = new FileSequenceState(
274   //     std::move(producer_), file_task_runner,
275   //     base::BindOnce(&FileDataPipeProducer::OnWriteComplete,
276   //                    weak_factory_.GetWeakPtr(), std::move(callback)),
277   //     base::SequencedTaskRunnerHandle::Get(), std::move(observer_));
278 }
279 
OnWriteComplete(CompletionCallback callback,ScopedDataPipeProducerHandle producer,MojoResult ready_result)280 void FileDataPipeProducer::OnWriteComplete(
281     CompletionCallback callback,
282     ScopedDataPipeProducerHandle producer,
283     MojoResult ready_result) {
284   producer_ = std::move(producer);
285   file_sequence_state_ = nullptr;
286   std::move(callback).Run(ready_result);
287 }
288 
289 }  // namespace mojo
290