1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/lib/io/snappy/snappy_inputbuffer.h"
17
18 namespace tensorflow {
19 namespace io {
SnappyInputBuffer(RandomAccessFile * file,size_t input_buffer_bytes,size_t output_buffer_bytes)20 SnappyInputBuffer::SnappyInputBuffer(
21 RandomAccessFile* file,
22 size_t input_buffer_bytes, // size of input_buffer_
23 size_t output_buffer_bytes // size of output_buffer_
24 )
25 : file_(file),
26 input_buffer_capacity_(input_buffer_bytes),
27 output_buffer_capacity_(output_buffer_bytes),
28 input_buffer_(new char[input_buffer_capacity_]),
29 output_buffer_(new char[output_buffer_capacity_]),
30 next_in_(input_buffer_.get()),
31 bytes_read_(0) {}
32
ReadNBytes(int64 bytes_to_read,tstring * result)33 Status SnappyInputBuffer::ReadNBytes(int64 bytes_to_read, tstring* result) {
34 result->clear();
35 result->resize_uninitialized(bytes_to_read);
36
37 char* result_ptr = result->mdata();
38
39 // Read as many bytes as possible from cache.
40 size_t bytes_read = ReadBytesFromCache(bytes_to_read, result_ptr);
41 bytes_to_read -= bytes_read;
42 result_ptr += bytes_read;
43
44 while (bytes_to_read > 0) {
45 // At this point we can be sure that cache has been emptied.
46 DCHECK_EQ(avail_out_, 0);
47
48 // Now that the cache is empty we need to inflate more data.
49 TF_RETURN_IF_ERROR(Inflate());
50
51 bytes_read = ReadBytesFromCache(bytes_to_read, result_ptr);
52 bytes_to_read -= bytes_read;
53 result_ptr += bytes_read;
54 }
55
56 return Status::OK();
57 }
58
Tell() const59 int64 SnappyInputBuffer::Tell() const { return bytes_read_; }
60
Reset()61 Status SnappyInputBuffer::Reset() {
62 file_pos_ = 0;
63 avail_in_ = 0;
64 avail_out_ = 0;
65 next_in_ = input_buffer_.get();
66 bytes_read_ = 0;
67 return Status::OK();
68 }
69
ReadBytesFromCache(size_t bytes_to_read,char * result_ptr)70 size_t SnappyInputBuffer::ReadBytesFromCache(size_t bytes_to_read,
71 char* result_ptr) {
72 size_t can_read_bytes = std::min(bytes_to_read, avail_out_);
73 if (can_read_bytes > 0) {
74 memcpy(result_ptr, next_out_, can_read_bytes);
75 next_out_ += can_read_bytes;
76 avail_out_ -= can_read_bytes;
77 }
78 bytes_read_ += can_read_bytes;
79 return can_read_bytes;
80 }
81
Inflate()82 Status SnappyInputBuffer::Inflate() {
83 // Read length of compressed block.
84 uint32 compressed_block_length;
85 TF_RETURN_IF_ERROR(ReadCompressedBlockLength(&compressed_block_length));
86
87 // If the entire block is not in cache do a read from file.
88 if (avail_in_ < compressed_block_length) {
89 TF_RETURN_IF_ERROR(ReadFromFile());
90 if (avail_in_ < compressed_block_length) {
91 if (compressed_block_length > input_buffer_capacity_) {
92 return errors::ResourceExhausted(
93 "Input buffer(size: ", input_buffer_capacity_,
94 " bytes) too small. Should be larger ", "than ",
95 compressed_block_length, " bytes.");
96 } else {
97 return errors::DataLoss(
98 strings::StrCat("Failed to read ", compressed_block_length,
99 " bytes from file. Possible data corruption."));
100 }
101 }
102 }
103
104 size_t uncompressed_length;
105 if (!port::Snappy_GetUncompressedLength(next_in_, compressed_block_length,
106 &uncompressed_length)) {
107 return errors::DataLoss("Parsing error in Snappy_GetUncompressedLength");
108 }
109
110 // Output buffer must have been cleared before uncompressing more input.
111 DCHECK_EQ(avail_out_, 0);
112
113 // Output buffer must be large enough to fit the uncompressed block.
114 DCHECK_GE(output_buffer_capacity_, uncompressed_length);
115 next_out_ = output_buffer_.get();
116
117 bool status = port::Snappy_Uncompress(next_in_, compressed_block_length,
118 output_buffer_.get());
119 if (!status) {
120 return errors::DataLoss("Snappy_Uncompress failed");
121 }
122 next_in_ += compressed_block_length;
123 avail_in_ -= compressed_block_length;
124 avail_out_ += uncompressed_length;
125 return Status::OK();
126 }
127
ReadCompressedBlockLength(uint32 * length)128 Status SnappyInputBuffer::ReadCompressedBlockLength(uint32* length) {
129 *length = 0;
130 size_t bytes_to_read = 4;
131 while (bytes_to_read > 0) {
132 if (avail_in_ == 0) {
133 TF_RETURN_IF_ERROR(ReadFromFile());
134 }
135 size_t readable = std::min(bytes_to_read, avail_in_);
136
137 for (size_t i = 0; i < readable; i++) {
138 // The "unsigned char" type cast is intentional to avoid implicit type
139 // casting of the signed char to unsigned int during bitwise OR which
140 // causes weird overflow errors.
141 *length = (*length << 8) | static_cast<unsigned char>(next_in_[0]);
142 bytes_to_read--;
143 next_in_++;
144 avail_in_--;
145 }
146 }
147 return Status::OK();
148 }
149
ReadFromFile()150 Status SnappyInputBuffer::ReadFromFile() {
151 int bytes_to_read = input_buffer_capacity_;
152 char* read_location = reinterpret_cast<char*>(input_buffer_.get());
153
154 // If there are unread bytes in the input stream we move them to the head
155 // of the stream to maximize the space available to read new data into.
156 // TODO(srbs): A circular buffer would be useful here.
157 if (avail_in_ > 0) {
158 size_t read_bytes = next_in_ - input_buffer_.get();
159 // Remove `read_bytes` from the head of the input stream.
160 // Move unread bytes to the head of the input stream.
161 if (read_bytes > 0) {
162 memmove(input_buffer_.get(), next_in_, avail_in_);
163 }
164
165 bytes_to_read -= avail_in_;
166 read_location += avail_in_;
167 }
168 StringPiece data;
169 // Try to read enough data to fill up input_buffer_.
170 Status s = file_->Read(file_pos_, bytes_to_read, &data, read_location);
171 if (data.data() != read_location) {
172 memmove(read_location, data.data(), data.size());
173 }
174
175 // Since we moved unread data to the head of the input stream we can point
176 // next_in to the head of the input stream.
177 next_in_ = input_buffer_.get();
178
179 // Note: data.size() could be different from bytes_to_read.
180 avail_in_ += data.size();
181 file_pos_ += data.size();
182
183 if (!s.ok() && !errors::IsOutOfRange(s)) {
184 return s;
185 }
186
187 // We throw OutOfRange error iff no new data has been read from file.
188 // Since we never check how much data is remaining in the file, it is
189 // possible that on the last read there isn't enough data in the file to
190 // fill up the buffer in which case file_->ReadNBytes would return an
191 // OutOfRange error.
192 if (data.empty()) {
193 return errors::OutOfRange("EOF reached");
194 }
195 if (errors::IsOutOfRange(s)) {
196 return Status::OK();
197 }
198
199 return s;
200 }
201
202 } // namespace io
203 } // namespace tensorflow
204