1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include "leveldb/table_builder.h"
6
7 #include <assert.h>
8 #include "leveldb/comparator.h"
9 #include "leveldb/env.h"
10 #include "leveldb/filter_policy.h"
11 #include "leveldb/options.h"
12 #include "table/block_builder.h"
13 #include "table/filter_block.h"
14 #include "table/format.h"
15 #include "util/coding.h"
16 #include "util/crc32c.h"
17
18 namespace leveldb {
19
20 struct TableBuilder::Rep {
21 Options options;
22 Options index_block_options;
23 WritableFile* file;
24 uint64_t offset;
25 Status status;
26 BlockBuilder data_block;
27 BlockBuilder index_block;
28 std::string last_key;
29 int64_t num_entries;
30 bool closed; // Either Finish() or Abandon() has been called.
31 FilterBlockBuilder* filter_block;
32
33 // We do not emit the index entry for a block until we have seen the
34 // first key for the next data block. This allows us to use shorter
35 // keys in the index block. For example, consider a block boundary
36 // between the keys "the quick brown fox" and "the who". We can use
37 // "the r" as the key for the index block entry since it is >= all
38 // entries in the first block and < all entries in subsequent
39 // blocks.
40 //
41 // Invariant: r->pending_index_entry is true only if data_block is empty.
42 bool pending_index_entry;
43 BlockHandle pending_handle; // Handle to add to index block
44
45 std::string compressed_output;
46
Repleveldb::TableBuilder::Rep47 Rep(const Options& opt, WritableFile* f)
48 : options(opt),
49 index_block_options(opt),
50 file(f),
51 offset(0),
52 data_block(&options),
53 index_block(&index_block_options),
54 num_entries(0),
55 closed(false),
56 filter_block(opt.filter_policy == NULL ? NULL
57 : new FilterBlockBuilder(opt.filter_policy)),
58 pending_index_entry(false) {
59 index_block_options.block_restart_interval = 1;
60 }
61 };
62
TableBuilder(const Options & options,WritableFile * file)63 TableBuilder::TableBuilder(const Options& options, WritableFile* file)
64 : rep_(new Rep(options, file)) {
65 if (rep_->filter_block != NULL) {
66 rep_->filter_block->StartBlock(0);
67 }
68 }
69
~TableBuilder()70 TableBuilder::~TableBuilder() {
71 assert(rep_->closed); // Catch errors where caller forgot to call Finish()
72 delete rep_->filter_block;
73 delete rep_;
74 }
75
ChangeOptions(const Options & options)76 Status TableBuilder::ChangeOptions(const Options& options) {
77 // Note: if more fields are added to Options, update
78 // this function to catch changes that should not be allowed to
79 // change in the middle of building a Table.
80 if (options.comparator != rep_->options.comparator) {
81 return Status::InvalidArgument("changing comparator while building table");
82 }
83
84 // Note that any live BlockBuilders point to rep_->options and therefore
85 // will automatically pick up the updated options.
86 rep_->options = options;
87 rep_->index_block_options = options;
88 rep_->index_block_options.block_restart_interval = 1;
89 return Status::OK();
90 }
91
Add(const Slice & key,const Slice & value)92 void TableBuilder::Add(const Slice& key, const Slice& value) {
93 Rep* r = rep_;
94 assert(!r->closed);
95 if (!ok()) return;
96 if (r->num_entries > 0) {
97 assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
98 }
99
100 if (r->pending_index_entry) {
101 assert(r->data_block.empty());
102 r->options.comparator->FindShortestSeparator(&r->last_key, key);
103 std::string handle_encoding;
104 r->pending_handle.EncodeTo(&handle_encoding);
105 r->index_block.Add(r->last_key, Slice(handle_encoding));
106 r->pending_index_entry = false;
107 }
108
109 if (r->filter_block != NULL) {
110 r->filter_block->AddKey(key);
111 }
112
113 r->last_key.assign(key.data(), key.size());
114 r->num_entries++;
115 r->data_block.Add(key, value);
116
117 const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
118 if (estimated_block_size >= r->options.block_size) {
119 Flush();
120 }
121 }
122
Flush()123 void TableBuilder::Flush() {
124 Rep* r = rep_;
125 assert(!r->closed);
126 if (!ok()) return;
127 if (r->data_block.empty()) return;
128 assert(!r->pending_index_entry);
129 WriteBlock(&r->data_block, &r->pending_handle);
130 if (ok()) {
131 r->pending_index_entry = true;
132 r->status = r->file->Flush();
133 }
134 if (r->filter_block != NULL) {
135 r->filter_block->StartBlock(r->offset);
136 }
137 }
138
WriteBlock(BlockBuilder * block,BlockHandle * handle)139 void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
140 // File format contains a sequence of blocks where each block has:
141 // block_data: uint8[n]
142 // type: uint8
143 // crc: uint32
144 assert(ok());
145 Rep* r = rep_;
146 Slice raw = block->Finish();
147
148 Slice block_contents;
149 CompressionType type = r->options.compression;
150 // TODO(postrelease): Support more compression options: zlib?
151 switch (type) {
152 case kNoCompression:
153 block_contents = raw;
154 break;
155
156 case kSnappyCompression: {
157 std::string* compressed = &r->compressed_output;
158 if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
159 compressed->size() < raw.size() - (raw.size() / 8u)) {
160 block_contents = *compressed;
161 } else {
162 // Snappy not supported, or compressed less than 12.5%, so just
163 // store uncompressed form
164 block_contents = raw;
165 type = kNoCompression;
166 }
167 break;
168 }
169 }
170 WriteRawBlock(block_contents, type, handle);
171 r->compressed_output.clear();
172 block->Reset();
173 }
174
WriteRawBlock(const Slice & block_contents,CompressionType type,BlockHandle * handle)175 void TableBuilder::WriteRawBlock(const Slice& block_contents,
176 CompressionType type,
177 BlockHandle* handle) {
178 Rep* r = rep_;
179 handle->set_offset(r->offset);
180 handle->set_size(block_contents.size());
181 r->status = r->file->Append(block_contents);
182 if (r->status.ok()) {
183 char trailer[kBlockTrailerSize];
184 trailer[0] = type;
185 uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
186 crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
187 EncodeFixed32(trailer+1, crc32c::Mask(crc));
188 r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
189 if (r->status.ok()) {
190 r->offset += block_contents.size() + kBlockTrailerSize;
191 }
192 }
193 }
194
status() const195 Status TableBuilder::status() const {
196 return rep_->status;
197 }
198
Finish()199 Status TableBuilder::Finish() {
200 Rep* r = rep_;
201 Flush();
202 assert(!r->closed);
203 r->closed = true;
204
205 BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
206
207 // Write filter block
208 if (ok() && r->filter_block != NULL) {
209 WriteRawBlock(r->filter_block->Finish(), kNoCompression,
210 &filter_block_handle);
211 }
212
213 // Write metaindex block
214 if (ok()) {
215 BlockBuilder meta_index_block(&r->options);
216 if (r->filter_block != NULL) {
217 // Add mapping from "filter.Name" to location of filter data
218 std::string key = "filter.";
219 key.append(r->options.filter_policy->Name());
220 std::string handle_encoding;
221 filter_block_handle.EncodeTo(&handle_encoding);
222 meta_index_block.Add(key, handle_encoding);
223 }
224
225 // TODO(postrelease): Add stats and other meta blocks
226 WriteBlock(&meta_index_block, &metaindex_block_handle);
227 }
228
229 // Write index block
230 if (ok()) {
231 if (r->pending_index_entry) {
232 r->options.comparator->FindShortSuccessor(&r->last_key);
233 std::string handle_encoding;
234 r->pending_handle.EncodeTo(&handle_encoding);
235 r->index_block.Add(r->last_key, Slice(handle_encoding));
236 r->pending_index_entry = false;
237 }
238 WriteBlock(&r->index_block, &index_block_handle);
239 }
240
241 // Write footer
242 if (ok()) {
243 Footer footer;
244 footer.set_metaindex_handle(metaindex_block_handle);
245 footer.set_index_handle(index_block_handle);
246 std::string footer_encoding;
247 footer.EncodeTo(&footer_encoding);
248 r->status = r->file->Append(footer_encoding);
249 if (r->status.ok()) {
250 r->offset += footer_encoding.size();
251 }
252 }
253 return r->status;
254 }
255
Abandon()256 void TableBuilder::Abandon() {
257 Rep* r = rep_;
258 assert(!r->closed);
259 r->closed = true;
260 }
261
NumEntries() const262 uint64_t TableBuilder::NumEntries() const {
263 return rep_->num_entries;
264 }
265
FileSize() const266 uint64_t TableBuilder::FileSize() const {
267 return rep_->offset;
268 }
269
270 } // namespace leveldb
271