1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include <zlib.h>
17
18 #include "tensorflow/core/lib/io/zlib_inputstream.h"
19
20 #include "tensorflow/core/lib/strings/strcat.h"
21 #include "tensorflow/core/platform/logging.h"
22
23 namespace tensorflow {
24 namespace io {
25
26 struct ZStreamDef {
ZStreamDeftensorflow::io::ZStreamDef27 ZStreamDef(size_t input_buffer_capacity, size_t output_buffer_capacity)
28 : input(new Bytef[input_buffer_capacity]),
29 output(new Bytef[output_buffer_capacity]),
30 stream(new z_stream) {}
31
32 // Buffer for storing contents read from compressed stream.
33 // TODO(srbs): Consider using circular buffers. That would greatly simplify
34 // the implementation.
35 std::unique_ptr<Bytef[]> input;
36
37 // Buffer for storing inflated contents of `input_stream_`.
38 std::unique_ptr<Bytef[]> output;
39
40 // Configuration passed to `inflate`.
41 //
42 // z_stream_def_->stream->next_in:
43 // Next byte to de-compress. Points to some byte in
44 // z_stream_def_->streamdef_.input buffer.
45 // z_stream_def_->stream->avail_in:
46 // Number of bytes available to be decompressed at this time.
47 // z_stream_def_->stream->next_out:
48 // Next byte to write de-compressed data to. Points to some byte in
49 // z_stream_def_->streamdef_.output buffer.
50 // z_stream_def_->stream->avail_out:
51 // Number of free bytes available at write location.
52 std::unique_ptr<z_stream> stream;
53 };
54
ZlibInputStream(InputStreamInterface * input_stream,size_t input_buffer_bytes,size_t output_buffer_bytes,const ZlibCompressionOptions & zlib_options,bool owns_input_stream)55 ZlibInputStream::ZlibInputStream(
56 InputStreamInterface* input_stream,
57 size_t input_buffer_bytes, // size of z_stream.next_in buffer
58 size_t output_buffer_bytes, // size of z_stream.next_out buffer
59 const ZlibCompressionOptions& zlib_options, bool owns_input_stream)
60 : owns_input_stream_(owns_input_stream),
61 input_stream_(input_stream),
62 input_buffer_capacity_(input_buffer_bytes),
63 output_buffer_capacity_(output_buffer_bytes),
64 zlib_options_(zlib_options),
65 z_stream_def_(
66 new ZStreamDef(input_buffer_capacity_, output_buffer_capacity_)),
67 bytes_read_(0) {
68 InitZlibBuffer();
69 }
70
ZlibInputStream(InputStreamInterface * input_stream,size_t input_buffer_bytes,size_t output_buffer_bytes,const ZlibCompressionOptions & zlib_options)71 ZlibInputStream::ZlibInputStream(InputStreamInterface* input_stream,
72 size_t input_buffer_bytes,
73 size_t output_buffer_bytes,
74 const ZlibCompressionOptions& zlib_options)
75 : ZlibInputStream(input_stream, input_buffer_bytes, output_buffer_bytes,
76 zlib_options, false) {}
77
~ZlibInputStream()78 ZlibInputStream::~ZlibInputStream() {
79 if (z_stream_def_->stream) {
80 inflateEnd(z_stream_def_->stream.get());
81 }
82 if (owns_input_stream_) {
83 delete input_stream_;
84 }
85 }
86
Reset()87 Status ZlibInputStream::Reset() {
88 TF_RETURN_IF_ERROR(input_stream_->Reset());
89 inflateEnd(z_stream_def_->stream.get());
90 InitZlibBuffer();
91 bytes_read_ = 0;
92 return Status::OK();
93 }
94
InitZlibBuffer()95 void ZlibInputStream::InitZlibBuffer() {
96 memset(z_stream_def_->stream.get(), 0, sizeof(z_stream));
97
98 z_stream_def_->stream->zalloc = Z_NULL;
99 z_stream_def_->stream->zfree = Z_NULL;
100 z_stream_def_->stream->opaque = Z_NULL;
101 z_stream_def_->stream->next_in = Z_NULL;
102 z_stream_def_->stream->avail_in = 0;
103
104 int status =
105 inflateInit2(z_stream_def_->stream.get(), zlib_options_.window_bits);
106
107 CHECK_EQ(status, Z_OK) << "inflateInit failed with status " << status;
108
109 z_stream_def_->stream->next_in = z_stream_def_->input.get();
110 z_stream_def_->stream->next_out = z_stream_def_->output.get();
111 next_unread_byte_ = reinterpret_cast<char*>(z_stream_def_->output.get());
112 z_stream_def_->stream->avail_in = 0;
113 z_stream_def_->stream->avail_out = output_buffer_capacity_;
114 }
115
ReadFromStream()116 Status ZlibInputStream::ReadFromStream() {
117 int bytes_to_read = input_buffer_capacity_;
118 char* read_location = reinterpret_cast<char*>(z_stream_def_->input.get());
119
120 // If there are unread bytes in the input stream we move them to the head
121 // of the stream to maximize the space available to read new data into.
122 if (z_stream_def_->stream->avail_in > 0) {
123 uLong read_bytes =
124 z_stream_def_->stream->next_in - z_stream_def_->input.get();
125 // Remove `read_bytes` from the head of the input stream.
126 // Move unread bytes to the head of the input stream.
127 if (read_bytes > 0) {
128 memmove(z_stream_def_->input.get(), z_stream_def_->stream->next_in,
129 z_stream_def_->stream->avail_in);
130 }
131
132 bytes_to_read -= z_stream_def_->stream->avail_in;
133 read_location += z_stream_def_->stream->avail_in;
134 }
135 string data;
136 // Try to read enough data to fill up z_stream_def_->input.
137 // TODO(rohanj): Add a char* version of ReadNBytes to InputStreamInterface
138 // and use that instead to make this more efficient.
139 Status s = input_stream_->ReadNBytes(bytes_to_read, &data);
140 memcpy(read_location, data.data(), data.size());
141
142 // Since we moved unread data to the head of the input stream we can point
143 // next_in to the head of the input stream.
144 z_stream_def_->stream->next_in = z_stream_def_->input.get();
145
146 // Note: data.size() could be different from bytes_to_read.
147 z_stream_def_->stream->avail_in += data.size();
148
149 if (!s.ok() && !errors::IsOutOfRange(s)) {
150 return s;
151 }
152
153 // We throw OutOfRange error iff no new data has been read from stream.
154 // Since we never check how much data is remaining in the stream, it is
155 // possible that on the last read there isn't enough data in the stream to
156 // fill up the buffer in which case input_stream_->ReadNBytes would return an
157 // OutOfRange error.
158 if (data.empty()) {
159 return errors::OutOfRange("EOF reached");
160 }
161 if (errors::IsOutOfRange(s)) {
162 return Status::OK();
163 }
164
165 return s;
166 }
167
ReadBytesFromCache(size_t bytes_to_read,string * result)168 size_t ZlibInputStream::ReadBytesFromCache(size_t bytes_to_read,
169 string* result) {
170 size_t unread_bytes =
171 reinterpret_cast<char*>(z_stream_def_->stream->next_out) -
172 next_unread_byte_;
173 size_t can_read_bytes = std::min(bytes_to_read, unread_bytes);
174 if (can_read_bytes > 0) {
175 result->append(next_unread_byte_, can_read_bytes);
176 next_unread_byte_ += can_read_bytes;
177 }
178 bytes_read_ += can_read_bytes;
179 return can_read_bytes;
180 }
181
NumUnreadBytes() const182 size_t ZlibInputStream::NumUnreadBytes() const {
183 size_t read_bytes =
184 next_unread_byte_ - reinterpret_cast<char*>(z_stream_def_->output.get());
185 return output_buffer_capacity_ - z_stream_def_->stream->avail_out -
186 read_bytes;
187 }
188
ReadNBytes(int64 bytes_to_read,string * result)189 Status ZlibInputStream::ReadNBytes(int64 bytes_to_read, string* result) {
190 result->clear();
191 // Read as many bytes as possible from cache.
192 bytes_to_read -= ReadBytesFromCache(bytes_to_read, result);
193
194 while (bytes_to_read > 0) {
195 // At this point we can be sure that cache has been emptied.
196 DCHECK_EQ(NumUnreadBytes(), 0);
197
198 // Now that the cache is empty we need to inflate more data.
199
200 // Step 1. Fill up input buffer.
201 // We read from stream only after the previously read contents have been
202 // completely consumed. This is an optimization and can be removed if
203 // it causes problems. `ReadFromStream` is capable of handling partially
204 // filled up buffers.
205 if (z_stream_def_->stream->avail_in == 0) {
206 TF_RETURN_IF_ERROR(ReadFromStream());
207 }
208
209 // Step 2. Setup output stream.
210 z_stream_def_->stream->next_out = z_stream_def_->output.get();
211 next_unread_byte_ = reinterpret_cast<char*>(z_stream_def_->output.get());
212 z_stream_def_->stream->avail_out = output_buffer_capacity_;
213
214 // Step 3. Inflate Inflate Inflate!
215 TF_RETURN_IF_ERROR(Inflate());
216
217 bytes_to_read -= ReadBytesFromCache(bytes_to_read, result);
218 }
219
220 return Status::OK();
221 }
222
Tell() const223 int64 ZlibInputStream::Tell() const { return bytes_read_; }
224
Inflate()225 Status ZlibInputStream::Inflate() {
226 int error = inflate(z_stream_def_->stream.get(), zlib_options_.flush_mode);
227 if (error != Z_OK && error != Z_STREAM_END) {
228 string error_string =
229 strings::StrCat("inflate() failed with error ", error);
230 if (z_stream_def_->stream->msg != nullptr) {
231 strings::StrAppend(&error_string, ": ", z_stream_def_->stream->msg);
232 }
233 return errors::DataLoss(error_string);
234 }
235 return Status::OK();
236 }
237
238 } // namespace io
239 } // namespace tensorflow
240