1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/lib/io/table_builder.h"
17
18 #include <assert.h>
19 #include "tensorflow/core/lib/core/coding.h"
20 #include "tensorflow/core/lib/core/errors.h"
21 #include "tensorflow/core/lib/hash/crc32c.h"
22 #include "tensorflow/core/lib/io/block_builder.h"
23 #include "tensorflow/core/lib/io/format.h"
24 #include "tensorflow/core/lib/io/table_options.h"
25 #include "tensorflow/core/platform/env.h"
26 #include "tensorflow/core/platform/snappy.h"
27
28 namespace tensorflow {
29 namespace table {
30
31 namespace {
32
FindShortestSeparator(string * start,const StringPiece & limit)33 void FindShortestSeparator(string* start, const StringPiece& limit) {
34 // Find length of common prefix
35 size_t min_length = std::min(start->size(), limit.size());
36 size_t diff_index = 0;
37 while ((diff_index < min_length) &&
38 ((*start)[diff_index] == limit[diff_index])) {
39 diff_index++;
40 }
41
42 if (diff_index >= min_length) {
43 // Do not shorten if one string is a prefix of the other
44 } else {
45 uint8 diff_byte = static_cast<uint8>((*start)[diff_index]);
46 if (diff_byte < static_cast<uint8>(0xff) &&
47 diff_byte + 1 < static_cast<uint8>(limit[diff_index])) {
48 (*start)[diff_index]++;
49 start->resize(diff_index + 1);
50 assert(StringPiece(*start).compare(limit) < 0);
51 }
52 }
53 }
54
FindShortSuccessor(string * key)55 void FindShortSuccessor(string* key) {
56 // Find first character that can be incremented
57 size_t n = key->size();
58 for (size_t i = 0; i < n; i++) {
59 const uint8 byte = (*key)[i];
60 if (byte != static_cast<uint8>(0xff)) {
61 (*key)[i] = byte + 1;
62 key->resize(i + 1);
63 return;
64 }
65 }
66 // *key is a run of 0xffs. Leave it alone.
67 }
68 } // namespace
69
70 struct TableBuilder::Rep {
71 Options options;
72 Options index_block_options;
73 WritableFile* file;
74 uint64 offset;
75 Status status;
76 BlockBuilder data_block;
77 BlockBuilder index_block;
78 string last_key;
79 int64 num_entries;
80 bool closed; // Either Finish() or Abandon() has been called.
81
82 // We do not emit the index entry for a block until we have seen the
83 // first key for the next data block. This allows us to use shorter
84 // keys in the index block. For example, consider a block boundary
85 // between the keys "the quick brown fox" and "the who". We can use
86 // "the r" as the key for the index block entry since it is >= all
87 // entries in the first block and < all entries in subsequent
88 // blocks.
89 //
90 // Invariant: r->pending_index_entry is true only if data_block is empty.
91 bool pending_index_entry;
92 BlockHandle pending_handle; // Handle to add to index block
93
94 string compressed_output;
95
Reptensorflow::table::TableBuilder::Rep96 Rep(const Options& opt, WritableFile* f)
97 : options(opt),
98 index_block_options(opt),
99 file(f),
100 offset(0),
101 data_block(&options),
102 index_block(&index_block_options),
103 num_entries(0),
104 closed(false),
105 pending_index_entry(false) {
106 index_block_options.block_restart_interval = 1;
107 }
108 };
109
TableBuilder(const Options & options,WritableFile * file)110 TableBuilder::TableBuilder(const Options& options, WritableFile* file)
111 : rep_(new Rep(options, file)) {}
112
~TableBuilder()113 TableBuilder::~TableBuilder() {
114 assert(rep_->closed); // Catch errors where caller forgot to call Finish()
115 delete rep_;
116 }
117
Add(const StringPiece & key,const StringPiece & value)118 void TableBuilder::Add(const StringPiece& key, const StringPiece& value) {
119 Rep* r = rep_;
120 assert(!r->closed);
121 if (!ok()) return;
122 if (r->num_entries > 0) {
123 assert(key.compare(StringPiece(r->last_key)) > 0);
124 // See if this key+value would make our current block overly large. If
125 // so, emit the current block before adding this key/value
126 const int kOverlyLargeBlockRatio = 2;
127 const size_t this_entry_bytes = key.size() + value.size();
128 if (this_entry_bytes >= kOverlyLargeBlockRatio * r->options.block_size) {
129 Flush();
130 }
131 }
132
133 if (r->pending_index_entry) {
134 assert(r->data_block.empty());
135 FindShortestSeparator(&r->last_key, key);
136 string handle_encoding;
137 r->pending_handle.EncodeTo(&handle_encoding);
138 r->index_block.Add(r->last_key, StringPiece(handle_encoding));
139 r->pending_index_entry = false;
140 }
141
142 r->last_key.assign(key.data(), key.size());
143 r->num_entries++;
144 r->data_block.Add(key, value);
145
146 const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
147 if (estimated_block_size >= r->options.block_size) {
148 Flush();
149 }
150 }
151
Flush()152 void TableBuilder::Flush() {
153 Rep* r = rep_;
154 assert(!r->closed);
155 if (!ok()) return;
156 if (r->data_block.empty()) return;
157 assert(!r->pending_index_entry);
158 WriteBlock(&r->data_block, &r->pending_handle);
159 if (ok()) {
160 r->pending_index_entry = true;
161 // We don't flush the underlying file as that can be slow.
162 }
163 }
164
WriteBlock(BlockBuilder * block,BlockHandle * handle)165 void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
166 // File format contains a sequence of blocks where each block has:
167 // block_data: uint8[n]
168 // type: uint8
169 // crc: uint32
170 assert(ok());
171 Rep* r = rep_;
172 StringPiece raw = block->Finish();
173
174 StringPiece block_contents;
175 CompressionType type = r->options.compression;
176 // TODO(postrelease): Support more compression options: zlib?
177 switch (type) {
178 case kNoCompression:
179 block_contents = raw;
180 break;
181
182 case kSnappyCompression: {
183 string* compressed = &r->compressed_output;
184 if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
185 compressed->size() < raw.size() - (raw.size() / 8u)) {
186 block_contents = *compressed;
187 } else {
188 // Snappy not supported, or compressed less than 12.5%, so just
189 // store uncompressed form
190 block_contents = raw;
191 type = kNoCompression;
192 }
193 break;
194 }
195 }
196 WriteRawBlock(block_contents, type, handle);
197 r->compressed_output.clear();
198 block->Reset();
199 }
200
WriteRawBlock(const StringPiece & block_contents,CompressionType type,BlockHandle * handle)201 void TableBuilder::WriteRawBlock(const StringPiece& block_contents,
202 CompressionType type, BlockHandle* handle) {
203 Rep* r = rep_;
204 handle->set_offset(r->offset);
205 handle->set_size(block_contents.size());
206 r->status = r->file->Append(block_contents);
207 if (r->status.ok()) {
208 char trailer[kBlockTrailerSize];
209 trailer[0] = type;
210 uint32 crc = crc32c::Value(block_contents.data(), block_contents.size());
211 crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
212 core::EncodeFixed32(trailer + 1, crc32c::Mask(crc));
213 r->status = r->file->Append(StringPiece(trailer, kBlockTrailerSize));
214 if (r->status.ok()) {
215 r->offset += block_contents.size() + kBlockTrailerSize;
216 }
217 }
218 }
219
status() const220 Status TableBuilder::status() const { return rep_->status; }
221
Finish()222 Status TableBuilder::Finish() {
223 Rep* r = rep_;
224 Flush();
225 assert(!r->closed);
226 r->closed = true;
227
228 BlockHandle metaindex_block_handle, index_block_handle;
229
230 // Write metaindex block
231 if (ok()) {
232 BlockBuilder meta_index_block(&r->options);
233 // TODO(postrelease): Add stats and other meta blocks
234 WriteBlock(&meta_index_block, &metaindex_block_handle);
235 }
236
237 // Write index block
238 if (ok()) {
239 if (r->pending_index_entry) {
240 FindShortSuccessor(&r->last_key);
241 string handle_encoding;
242 r->pending_handle.EncodeTo(&handle_encoding);
243 r->index_block.Add(r->last_key, StringPiece(handle_encoding));
244 r->pending_index_entry = false;
245 }
246 WriteBlock(&r->index_block, &index_block_handle);
247 }
248
249 // Write footer
250 if (ok()) {
251 Footer footer;
252 footer.set_metaindex_handle(metaindex_block_handle);
253 footer.set_index_handle(index_block_handle);
254 string footer_encoding;
255 footer.EncodeTo(&footer_encoding);
256 r->status = r->file->Append(footer_encoding);
257 if (r->status.ok()) {
258 r->offset += footer_encoding.size();
259 }
260 }
261 return r->status;
262 }
263
Abandon()264 void TableBuilder::Abandon() {
265 Rep* r = rep_;
266 assert(!r->closed);
267 r->closed = true;
268 }
269
NumEntries() const270 uint64 TableBuilder::NumEntries() const { return rep_->num_entries; }
271
FileSize() const272 uint64 TableBuilder::FileSize() const { return rep_->offset; }
273
274 } // namespace table
275 } // namespace tensorflow
276