• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 // Decodes the blocks generated by block_builder.cc.
17 
18 #include "tensorflow/core/lib/io/block.h"
19 
20 #include <algorithm>
21 #include "tensorflow/core/lib/core/coding.h"
22 #include "tensorflow/core/lib/core/errors.h"
23 #include "tensorflow/core/lib/io/format.h"
24 #include "tensorflow/core/platform/logging.h"
25 
26 namespace tensorflow {
27 namespace table {
28 
NumRestarts() const29 inline uint32 Block::NumRestarts() const {
30   assert(size_ >= sizeof(uint32));
31   return core::DecodeFixed32(data_ + size_ - sizeof(uint32));
32 }
33 
Block(const BlockContents & contents)34 Block::Block(const BlockContents& contents)
35     : data_(contents.data.data()),
36       size_(contents.data.size()),
37       owned_(contents.heap_allocated) {
38   if (size_ < sizeof(uint32)) {
39     size_ = 0;  // Error marker
40   } else {
41     size_t max_restarts_allowed = (size_ - sizeof(uint32)) / sizeof(uint32);
42     if (NumRestarts() > max_restarts_allowed) {
43       // The size is too small for NumRestarts()
44       size_ = 0;
45     } else {
46       restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32);
47     }
48   }
49 }
50 
~Block()51 Block::~Block() {
52   if (owned_) {
53     delete[] data_;
54   }
55 }
56 
57 // Helper routine: decode the next block entry starting at "p",
58 // storing the number of shared key bytes, non_shared key bytes,
59 // and the length of the value in "*shared", "*non_shared", and
60 // "*value_length", respectively.  Will not dereference past "limit".
61 //
62 // If any errors are detected, returns NULL.  Otherwise, returns a
63 // pointer to the key delta (just past the three decoded values).
DecodeEntry(const char * p,const char * limit,uint32 * shared,uint32 * non_shared,uint32 * value_length)64 static inline const char* DecodeEntry(const char* p, const char* limit,
65                                       uint32* shared, uint32* non_shared,
66                                       uint32* value_length) {
67   if (limit - p < 3) return nullptr;
68   *shared = reinterpret_cast<const unsigned char*>(p)[0];
69   *non_shared = reinterpret_cast<const unsigned char*>(p)[1];
70   *value_length = reinterpret_cast<const unsigned char*>(p)[2];
71   if ((*shared | *non_shared | *value_length) < 128) {
72     // Fast path: all three values are encoded in one byte each
73     p += 3;
74   } else {
75     if ((p = core::GetVarint32Ptr(p, limit, shared)) == nullptr) return nullptr;
76     if ((p = core::GetVarint32Ptr(p, limit, non_shared)) == nullptr)
77       return nullptr;
78     if ((p = core::GetVarint32Ptr(p, limit, value_length)) == nullptr)
79       return nullptr;
80   }
81 
82   if (static_cast<uint32>(limit - p) < (*non_shared + *value_length)) {
83     return nullptr;
84   }
85   return p;
86 }
87 
88 class Block::Iter : public Iterator {
89  private:
90   const char* const data_;     // underlying block contents
91   uint32 const restarts_;      // Offset of restart array (list of fixed32)
92   uint32 const num_restarts_;  // Number of uint32 entries in restart array
93 
94   // current_ is offset in data_ of current entry.  >= restarts_ if !Valid
95   uint32 current_;
96   uint32 restart_index_;  // Index of restart block in which current_ falls
97   string key_;
98   StringPiece value_;
99   Status status_;
100 
Compare(const StringPiece & a,const StringPiece & b) const101   inline int Compare(const StringPiece& a, const StringPiece& b) const {
102     return a.compare(b);
103   }
104 
105   // Return the offset in data_ just past the end of the current entry.
NextEntryOffset() const106   inline uint32 NextEntryOffset() const {
107     return (value_.data() + value_.size()) - data_;
108   }
109 
GetRestartPoint(uint32 index)110   uint32 GetRestartPoint(uint32 index) {
111     assert(index < num_restarts_);
112     return core::DecodeFixed32(data_ + restarts_ + index * sizeof(uint32));
113   }
114 
SeekToRestartPoint(uint32 index)115   void SeekToRestartPoint(uint32 index) {
116     key_.clear();
117     restart_index_ = index;
118     // current_ will be fixed by ParseNextKey();
119 
120     // ParseNextKey() starts at the end of value_, so set value_ accordingly
121     uint32 offset = GetRestartPoint(index);
122     value_ = StringPiece(data_ + offset, 0);
123   }
124 
125  public:
Iter(const char * data,uint32 restarts,uint32 num_restarts)126   Iter(const char* data, uint32 restarts, uint32 num_restarts)
127       : data_(data),
128         restarts_(restarts),
129         num_restarts_(num_restarts),
130         current_(restarts_),
131         restart_index_(num_restarts_) {
132     assert(num_restarts_ > 0);
133   }
134 
Valid() const135   bool Valid() const override { return current_ < restarts_; }
status() const136   Status status() const override { return status_; }
key() const137   StringPiece key() const override {
138     assert(Valid());
139     return key_;
140   }
value() const141   StringPiece value() const override {
142     assert(Valid());
143     return value_;
144   }
145 
Next()146   void Next() override {
147     assert(Valid());
148     ParseNextKey();
149   }
150 
Seek(const StringPiece & target)151   void Seek(const StringPiece& target) override {
152     // Binary search in restart array to find the last restart point
153     // with a key < target
154     uint32 left = 0;
155     uint32 right = num_restarts_ - 1;
156     while (left < right) {
157       uint32 mid = (left + right + 1) / 2;
158       uint32 region_offset = GetRestartPoint(mid);
159       uint32 shared, non_shared, value_length;
160       const char* key_ptr =
161           DecodeEntry(data_ + region_offset, data_ + restarts_, &shared,
162                       &non_shared, &value_length);
163       if (key_ptr == nullptr || (shared != 0)) {
164         CorruptionError();
165         return;
166       }
167       StringPiece mid_key(key_ptr, non_shared);
168       if (Compare(mid_key, target) < 0) {
169         // Key at "mid" is smaller than "target".  Therefore all
170         // blocks before "mid" are uninteresting.
171         left = mid;
172       } else {
173         // Key at "mid" is >= "target".  Therefore all blocks at or
174         // after "mid" are uninteresting.
175         right = mid - 1;
176       }
177     }
178 
179     // Linear search (within restart block) for first key >= target
180     SeekToRestartPoint(left);
181     while (true) {
182       if (!ParseNextKey()) {
183         return;
184       }
185       if (Compare(key_, target) >= 0) {
186         return;
187       }
188     }
189   }
190 
SeekToFirst()191   void SeekToFirst() override {
192     SeekToRestartPoint(0);
193     ParseNextKey();
194   }
195 
196  private:
CorruptionError()197   void CorruptionError() {
198     current_ = restarts_;
199     restart_index_ = num_restarts_;
200     status_ = errors::DataLoss("bad entry in block");
201     key_.clear();
202     value_ = StringPiece();
203   }
204 
ParseNextKey()205   bool ParseNextKey() {
206     current_ = NextEntryOffset();
207     const char* p = data_ + current_;
208     const char* limit = data_ + restarts_;  // Restarts come right after data
209     if (p >= limit) {
210       // No more entries to return.  Mark as invalid.
211       current_ = restarts_;
212       restart_index_ = num_restarts_;
213       return false;
214     }
215 
216     // Decode next entry
217     uint32 shared, non_shared, value_length;
218     p = DecodeEntry(p, limit, &shared, &non_shared, &value_length);
219     if (p == nullptr || key_.size() < shared) {
220       CorruptionError();
221       return false;
222     } else {
223       key_.resize(shared);
224       key_.append(p, non_shared);
225       value_ = StringPiece(p + non_shared, value_length);
226       while (restart_index_ + 1 < num_restarts_ &&
227              GetRestartPoint(restart_index_ + 1) < current_) {
228         ++restart_index_;
229       }
230       return true;
231     }
232   }
233 };
234 
NewIterator()235 Iterator* Block::NewIterator() {
236   if (size_ < sizeof(uint32)) {
237     return NewErrorIterator(errors::DataLoss("bad block contents"));
238   }
239   const uint32 num_restarts = NumRestarts();
240   if (num_restarts == 0) {
241     return NewEmptyIterator();
242   } else {
243     return new Iter(data_, restart_offset_, num_restarts);
244   }
245 }
246 
247 }  // namespace table
248 }  // namespace tensorflow
249