1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/lib/io/inputbuffer.h"
17
18 #include "tensorflow/core/lib/core/errors.h"
19 #include "tensorflow/core/platform/logging.h"
20
21 namespace tensorflow {
22 namespace io {
23
InputBuffer(RandomAccessFile * file,size_t buffer_bytes)24 InputBuffer::InputBuffer(RandomAccessFile* file, size_t buffer_bytes)
25 : file_(file),
26 file_pos_(0),
27 size_(buffer_bytes),
28 buf_(new char[size_]),
29 pos_(buf_),
30 limit_(buf_) {}
31
~InputBuffer()32 InputBuffer::~InputBuffer() { delete[] buf_; }
33
FillBuffer()34 Status InputBuffer::FillBuffer() {
35 StringPiece data;
36 Status s = file_->Read(file_pos_, size_, &data, buf_);
37 if (data.data() != buf_) {
38 memmove(buf_, data.data(), data.size());
39 }
40 pos_ = buf_;
41 limit_ = pos_ + data.size();
42 file_pos_ += data.size();
43 return s;
44 }
45
46 template <typename T>
ReadLine(T * result)47 Status InputBuffer::ReadLine(T* result) {
48 result->clear();
49 Status s;
50 do {
51 size_t buf_remain = limit_ - pos_;
52 char* newline = static_cast<char*>(memchr(pos_, '\n', buf_remain));
53 if (newline != nullptr) {
54 size_t result_len = newline - pos_;
55 result->append(pos_, result_len);
56 pos_ = newline + 1;
57 if (!result->empty() && result->back() == '\r') {
58 result->resize(result->size() - 1);
59 }
60 return Status::OK();
61 }
62 if (buf_remain > 0) result->append(pos_, buf_remain);
63 // Get more data into buffer
64 s = FillBuffer();
65 DCHECK_EQ(pos_, buf_);
66 } while (limit_ != buf_);
67 if (!result->empty() && result->back() == '\r') {
68 result->resize(result->size() - 1);
69 }
70 if (errors::IsOutOfRange(s) && !result->empty()) {
71 return Status::OK();
72 }
73 return s;
74 }
75
76 template Status InputBuffer::ReadLine<string>(string* result);
77 template Status InputBuffer::ReadLine<tstring>(tstring* result);
78
ReadNBytes(int64 bytes_to_read,string * result)79 Status InputBuffer::ReadNBytes(int64 bytes_to_read, string* result) {
80 result->clear();
81 if (bytes_to_read < 0) {
82 return errors::InvalidArgument("Can't read a negative number of bytes: ",
83 bytes_to_read);
84 }
85 result->resize(bytes_to_read);
86 size_t bytes_read = 0;
87 Status status = ReadNBytes(bytes_to_read, &(*result)[0], &bytes_read);
88 if (bytes_read < bytes_to_read) result->resize(bytes_read);
89 return status;
90 }
91
ReadNBytes(int64 bytes_to_read,char * result,size_t * bytes_read)92 Status InputBuffer::ReadNBytes(int64 bytes_to_read, char* result,
93 size_t* bytes_read) {
94 if (bytes_to_read < 0) {
95 return errors::InvalidArgument("Can't read a negative number of bytes: ",
96 bytes_to_read);
97 }
98 Status status;
99 *bytes_read = 0;
100 while (*bytes_read < static_cast<size_t>(bytes_to_read)) {
101 if (pos_ == limit_) {
102 // Get more data into buffer.
103 status = FillBuffer();
104 if (limit_ == buf_) {
105 break;
106 }
107 }
108 // Do not go over the buffer boundary.
109 const int64 bytes_to_copy =
110 std::min<int64>(limit_ - pos_, bytes_to_read - *bytes_read);
111 // Copies buffered data into the destination.
112 memcpy(result + *bytes_read, pos_, bytes_to_copy);
113 pos_ += bytes_to_copy;
114 *bytes_read += bytes_to_copy;
115 }
116 if (errors::IsOutOfRange(status) &&
117 (*bytes_read == static_cast<size_t>(bytes_to_read))) {
118 return Status::OK();
119 }
120 return status;
121 }
122
ReadVarint32Fallback(uint32 * result)123 Status InputBuffer::ReadVarint32Fallback(uint32* result) {
124 Status s = ReadVarintFallback(result, core::kMaxVarint32Bytes);
125 if (errors::IsDataLoss(s)) {
126 return errors::DataLoss("Stored data is too large to be a varint32.");
127 }
128 return s;
129 }
130
ReadVarint64Fallback(uint64 * result)131 Status InputBuffer::ReadVarint64Fallback(uint64* result) {
132 Status s = ReadVarintFallback(result, core::kMaxVarint64Bytes);
133 if (errors::IsDataLoss(s)) {
134 return errors::DataLoss("Stored data is too large to be a varint64.");
135 }
136 return s;
137 }
138
139 template <typename T>
ReadVarintFallback(T * result,int max_bytes)140 Status InputBuffer::ReadVarintFallback(T* result, int max_bytes) {
141 uint8 scratch = 0;
142 auto* p = reinterpret_cast<char*>(&scratch);
143 size_t unused_bytes_read = 0;
144
145 *result = 0;
146 for (int index = 0; index < max_bytes; index++) {
147 int shift = 7 * index;
148 TF_RETURN_IF_ERROR(ReadNBytes(1, p, &unused_bytes_read));
149 *result |= (static_cast<T>(scratch) & 127) << shift;
150 if (!(scratch & 128)) return Status::OK();
151 }
152 return errors::DataLoss("Stored data longer than ", max_bytes, " bytes.");
153 }
154
SkipNBytes(int64 bytes_to_skip)155 Status InputBuffer::SkipNBytes(int64 bytes_to_skip) {
156 if (bytes_to_skip < 0) {
157 return errors::InvalidArgument("Can only skip forward, not ",
158 bytes_to_skip);
159 }
160 int64 bytes_skipped = 0;
161 Status s;
162 while (bytes_skipped < bytes_to_skip) {
163 if (pos_ == limit_) {
164 // Get more data into buffer
165 s = FillBuffer();
166 if (limit_ == buf_) {
167 break;
168 }
169 }
170 const int64 bytes_to_advance =
171 std::min<int64>(limit_ - pos_, bytes_to_skip - bytes_skipped);
172 bytes_skipped += bytes_to_advance;
173 pos_ += bytes_to_advance;
174 }
175 if (errors::IsOutOfRange(s) && bytes_skipped == bytes_to_skip) {
176 return Status::OK();
177 }
178 return s;
179 }
180
Seek(int64 position)181 Status InputBuffer::Seek(int64 position) {
182 if (position < 0) {
183 return errors::InvalidArgument("Seeking to a negative position: ",
184 position);
185 }
186 // Position of the buffer within file.
187 const int64 bufpos = file_pos_ - static_cast<int64>(limit_ - buf_);
188 if (position >= bufpos && position < file_pos_) {
189 // Seeks to somewhere inside the buffer.
190 pos_ = buf_ + (position - bufpos);
191 DCHECK(pos_ >= buf_ && pos_ < limit_);
192 } else {
193 // Seeks to somewhere outside. Discards the buffered data.
194 pos_ = limit_ = buf_;
195 file_pos_ = position;
196 }
197 return Status::OK();
198 }
199
Hint(int64 bytes_to_read)200 Status InputBuffer::Hint(int64 bytes_to_read) {
201 if (bytes_to_read < 0) {
202 return errors::InvalidArgument("Can't read a negative number of bytes: ",
203 bytes_to_read);
204 }
205
206 // The internal buffer is too small. Do nothing.
207 if (bytes_to_read > size_) {
208 return Status::OK();
209 }
210
211 const int64 bytes_remain_in_buf = static_cast<int64>(limit_ - pos_);
212
213 // There are enough data in the buffer. Do nothing.
214 if (bytes_to_read <= bytes_remain_in_buf) {
215 return Status::OK();
216 }
217
218 // Additional read from file is necessary. Make some room.
219 memmove(buf_, pos_, bytes_remain_in_buf);
220 pos_ = buf_;
221 limit_ = buf_ + bytes_remain_in_buf;
222 bytes_to_read -= bytes_remain_in_buf;
223
224 // Read the remaining bytes from file.
225 StringPiece data;
226 Status s = file_->Read(file_pos_, bytes_to_read, &data, limit_);
227 if (data.data() != limit_) {
228 memmove(limit_, data.data(), data.size());
229 }
230 limit_ += data.size();
231 file_pos_ += data.size();
232
233 if (errors::IsOutOfRange(s) && data.size() == bytes_to_read) {
234 return Status::OK();
235 } else {
236 return s;
237 }
238 }
239
240 } // namespace io
241 } // namespace tensorflow
242