1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/lib/io/snappy/snappy_outputbuffer.h"
17
18 namespace tensorflow {
19 namespace io {
20
SnappyOutputBuffer(WritableFile * file,int32 input_buffer_bytes,int32 output_buffer_bytes)21 SnappyOutputBuffer::SnappyOutputBuffer(WritableFile* file,
22 int32 input_buffer_bytes,
23 int32 output_buffer_bytes)
24 : file_(file),
25 input_buffer_(new char[input_buffer_bytes]),
26 input_buffer_capacity_(input_buffer_bytes),
27 next_in_(input_buffer_.get()),
28 output_buffer_(new char[output_buffer_bytes]),
29 output_buffer_capacity_(output_buffer_bytes),
30 next_out_(output_buffer_.get()),
31 avail_out_(output_buffer_bytes) {}
32
~SnappyOutputBuffer()33 SnappyOutputBuffer::~SnappyOutputBuffer() {
34 size_t bytes_to_write = output_buffer_capacity_ - avail_out_;
35 if (bytes_to_write > 0) {
36 LOG(WARNING) << "There is still data in the output buffer. "
37 << "Possible data loss has occurred.";
38 }
39 }
40
Append(StringPiece data)41 Status SnappyOutputBuffer::Append(StringPiece data) { return Write(data); }
42
43 #if defined(TF_CORD_SUPPORT)
Append(const absl::Cord & cord)44 Status SnappyOutputBuffer::Append(const absl::Cord& cord) {
45 for (absl::string_view fragment : cord.Chunks()) {
46 TF_RETURN_IF_ERROR(Append(fragment));
47 }
48 return Status::OK();
49 }
50 #endif
51
Close()52 Status SnappyOutputBuffer::Close() {
53 // Given that we do not own `file`, we don't close it.
54 return Flush();
55 }
56
Name(StringPiece * result) const57 Status SnappyOutputBuffer::Name(StringPiece* result) const {
58 return file_->Name(result);
59 }
60
Sync()61 Status SnappyOutputBuffer::Sync() {
62 TF_RETURN_IF_ERROR(Flush());
63 return file_->Sync();
64 }
65
Tell(int64 * position)66 Status SnappyOutputBuffer::Tell(int64* position) {
67 return file_->Tell(position);
68 }
69
Write(StringPiece data)70 Status SnappyOutputBuffer::Write(StringPiece data) {
71 //
72 // The deflated output is accumulated in output_buffer_ and gets written to
73 // file as and when needed.
74
75 size_t bytes_to_write = data.size();
76
77 // If there is sufficient free space in input_buffer_ to fit data we
78 // add it there and return.
79 if (static_cast<int32>(bytes_to_write) <= AvailableInputSpace()) {
80 AddToInputBuffer(data);
81 return Status::OK();
82 }
83
84 // If there isn't enough available space in the input_buffer_ we empty it
85 // by uncompressing its contents. If data now fits in input_buffer_
86 // we add it there else we directly deflate it.
87 TF_RETURN_IF_ERROR(DeflateBuffered());
88
89 // input_buffer_ should be empty at this point.
90 if (static_cast<int32>(bytes_to_write) <= AvailableInputSpace()) {
91 AddToInputBuffer(data);
92 return Status::OK();
93 }
94
95 // `data` is too large to fit in input buffer so we deflate it directly.
96 // Note that at this point we have already deflated all existing input so
97 // we do not need to backup next_in and avail_in.
98 next_in_ = const_cast<char*>(data.data());
99 avail_in_ = bytes_to_write;
100
101 TF_RETURN_IF_ERROR(Deflate());
102
103 DCHECK_EQ(avail_in_, 0); // All input will be used up.
104
105 next_in_ = input_buffer_.get();
106
107 return Status::OK();
108 }
109
Flush()110 Status SnappyOutputBuffer::Flush() {
111 TF_RETURN_IF_ERROR(DeflateBuffered());
112 TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
113 return Status::OK();
114 }
115
AvailableInputSpace() const116 int32 SnappyOutputBuffer::AvailableInputSpace() const {
117 return input_buffer_capacity_ - avail_in_;
118 }
119
AddToInputBuffer(StringPiece data)120 void SnappyOutputBuffer::AddToInputBuffer(StringPiece data) {
121 size_t bytes_to_write = data.size();
122 DCHECK_LE(bytes_to_write, AvailableInputSpace());
123
124 // Input stream ->
125 // [....................input_buffer_capacity_...............]
126 // [<...read_bytes...><...avail_in...>......empty space......]
127 // ^ ^
128 // | |
129 // input_buffer_ next_in
130 //
131 // Data in the input stream is sharded as shown above. next_in_ could
132 // be pointing to some byte in the buffer with avail_in number of bytes
133 // available to be read.
134 //
135 // In order to avoid shifting the avail_in bytes at next_in to the head of
136 // the buffer we try to fit `data` in the empty space at the tail of the
137 // input stream.
138 // TODO(srbs): This could be avoided if we had a circular buffer.
139 // If it doesn't fit we free the space at the head of the stream and then
140 // append `data` at the end of existing data.
141
142 const int32 read_bytes = next_in_ - input_buffer_.get();
143 const int32 unread_bytes = avail_in_;
144 const int32 free_tail_bytes =
145 input_buffer_capacity_ - (read_bytes + unread_bytes);
146
147 if (static_cast<int32>(bytes_to_write) > free_tail_bytes) {
148 memmove(input_buffer_.get(), next_in_, avail_in_);
149 next_in_ = input_buffer_.get();
150 }
151 memcpy(next_in_ + avail_in_, data.data(), bytes_to_write);
152 avail_in_ += bytes_to_write;
153 }
154
AddToOutputBuffer(const char * data,size_t length)155 Status SnappyOutputBuffer::AddToOutputBuffer(const char* data, size_t length) {
156 while (length > 0) {
157 size_t bytes_to_copy = std::min(length, avail_out_);
158 memcpy(next_out_, data, bytes_to_copy);
159 data += bytes_to_copy;
160 next_out_ += bytes_to_copy;
161 avail_out_ -= bytes_to_copy;
162 length -= bytes_to_copy;
163 if (avail_out_ == 0) {
164 TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
165 }
166 }
167 return Status::OK();
168 }
169
DeflateBuffered()170 Status SnappyOutputBuffer::DeflateBuffered() {
171 TF_RETURN_IF_ERROR(Deflate());
172 DCHECK_EQ(avail_in_, 0);
173 next_in_ = input_buffer_.get();
174 return Status::OK();
175 }
176
FlushOutputBufferToFile()177 Status SnappyOutputBuffer::FlushOutputBufferToFile() {
178 size_t bytes_to_write = output_buffer_capacity_ - avail_out_;
179 if (bytes_to_write > 0) {
180 Status s = file_->Append(StringPiece(
181 reinterpret_cast<char*>(output_buffer_.get()), bytes_to_write));
182 if (s.ok()) {
183 next_out_ = output_buffer_.get();
184 avail_out_ = output_buffer_capacity_;
185 }
186 return s;
187 }
188 return Status::OK();
189 }
190
Deflate()191 Status SnappyOutputBuffer::Deflate() {
192 if (avail_in_ == 0) {
193 return Status::OK();
194 }
195 string output;
196 if (!port::Snappy_Compress(next_in_, avail_in_, &output)) {
197 return errors::DataLoss("Snappy_Compress failed");
198 }
199
200 // Write length of compressed block to output buffer.
201 char compressed_length_array[4];
202 std::fill(compressed_length_array, compressed_length_array + 4, 0);
203 for (int i = 0; i < 4; i++) {
204 // Little endian.
205 compressed_length_array[i] = output.size() >> (8 * (3 - i));
206 }
207 TF_RETURN_IF_ERROR(AddToOutputBuffer(compressed_length_array, 4));
208
209 // Write compressed output to buffer.
210 TF_RETURN_IF_ERROR(AddToOutputBuffer(output.data(), output.size()));
211 next_in_ += avail_in_;
212 avail_in_ = 0;
213
214 return Status::OK();
215 }
216
217 } // namespace io
218 } // namespace tensorflow
219