• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/lib/io/table_builder.h"
17 
18 #include <assert.h>
19 #include "tensorflow/core/lib/core/coding.h"
20 #include "tensorflow/core/lib/core/errors.h"
21 #include "tensorflow/core/lib/hash/crc32c.h"
22 #include "tensorflow/core/lib/io/block_builder.h"
23 #include "tensorflow/core/lib/io/format.h"
24 #include "tensorflow/core/lib/io/table_options.h"
25 #include "tensorflow/core/platform/env.h"
26 #include "tensorflow/core/platform/snappy.h"
27 
28 namespace tensorflow {
29 namespace table {
30 
31 namespace {
32 
FindShortestSeparator(string * start,const StringPiece & limit)33 void FindShortestSeparator(string* start, const StringPiece& limit) {
34   // Find length of common prefix
35   size_t min_length = std::min(start->size(), limit.size());
36   size_t diff_index = 0;
37   while ((diff_index < min_length) &&
38          ((*start)[diff_index] == limit[diff_index])) {
39     diff_index++;
40   }
41 
42   if (diff_index >= min_length) {
43     // Do not shorten if one string is a prefix of the other
44   } else {
45     uint8 diff_byte = static_cast<uint8>((*start)[diff_index]);
46     if (diff_byte < static_cast<uint8>(0xff) &&
47         diff_byte + 1 < static_cast<uint8>(limit[diff_index])) {
48       (*start)[diff_index]++;
49       start->resize(diff_index + 1);
50       assert(StringPiece(*start).compare(limit) < 0);
51     }
52   }
53 }
54 
FindShortSuccessor(string * key)55 void FindShortSuccessor(string* key) {
56   // Find first character that can be incremented
57   size_t n = key->size();
58   for (size_t i = 0; i < n; i++) {
59     const uint8 byte = (*key)[i];
60     if (byte != static_cast<uint8>(0xff)) {
61       (*key)[i] = byte + 1;
62       key->resize(i + 1);
63       return;
64     }
65   }
66   // *key is a run of 0xffs.  Leave it alone.
67 }
68 }  // namespace
69 
70 struct TableBuilder::Rep {
71   Options options;
72   Options index_block_options;
73   WritableFile* file;
74   uint64 offset;
75   Status status;
76   BlockBuilder data_block;
77   BlockBuilder index_block;
78   string last_key;
79   int64 num_entries;
80   bool closed;  // Either Finish() or Abandon() has been called.
81 
82   // We do not emit the index entry for a block until we have seen the
83   // first key for the next data block.  This allows us to use shorter
84   // keys in the index block.  For example, consider a block boundary
85   // between the keys "the quick brown fox" and "the who".  We can use
86   // "the r" as the key for the index block entry since it is >= all
87   // entries in the first block and < all entries in subsequent
88   // blocks.
89   //
90   // Invariant: r->pending_index_entry is true only if data_block is empty.
91   bool pending_index_entry;
92   BlockHandle pending_handle;  // Handle to add to index block
93 
94   string compressed_output;
95 
Reptensorflow::table::TableBuilder::Rep96   Rep(const Options& opt, WritableFile* f)
97       : options(opt),
98         index_block_options(opt),
99         file(f),
100         offset(0),
101         data_block(&options),
102         index_block(&index_block_options),
103         num_entries(0),
104         closed(false),
105         pending_index_entry(false) {
106     index_block_options.block_restart_interval = 1;
107   }
108 };
109 
TableBuilder(const Options & options,WritableFile * file)110 TableBuilder::TableBuilder(const Options& options, WritableFile* file)
111     : rep_(new Rep(options, file)) {}
112 
~TableBuilder()113 TableBuilder::~TableBuilder() {
114   assert(rep_->closed);  // Catch errors where caller forgot to call Finish()
115   delete rep_;
116 }
117 
Add(const StringPiece & key,const StringPiece & value)118 void TableBuilder::Add(const StringPiece& key, const StringPiece& value) {
119   Rep* r = rep_;
120   assert(!r->closed);
121   if (!ok()) return;
122   if (r->num_entries > 0) {
123     assert(key.compare(StringPiece(r->last_key)) > 0);
124     // See if this key+value would make our current block overly large.  If
125     // so, emit the current block before adding this key/value
126     const int kOverlyLargeBlockRatio = 2;
127     const size_t this_entry_bytes = key.size() + value.size();
128     if (this_entry_bytes >= kOverlyLargeBlockRatio * r->options.block_size) {
129       Flush();
130     }
131   }
132 
133   if (r->pending_index_entry) {
134     assert(r->data_block.empty());
135     FindShortestSeparator(&r->last_key, key);
136     string handle_encoding;
137     r->pending_handle.EncodeTo(&handle_encoding);
138     r->index_block.Add(r->last_key, StringPiece(handle_encoding));
139     r->pending_index_entry = false;
140   }
141 
142   r->last_key.assign(key.data(), key.size());
143   r->num_entries++;
144   r->data_block.Add(key, value);
145 
146   const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
147   if (estimated_block_size >= r->options.block_size) {
148     Flush();
149   }
150 }
151 
Flush()152 void TableBuilder::Flush() {
153   Rep* r = rep_;
154   assert(!r->closed);
155   if (!ok()) return;
156   if (r->data_block.empty()) return;
157   assert(!r->pending_index_entry);
158   WriteBlock(&r->data_block, &r->pending_handle);
159   if (ok()) {
160     r->pending_index_entry = true;
161     // We don't flush the underlying file as that can be slow.
162   }
163 }
164 
WriteBlock(BlockBuilder * block,BlockHandle * handle)165 void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
166   // File format contains a sequence of blocks where each block has:
167   //    block_data: uint8[n]
168   //    type: uint8
169   //    crc: uint32
170   assert(ok());
171   Rep* r = rep_;
172   StringPiece raw = block->Finish();
173 
174   StringPiece block_contents;
175   CompressionType type = r->options.compression;
176   // TODO(postrelease): Support more compression options: zlib?
177   switch (type) {
178     case kNoCompression:
179       block_contents = raw;
180       break;
181 
182     case kSnappyCompression: {
183       string* compressed = &r->compressed_output;
184       if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
185           compressed->size() < raw.size() - (raw.size() / 8u)) {
186         block_contents = *compressed;
187       } else {
188         // Snappy not supported, or compressed less than 12.5%, so just
189         // store uncompressed form
190         block_contents = raw;
191         type = kNoCompression;
192       }
193       break;
194     }
195   }
196   WriteRawBlock(block_contents, type, handle);
197   r->compressed_output.clear();
198   block->Reset();
199 }
200 
WriteRawBlock(const StringPiece & block_contents,CompressionType type,BlockHandle * handle)201 void TableBuilder::WriteRawBlock(const StringPiece& block_contents,
202                                  CompressionType type, BlockHandle* handle) {
203   Rep* r = rep_;
204   handle->set_offset(r->offset);
205   handle->set_size(block_contents.size());
206   r->status = r->file->Append(block_contents);
207   if (r->status.ok()) {
208     char trailer[kBlockTrailerSize];
209     trailer[0] = type;
210     uint32 crc = crc32c::Value(block_contents.data(), block_contents.size());
211     crc = crc32c::Extend(crc, trailer, 1);  // Extend crc to cover block type
212     core::EncodeFixed32(trailer + 1, crc32c::Mask(crc));
213     r->status = r->file->Append(StringPiece(trailer, kBlockTrailerSize));
214     if (r->status.ok()) {
215       r->offset += block_contents.size() + kBlockTrailerSize;
216     }
217   }
218 }
219 
status() const220 Status TableBuilder::status() const { return rep_->status; }
221 
Finish()222 Status TableBuilder::Finish() {
223   Rep* r = rep_;
224   Flush();
225   assert(!r->closed);
226   r->closed = true;
227 
228   BlockHandle metaindex_block_handle, index_block_handle;
229 
230   // Write metaindex block
231   if (ok()) {
232     BlockBuilder meta_index_block(&r->options);
233     // TODO(postrelease): Add stats and other meta blocks
234     WriteBlock(&meta_index_block, &metaindex_block_handle);
235   }
236 
237   // Write index block
238   if (ok()) {
239     if (r->pending_index_entry) {
240       FindShortSuccessor(&r->last_key);
241       string handle_encoding;
242       r->pending_handle.EncodeTo(&handle_encoding);
243       r->index_block.Add(r->last_key, StringPiece(handle_encoding));
244       r->pending_index_entry = false;
245     }
246     WriteBlock(&r->index_block, &index_block_handle);
247   }
248 
249   // Write footer
250   if (ok()) {
251     Footer footer;
252     footer.set_metaindex_handle(metaindex_block_handle);
253     footer.set_index_handle(index_block_handle);
254     string footer_encoding;
255     footer.EncodeTo(&footer_encoding);
256     r->status = r->file->Append(footer_encoding);
257     if (r->status.ok()) {
258       r->offset += footer_encoding.size();
259     }
260   }
261   return r->status;
262 }
263 
Abandon()264 void TableBuilder::Abandon() {
265   Rep* r = rep_;
266   assert(!r->closed);
267   r->closed = true;
268 }
269 
NumEntries() const270 uint64 TableBuilder::NumEntries() const { return rep_->num_entries; }
271 
FileSize() const272 uint64 TableBuilder::FileSize() const { return rep_->offset; }
273 
274 }  // namespace table
275 }  // namespace tensorflow
276