• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/lib/io/inputbuffer.h"
17 #include "tensorflow/core/lib/core/errors.h"
18 #include "tensorflow/core/platform/logging.h"
19 
20 namespace tensorflow {
21 namespace io {
22 
InputBuffer(RandomAccessFile * file,size_t buffer_bytes)23 InputBuffer::InputBuffer(RandomAccessFile* file, size_t buffer_bytes)
24     : file_(file),
25       file_pos_(0),
26       size_(buffer_bytes),
27       buf_(new char[size_]),
28       pos_(buf_),
29       limit_(buf_) {}
30 
~InputBuffer()31 InputBuffer::~InputBuffer() { delete[] buf_; }
32 
FillBuffer()33 Status InputBuffer::FillBuffer() {
34   StringPiece data;
35   Status s = file_->Read(file_pos_, size_, &data, buf_);
36   if (data.data() != buf_) {
37     memmove(buf_, data.data(), data.size());
38   }
39   pos_ = buf_;
40   limit_ = pos_ + data.size();
41   file_pos_ += data.size();
42   return s;
43 }
44 
ReadLine(string * result)45 Status InputBuffer::ReadLine(string* result) {
46   result->clear();
47   Status s;
48   do {
49     size_t buf_remain = limit_ - pos_;
50     char* newline = static_cast<char*>(memchr(pos_, '\n', buf_remain));
51     if (newline != nullptr) {
52       size_t result_len = newline - pos_;
53       result->append(pos_, result_len);
54       pos_ = newline + 1;
55       if (!result->empty() && result->back() == '\r') {
56         result->resize(result->size() - 1);
57       }
58       return Status::OK();
59     }
60     if (buf_remain > 0) result->append(pos_, buf_remain);
61     // Get more data into buffer
62     s = FillBuffer();
63     DCHECK_EQ(pos_, buf_);
64   } while (limit_ != buf_);
65   if (!result->empty() && result->back() == '\r') {
66     result->resize(result->size() - 1);
67   }
68   if (errors::IsOutOfRange(s) && !result->empty()) {
69     return Status::OK();
70   }
71   return s;
72 }
73 
ReadNBytes(int64 bytes_to_read,string * result)74 Status InputBuffer::ReadNBytes(int64 bytes_to_read, string* result) {
75   result->clear();
76   if (bytes_to_read < 0) {
77     return errors::InvalidArgument("Can't read a negative number of bytes: ",
78                                    bytes_to_read);
79   }
80   result->resize(bytes_to_read);
81   size_t bytes_read = 0;
82   Status status = ReadNBytes(bytes_to_read, &(*result)[0], &bytes_read);
83   if (bytes_read < bytes_to_read) result->resize(bytes_read);
84   return status;
85 }
86 
ReadNBytes(int64 bytes_to_read,char * result,size_t * bytes_read)87 Status InputBuffer::ReadNBytes(int64 bytes_to_read, char* result,
88                                size_t* bytes_read) {
89   if (bytes_to_read < 0) {
90     return errors::InvalidArgument("Can't read a negative number of bytes: ",
91                                    bytes_to_read);
92   }
93   Status status;
94   *bytes_read = 0;
95   while (*bytes_read < static_cast<size_t>(bytes_to_read)) {
96     if (pos_ == limit_) {
97       // Get more data into buffer.
98       status = FillBuffer();
99       if (limit_ == buf_) {
100         break;
101       }
102     }
103     // Do not go over the buffer boundary.
104     const int64 bytes_to_copy =
105         std::min<int64>(limit_ - pos_, bytes_to_read - *bytes_read);
106     // Copies buffered data into the destination.
107     memcpy(result + *bytes_read, pos_, bytes_to_copy);
108     pos_ += bytes_to_copy;
109     *bytes_read += bytes_to_copy;
110   }
111   if (errors::IsOutOfRange(status) &&
112       (*bytes_read == static_cast<size_t>(bytes_to_read))) {
113     return Status::OK();
114   }
115   return status;
116 }
117 
ReadVarint32Fallback(uint32 * result)118 Status InputBuffer::ReadVarint32Fallback(uint32* result) {
119   Status s = ReadVarintFallback(result, core::kMaxVarint32Bytes);
120   if (errors::IsDataLoss(s)) {
121     return errors::DataLoss("Stored data is too large to be a varint32.");
122   }
123   return s;
124 }
125 
ReadVarint64Fallback(uint64 * result)126 Status InputBuffer::ReadVarint64Fallback(uint64* result) {
127   Status s = ReadVarintFallback(result, core::kMaxVarint64Bytes);
128   if (errors::IsDataLoss(s)) {
129     return errors::DataLoss("Stored data is too large to be a varint64.");
130   }
131   return s;
132 }
133 
134 template <typename T>
ReadVarintFallback(T * result,int max_bytes)135 Status InputBuffer::ReadVarintFallback(T* result, int max_bytes) {
136   uint8 scratch = 0;
137   auto* p = reinterpret_cast<char*>(&scratch);
138   size_t unused_bytes_read = 0;
139 
140   *result = 0;
141   for (int index = 0; index < max_bytes; index++) {
142     int shift = 7 * index;
143     TF_RETURN_IF_ERROR(ReadNBytes(1, p, &unused_bytes_read));
144     *result |= (static_cast<T>(scratch) & 127) << shift;
145     if (!(scratch & 128)) return Status::OK();
146   }
147   return errors::DataLoss("Stored data longer than ", max_bytes, " bytes.");
148 }
149 
SkipNBytes(int64 bytes_to_skip)150 Status InputBuffer::SkipNBytes(int64 bytes_to_skip) {
151   if (bytes_to_skip < 0) {
152     return errors::InvalidArgument("Can only skip forward, not ",
153                                    bytes_to_skip);
154   }
155   int64 bytes_skipped = 0;
156   Status s;
157   while (bytes_skipped < bytes_to_skip) {
158     if (pos_ == limit_) {
159       // Get more data into buffer
160       s = FillBuffer();
161       if (limit_ == buf_) {
162         break;
163       }
164     }
165     const int64 bytes_to_advance =
166         std::min<int64>(limit_ - pos_, bytes_to_skip - bytes_skipped);
167     bytes_skipped += bytes_to_advance;
168     pos_ += bytes_to_advance;
169   }
170   if (errors::IsOutOfRange(s) && bytes_skipped == bytes_to_skip) {
171     return Status::OK();
172   }
173   return s;
174 }
175 
Seek(int64 position)176 Status InputBuffer::Seek(int64 position) {
177   if (position < 0) {
178     return errors::InvalidArgument("Seeking to a negative position: ",
179                                    position);
180   }
181   // Position of the buffer within file.
182   const int64 bufpos = file_pos_ - static_cast<int64>(limit_ - buf_);
183   if (position >= bufpos && position < file_pos_) {
184     // Seeks to somewhere inside the buffer.
185     pos_ = buf_ + (position - bufpos);
186     DCHECK(pos_ >= buf_ && pos_ < limit_);
187   } else {
188     // Seeks to somewhere outside.  Discards the buffered data.
189     pos_ = limit_ = buf_;
190     file_pos_ = position;
191   }
192   return Status::OK();
193 }
194 
195 }  // namespace io
196 }  // namespace tensorflow
197