1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/lib/io/buffered_inputstream.h"
17
18 #include "tensorflow/core/lib/io/random_inputstream.h"
19
20 namespace tensorflow {
21 namespace io {
22
BufferedInputStream(InputStreamInterface * input_stream,size_t buffer_bytes,bool owns_input_stream)23 BufferedInputStream::BufferedInputStream(InputStreamInterface* input_stream,
24 size_t buffer_bytes,
25 bool owns_input_stream)
26 : input_stream_(input_stream),
27 size_(buffer_bytes),
28 owns_input_stream_(owns_input_stream) {
29 buf_.reserve(size_);
30 }
31
BufferedInputStream(RandomAccessFile * file,size_t buffer_bytes)32 BufferedInputStream::BufferedInputStream(RandomAccessFile* file,
33 size_t buffer_bytes)
34 : BufferedInputStream(new RandomAccessInputStream(file), buffer_bytes,
35 true) {}
36
~BufferedInputStream()37 BufferedInputStream::~BufferedInputStream() {
38 if (owns_input_stream_) {
39 delete input_stream_;
40 }
41 }
42
FillBuffer()43 Status BufferedInputStream::FillBuffer() {
44 if (!file_status_.ok()) {
45 pos_ = 0;
46 limit_ = 0;
47 return file_status_;
48 }
49 Status s = input_stream_->ReadNBytes(size_, &buf_);
50 pos_ = 0;
51 limit_ = buf_.size();
52 if (!s.ok()) {
53 file_status_ = s;
54 }
55 return s;
56 }
57
58 template <typename StringType>
ReadLineHelper(StringType * result,bool include_eol)59 Status BufferedInputStream::ReadLineHelper(StringType* result,
60 bool include_eol) {
61 result->clear();
62 Status s;
63 while (true) {
64 if (pos_ == limit_) {
65 // Get more data into buffer
66 s = FillBuffer();
67 if (limit_ == 0) {
68 break;
69 }
70 }
71 char c = buf_[pos_++];
72 if (c == '\n') {
73 if (include_eol) {
74 result->append(1, c);
75 }
76 return Status::OK();
77 }
78 // We don't append '\r' to *result
79 if (c != '\r') {
80 result->append(1, c);
81 }
82 }
83 if (errors::IsOutOfRange(s) && !result->empty()) {
84 return Status::OK();
85 }
86 return s;
87 }
88
ReadNBytes(int64 bytes_to_read,tstring * result)89 Status BufferedInputStream::ReadNBytes(int64 bytes_to_read, tstring* result) {
90 if (bytes_to_read < 0) {
91 return errors::InvalidArgument("Can't read a negative number of bytes: ",
92 bytes_to_read);
93 }
94 result->clear();
95 if (pos_ == limit_ && !file_status_.ok() && bytes_to_read > 0) {
96 return file_status_;
97 }
98 result->reserve(bytes_to_read);
99
100 Status s;
101 while (result->size() < static_cast<size_t>(bytes_to_read)) {
102 // Check whether the buffer is fully read or not.
103 if (pos_ == limit_) {
104 s = FillBuffer();
105 // If we didn't read any bytes, we're at the end of the file; break out.
106 if (limit_ == 0) {
107 DCHECK(!s.ok());
108 file_status_ = s;
109 break;
110 }
111 }
112 const int64 bytes_to_copy =
113 std::min<int64>(limit_ - pos_, bytes_to_read - result->size());
114 result->insert(result->size(), buf_, pos_, bytes_to_copy);
115 pos_ += bytes_to_copy;
116 }
117 // Filling the buffer might lead to a situation when we go past the end of
118 // the file leading to an OutOfRange() status return. But we might have
119 // obtained enough data to satisfy the function call. Returning OK then.
120 if (errors::IsOutOfRange(s) &&
121 (result->size() == static_cast<size_t>(bytes_to_read))) {
122 return Status::OK();
123 }
124 return s;
125 }
126
SkipNBytes(int64 bytes_to_skip)127 Status BufferedInputStream::SkipNBytes(int64 bytes_to_skip) {
128 if (bytes_to_skip < 0) {
129 return errors::InvalidArgument("Can only skip forward, not ",
130 bytes_to_skip);
131 }
132 if (pos_ + bytes_to_skip < limit_) {
133 // If we aren't skipping too much, then we can just move pos_;
134 pos_ += bytes_to_skip;
135 } else {
136 // Otherwise, we already have read limit_ - pos_, so skip the rest. At this
137 // point we need to get fresh data into the buffer, so reset pos_ and
138 // limit_.
139 Status s = input_stream_->SkipNBytes(bytes_to_skip - (limit_ - pos_));
140 pos_ = 0;
141 limit_ = 0;
142 if (errors::IsOutOfRange(s)) {
143 file_status_ = s;
144 }
145 return s;
146 }
147 return Status::OK();
148 }
149
Tell() const150 int64 BufferedInputStream::Tell() const {
151 return input_stream_->Tell() - (limit_ - pos_);
152 }
153
Seek(int64 position)154 Status BufferedInputStream::Seek(int64 position) {
155 if (position < 0) {
156 return errors::InvalidArgument("Seeking to a negative position: ",
157 position);
158 }
159
160 // Position of the buffer's lower limit within file.
161 const int64 buf_lower_limit = input_stream_->Tell() - limit_;
162 if (position < buf_lower_limit) {
163 // Seek before buffer, reset input stream and skip 'position' bytes.
164 TF_RETURN_IF_ERROR(Reset());
165 return SkipNBytes(position);
166 }
167
168 if (position < Tell()) {
169 // Seek within buffer before 'pos_'
170 pos_ -= Tell() - position;
171 return Status::OK();
172 }
173
174 // Seek after 'pos_'
175 return SkipNBytes(position - Tell());
176 }
177
178 template <typename T>
ReadAll(T * result)179 Status BufferedInputStream::ReadAll(T* result) {
180 result->clear();
181 Status status;
182 while (status.ok()) {
183 status = FillBuffer();
184 if (limit_ == 0) {
185 break;
186 }
187 result->append(buf_);
188 pos_ = limit_;
189 }
190
191 if (errors::IsOutOfRange(status)) {
192 file_status_ = status;
193 return Status::OK();
194 }
195 return status;
196 }
197
198 template Status BufferedInputStream::ReadAll<string>(string* result);
199 template Status BufferedInputStream::ReadAll<tstring>(tstring* result);
200
Reset()201 Status BufferedInputStream::Reset() {
202 TF_RETURN_IF_ERROR(input_stream_->Reset());
203 pos_ = 0;
204 limit_ = 0;
205 file_status_ = Status::OK();
206 return Status::OK();
207 }
208
ReadLine(string * result)209 Status BufferedInputStream::ReadLine(string* result) {
210 return ReadLineHelper(result, false);
211 }
212
ReadLine(tstring * result)213 Status BufferedInputStream::ReadLine(tstring* result) {
214 return ReadLineHelper(result, false);
215 }
216
ReadLineAsString()217 string BufferedInputStream::ReadLineAsString() {
218 string result;
219 ReadLineHelper(&result, true).IgnoreError();
220 return result;
221 }
222
223 } // namespace io
224 } // namespace tensorflow
225