• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/lib/io/inputbuffer.h"
17 
18 #include "tensorflow/core/lib/core/errors.h"
19 #include "tensorflow/core/platform/logging.h"
20 
21 namespace tensorflow {
22 namespace io {
23 
InputBuffer(RandomAccessFile * file,size_t buffer_bytes)24 InputBuffer::InputBuffer(RandomAccessFile* file, size_t buffer_bytes)
25     : file_(file),
26       file_pos_(0),
27       size_(buffer_bytes),
28       buf_(new char[size_]),
29       pos_(buf_),
30       limit_(buf_) {}
31 
~InputBuffer()32 InputBuffer::~InputBuffer() { delete[] buf_; }
33 
FillBuffer()34 Status InputBuffer::FillBuffer() {
35   StringPiece data;
36   Status s = file_->Read(file_pos_, size_, &data, buf_);
37   if (data.data() != buf_) {
38     memmove(buf_, data.data(), data.size());
39   }
40   pos_ = buf_;
41   limit_ = pos_ + data.size();
42   file_pos_ += data.size();
43   return s;
44 }
45 
46 template <typename T>
ReadLine(T * result)47 Status InputBuffer::ReadLine(T* result) {
48   result->clear();
49   Status s;
50   do {
51     size_t buf_remain = limit_ - pos_;
52     char* newline = static_cast<char*>(memchr(pos_, '\n', buf_remain));
53     if (newline != nullptr) {
54       size_t result_len = newline - pos_;
55       result->append(pos_, result_len);
56       pos_ = newline + 1;
57       if (!result->empty() && result->back() == '\r') {
58         result->resize(result->size() - 1);
59       }
60       return Status::OK();
61     }
62     if (buf_remain > 0) result->append(pos_, buf_remain);
63     // Get more data into buffer
64     s = FillBuffer();
65     DCHECK_EQ(pos_, buf_);
66   } while (limit_ != buf_);
67   if (!result->empty() && result->back() == '\r') {
68     result->resize(result->size() - 1);
69   }
70   if (errors::IsOutOfRange(s) && !result->empty()) {
71     return Status::OK();
72   }
73   return s;
74 }
75 
76 template Status InputBuffer::ReadLine<string>(string* result);
77 template Status InputBuffer::ReadLine<tstring>(tstring* result);
78 
ReadNBytes(int64 bytes_to_read,string * result)79 Status InputBuffer::ReadNBytes(int64 bytes_to_read, string* result) {
80   result->clear();
81   if (bytes_to_read < 0) {
82     return errors::InvalidArgument("Can't read a negative number of bytes: ",
83                                    bytes_to_read);
84   }
85   result->resize(bytes_to_read);
86   size_t bytes_read = 0;
87   Status status = ReadNBytes(bytes_to_read, &(*result)[0], &bytes_read);
88   if (bytes_read < bytes_to_read) result->resize(bytes_read);
89   return status;
90 }
91 
ReadNBytes(int64 bytes_to_read,char * result,size_t * bytes_read)92 Status InputBuffer::ReadNBytes(int64 bytes_to_read, char* result,
93                                size_t* bytes_read) {
94   if (bytes_to_read < 0) {
95     return errors::InvalidArgument("Can't read a negative number of bytes: ",
96                                    bytes_to_read);
97   }
98   Status status;
99   *bytes_read = 0;
100   while (*bytes_read < static_cast<size_t>(bytes_to_read)) {
101     if (pos_ == limit_) {
102       // Get more data into buffer.
103       status = FillBuffer();
104       if (limit_ == buf_) {
105         break;
106       }
107     }
108     // Do not go over the buffer boundary.
109     const int64 bytes_to_copy =
110         std::min<int64>(limit_ - pos_, bytes_to_read - *bytes_read);
111     // Copies buffered data into the destination.
112     memcpy(result + *bytes_read, pos_, bytes_to_copy);
113     pos_ += bytes_to_copy;
114     *bytes_read += bytes_to_copy;
115   }
116   if (errors::IsOutOfRange(status) &&
117       (*bytes_read == static_cast<size_t>(bytes_to_read))) {
118     return Status::OK();
119   }
120   return status;
121 }
122 
ReadVarint32Fallback(uint32 * result)123 Status InputBuffer::ReadVarint32Fallback(uint32* result) {
124   Status s = ReadVarintFallback(result, core::kMaxVarint32Bytes);
125   if (errors::IsDataLoss(s)) {
126     return errors::DataLoss("Stored data is too large to be a varint32.");
127   }
128   return s;
129 }
130 
ReadVarint64Fallback(uint64 * result)131 Status InputBuffer::ReadVarint64Fallback(uint64* result) {
132   Status s = ReadVarintFallback(result, core::kMaxVarint64Bytes);
133   if (errors::IsDataLoss(s)) {
134     return errors::DataLoss("Stored data is too large to be a varint64.");
135   }
136   return s;
137 }
138 
139 template <typename T>
ReadVarintFallback(T * result,int max_bytes)140 Status InputBuffer::ReadVarintFallback(T* result, int max_bytes) {
141   uint8 scratch = 0;
142   auto* p = reinterpret_cast<char*>(&scratch);
143   size_t unused_bytes_read = 0;
144 
145   *result = 0;
146   for (int index = 0; index < max_bytes; index++) {
147     int shift = 7 * index;
148     TF_RETURN_IF_ERROR(ReadNBytes(1, p, &unused_bytes_read));
149     *result |= (static_cast<T>(scratch) & 127) << shift;
150     if (!(scratch & 128)) return Status::OK();
151   }
152   return errors::DataLoss("Stored data longer than ", max_bytes, " bytes.");
153 }
154 
SkipNBytes(int64 bytes_to_skip)155 Status InputBuffer::SkipNBytes(int64 bytes_to_skip) {
156   if (bytes_to_skip < 0) {
157     return errors::InvalidArgument("Can only skip forward, not ",
158                                    bytes_to_skip);
159   }
160   int64 bytes_skipped = 0;
161   Status s;
162   while (bytes_skipped < bytes_to_skip) {
163     if (pos_ == limit_) {
164       // Get more data into buffer
165       s = FillBuffer();
166       if (limit_ == buf_) {
167         break;
168       }
169     }
170     const int64 bytes_to_advance =
171         std::min<int64>(limit_ - pos_, bytes_to_skip - bytes_skipped);
172     bytes_skipped += bytes_to_advance;
173     pos_ += bytes_to_advance;
174   }
175   if (errors::IsOutOfRange(s) && bytes_skipped == bytes_to_skip) {
176     return Status::OK();
177   }
178   return s;
179 }
180 
Seek(int64 position)181 Status InputBuffer::Seek(int64 position) {
182   if (position < 0) {
183     return errors::InvalidArgument("Seeking to a negative position: ",
184                                    position);
185   }
186   // Position of the buffer within file.
187   const int64 bufpos = file_pos_ - static_cast<int64>(limit_ - buf_);
188   if (position >= bufpos && position < file_pos_) {
189     // Seeks to somewhere inside the buffer.
190     pos_ = buf_ + (position - bufpos);
191     DCHECK(pos_ >= buf_ && pos_ < limit_);
192   } else {
193     // Seeks to somewhere outside.  Discards the buffered data.
194     pos_ = limit_ = buf_;
195     file_pos_ = position;
196   }
197   return Status::OK();
198 }
199 
Hint(int64 bytes_to_read)200 Status InputBuffer::Hint(int64 bytes_to_read) {
201   if (bytes_to_read < 0) {
202     return errors::InvalidArgument("Can't read a negative number of bytes: ",
203                                    bytes_to_read);
204   }
205 
206   // The internal buffer is too small. Do nothing.
207   if (bytes_to_read > size_) {
208     return Status::OK();
209   }
210 
211   const int64 bytes_remain_in_buf = static_cast<int64>(limit_ - pos_);
212 
213   // There are enough data in the buffer. Do nothing.
214   if (bytes_to_read <= bytes_remain_in_buf) {
215     return Status::OK();
216   }
217 
218   // Additional read from file is necessary. Make some room.
219   memmove(buf_, pos_, bytes_remain_in_buf);
220   pos_ = buf_;
221   limit_ = buf_ + bytes_remain_in_buf;
222   bytes_to_read -= bytes_remain_in_buf;
223 
224   // Read the remaining bytes from file.
225   StringPiece data;
226   Status s = file_->Read(file_pos_, bytes_to_read, &data, limit_);
227   if (data.data() != limit_) {
228     memmove(limit_, data.data(), data.size());
229   }
230   limit_ += data.size();
231   file_pos_ += data.size();
232 
233   if (errors::IsOutOfRange(s) && data.size() == bytes_to_read) {
234     return Status::OK();
235   } else {
236     return s;
237   }
238 }
239 
240 }  // namespace io
241 }  // namespace tensorflow
242