1 // Copyright 2015 The Chromium OS Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef LIBBRILLO_BRILLO_STREAMS_STREAM_H_ 6 #define LIBBRILLO_BRILLO_STREAMS_STREAM_H_ 7 8 #include <cstdint> 9 #include <memory> 10 11 #include <base/callback.h> 12 #include <base/macros.h> 13 #include <base/memory/weak_ptr.h> 14 #include <base/time/time.h> 15 #include <brillo/brillo_export.h> 16 #include <brillo/errors/error.h> 17 18 namespace brillo { 19 20 // Stream is a base class that specific stream storage implementations must 21 // derive from to provide I/O facilities. 22 // The stream class provides general streaming I/O primitives to read, write and 23 // seek within a stream. It has methods for asynchronous (callback-based) as 24 // well as synchronous (both blocking and non-blocking) operations. 25 // The Stream class is abstract and cannot be created by itself. 26 // In order to construct a stream, you must use one of the derived classes' 27 // factory methods which return a stream smart pointer (StreamPtr): 28 // 29 // StreamPtr input_stream = FileStream::Open(path, AccessMode::READ); 30 // StreamPtr output_stream = MemoryStream::Create(); 31 // uint8_t buf[1000]; 32 // size_t read = 0; 33 // while (input_stream->ReadBlocking(buf, sizeof(buf), &read, nullptr)) { 34 // if (read == 0) break; 35 // output_stream->WriteAllBlocking(buf, read, nullptr); 36 // } 37 // 38 // NOTE ABOUT ASYNCHRONOUS OPERATIONS: Asynchronous I/O relies on a MessageLoop 39 // instance to be present on the current thread. Using Stream::ReadAsync(), 40 // Stream::WriteAsync() and similar will call MessageLoop::current() to access 41 // the current message loop and abort if there isn't one for the current thread. 42 // Also, only one outstanding asynchronous operation of particular kind (reading 43 // or writing) at a time is supported. Trying to call ReadAsync() while another 44 // asynchronous read operation is pending will fail with an error 45 // ("operation_not_supported"). 46 // 47 // NOTE ABOUT READING FROM/WRITING TO STREAMS: In many cases underlying streams 48 // use buffered I/O. Using all read/write methods other than ReadAllAsync(), 49 // ReadAllBlocking(), WriteAllAsync(), WriteAllBlocking() will return 50 // immediately if there is any data available in the underlying buffer. That is, 51 // trying to read 1000 bytes while the internal buffer contains only 100 will 52 // return immediately with just those 100 bytes and no blocking or other I/O 53 // traffic will be incurred. This guarantee is important for efficient and 54 // correct implementation of duplex communication over pipes and sockets. 55 // 56 // NOTE TO IMPLEMENTERS: When creating new stream types, you must derive 57 // from this class and provide the implementation for its pure virtual methods. 58 // For operations that do not apply to your stream, make sure the corresponding 59 // methods return "false" and set the error to "operation_not_supported". 60 // You should use stream_utils::ErrorOperationNotSupported() for this. Also 61 // Make sure the stream capabilities functions like CanRead(), etc return 62 // correct values: 63 // 64 // bool MyReadOnlyStream::CanRead() const { return true; } 65 // bool MyReadOnlyStream::CanWrite() const { return false; } 66 // bool MyReadOnlyStream::WriteBlocking(const void* buffer, 67 // size_t size_to_write, 68 // size_t* size_written, 69 // ErrorPtr* error) { 70 // return stream_utils::ErrorOperationNotSupported(error); 71 // } 72 // 73 // The class should also provide a static factory methods to create/open 74 // a new stream: 75 // 76 // static StreamPtr MyReadOnlyStream::Open(..., ErrorPtr* error) { 77 // auto my_stream = std::make_unique<MyReadOnlyStream>(...); 78 // if (!my_stream->Initialize(..., error)) 79 // my_stream.reset(); 80 // } 81 // return my_stream; 82 // } 83 // 84 class BRILLO_EXPORT Stream { 85 public: 86 // When seeking in streams, whence specifies the origin of the seek operation. 87 enum class Whence { FROM_BEGIN, FROM_CURRENT, FROM_END }; 88 // Stream access mode for open operations (used in derived classes). 89 enum class AccessMode { READ, WRITE, READ_WRITE }; 90 91 // Standard error callback for asynchronous operations. 92 using ErrorCallback = base::Callback<void(const Error*)>; 93 94 virtual ~Stream() = default; 95 96 // == Stream capabilities =================================================== 97 98 // Returns true while stream is open. Closing the last reference to the stream 99 // will make this method return false. 100 virtual bool IsOpen() const = 0; 101 102 // Called to determine if read operations are supported on the stream (stream 103 // is readable). This method does not check if there is actually any data to 104 // read, only the fact that the stream is open in read mode and can be read 105 // from in general. 106 // If CanRead() returns false, it is guaranteed that the stream can't be 107 // read from. However, if it returns true, there is no guarantee that the 108 // subsequent read operation will actually succeed (for example, the stream 109 // position could be at the end of the data stream, or the access mode of 110 // the stream is unknown beforehand). 111 virtual bool CanRead() const = 0; 112 113 // Called to determine if write operations are supported on the stream (stream 114 // is writable). 115 // If CanWrite() returns false, it is guaranteed that the stream can't be 116 // written to. However, if it returns true, the subsequent write operation 117 // is not guaranteed to succeed (e.g. the output media could be out of free 118 // space or a transport error could occur). 119 virtual bool CanWrite() const = 0; 120 121 // Called to determine if random access I/O operations are supported on 122 // the stream. Sequential streams should return false. 123 // If CanSeek() returns false, it is guaranteed that the stream can't use 124 // Seek(). However, if it returns true, it might be possible to seek, but this 125 // is not guaranteed since the actual underlying stream capabilities might 126 // not be known. 127 // Note that non-seekable streams might still maintain the current stream 128 // position and GetPosition method might still be used even if CanSeek() 129 // returns false. However SetPosition() will almost always fail in such 130 // a case. 131 virtual bool CanSeek() const = 0; 132 133 // Called to determine if the size of the stream is known. Size of some 134 // sequential streams (e.g. based on pipes) is unknown beforehand, so this 135 // method can be used to check how reliable a call to GetSize() is. 136 virtual bool CanGetSize() const = 0; 137 138 // == Stream size operations ================================================ 139 140 // Returns the size of stream data. 141 // If the stream size is unavailable/unknown, it returns 0. 142 virtual uint64_t GetSize() const = 0; 143 144 // Resizes the stream storage to |size|. Stream must be writable and support 145 // this operation. 146 virtual bool SetSizeBlocking(uint64_t size, ErrorPtr* error) = 0; 147 148 // Truncates the stream at the current stream pointer. 149 // Calls SetSizeBlocking(GetPosition(), ...). 150 bool TruncateBlocking(ErrorPtr* error); 151 152 // Returns the amount of data remaining in the stream. If the size of the 153 // stream is unknown, or if the stream pointer is at or past the end of the 154 // stream, the function returns 0. 155 virtual uint64_t GetRemainingSize() const = 0; 156 157 // == Seek operations ======================================================= 158 159 // Gets the position of the stream I/O pointer from the beginning of the 160 // stream. If the stream position is unavailable/unknown, it returns 0. 161 virtual uint64_t GetPosition() const = 0; 162 163 // Moves the stream pointer to the specified position, relative to the 164 // beginning of the stream. This calls Seek(position, Whence::FROM_BEGIN), 165 // however it also provides proper |position| validation to ensure that 166 // it doesn't overflow the range of signed int64_t used by Seek. 167 bool SetPosition(uint64_t position, ErrorPtr* error); 168 169 // Moves the stream pointer by |offset| bytes relative to |whence|. 170 // When successful, returns true and sets the new pointer position from the 171 // beginning of the stream to |new_position|. If |new_position| is nullptr, 172 // new stream position is not returned. 173 // On error, returns false and specifies additional details in |error| if it 174 // is not nullptr. 175 virtual bool Seek(int64_t offset, 176 Whence whence, 177 uint64_t* new_position, 178 ErrorPtr* error) = 0; 179 180 // == Read operations ======================================================= 181 182 // -- Asynchronous ---------------------------------------------------------- 183 184 // Reads up to |size_to_read| bytes from the stream asynchronously. It is not 185 // guaranteed that all requested data will be read. It is not an error for 186 // this function to read fewer bytes than requested. If the function reads 187 // zero bytes, it means that the end of stream is reached. 188 // Upon successful read, the |success_callback| will be invoked with the 189 // actual number of bytes read. 190 // If an error occurs during the asynchronous operation, the |error_callback| 191 // is invoked with the error details. The error object pointer passed in as a 192 // parameter to the |error_callback| is valid only for the duration of that 193 // callback. 194 // If this function successfully schedules an asynchronous operation, it 195 // returns true. If it fails immediately, it will return false and set the 196 // error details to |error| object and will not call the success or error 197 // callbacks. 198 // The |buffer| must be at least |size_to_read| in size and must remain 199 // valid for the duration of the asynchronous operation (until either 200 // |success_callback| or |error_callback| is called). 201 // Only one asynchronous operation at a time is allowed on the stream (read 202 // and/or write) 203 // Uses ReadNonBlocking() and MonitorDataAvailable(). 204 virtual bool ReadAsync(void* buffer, 205 size_t size_to_read, 206 const base::Callback<void(size_t)>& success_callback, 207 const ErrorCallback& error_callback, 208 ErrorPtr* error); 209 210 // Similar to ReadAsync() operation above but reads exactly |size_to_read| 211 // bytes from the stream into the |buffer|. Attempt to read past the end of 212 // the stream is considered an error in this case and will trigger the 213 // |error_callback|. The rest of restrictions and conditions of ReadAsync() 214 // method applies to ReadAllAsync() as well. 215 // Uses ReadNonBlocking() and MonitorDataAvailable(). 216 virtual bool ReadAllAsync(void* buffer, 217 size_t size_to_read, 218 const base::Closure& success_callback, 219 const ErrorCallback& error_callback, 220 ErrorPtr* error); 221 222 // -- Synchronous non-blocking ---------------------------------------------- 223 224 // Reads up to |size_to_read| bytes from the stream without blocking. 225 // The |buffer| must be at least |size_to_read| in size. It is not an error 226 // for this function to return without reading all (or any) the data. 227 // The actual amount of data read (which could be 0 bytes) is returned in 228 // |size_read|. 229 // On error, the function returns false and specifies additional error details 230 // in |error|. 231 // If end of stream is reached or if no data is currently available to be read 232 // without blocking, |size_read| will contain 0 and the function will still 233 // return true (success). In case of end-of-stream scenario, |end_of_stream| 234 // will also be set to true to indicate that no more data is available. 235 virtual bool ReadNonBlocking(void* buffer, 236 size_t size_to_read, 237 size_t* size_read, 238 bool* end_of_stream, 239 ErrorPtr* error) = 0; 240 241 // -- Synchronous blocking -------------------------------------------------- 242 243 // Reads up to |size_to_read| bytes from the stream. This function will block 244 // until at least one byte is read or the end of stream is reached or until 245 // the stream is closed. 246 // The |buffer| must be at least |size_to_read| in size. It is not an error 247 // for this function to return without reading all the data. The actual amount 248 // of data read (which could be 0 bytes) is returned in |size_read|. 249 // On error, the function returns false and specifies additional error details 250 // in |error|. In this case, the state of the stream pointer is undefined, 251 // since some bytes might have been read successfully (and the pointer moved) 252 // before the error has occurred and |size_read| is not updated. 253 // If end of stream is reached, |size_read| will contain 0 and the function 254 // will still return true (success). 255 virtual bool ReadBlocking(void* buffer, 256 size_t size_to_read, 257 size_t* size_read, 258 ErrorPtr* error); 259 260 // Reads exactly |size_to_read| bytes to |buffer|. Returns false on error 261 // (reading fewer than requested bytes is treated as an error as well). 262 // Calls ReadAllBlocking() repeatedly until all the data is read. 263 virtual bool ReadAllBlocking(void* buffer, 264 size_t size_to_read, 265 ErrorPtr* error); 266 267 // == Write operations ====================================================== 268 269 // -- Asynchronous ---------------------------------------------------------- 270 271 // Writes up to |size_to_write| bytes from |buffer| to the stream 272 // asynchronously. It is not guaranteed that all requested data will be 273 // written. It is not an error for this function to write fewer bytes than 274 // requested. 275 // Upon successful write, the |success_callback| will be invoked with the 276 // actual number of bytes written. 277 // If an error occurs during the asynchronous operation, the |error_callback| 278 // is invoked with the error details. The error object pointer is valid only 279 // for the duration of the error callback. 280 // If this function successfully schedules an asynchronous operation, it 281 // returns true. If it fails immediately, it will return false and set the 282 // error details to |error| object and will not call the success or error 283 // callbacks. 284 // The |buffer| must be at least |size_to_write| in size and must remain 285 // valid for the duration of the asynchronous operation (until either 286 // |success_callback| or |error_callback| is called). 287 // Only one asynchronous operation at a time is allowed on the stream (read 288 // and/or write). 289 // Uses WriteNonBlocking() and MonitorDataAvailable(). 290 virtual bool WriteAsync(const void* buffer, 291 size_t size_to_write, 292 const base::Callback<void(size_t)>& success_callback, 293 const ErrorCallback& error_callback, 294 ErrorPtr* error); 295 296 // Similar to WriteAsync() operation above but writes exactly |size_to_write| 297 // bytes from |buffet| to the stream. When all the data is written 298 // successfully, the |success_callback| is invoked. 299 // The rest of restrictions and conditions of WriteAsync() method applies to 300 // WriteAllAsync() as well. 301 // Uses WriteNonBlocking() and MonitorDataAvailable(). 302 virtual bool WriteAllAsync(const void* buffer, 303 size_t size_to_write, 304 const base::Closure& success_callback, 305 const ErrorCallback& error_callback, 306 ErrorPtr* error); 307 308 // -- Synchronous non-blocking ---------------------------------------------- 309 310 // Writes up to |size_to_write| bytes to the stream. The |buffer| must be at 311 // least |size_to_write| in size. It is not an error for this function to 312 // return without writing all the data requested (or any data at all). 313 // The actual amount of data written is returned in |size_written|. 314 // On error, the function returns false and specifies additional error details 315 // in |error|. 316 virtual bool WriteNonBlocking(const void* buffer, 317 size_t size_to_write, 318 size_t* size_written, 319 ErrorPtr* error) = 0; 320 321 // -- Synchronous blocking -------------------------------------------------- 322 323 // Writes up to |size_to_write| bytes to the stream. The |buffer| must be at 324 // least |size_to_write| in size. It is not an error for this function to 325 // return without writing all the data requested. The actual amount of data 326 // written is returned in |size_written|. 327 // On error, the function returns false and specifies additional error details 328 // in |error|. 329 virtual bool WriteBlocking(const void* buffer, 330 size_t size_to_write, 331 size_t* size_written, 332 ErrorPtr* error); 333 334 // Writes exactly |size_to_write| bytes to |buffer|. Returns false on error 335 // (writing fewer than requested bytes is treated as an error as well). 336 // Calls WriteBlocking() repeatedly until all the data is written. 337 virtual bool WriteAllBlocking(const void* buffer, 338 size_t size_to_write, 339 ErrorPtr* error); 340 341 // == Finalizing/closing streams =========================================== 342 343 // Flushes all the user-space data from cache output buffers to storage 344 // medium. For read-only streams this is a no-op, however it is still valid 345 // to call this method on read-only streams. 346 // If an error occurs, the function returns false and specifies additional 347 // error details in |error|. 348 virtual bool FlushBlocking(ErrorPtr* error) = 0; 349 350 // Flushes all the user-space data from the cache output buffer 351 // asynchronously. When all the data is successfully flushed, the 352 // |success_callback| is invoked. If an error occurs while flushing, partial 353 // data might be flushed and |error_callback| is invoked. If there's an error 354 // scheduling the flush operation, it returns false and neither callback will 355 // be called. 356 virtual bool FlushAsync(const base::Closure& success_callback, 357 const ErrorCallback& error_callback, 358 ErrorPtr* error); 359 360 // Closes the underlying stream. The stream is also automatically closed 361 // when the stream object is destroyed, but since closing a stream is 362 // an operation that may fail, in situations when it is important to detect 363 // the failure to close the stream, CloseBlocking() should be used explicitly 364 // before destroying the stream object. 365 virtual bool CloseBlocking(ErrorPtr* error) = 0; 366 367 // == Data availability monitoring ========================================== 368 369 // Overloaded by derived classes to provide stream monitoring for read/write 370 // data availability for the stream. Calls |callback| when data can be read 371 // and/or written without blocking. 372 // |mode| specifies the type of operation to monitor for (read, write, both). 373 virtual bool WaitForData(AccessMode mode, 374 const base::Callback<void(AccessMode)>& callback, 375 ErrorPtr* error) = 0; 376 377 // Helper function for implementing blocking I/O. Blocks until the 378 // non-blocking operation specified by |in_mode| can be performed. 379 // If |out_mode| is not nullptr, it receives the actual operation that can be 380 // performed. For example, watching a stream for READ_WRITE while only 381 // READ can be performed, |out_mode| would contain READ even though |in_mode| 382 // was set to READ_WRITE. 383 // |timeout| is the maximum amount of time to wait. Set it to TimeDelta::Max() 384 // to wait indefinitely. 385 virtual bool WaitForDataBlocking(AccessMode in_mode, 386 base::TimeDelta timeout, 387 AccessMode* out_mode, 388 ErrorPtr* error) = 0; 389 390 // Cancels pending asynchronous read/write operations. 391 virtual void CancelPendingAsyncOperations(); 392 393 protected: 394 Stream() = default; 395 396 private: 397 // Simple wrapper to call the externally exposed |success_callback| that only 398 // receives a size_t. 399 BRILLO_PRIVATE static void IgnoreEOSCallback( 400 const base::Callback<void(size_t)>& success_callback, 401 size_t read, 402 bool eos); 403 404 // The internal implementation of ReadAsync() and ReadAllAsync(). 405 // Calls ReadNonBlocking and if there's no data available waits for it calling 406 // WaitForData(). The extra |force_async_callback| tell whether the success 407 // callback should be called from the main loop instead of directly from this 408 // method. This method only calls WaitForData() if ReadNonBlocking() returns a 409 // situation in which it would block (bytes_read = 0 and eos = false), 410 // preventing us from calling WaitForData() on streams that don't support such 411 // feature. 412 BRILLO_PRIVATE bool ReadAsyncImpl( 413 void* buffer, 414 size_t size_to_read, 415 const base::Callback<void(size_t, bool)>& success_callback, 416 const ErrorCallback& error_callback, 417 ErrorPtr* error, 418 bool force_async_callback); 419 420 // Called from the main loop when the ReadAsyncImpl finished right away 421 // without waiting for data. We use this callback to call the 422 // |sucess_callback| but invalidate the callback if the Stream is destroyed 423 // while this call is waiting in the main loop. 424 BRILLO_PRIVATE void OnReadAsyncDone( 425 const base::Callback<void(size_t, bool)>& success_callback, 426 size_t bytes_read, 427 bool eos); 428 429 // Called from WaitForData() when read operations can be performed 430 // without blocking (the type of operation is provided in |mode|). 431 BRILLO_PRIVATE void OnReadAvailable( 432 void* buffer, 433 size_t size_to_read, 434 const base::Callback<void(size_t, bool)>& success_callback, 435 const ErrorCallback& error_callback, 436 AccessMode mode); 437 438 // The internal implementation of WriteAsync() and WriteAllAsync(). 439 // Calls WriteNonBlocking and if the write would block for it to not block 440 // calling WaitForData(). The extra |force_async_callback| tell whether the 441 // success callback should be called from the main loop instead of directly 442 // from this method. This method only calls WaitForData() if 443 // WriteNonBlocking() returns a situation in which it would block 444 // (size_written = 0 and eos = false), preventing us from calling 445 // WaitForData() on streams that don't support such feature. 446 BRILLO_PRIVATE bool WriteAsyncImpl( 447 const void* buffer, 448 size_t size_to_write, 449 const base::Callback<void(size_t)>& success_callback, 450 const ErrorCallback& error_callback, 451 ErrorPtr* error, 452 bool force_async_callback); 453 454 // Called from the main loop when the WriteAsyncImpl finished right away 455 // without waiting for data. We use this callback to call the 456 // |sucess_callback| but invalidate the callback if the Stream is destroyed 457 // while this call is waiting in the main loop. 458 BRILLO_PRIVATE void OnWriteAsyncDone( 459 const base::Callback<void(size_t)>& success_callback, 460 size_t size_written); 461 462 // Called from WaitForData() when write operations can be performed 463 // without blocking (the type of operation is provided in |mode|). 464 BRILLO_PRIVATE void OnWriteAvailable( 465 const void* buffer, 466 size_t size, 467 const base::Callback<void(size_t)>& success_callback, 468 const ErrorCallback& error_callback, 469 AccessMode mode); 470 471 // Helper callbacks to implement ReadAllAsync/WriteAllAsync. 472 BRILLO_PRIVATE void ReadAllAsyncCallback( 473 void* buffer, 474 size_t size_to_read, 475 const base::Closure& success_callback, 476 const ErrorCallback& error_callback, 477 size_t size_read, 478 bool eos); 479 BRILLO_PRIVATE void WriteAllAsyncCallback( 480 const void* buffer, 481 size_t size_to_write, 482 const base::Closure& success_callback, 483 const ErrorCallback& error_callback, 484 size_t size_written); 485 486 // Helper callbacks to implement FlushAsync(). 487 BRILLO_PRIVATE void FlushAsyncCallback( 488 const base::Closure& success_callback, 489 const ErrorCallback& error_callback); 490 491 // Data members for asynchronous read operations. 492 bool is_async_read_pending_{false}; 493 494 // Data members for asynchronous write operations. 495 bool is_async_write_pending_{false}; 496 497 base::WeakPtrFactory<Stream> weak_ptr_factory_{this}; 498 DISALLOW_COPY_AND_ASSIGN(Stream); 499 }; 500 501 // A smart pointer to the stream used to pass the stream object around. 502 using StreamPtr = std::unique_ptr<Stream>; 503 504 } // namespace brillo 505 506 #endif // LIBBRILLO_BRILLO_STREAMS_STREAM_H_ 507