• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include <zlib.h>
17 
18 #include "tensorflow/core/lib/io/zlib_inputstream.h"
19 
20 #include "tensorflow/core/lib/strings/strcat.h"
21 #include "tensorflow/core/platform/logging.h"
22 
23 namespace tensorflow {
24 namespace io {
25 
26 struct ZStreamDef {
ZStreamDeftensorflow::io::ZStreamDef27   ZStreamDef(size_t input_buffer_capacity, size_t output_buffer_capacity)
28       : input(new Bytef[input_buffer_capacity]),
29         output(new Bytef[output_buffer_capacity]),
30         stream(new z_stream) {}
31 
32   // Buffer for storing contents read from compressed stream.
33   // TODO(srbs): Consider using circular buffers. That would greatly simplify
34   // the implementation.
35   std::unique_ptr<Bytef[]> input;
36 
37   // Buffer for storing inflated contents of `input_stream_`.
38   std::unique_ptr<Bytef[]> output;
39 
40   // Configuration passed to `inflate`.
41   //
42   // z_stream_def_->stream->next_in:
43   //   Next byte to de-compress. Points to some byte in
44   //   z_stream_def_->streamdef_.input buffer.
45   // z_stream_def_->stream->avail_in:
46   //   Number of bytes available to be decompressed at this time.
47   // z_stream_def_->stream->next_out:
48   //   Next byte to write de-compressed data to. Points to some byte in
49   //   z_stream_def_->streamdef_.output buffer.
50   // z_stream_def_->stream->avail_out:
51   //   Number of free bytes available at write location.
52   std::unique_ptr<z_stream> stream;
53 };
54 
ZlibInputStream(InputStreamInterface * input_stream,size_t input_buffer_bytes,size_t output_buffer_bytes,const ZlibCompressionOptions & zlib_options,bool owns_input_stream)55 ZlibInputStream::ZlibInputStream(
56     InputStreamInterface* input_stream,
57     size_t input_buffer_bytes,   // size of z_stream.next_in buffer
58     size_t output_buffer_bytes,  // size of z_stream.next_out buffer
59     const ZlibCompressionOptions& zlib_options, bool owns_input_stream)
60     : owns_input_stream_(owns_input_stream),
61       input_stream_(input_stream),
62       input_buffer_capacity_(input_buffer_bytes),
63       output_buffer_capacity_(output_buffer_bytes),
64       zlib_options_(zlib_options),
65       z_stream_def_(
66           new ZStreamDef(input_buffer_capacity_, output_buffer_capacity_)),
67       bytes_read_(0) {
68   InitZlibBuffer();
69 }
70 
ZlibInputStream(InputStreamInterface * input_stream,size_t input_buffer_bytes,size_t output_buffer_bytes,const ZlibCompressionOptions & zlib_options)71 ZlibInputStream::ZlibInputStream(InputStreamInterface* input_stream,
72                                  size_t input_buffer_bytes,
73                                  size_t output_buffer_bytes,
74                                  const ZlibCompressionOptions& zlib_options)
75     : ZlibInputStream(input_stream, input_buffer_bytes, output_buffer_bytes,
76                       zlib_options, false) {}
77 
~ZlibInputStream()78 ZlibInputStream::~ZlibInputStream() {
79   if (z_stream_def_->stream) {
80     inflateEnd(z_stream_def_->stream.get());
81   }
82   if (owns_input_stream_) {
83     delete input_stream_;
84   }
85 }
86 
Reset()87 Status ZlibInputStream::Reset() {
88   TF_RETURN_IF_ERROR(input_stream_->Reset());
89   inflateEnd(z_stream_def_->stream.get());
90   InitZlibBuffer();
91   bytes_read_ = 0;
92   return Status::OK();
93 }
94 
InitZlibBuffer()95 void ZlibInputStream::InitZlibBuffer() {
96   memset(z_stream_def_->stream.get(), 0, sizeof(z_stream));
97 
98   z_stream_def_->stream->zalloc = Z_NULL;
99   z_stream_def_->stream->zfree = Z_NULL;
100   z_stream_def_->stream->opaque = Z_NULL;
101   z_stream_def_->stream->next_in = Z_NULL;
102   z_stream_def_->stream->avail_in = 0;
103 
104   int status =
105       inflateInit2(z_stream_def_->stream.get(), zlib_options_.window_bits);
106 
107   CHECK_EQ(status, Z_OK) << "inflateInit failed with status " << status;
108 
109   z_stream_def_->stream->next_in = z_stream_def_->input.get();
110   z_stream_def_->stream->next_out = z_stream_def_->output.get();
111   next_unread_byte_ = reinterpret_cast<char*>(z_stream_def_->output.get());
112   z_stream_def_->stream->avail_in = 0;
113   z_stream_def_->stream->avail_out = output_buffer_capacity_;
114 }
115 
ReadFromStream()116 Status ZlibInputStream::ReadFromStream() {
117   int bytes_to_read = input_buffer_capacity_;
118   char* read_location = reinterpret_cast<char*>(z_stream_def_->input.get());
119 
120   // If there are unread bytes in the input stream we move them to the head
121   // of the stream to maximize the space available to read new data into.
122   if (z_stream_def_->stream->avail_in > 0) {
123     uLong read_bytes =
124         z_stream_def_->stream->next_in - z_stream_def_->input.get();
125     // Remove `read_bytes` from the head of the input stream.
126     // Move unread bytes to the head of the input stream.
127     if (read_bytes > 0) {
128       memmove(z_stream_def_->input.get(), z_stream_def_->stream->next_in,
129               z_stream_def_->stream->avail_in);
130     }
131 
132     bytes_to_read -= z_stream_def_->stream->avail_in;
133     read_location += z_stream_def_->stream->avail_in;
134   }
135   string data;
136   // Try to read enough data to fill up z_stream_def_->input.
137   // TODO(rohanj): Add a char* version of ReadNBytes to InputStreamInterface
138   // and use that instead to make this more efficient.
139   Status s = input_stream_->ReadNBytes(bytes_to_read, &data);
140   memcpy(read_location, data.data(), data.size());
141 
142   // Since we moved unread data to the head of the input stream we can point
143   // next_in to the head of the input stream.
144   z_stream_def_->stream->next_in = z_stream_def_->input.get();
145 
146   // Note: data.size() could be different from bytes_to_read.
147   z_stream_def_->stream->avail_in += data.size();
148 
149   if (!s.ok() && !errors::IsOutOfRange(s)) {
150     return s;
151   }
152 
153   // We throw OutOfRange error iff no new data has been read from stream.
154   // Since we never check how much data is remaining in the stream, it is
155   // possible that on the last read there isn't enough data in the stream to
156   // fill up the buffer in which case input_stream_->ReadNBytes would return an
157   // OutOfRange error.
158   if (data.empty()) {
159     return errors::OutOfRange("EOF reached");
160   }
161   if (errors::IsOutOfRange(s)) {
162     return Status::OK();
163   }
164 
165   return s;
166 }
167 
ReadBytesFromCache(size_t bytes_to_read,string * result)168 size_t ZlibInputStream::ReadBytesFromCache(size_t bytes_to_read,
169                                            string* result) {
170   size_t unread_bytes =
171       reinterpret_cast<char*>(z_stream_def_->stream->next_out) -
172       next_unread_byte_;
173   size_t can_read_bytes = std::min(bytes_to_read, unread_bytes);
174   if (can_read_bytes > 0) {
175     result->append(next_unread_byte_, can_read_bytes);
176     next_unread_byte_ += can_read_bytes;
177   }
178   bytes_read_ += can_read_bytes;
179   return can_read_bytes;
180 }
181 
NumUnreadBytes() const182 size_t ZlibInputStream::NumUnreadBytes() const {
183   size_t read_bytes =
184       next_unread_byte_ - reinterpret_cast<char*>(z_stream_def_->output.get());
185   return output_buffer_capacity_ - z_stream_def_->stream->avail_out -
186          read_bytes;
187 }
188 
ReadNBytes(int64 bytes_to_read,string * result)189 Status ZlibInputStream::ReadNBytes(int64 bytes_to_read, string* result) {
190   result->clear();
191   // Read as many bytes as possible from cache.
192   bytes_to_read -= ReadBytesFromCache(bytes_to_read, result);
193 
194   while (bytes_to_read > 0) {
195     // At this point we can be sure that cache has been emptied.
196     DCHECK_EQ(NumUnreadBytes(), 0);
197 
198     // Now that the cache is empty we need to inflate more data.
199 
200     // Step 1. Fill up input buffer.
201     // We read from stream only after the previously read contents have been
202     // completely consumed. This is an optimization and can be removed if
203     // it causes problems. `ReadFromStream` is capable of handling partially
204     // filled up buffers.
205     if (z_stream_def_->stream->avail_in == 0) {
206       TF_RETURN_IF_ERROR(ReadFromStream());
207     }
208 
209     // Step 2. Setup output stream.
210     z_stream_def_->stream->next_out = z_stream_def_->output.get();
211     next_unread_byte_ = reinterpret_cast<char*>(z_stream_def_->output.get());
212     z_stream_def_->stream->avail_out = output_buffer_capacity_;
213 
214     // Step 3. Inflate Inflate Inflate!
215     TF_RETURN_IF_ERROR(Inflate());
216 
217     bytes_to_read -= ReadBytesFromCache(bytes_to_read, result);
218   }
219 
220   return Status::OK();
221 }
222 
Tell() const223 int64 ZlibInputStream::Tell() const { return bytes_read_; }
224 
Inflate()225 Status ZlibInputStream::Inflate() {
226   int error = inflate(z_stream_def_->stream.get(), zlib_options_.flush_mode);
227   if (error != Z_OK && error != Z_STREAM_END) {
228     string error_string =
229         strings::StrCat("inflate() failed with error ", error);
230     if (z_stream_def_->stream->msg != nullptr) {
231       strings::StrAppend(&error_string, ": ", z_stream_def_->stream->msg);
232     }
233     return errors::DataLoss(error_string);
234   }
235   return Status::OK();
236 }
237 
238 }  // namespace io
239 }  // namespace tensorflow
240