1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/system/file_data_pipe_producer.h"
6
7 #include <algorithm>
8 #include <limits>
9 #include <memory>
10 #include <utility>
11
12 #include "base/bind.h"
13 #include "base/callback.h"
14 #include "base/location.h"
15 #include "base/memory/ref_counted_delete_on_sequence.h"
16 #include "base/numerics/safe_conversions.h"
17 #include "base/sequenced_task_runner.h"
18 #include "base/synchronization/lock.h"
19 #include "base/task_scheduler/post_task.h"
20 #include "base/threading/sequenced_task_runner_handle.h"
21 #include "mojo/public/cpp/system/simple_watcher.h"
22
23 namespace mojo {
24
25 namespace {
26
27 // No good reason not to attempt very large pipe transactions in case the data
28 // pipe in use has a very large capacity available, so we default to trying
29 // 64 MB chunks whenever a producer is writable.
30 constexpr uint32_t kDefaultMaxReadSize = 64 * 1024 * 1024;
31
FileErrorToMojoResult(base::File::Error error)32 MojoResult FileErrorToMojoResult(base::File::Error error) {
33 switch (error) {
34 case base::File::FILE_OK:
35 return MOJO_RESULT_OK;
36 case base::File::FILE_ERROR_NOT_FOUND:
37 return MOJO_RESULT_NOT_FOUND;
38 case base::File::FILE_ERROR_SECURITY:
39 case base::File::FILE_ERROR_ACCESS_DENIED:
40 return MOJO_RESULT_PERMISSION_DENIED;
41 case base::File::FILE_ERROR_TOO_MANY_OPENED:
42 case base::File::FILE_ERROR_NO_MEMORY:
43 return MOJO_RESULT_RESOURCE_EXHAUSTED;
44 case base::File::FILE_ERROR_ABORT:
45 return MOJO_RESULT_ABORTED;
46 default:
47 return MOJO_RESULT_UNKNOWN;
48 }
49 }
50
51 } // namespace
52
53 class FileDataPipeProducer::FileSequenceState
54 : public base::RefCountedDeleteOnSequence<FileSequenceState> {
55 public:
56 using CompletionCallback =
57 base::OnceCallback<void(ScopedDataPipeProducerHandle producer,
58 MojoResult result)>;
59
FileSequenceState(ScopedDataPipeProducerHandle producer_handle,scoped_refptr<base::SequencedTaskRunner> file_task_runner,CompletionCallback callback,scoped_refptr<base::SequencedTaskRunner> callback_task_runner,std::unique_ptr<Observer> observer)60 FileSequenceState(
61 ScopedDataPipeProducerHandle producer_handle,
62 scoped_refptr<base::SequencedTaskRunner> file_task_runner,
63 CompletionCallback callback,
64 scoped_refptr<base::SequencedTaskRunner> callback_task_runner,
65 std::unique_ptr<Observer> observer)
66 : base::RefCountedDeleteOnSequence<FileSequenceState>(
67 std::move(file_task_runner)),
68 callback_task_runner_(std::move(callback_task_runner)),
69 producer_handle_(std::move(producer_handle)),
70 callback_(std::move(callback)),
71 observer_(std::move(observer)) {}
72
Cancel()73 void Cancel() {
74 base::AutoLock lock(lock_);
75 is_cancelled_ = true;
76 }
77
StartFromFile(base::File file,size_t max_bytes)78 void StartFromFile(base::File file, size_t max_bytes) {
79 owning_task_runner()->PostTask(
80 FROM_HERE,
81 base::BindOnce(&FileSequenceState::StartFromFileOnFileSequence, this,
82 std::move(file), max_bytes));
83 }
84
StartFromPath(const base::FilePath & path)85 void StartFromPath(const base::FilePath& path) {
86 owning_task_runner()->PostTask(
87 FROM_HERE,
88 base::BindOnce(&FileSequenceState::StartFromPathOnFileSequence, this,
89 path));
90 }
91
92 private:
93 friend class base::DeleteHelper<FileSequenceState>;
94 friend class base::RefCountedDeleteOnSequence<FileSequenceState>;
95
96 ~FileSequenceState() = default;
97
StartFromFileOnFileSequence(base::File file,size_t max_bytes)98 void StartFromFileOnFileSequence(base::File file, size_t max_bytes) {
99 if (file.error_details() != base::File::FILE_OK) {
100 Finish(FileErrorToMojoResult(file.error_details()));
101 return;
102 }
103 file_ = std::move(file);
104 max_bytes_ = max_bytes;
105 TransferSomeBytes();
106 if (producer_handle_.is_valid()) {
107 // If we didn't nail it all on the first transaction attempt, setup a
108 // watcher and complete the read asynchronously.
109 watcher_ = std::make_unique<SimpleWatcher>(
110 FROM_HERE, SimpleWatcher::ArmingPolicy::AUTOMATIC,
111 base::SequencedTaskRunnerHandle::Get());
112 watcher_->Watch(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
113 MOJO_WATCH_CONDITION_SATISFIED,
114 base::Bind(&FileSequenceState::OnHandleReady, this));
115 }
116 }
117
StartFromPathOnFileSequence(const base::FilePath & path)118 void StartFromPathOnFileSequence(const base::FilePath& path) {
119 StartFromFileOnFileSequence(
120 base::File(path, base::File::FLAG_OPEN | base::File::FLAG_READ),
121 std::numeric_limits<size_t>::max());
122 }
123
OnHandleReady(MojoResult result,const HandleSignalsState & state)124 void OnHandleReady(MojoResult result, const HandleSignalsState& state) {
125 {
126 // Stop ourselves from doing redundant work if we've been cancelled from
127 // another thread. Note that we do not rely on this for any kind of thread
128 // safety concerns.
129 base::AutoLock lock(lock_);
130 if (is_cancelled_)
131 return;
132 }
133
134 if (result != MOJO_RESULT_OK) {
135 // Either the consumer pipe has been closed or something terrible
136 // happened. In any case, we'll never be able to write more data.
137 Finish(result);
138 return;
139 }
140
141 TransferSomeBytes();
142 }
143
TransferSomeBytes()144 void TransferSomeBytes() {
145 while (true) {
146 // Lock as much of the pipe as we can.
147 void* pipe_buffer;
148 uint32_t size = kDefaultMaxReadSize;
149 MojoResult result = producer_handle_->BeginWriteData(
150 &pipe_buffer, &size, MOJO_WRITE_DATA_FLAG_NONE);
151 if (result == MOJO_RESULT_SHOULD_WAIT)
152 return;
153 if (result != MOJO_RESULT_OK) {
154 Finish(result);
155 return;
156 }
157
158 // Attempt to read that many bytes from the file, directly into the data
159 // pipe. Note that while |max_bytes_remaining| may be very large, the
160 // length we attempt read is bounded by the much smaller
161 // |kDefaultMaxReadSize| via |size|.
162 DCHECK(base::IsValueInRangeForNumericType<int>(size));
163 const size_t max_bytes_remaining = max_bytes_ - bytes_transferred_;
164 int attempted_read_size = static_cast<int>(
165 std::min(static_cast<size_t>(size), max_bytes_remaining));
166 int read_size = file_.ReadAtCurrentPos(static_cast<char*>(pipe_buffer),
167 attempted_read_size);
168 base::File::Error read_error;
169 if (read_size < 0) {
170 read_error = base::File::GetLastFileError();
171 DCHECK_NE(base::File::FILE_OK, read_error);
172 if (observer_)
173 observer_->OnBytesRead(pipe_buffer, 0u, read_error);
174 } else {
175 read_error = base::File::FILE_OK;
176 if (observer_) {
177 observer_->OnBytesRead(pipe_buffer, static_cast<size_t>(read_size),
178 base::File::FILE_OK);
179 }
180 }
181 producer_handle_->EndWriteData(
182 read_size >= 0 ? static_cast<uint32_t>(read_size) : 0);
183
184 if (read_size < 0) {
185 Finish(FileErrorToMojoResult(read_error));
186 return;
187 }
188
189 bytes_transferred_ += read_size;
190 DCHECK_LE(bytes_transferred_, max_bytes_);
191
192 if (read_size < attempted_read_size) {
193 // ReadAtCurrentPos makes a best effort to read all requested bytes. We
194 // reasonably assume if it fails to read what we ask for, we've hit EOF.
195 Finish(MOJO_RESULT_OK);
196 return;
197 }
198
199 if (bytes_transferred_ == max_bytes_) {
200 // We've read as much as we were asked to read.
201 Finish(MOJO_RESULT_OK);
202 return;
203 }
204 }
205 }
206
Finish(MojoResult result)207 void Finish(MojoResult result) {
208 if (observer_) {
209 observer_->OnDoneReading();
210 observer_ = nullptr;
211 }
212 watcher_.reset();
213 callback_task_runner_->PostTask(
214 FROM_HERE, base::BindOnce(std::move(callback_),
215 std::move(producer_handle_), result));
216 }
217
218 const scoped_refptr<base::SequencedTaskRunner> callback_task_runner_;
219
220 // State which is effectively owned and used only on the file sequence.
221 ScopedDataPipeProducerHandle producer_handle_;
222 base::File file_;
223 size_t max_bytes_ = 0;
224 size_t bytes_transferred_ = 0;
225 CompletionCallback callback_;
226 std::unique_ptr<SimpleWatcher> watcher_;
227
228 // Guards |is_cancelled_|.
229 base::Lock lock_;
230 bool is_cancelled_ = false;
231 std::unique_ptr<Observer> observer_;
232
233 DISALLOW_COPY_AND_ASSIGN(FileSequenceState);
234 };
235
FileDataPipeProducer(ScopedDataPipeProducerHandle producer,std::unique_ptr<Observer> observer)236 FileDataPipeProducer::FileDataPipeProducer(
237 ScopedDataPipeProducerHandle producer,
238 std::unique_ptr<Observer> observer)
239 : producer_(std::move(producer)),
240 observer_(std::move(observer)),
241 weak_factory_(this) {}
242
~FileDataPipeProducer()243 FileDataPipeProducer::~FileDataPipeProducer() {
244 if (file_sequence_state_)
245 file_sequence_state_->Cancel();
246 }
247
WriteFromFile(base::File file,CompletionCallback callback)248 void FileDataPipeProducer::WriteFromFile(base::File file,
249 CompletionCallback callback) {
250 WriteFromFile(std::move(file), std::numeric_limits<size_t>::max(),
251 std::move(callback));
252 }
253
WriteFromFile(base::File file,size_t max_bytes,CompletionCallback callback)254 void FileDataPipeProducer::WriteFromFile(base::File file,
255 size_t max_bytes,
256 CompletionCallback callback) {
257 InitializeNewRequest(std::move(callback));
258 file_sequence_state_->StartFromFile(std::move(file), max_bytes);
259 }
260
WriteFromPath(const base::FilePath & path,CompletionCallback callback)261 void FileDataPipeProducer::WriteFromPath(const base::FilePath& path,
262 CompletionCallback callback) {
263 InitializeNewRequest(std::move(callback));
264 file_sequence_state_->StartFromPath(path);
265 }
266
InitializeNewRequest(CompletionCallback callback)267 void FileDataPipeProducer::InitializeNewRequest(CompletionCallback callback) {
268 DCHECK(!file_sequence_state_);
269
270 LOG(FATAL) << "unsupported in libchrome";
271 // auto file_task_runner = base::CreateSequencedTaskRunnerWithTraits(
272 // {base::MayBlock(), base::TaskPriority::BACKGROUND});
273 // file_sequence_state_ = new FileSequenceState(
274 // std::move(producer_), file_task_runner,
275 // base::BindOnce(&FileDataPipeProducer::OnWriteComplete,
276 // weak_factory_.GetWeakPtr(), std::move(callback)),
277 // base::SequencedTaskRunnerHandle::Get(), std::move(observer_));
278 }
279
OnWriteComplete(CompletionCallback callback,ScopedDataPipeProducerHandle producer,MojoResult ready_result)280 void FileDataPipeProducer::OnWriteComplete(
281 CompletionCallback callback,
282 ScopedDataPipeProducerHandle producer,
283 MojoResult ready_result) {
284 producer_ = std::move(producer);
285 file_sequence_state_ = nullptr;
286 std::move(callback).Run(ready_result);
287 }
288
289 } // namespace mojo
290