1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/lib/io/buffered_inputstream.h"
17
18 #include "tensorflow/core/lib/io/random_inputstream.h"
19
20 namespace tensorflow {
21 namespace io {
22
BufferedInputStream(InputStreamInterface * input_stream,size_t buffer_size,bool owns_input_stream)23 BufferedInputStream::BufferedInputStream(InputStreamInterface* input_stream,
24 size_t buffer_size,
25 bool owns_input_stream)
26 : input_stream_(input_stream),
27 size_(buffer_size),
28 owns_input_stream_(owns_input_stream) {
29 buf_.reserve(size_);
30 }
31
BufferedInputStream(RandomAccessFile * file,size_t buffer_size)32 BufferedInputStream::BufferedInputStream(RandomAccessFile* file,
33 size_t buffer_size)
34 : BufferedInputStream(new RandomAccessInputStream(file), buffer_size,
35 true) {}
36
~BufferedInputStream()37 BufferedInputStream::~BufferedInputStream() {
38 if (owns_input_stream_) {
39 delete input_stream_;
40 }
41 }
42
FillBuffer()43 Status BufferedInputStream::FillBuffer() {
44 if (!file_status_.ok()) {
45 pos_ = 0;
46 limit_ = 0;
47 return file_status_;
48 }
49 Status s = input_stream_->ReadNBytes(size_, &buf_);
50 pos_ = 0;
51 limit_ = buf_.size();
52 if (buf_.empty()) {
53 DCHECK(!s.ok());
54 file_status_ = s;
55 }
56 return s;
57 }
58
ReadLineHelper(string * result,bool include_eol)59 Status BufferedInputStream::ReadLineHelper(string* result, bool include_eol) {
60 result->clear();
61 Status s;
62 while (true) {
63 if (pos_ == limit_) {
64 // Get more data into buffer
65 s = FillBuffer();
66 if (limit_ == 0) {
67 break;
68 }
69 }
70 char c = buf_[pos_++];
71 if (c == '\n') {
72 if (include_eol) {
73 *result += c;
74 }
75 return Status::OK();
76 }
77 // We don't append '\r' to *result
78 if (c != '\r') {
79 *result += c;
80 }
81 }
82 if (errors::IsOutOfRange(s) && !result->empty()) {
83 return Status::OK();
84 }
85 return s;
86 }
87
ReadNBytes(int64 bytes_to_read,string * result)88 Status BufferedInputStream::ReadNBytes(int64 bytes_to_read, string* result) {
89 if (bytes_to_read < 0) {
90 return errors::InvalidArgument("Can't read a negative number of bytes: ",
91 bytes_to_read);
92 }
93 result->clear();
94 if (!file_status_.ok() && bytes_to_read > 0) {
95 return file_status_;
96 }
97 result->reserve(bytes_to_read);
98
99 Status s;
100 while (result->size() < static_cast<size_t>(bytes_to_read)) {
101 // Check whether the buffer is fully read or not.
102 if (pos_ == limit_) {
103 s = FillBuffer();
104 // If we didn't read any bytes, we're at the end of the file; break out.
105 if (limit_ == 0) {
106 DCHECK(!s.ok());
107 file_status_ = s;
108 break;
109 }
110 }
111 const int64 bytes_to_copy =
112 std::min<int64>(limit_ - pos_, bytes_to_read - result->size());
113 result->insert(result->size(), buf_, pos_, bytes_to_copy);
114 pos_ += bytes_to_copy;
115 }
116 // Filling the buffer might lead to a situation when we go past the end of
117 // the file leading to an OutOfRange() status return. But we might have
118 // obtained enough data to satisfy the function call. Returning OK then.
119 if (errors::IsOutOfRange(s) &&
120 (result->size() == static_cast<size_t>(bytes_to_read))) {
121 return Status::OK();
122 }
123 return s;
124 }
125
SkipNBytes(int64 bytes_to_skip)126 Status BufferedInputStream::SkipNBytes(int64 bytes_to_skip) {
127 if (bytes_to_skip < 0) {
128 return errors::InvalidArgument("Can only skip forward, not ",
129 bytes_to_skip);
130 }
131 if (pos_ + bytes_to_skip < limit_) {
132 // If we aren't skipping too much, then we can just move pos_;
133 pos_ += bytes_to_skip;
134 } else {
135 // Otherwise, we already have read limit_ - pos_, so skip the rest. At this
136 // point we need to get fresh data into the buffer, so reset pos_ and
137 // limit_.
138 Status s = input_stream_->SkipNBytes(bytes_to_skip - (limit_ - pos_));
139 pos_ = 0;
140 limit_ = 0;
141 if (errors::IsOutOfRange(s)) {
142 file_status_ = s;
143 }
144 return s;
145 }
146 return Status::OK();
147 }
148
Tell() const149 int64 BufferedInputStream::Tell() const {
150 return input_stream_->Tell() - (limit_ - pos_);
151 }
152
Seek(int64 position)153 Status BufferedInputStream::Seek(int64 position) {
154 if (position < 0) {
155 return errors::InvalidArgument("Seeking to a negative position: ",
156 position);
157 }
158
159 // Position of the buffer within file.
160 const int64 bufpos = Tell();
161 if (position < bufpos) {
162 // Reset input stream and skip 'position' bytes.
163 TF_RETURN_IF_ERROR(Reset());
164 return SkipNBytes(position);
165 }
166
167 return SkipNBytes(position - bufpos);
168 }
169
ReadAll(string * result)170 Status BufferedInputStream::ReadAll(string* result) {
171 result->clear();
172 Status status;
173 while (status.ok()) {
174 status = FillBuffer();
175 if (limit_ == 0) {
176 break;
177 }
178 result->append(buf_);
179 pos_ = limit_;
180 }
181
182 if (errors::IsOutOfRange(status)) {
183 file_status_ = status;
184 return Status::OK();
185 }
186 return status;
187 }
188
Reset()189 Status BufferedInputStream::Reset() {
190 TF_RETURN_IF_ERROR(input_stream_->Reset());
191 pos_ = 0;
192 limit_ = 0;
193 file_status_ = Status::OK();
194 return Status::OK();
195 }
196
ReadLine(string * result)197 Status BufferedInputStream::ReadLine(string* result) {
198 return ReadLineHelper(result, false);
199 }
200
ReadLineAsString()201 string BufferedInputStream::ReadLineAsString() {
202 string result;
203 ReadLineHelper(&result, true).IgnoreError();
204 return result;
205 }
206
207 } // namespace io
208 } // namespace tensorflow
209