1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <brillo/streams/stream.h>
6
7 #include <algorithm>
8
9 #include <base/bind.h>
10 #include <brillo/message_loops/message_loop.h>
11 #include <brillo/pointer_utils.h>
12 #include <brillo/streams/stream_errors.h>
13 #include <brillo/streams/stream_utils.h>
14
15 namespace brillo {
16
TruncateBlocking(ErrorPtr * error)17 bool Stream::TruncateBlocking(ErrorPtr* error) {
18 return SetSizeBlocking(GetPosition(), error);
19 }
20
SetPosition(uint64_t position,ErrorPtr * error)21 bool Stream::SetPosition(uint64_t position, ErrorPtr* error) {
22 if (!stream_utils::CheckInt64Overflow(FROM_HERE, position, 0, error))
23 return false;
24 return Seek(position, Whence::FROM_BEGIN, nullptr, error);
25 }
26
ReadAsync(void * buffer,size_t size_to_read,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)27 bool Stream::ReadAsync(void* buffer,
28 size_t size_to_read,
29 const base::Callback<void(size_t)>& success_callback,
30 const ErrorCallback& error_callback,
31 ErrorPtr* error) {
32 if (is_async_read_pending_) {
33 Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
34 errors::stream::kOperationNotSupported,
35 "Another asynchronous operation is still pending");
36 return false;
37 }
38
39 auto callback = base::Bind(&Stream::IgnoreEOSCallback, success_callback);
40 // If we can read some data right away non-blocking we should still run the
41 // callback from the main loop, so we pass true here for force_async_callback.
42 return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
43 true);
44 }
45
ReadAllAsync(void * buffer,size_t size_to_read,const base::Closure & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)46 bool Stream::ReadAllAsync(void* buffer,
47 size_t size_to_read,
48 const base::Closure& success_callback,
49 const ErrorCallback& error_callback,
50 ErrorPtr* error) {
51 if (is_async_read_pending_) {
52 Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
53 errors::stream::kOperationNotSupported,
54 "Another asynchronous operation is still pending");
55 return false;
56 }
57
58 auto callback = base::Bind(&Stream::ReadAllAsyncCallback,
59 weak_ptr_factory_.GetWeakPtr(), buffer,
60 size_to_read, success_callback, error_callback);
61 return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
62 true);
63 }
64
ReadBlocking(void * buffer,size_t size_to_read,size_t * size_read,ErrorPtr * error)65 bool Stream::ReadBlocking(void* buffer,
66 size_t size_to_read,
67 size_t* size_read,
68 ErrorPtr* error) {
69 for (;;) {
70 bool eos = false;
71 if (!ReadNonBlocking(buffer, size_to_read, size_read, &eos, error))
72 return false;
73
74 if (*size_read > 0 || eos)
75 break;
76
77 if (!WaitForDataBlocking(AccessMode::READ, base::TimeDelta::Max(), nullptr,
78 error)) {
79 return false;
80 }
81 }
82 return true;
83 }
84
ReadAllBlocking(void * buffer,size_t size_to_read,ErrorPtr * error)85 bool Stream::ReadAllBlocking(void* buffer,
86 size_t size_to_read,
87 ErrorPtr* error) {
88 while (size_to_read > 0) {
89 size_t size_read = 0;
90 if (!ReadBlocking(buffer, size_to_read, &size_read, error))
91 return false;
92
93 if (size_read == 0)
94 return stream_utils::ErrorReadPastEndOfStream(FROM_HERE, error);
95
96 size_to_read -= size_read;
97 buffer = AdvancePointer(buffer, size_read);
98 }
99 return true;
100 }
101
WriteAsync(const void * buffer,size_t size_to_write,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)102 bool Stream::WriteAsync(const void* buffer,
103 size_t size_to_write,
104 const base::Callback<void(size_t)>& success_callback,
105 const ErrorCallback& error_callback,
106 ErrorPtr* error) {
107 if (is_async_write_pending_) {
108 Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
109 errors::stream::kOperationNotSupported,
110 "Another asynchronous operation is still pending");
111 return false;
112 }
113 // If we can read some data right away non-blocking we should still run the
114 // callback from the main loop, so we pass true here for force_async_callback.
115 return WriteAsyncImpl(buffer, size_to_write, success_callback, error_callback,
116 error, true);
117 }
118
WriteAllAsync(const void * buffer,size_t size_to_write,const base::Closure & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)119 bool Stream::WriteAllAsync(const void* buffer,
120 size_t size_to_write,
121 const base::Closure& success_callback,
122 const ErrorCallback& error_callback,
123 ErrorPtr* error) {
124 if (is_async_write_pending_) {
125 Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
126 errors::stream::kOperationNotSupported,
127 "Another asynchronous operation is still pending");
128 return false;
129 }
130
131 auto callback = base::Bind(&Stream::WriteAllAsyncCallback,
132 weak_ptr_factory_.GetWeakPtr(), buffer,
133 size_to_write, success_callback, error_callback);
134 return WriteAsyncImpl(buffer, size_to_write, callback, error_callback, error,
135 true);
136 }
137
WriteBlocking(const void * buffer,size_t size_to_write,size_t * size_written,ErrorPtr * error)138 bool Stream::WriteBlocking(const void* buffer,
139 size_t size_to_write,
140 size_t* size_written,
141 ErrorPtr* error) {
142 for (;;) {
143 if (!WriteNonBlocking(buffer, size_to_write, size_written, error))
144 return false;
145
146 if (*size_written > 0 || size_to_write == 0)
147 break;
148
149 if (!WaitForDataBlocking(AccessMode::WRITE, base::TimeDelta::Max(), nullptr,
150 error)) {
151 return false;
152 }
153 }
154 return true;
155 }
156
WriteAllBlocking(const void * buffer,size_t size_to_write,ErrorPtr * error)157 bool Stream::WriteAllBlocking(const void* buffer,
158 size_t size_to_write,
159 ErrorPtr* error) {
160 while (size_to_write > 0) {
161 size_t size_written = 0;
162 if (!WriteBlocking(buffer, size_to_write, &size_written, error))
163 return false;
164
165 if (size_written == 0) {
166 Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
167 errors::stream::kPartialData,
168 "Failed to write all the data");
169 return false;
170 }
171 size_to_write -= size_written;
172 buffer = AdvancePointer(buffer, size_written);
173 }
174 return true;
175 }
176
FlushAsync(const base::Closure & success_callback,const ErrorCallback & error_callback,ErrorPtr *)177 bool Stream::FlushAsync(const base::Closure& success_callback,
178 const ErrorCallback& error_callback,
179 ErrorPtr* /* error */) {
180 auto callback = base::Bind(&Stream::FlushAsyncCallback,
181 weak_ptr_factory_.GetWeakPtr(),
182 success_callback, error_callback);
183 MessageLoop::current()->PostTask(FROM_HERE, callback);
184 return true;
185 }
186
IgnoreEOSCallback(const base::Callback<void (size_t)> & success_callback,size_t bytes,bool)187 void Stream::IgnoreEOSCallback(
188 const base::Callback<void(size_t)>& success_callback,
189 size_t bytes,
190 bool /* eos */) {
191 success_callback.Run(bytes);
192 }
193
ReadAsyncImpl(void * buffer,size_t size_to_read,const base::Callback<void (size_t,bool)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error,bool force_async_callback)194 bool Stream::ReadAsyncImpl(
195 void* buffer,
196 size_t size_to_read,
197 const base::Callback<void(size_t, bool)>& success_callback,
198 const ErrorCallback& error_callback,
199 ErrorPtr* error,
200 bool force_async_callback) {
201 CHECK(!is_async_read_pending_);
202 // We set this value to true early in the function so calling others will
203 // prevent us from calling WaitForData() to make calls to
204 // ReadAsync() fail while we run WaitForData().
205 is_async_read_pending_ = true;
206
207 size_t read = 0;
208 bool eos = false;
209 if (!ReadNonBlocking(buffer, size_to_read, &read, &eos, error))
210 return false;
211
212 if (read > 0 || eos) {
213 if (force_async_callback) {
214 MessageLoop::current()->PostTask(
215 FROM_HERE,
216 base::BindOnce(&Stream::OnReadAsyncDone,
217 weak_ptr_factory_.GetWeakPtr(),
218 success_callback, read, eos));
219 } else {
220 is_async_read_pending_ = false;
221 success_callback.Run(read, eos);
222 }
223 return true;
224 }
225
226 is_async_read_pending_ = WaitForData(
227 AccessMode::READ,
228 base::Bind(&Stream::OnReadAvailable, weak_ptr_factory_.GetWeakPtr(),
229 buffer, size_to_read, success_callback, error_callback),
230 error);
231 return is_async_read_pending_;
232 }
233
OnReadAsyncDone(const base::Callback<void (size_t,bool)> & success_callback,size_t bytes_read,bool eos)234 void Stream::OnReadAsyncDone(
235 const base::Callback<void(size_t, bool)>& success_callback,
236 size_t bytes_read,
237 bool eos) {
238 is_async_read_pending_ = false;
239 success_callback.Run(bytes_read, eos);
240 }
241
OnReadAvailable(void * buffer,size_t size_to_read,const base::Callback<void (size_t,bool)> & success_callback,const ErrorCallback & error_callback,AccessMode mode)242 void Stream::OnReadAvailable(
243 void* buffer,
244 size_t size_to_read,
245 const base::Callback<void(size_t, bool)>& success_callback,
246 const ErrorCallback& error_callback,
247 AccessMode mode) {
248 CHECK(stream_utils::IsReadAccessMode(mode));
249 CHECK(is_async_read_pending_);
250 is_async_read_pending_ = false;
251 ErrorPtr error;
252 // Just reschedule the read operation but don't need to run the callback from
253 // the main loop since we are already running on a callback.
254 if (!ReadAsyncImpl(buffer, size_to_read, success_callback, error_callback,
255 &error, false)) {
256 error_callback.Run(error.get());
257 }
258 }
259
WriteAsyncImpl(const void * buffer,size_t size_to_write,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error,bool force_async_callback)260 bool Stream::WriteAsyncImpl(
261 const void* buffer,
262 size_t size_to_write,
263 const base::Callback<void(size_t)>& success_callback,
264 const ErrorCallback& error_callback,
265 ErrorPtr* error,
266 bool force_async_callback) {
267 CHECK(!is_async_write_pending_);
268 // We set this value to true early in the function so calling others will
269 // prevent us from calling WaitForData() to make calls to
270 // ReadAsync() fail while we run WaitForData().
271 is_async_write_pending_ = true;
272
273 size_t written = 0;
274 if (!WriteNonBlocking(buffer, size_to_write, &written, error))
275 return false;
276
277 if (written > 0) {
278 if (force_async_callback) {
279 MessageLoop::current()->PostTask(
280 FROM_HERE,
281 base::BindOnce(&Stream::OnWriteAsyncDone,
282 weak_ptr_factory_.GetWeakPtr(),
283 success_callback, written));
284 } else {
285 is_async_write_pending_ = false;
286 success_callback.Run(written);
287 }
288 return true;
289 }
290 is_async_write_pending_ = WaitForData(
291 AccessMode::WRITE,
292 base::Bind(&Stream::OnWriteAvailable, weak_ptr_factory_.GetWeakPtr(),
293 buffer, size_to_write, success_callback, error_callback),
294 error);
295 return is_async_write_pending_;
296 }
297
OnWriteAsyncDone(const base::Callback<void (size_t)> & success_callback,size_t size_written)298 void Stream::OnWriteAsyncDone(
299 const base::Callback<void(size_t)>& success_callback,
300 size_t size_written) {
301 is_async_write_pending_ = false;
302 success_callback.Run(size_written);
303 }
304
OnWriteAvailable(const void * buffer,size_t size,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,AccessMode mode)305 void Stream::OnWriteAvailable(
306 const void* buffer,
307 size_t size,
308 const base::Callback<void(size_t)>& success_callback,
309 const ErrorCallback& error_callback,
310 AccessMode mode) {
311 CHECK(stream_utils::IsWriteAccessMode(mode));
312 CHECK(is_async_write_pending_);
313 is_async_write_pending_ = false;
314 ErrorPtr error;
315 // Just reschedule the read operation but don't need to run the callback from
316 // the main loop since we are already running on a callback.
317 if (!WriteAsyncImpl(buffer, size, success_callback, error_callback, &error,
318 false)) {
319 error_callback.Run(error.get());
320 }
321 }
322
ReadAllAsyncCallback(void * buffer,size_t size_to_read,const base::Closure & success_callback,const ErrorCallback & error_callback,size_t size_read,bool eos)323 void Stream::ReadAllAsyncCallback(void* buffer,
324 size_t size_to_read,
325 const base::Closure& success_callback,
326 const ErrorCallback& error_callback,
327 size_t size_read,
328 bool eos) {
329 ErrorPtr error;
330 size_to_read -= size_read;
331 if (size_to_read != 0 && eos) {
332 stream_utils::ErrorReadPastEndOfStream(FROM_HERE, &error);
333 error_callback.Run(error.get());
334 return;
335 }
336
337 if (size_to_read) {
338 buffer = AdvancePointer(buffer, size_read);
339 auto callback = base::Bind(&Stream::ReadAllAsyncCallback,
340 weak_ptr_factory_.GetWeakPtr(), buffer,
341 size_to_read, success_callback, error_callback);
342 if (!ReadAsyncImpl(buffer, size_to_read, callback, error_callback, &error,
343 false)) {
344 error_callback.Run(error.get());
345 }
346 } else {
347 success_callback.Run();
348 }
349 }
350
WriteAllAsyncCallback(const void * buffer,size_t size_to_write,const base::Closure & success_callback,const ErrorCallback & error_callback,size_t size_written)351 void Stream::WriteAllAsyncCallback(const void* buffer,
352 size_t size_to_write,
353 const base::Closure& success_callback,
354 const ErrorCallback& error_callback,
355 size_t size_written) {
356 ErrorPtr error;
357 if (size_to_write != 0 && size_written == 0) {
358 Error::AddTo(&error, FROM_HERE, errors::stream::kDomain,
359 errors::stream::kPartialData, "Failed to write all the data");
360 error_callback.Run(error.get());
361 return;
362 }
363 size_to_write -= size_written;
364 if (size_to_write) {
365 buffer = AdvancePointer(buffer, size_written);
366 auto callback = base::Bind(&Stream::WriteAllAsyncCallback,
367 weak_ptr_factory_.GetWeakPtr(), buffer,
368 size_to_write, success_callback, error_callback);
369 if (!WriteAsyncImpl(buffer, size_to_write, callback, error_callback, &error,
370 false)) {
371 error_callback.Run(error.get());
372 }
373 } else {
374 success_callback.Run();
375 }
376 }
377
FlushAsyncCallback(const base::Closure & success_callback,const ErrorCallback & error_callback)378 void Stream::FlushAsyncCallback(const base::Closure& success_callback,
379 const ErrorCallback& error_callback) {
380 ErrorPtr error;
381 if (FlushBlocking(&error)) {
382 success_callback.Run();
383 } else {
384 error_callback.Run(error.get());
385 }
386 }
387
CancelPendingAsyncOperations()388 void Stream::CancelPendingAsyncOperations() {
389 weak_ptr_factory_.InvalidateWeakPtrs();
390 is_async_read_pending_ = false;
391 is_async_write_pending_ = false;
392 }
393
394 } // namespace brillo
395