1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 // Decodes the blocks generated by block_builder.cc.
17
18 #include "tensorflow/core/lib/io/block.h"
19
20 #include <algorithm>
21 #include "tensorflow/core/lib/core/coding.h"
22 #include "tensorflow/core/lib/core/errors.h"
23 #include "tensorflow/core/lib/io/format.h"
24 #include "tensorflow/core/platform/logging.h"
25
26 namespace tensorflow {
27 namespace table {
28
NumRestarts() const29 inline uint32 Block::NumRestarts() const {
30 assert(size_ >= sizeof(uint32));
31 return core::DecodeFixed32(data_ + size_ - sizeof(uint32));
32 }
33
Block(const BlockContents & contents)34 Block::Block(const BlockContents& contents)
35 : data_(contents.data.data()),
36 size_(contents.data.size()),
37 owned_(contents.heap_allocated) {
38 if (size_ < sizeof(uint32)) {
39 size_ = 0; // Error marker
40 } else {
41 size_t max_restarts_allowed = (size_ - sizeof(uint32)) / sizeof(uint32);
42 if (NumRestarts() > max_restarts_allowed) {
43 // The size is too small for NumRestarts()
44 size_ = 0;
45 } else {
46 restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32);
47 }
48 }
49 }
50
~Block()51 Block::~Block() {
52 if (owned_) {
53 delete[] data_;
54 }
55 }
56
57 // Helper routine: decode the next block entry starting at "p",
58 // storing the number of shared key bytes, non_shared key bytes,
59 // and the length of the value in "*shared", "*non_shared", and
60 // "*value_length", respectively. Will not dereference past "limit".
61 //
62 // If any errors are detected, returns NULL. Otherwise, returns a
63 // pointer to the key delta (just past the three decoded values).
DecodeEntry(const char * p,const char * limit,uint32 * shared,uint32 * non_shared,uint32 * value_length)64 static inline const char* DecodeEntry(const char* p, const char* limit,
65 uint32* shared, uint32* non_shared,
66 uint32* value_length) {
67 if (limit - p < 3) return nullptr;
68 *shared = reinterpret_cast<const unsigned char*>(p)[0];
69 *non_shared = reinterpret_cast<const unsigned char*>(p)[1];
70 *value_length = reinterpret_cast<const unsigned char*>(p)[2];
71 if ((*shared | *non_shared | *value_length) < 128) {
72 // Fast path: all three values are encoded in one byte each
73 p += 3;
74 } else {
75 if ((p = core::GetVarint32Ptr(p, limit, shared)) == nullptr) return nullptr;
76 if ((p = core::GetVarint32Ptr(p, limit, non_shared)) == nullptr)
77 return nullptr;
78 if ((p = core::GetVarint32Ptr(p, limit, value_length)) == nullptr)
79 return nullptr;
80 }
81
82 if (static_cast<uint32>(limit - p) < (*non_shared + *value_length)) {
83 return nullptr;
84 }
85 return p;
86 }
87
88 class Block::Iter : public Iterator {
89 private:
90 const char* const data_; // underlying block contents
91 uint32 const restarts_; // Offset of restart array (list of fixed32)
92 uint32 const num_restarts_; // Number of uint32 entries in restart array
93
94 // current_ is offset in data_ of current entry. >= restarts_ if !Valid
95 uint32 current_;
96 uint32 restart_index_; // Index of restart block in which current_ falls
97 string key_;
98 StringPiece value_;
99 Status status_;
100
Compare(const StringPiece & a,const StringPiece & b) const101 inline int Compare(const StringPiece& a, const StringPiece& b) const {
102 return a.compare(b);
103 }
104
105 // Return the offset in data_ just past the end of the current entry.
NextEntryOffset() const106 inline uint32 NextEntryOffset() const {
107 return (value_.data() + value_.size()) - data_;
108 }
109
GetRestartPoint(uint32 index)110 uint32 GetRestartPoint(uint32 index) {
111 assert(index < num_restarts_);
112 return core::DecodeFixed32(data_ + restarts_ + index * sizeof(uint32));
113 }
114
SeekToRestartPoint(uint32 index)115 void SeekToRestartPoint(uint32 index) {
116 key_.clear();
117 restart_index_ = index;
118 // current_ will be fixed by ParseNextKey();
119
120 // ParseNextKey() starts at the end of value_, so set value_ accordingly
121 uint32 offset = GetRestartPoint(index);
122 value_ = StringPiece(data_ + offset, 0);
123 }
124
125 public:
Iter(const char * data,uint32 restarts,uint32 num_restarts)126 Iter(const char* data, uint32 restarts, uint32 num_restarts)
127 : data_(data),
128 restarts_(restarts),
129 num_restarts_(num_restarts),
130 current_(restarts_),
131 restart_index_(num_restarts_) {
132 assert(num_restarts_ > 0);
133 }
134
Valid() const135 bool Valid() const override { return current_ < restarts_; }
status() const136 Status status() const override { return status_; }
key() const137 StringPiece key() const override {
138 assert(Valid());
139 return key_;
140 }
value() const141 StringPiece value() const override {
142 assert(Valid());
143 return value_;
144 }
145
Next()146 void Next() override {
147 assert(Valid());
148 ParseNextKey();
149 }
150
Seek(const StringPiece & target)151 void Seek(const StringPiece& target) override {
152 // Binary search in restart array to find the last restart point
153 // with a key < target
154 uint32 left = 0;
155 uint32 right = num_restarts_ - 1;
156 while (left < right) {
157 uint32 mid = (left + right + 1) / 2;
158 uint32 region_offset = GetRestartPoint(mid);
159 uint32 shared, non_shared, value_length;
160 const char* key_ptr =
161 DecodeEntry(data_ + region_offset, data_ + restarts_, &shared,
162 &non_shared, &value_length);
163 if (key_ptr == nullptr || (shared != 0)) {
164 CorruptionError();
165 return;
166 }
167 StringPiece mid_key(key_ptr, non_shared);
168 if (Compare(mid_key, target) < 0) {
169 // Key at "mid" is smaller than "target". Therefore all
170 // blocks before "mid" are uninteresting.
171 left = mid;
172 } else {
173 // Key at "mid" is >= "target". Therefore all blocks at or
174 // after "mid" are uninteresting.
175 right = mid - 1;
176 }
177 }
178
179 // Linear search (within restart block) for first key >= target
180 SeekToRestartPoint(left);
181 while (true) {
182 if (!ParseNextKey()) {
183 return;
184 }
185 if (Compare(key_, target) >= 0) {
186 return;
187 }
188 }
189 }
190
SeekToFirst()191 void SeekToFirst() override {
192 SeekToRestartPoint(0);
193 ParseNextKey();
194 }
195
196 private:
CorruptionError()197 void CorruptionError() {
198 current_ = restarts_;
199 restart_index_ = num_restarts_;
200 status_ = errors::DataLoss("bad entry in block");
201 key_.clear();
202 value_ = StringPiece();
203 }
204
ParseNextKey()205 bool ParseNextKey() {
206 current_ = NextEntryOffset();
207 const char* p = data_ + current_;
208 const char* limit = data_ + restarts_; // Restarts come right after data
209 if (p >= limit) {
210 // No more entries to return. Mark as invalid.
211 current_ = restarts_;
212 restart_index_ = num_restarts_;
213 return false;
214 }
215
216 // Decode next entry
217 uint32 shared, non_shared, value_length;
218 p = DecodeEntry(p, limit, &shared, &non_shared, &value_length);
219 if (p == nullptr || key_.size() < shared) {
220 CorruptionError();
221 return false;
222 } else {
223 key_.resize(shared);
224 key_.append(p, non_shared);
225 value_ = StringPiece(p + non_shared, value_length);
226 while (restart_index_ + 1 < num_restarts_ &&
227 GetRestartPoint(restart_index_ + 1) < current_) {
228 ++restart_index_;
229 }
230 return true;
231 }
232 }
233 };
234
NewIterator()235 Iterator* Block::NewIterator() {
236 if (size_ < sizeof(uint32)) {
237 return NewErrorIterator(errors::DataLoss("bad block contents"));
238 }
239 const uint32 num_restarts = NumRestarts();
240 if (num_restarts == 0) {
241 return NewEmptyIterator();
242 } else {
243 return new Iter(data_, restart_offset_, num_restarts);
244 }
245 }
246
247 } // namespace table
248 } // namespace tensorflow
249