• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "leveldb/table_builder.h"
6 
7 #include <assert.h>
8 #include "leveldb/comparator.h"
9 #include "leveldb/env.h"
10 #include "leveldb/filter_policy.h"
11 #include "leveldb/options.h"
12 #include "table/block_builder.h"
13 #include "table/filter_block.h"
14 #include "table/format.h"
15 #include "util/coding.h"
16 #include "util/crc32c.h"
17 
18 namespace leveldb {
19 
20 struct TableBuilder::Rep {
21   Options options;
22   Options index_block_options;
23   WritableFile* file;
24   uint64_t offset;
25   Status status;
26   BlockBuilder data_block;
27   BlockBuilder index_block;
28   std::string last_key;
29   int64_t num_entries;
30   bool closed;          // Either Finish() or Abandon() has been called.
31   FilterBlockBuilder* filter_block;
32 
33   // We do not emit the index entry for a block until we have seen the
34   // first key for the next data block.  This allows us to use shorter
35   // keys in the index block.  For example, consider a block boundary
36   // between the keys "the quick brown fox" and "the who".  We can use
37   // "the r" as the key for the index block entry since it is >= all
38   // entries in the first block and < all entries in subsequent
39   // blocks.
40   //
41   // Invariant: r->pending_index_entry is true only if data_block is empty.
42   bool pending_index_entry;
43   BlockHandle pending_handle;  // Handle to add to index block
44 
45   std::string compressed_output;
46 
Repleveldb::TableBuilder::Rep47   Rep(const Options& opt, WritableFile* f)
48       : options(opt),
49         index_block_options(opt),
50         file(f),
51         offset(0),
52         data_block(&options),
53         index_block(&index_block_options),
54         num_entries(0),
55         closed(false),
56         filter_block(opt.filter_policy == NULL ? NULL
57                      : new FilterBlockBuilder(opt.filter_policy)),
58         pending_index_entry(false) {
59     index_block_options.block_restart_interval = 1;
60   }
61 };
62 
TableBuilder(const Options & options,WritableFile * file)63 TableBuilder::TableBuilder(const Options& options, WritableFile* file)
64     : rep_(new Rep(options, file)) {
65   if (rep_->filter_block != NULL) {
66     rep_->filter_block->StartBlock(0);
67   }
68 }
69 
~TableBuilder()70 TableBuilder::~TableBuilder() {
71   assert(rep_->closed);  // Catch errors where caller forgot to call Finish()
72   delete rep_->filter_block;
73   delete rep_;
74 }
75 
ChangeOptions(const Options & options)76 Status TableBuilder::ChangeOptions(const Options& options) {
77   // Note: if more fields are added to Options, update
78   // this function to catch changes that should not be allowed to
79   // change in the middle of building a Table.
80   if (options.comparator != rep_->options.comparator) {
81     return Status::InvalidArgument("changing comparator while building table");
82   }
83 
84   // Note that any live BlockBuilders point to rep_->options and therefore
85   // will automatically pick up the updated options.
86   rep_->options = options;
87   rep_->index_block_options = options;
88   rep_->index_block_options.block_restart_interval = 1;
89   return Status::OK();
90 }
91 
Add(const Slice & key,const Slice & value)92 void TableBuilder::Add(const Slice& key, const Slice& value) {
93   Rep* r = rep_;
94   assert(!r->closed);
95   if (!ok()) return;
96   if (r->num_entries > 0) {
97     assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
98   }
99 
100   if (r->pending_index_entry) {
101     assert(r->data_block.empty());
102     r->options.comparator->FindShortestSeparator(&r->last_key, key);
103     std::string handle_encoding;
104     r->pending_handle.EncodeTo(&handle_encoding);
105     r->index_block.Add(r->last_key, Slice(handle_encoding));
106     r->pending_index_entry = false;
107   }
108 
109   if (r->filter_block != NULL) {
110     r->filter_block->AddKey(key);
111   }
112 
113   r->last_key.assign(key.data(), key.size());
114   r->num_entries++;
115   r->data_block.Add(key, value);
116 
117   const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
118   if (estimated_block_size >= r->options.block_size) {
119     Flush();
120   }
121 }
122 
Flush()123 void TableBuilder::Flush() {
124   Rep* r = rep_;
125   assert(!r->closed);
126   if (!ok()) return;
127   if (r->data_block.empty()) return;
128   assert(!r->pending_index_entry);
129   WriteBlock(&r->data_block, &r->pending_handle);
130   if (ok()) {
131     r->pending_index_entry = true;
132     r->status = r->file->Flush();
133   }
134   if (r->filter_block != NULL) {
135     r->filter_block->StartBlock(r->offset);
136   }
137 }
138 
WriteBlock(BlockBuilder * block,BlockHandle * handle)139 void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
140   // File format contains a sequence of blocks where each block has:
141   //    block_data: uint8[n]
142   //    type: uint8
143   //    crc: uint32
144   assert(ok());
145   Rep* r = rep_;
146   Slice raw = block->Finish();
147 
148   Slice block_contents;
149   CompressionType type = r->options.compression;
150   // TODO(postrelease): Support more compression options: zlib?
151   switch (type) {
152     case kNoCompression:
153       block_contents = raw;
154       break;
155 
156     case kSnappyCompression: {
157       std::string* compressed = &r->compressed_output;
158       if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
159           compressed->size() < raw.size() - (raw.size() / 8u)) {
160         block_contents = *compressed;
161       } else {
162         // Snappy not supported, or compressed less than 12.5%, so just
163         // store uncompressed form
164         block_contents = raw;
165         type = kNoCompression;
166       }
167       break;
168     }
169   }
170   WriteRawBlock(block_contents, type, handle);
171   r->compressed_output.clear();
172   block->Reset();
173 }
174 
WriteRawBlock(const Slice & block_contents,CompressionType type,BlockHandle * handle)175 void TableBuilder::WriteRawBlock(const Slice& block_contents,
176                                  CompressionType type,
177                                  BlockHandle* handle) {
178   Rep* r = rep_;
179   handle->set_offset(r->offset);
180   handle->set_size(block_contents.size());
181   r->status = r->file->Append(block_contents);
182   if (r->status.ok()) {
183     char trailer[kBlockTrailerSize];
184     trailer[0] = type;
185     uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
186     crc = crc32c::Extend(crc, trailer, 1);  // Extend crc to cover block type
187     EncodeFixed32(trailer+1, crc32c::Mask(crc));
188     r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
189     if (r->status.ok()) {
190       r->offset += block_contents.size() + kBlockTrailerSize;
191     }
192   }
193 }
194 
status() const195 Status TableBuilder::status() const {
196   return rep_->status;
197 }
198 
Finish()199 Status TableBuilder::Finish() {
200   Rep* r = rep_;
201   Flush();
202   assert(!r->closed);
203   r->closed = true;
204 
205   BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
206 
207   // Write filter block
208   if (ok() && r->filter_block != NULL) {
209     WriteRawBlock(r->filter_block->Finish(), kNoCompression,
210                   &filter_block_handle);
211   }
212 
213   // Write metaindex block
214   if (ok()) {
215     BlockBuilder meta_index_block(&r->options);
216     if (r->filter_block != NULL) {
217       // Add mapping from "filter.Name" to location of filter data
218       std::string key = "filter.";
219       key.append(r->options.filter_policy->Name());
220       std::string handle_encoding;
221       filter_block_handle.EncodeTo(&handle_encoding);
222       meta_index_block.Add(key, handle_encoding);
223     }
224 
225     // TODO(postrelease): Add stats and other meta blocks
226     WriteBlock(&meta_index_block, &metaindex_block_handle);
227   }
228 
229   // Write index block
230   if (ok()) {
231     if (r->pending_index_entry) {
232       r->options.comparator->FindShortSuccessor(&r->last_key);
233       std::string handle_encoding;
234       r->pending_handle.EncodeTo(&handle_encoding);
235       r->index_block.Add(r->last_key, Slice(handle_encoding));
236       r->pending_index_entry = false;
237     }
238     WriteBlock(&r->index_block, &index_block_handle);
239   }
240 
241   // Write footer
242   if (ok()) {
243     Footer footer;
244     footer.set_metaindex_handle(metaindex_block_handle);
245     footer.set_index_handle(index_block_handle);
246     std::string footer_encoding;
247     footer.EncodeTo(&footer_encoding);
248     r->status = r->file->Append(footer_encoding);
249     if (r->status.ok()) {
250       r->offset += footer_encoding.size();
251     }
252   }
253   return r->status;
254 }
255 
Abandon()256 void TableBuilder::Abandon() {
257   Rep* r = rep_;
258   assert(!r->closed);
259   r->closed = true;
260 }
261 
NumEntries() const262 uint64_t TableBuilder::NumEntries() const {
263   return rep_->num_entries;
264 }
265 
FileSize() const266 uint64_t TableBuilder::FileSize() const {
267   return rep_->offset;
268 }
269 
270 }  // namespace leveldb
271