1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_LIB_IO_SNAPPY_SNAPPY_OUTPUTBUFFER_H_ 17 #define TENSORFLOW_CORE_LIB_IO_SNAPPY_SNAPPY_OUTPUTBUFFER_H_ 18 19 #include <string> 20 21 #include "tensorflow/core/lib/core/status.h" 22 #include "tensorflow/core/platform/env.h" 23 #include "tensorflow/core/platform/macros.h" 24 #include "tensorflow/core/platform/platform.h" 25 #include "tensorflow/core/platform/snappy.h" 26 #include "tensorflow/core/platform/types.h" 27 28 namespace tensorflow { 29 namespace io { 30 31 // Compresses input data using Snappy (https://github.com/google/snappy) and 32 // writes to `file`. 33 // 34 // The input data is cached in a buffer of size `input_buffer_bytes`. When the 35 // buffer does not have enough available space to fit new data (in the call to 36 // `Write`), the contents of the buffer are compressed and sent to the output 37 // buffer. 38 // 39 // The compressed output is buffered in a buffer of size `output_buffer_bytes` 40 // which gets flushed to file when full. 41 // 42 // Output file format: 43 // The output file consists of a sequence of compressed blocks. Each block 44 // starts with a 4 byte header which stores the length (in bytes) of the 45 // _compressed_ block _excluding_ this header. The compressed 46 // block (excluding the 4 byte header) is a valid snappy block and can directly 47 // be uncompressed using Snappy_Uncompress. 48 class SnappyOutputBuffer : public WritableFile { 49 public: 50 // Create an SnappyOutputBuffer for `file` with two buffers that cache the 51 // 1. input data to be deflated 52 // 2. the deflated output 53 // with sizes `input_buffer_bytes` and `output_buffer_bytes` respectively. 54 // Does not take ownership of `file`. 55 SnappyOutputBuffer(WritableFile* file, int32 input_buffer_bytes, 56 int32 output_buffer_bytes); 57 58 // Per convention, the dtor does not call Flush() or Close(). We expect the 59 // caller to call those manually when done. 60 ~SnappyOutputBuffer() override; 61 62 // Adds `data` to the compression pipeline. 63 // 64 // The input data is buffered internally and will be written to disk at a 65 // later time. To immediately write contents to file call `Flush()`. 66 Status Append(StringPiece data) override; 67 68 #if defined(TF_CORD_SUPPORT) 69 Status Append(const absl::Cord& cord) override; 70 #endif 71 72 // Compresses any buffered input and writes all output to file. This must be 73 // called before the destructor to avoid any data loss. 74 // 75 // Contrary to `Flush()` this informs snappy that it should not expect any 76 // further input. 77 // 78 // After calling this, any further calls to `Write()`, `Flush()` or `Close()` 79 // will fail. 80 Status Close() override; 81 82 // Returns the name of the underlying file. 83 Status Name(StringPiece* result) const override; 84 85 // Deflates any cached input, writes all output to file and syncs it. 86 Status Sync() override; 87 88 // Returns the write position in the underlying file. The position does not 89 // reflect buffered, un-flushed data. 90 Status Tell(int64* position) override; 91 92 // Adds `data` to the compression pipeline. 93 // 94 // The input data is buffered in `input_buffer_` and is compressed in bulk 95 // when the buffer gets full. The compressed output may not be immediately 96 // written to file but rather buffered in `output_buffer_` and gets written 97 // to file when the buffer is full. 98 // 99 // To immediately write contents to file call `Flush()`. 100 Status Write(StringPiece data); 101 102 // Compresses any cached input and writes all output to file. This must be 103 // called before the destructor to avoid any data loss. 104 Status Flush(); 105 106 private: 107 // Appends `data` to `input_buffer_`. 108 // Throws if `data.size()` > AvailableInputSpace(). 109 void AddToInputBuffer(StringPiece data); 110 111 // Appends `data` to `output_buffer_`. Flushes buffer contents to file when 112 // buffer gets full. 113 Status AddToOutputBuffer(const char* data, size_t length); 114 115 // Returns the total space available in `input_buffer_`. 116 int32 AvailableInputSpace() const; 117 118 // Deflate contents in input_buffer_ and store results in output_buffer_. 119 // The contents of output stream are written to file if more space is needed. 120 // 121 // Note: This method does not flush contents to file. 122 // Returns non-ok status if writing contents to file fails. 123 Status DeflateBuffered(); 124 125 // Appends contents of `output_buffer_` to `file_`. 126 // Returns non-OK status if writing to file fails. 127 Status FlushOutputBufferToFile(); 128 129 // Compresses `avail_in_` bytes at `next_in_` location in `input_buffer_` and 130 // writes the results to output using `AddToOutputBuffer`. 131 // Returns non-OK status if writing to file failed. 132 Status Deflate(); 133 134 WritableFile* file_; // Not owned 135 136 // Buffer for storing contents read from input `file_`. 137 // TODO(srbs): Consider using circular buffers. That would greatly simplify 138 // the implementation. 139 std::unique_ptr<char[]> input_buffer_; 140 size_t input_buffer_capacity_; 141 char* next_in_; 142 size_t avail_in_ = 0; 143 144 // Buffer for storing deflated contents of `file_`. 145 std::unique_ptr<char[]> output_buffer_; 146 size_t output_buffer_capacity_; 147 char* next_out_; 148 size_t avail_out_; 149 150 TF_DISALLOW_COPY_AND_ASSIGN(SnappyOutputBuffer); 151 }; 152 153 } // namespace io 154 } // namespace tensorflow 155 156 #endif // TENSORFLOW_CORE_LIB_IO_SNAPPY_SNAPPY_OUTPUTBUFFER_H_ 157