• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "leveldb/db.h"
6 
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <sys/stat.h>
10 #include <sys/types.h>
11 #include "leveldb/cache.h"
12 #include "leveldb/env.h"
13 #include "leveldb/table.h"
14 #include "leveldb/write_batch.h"
15 #include "db/db_impl.h"
16 #include "db/filename.h"
17 #include "db/log_format.h"
18 #include "db/version_set.h"
19 #include "util/logging.h"
20 #include "util/testharness.h"
21 #include "util/testutil.h"
22 
23 namespace leveldb {
24 
25 static const int kValueSize = 1000;
26 
27 class CorruptionTest {
28  public:
29   test::ErrorEnv env_;
30   std::string dbname_;
31   Cache* tiny_cache_;
32   Options options_;
33   DB* db_;
34 
CorruptionTest()35   CorruptionTest() {
36     tiny_cache_ = NewLRUCache(100);
37     options_.env = &env_;
38     options_.block_cache = tiny_cache_;
39     dbname_ = test::TmpDir() + "/db_test";
40     DestroyDB(dbname_, options_);
41 
42     db_ = NULL;
43     options_.create_if_missing = true;
44     Reopen();
45     options_.create_if_missing = false;
46   }
47 
~CorruptionTest()48   ~CorruptionTest() {
49      delete db_;
50      DestroyDB(dbname_, Options());
51      delete tiny_cache_;
52   }
53 
TryReopen()54   Status TryReopen() {
55     delete db_;
56     db_ = NULL;
57     return DB::Open(options_, dbname_, &db_);
58   }
59 
Reopen()60   void Reopen() {
61     ASSERT_OK(TryReopen());
62   }
63 
RepairDB()64   void RepairDB() {
65     delete db_;
66     db_ = NULL;
67     ASSERT_OK(::leveldb::RepairDB(dbname_, options_));
68   }
69 
Build(int n)70   void Build(int n) {
71     std::string key_space, value_space;
72     WriteBatch batch;
73     for (int i = 0; i < n; i++) {
74       //if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n);
75       Slice key = Key(i, &key_space);
76       batch.Clear();
77       batch.Put(key, Value(i, &value_space));
78       WriteOptions options;
79       // Corrupt() doesn't work without this sync on windows; stat reports 0 for
80       // the file size.
81       if (i == n - 1) {
82         options.sync = true;
83       }
84       ASSERT_OK(db_->Write(options, &batch));
85     }
86   }
87 
Check(int min_expected,int max_expected)88   void Check(int min_expected, int max_expected) {
89     int next_expected = 0;
90     int missed = 0;
91     int bad_keys = 0;
92     int bad_values = 0;
93     int correct = 0;
94     std::string value_space;
95     Iterator* iter = db_->NewIterator(ReadOptions());
96     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
97       uint64_t key;
98       Slice in(iter->key());
99       if (in == "" || in == "~") {
100         // Ignore boundary keys.
101         continue;
102       }
103       if (!ConsumeDecimalNumber(&in, &key) ||
104           !in.empty() ||
105           key < next_expected) {
106         bad_keys++;
107         continue;
108       }
109       missed += (key - next_expected);
110       next_expected = key + 1;
111       if (iter->value() != Value(key, &value_space)) {
112         bad_values++;
113       } else {
114         correct++;
115       }
116     }
117     delete iter;
118 
119     fprintf(stderr,
120             "expected=%d..%d; got=%d; bad_keys=%d; bad_values=%d; missed=%d\n",
121             min_expected, max_expected, correct, bad_keys, bad_values, missed);
122     ASSERT_LE(min_expected, correct);
123     ASSERT_GE(max_expected, correct);
124   }
125 
Corrupt(FileType filetype,int offset,int bytes_to_corrupt)126   void Corrupt(FileType filetype, int offset, int bytes_to_corrupt) {
127     // Pick file to corrupt
128     std::vector<std::string> filenames;
129     ASSERT_OK(env_.GetChildren(dbname_, &filenames));
130     uint64_t number;
131     FileType type;
132     std::string fname;
133     int picked_number = -1;
134     for (size_t i = 0; i < filenames.size(); i++) {
135       if (ParseFileName(filenames[i], &number, &type) &&
136           type == filetype &&
137           int(number) > picked_number) {  // Pick latest file
138         fname = dbname_ + "/" + filenames[i];
139         picked_number = number;
140       }
141     }
142     ASSERT_TRUE(!fname.empty()) << filetype;
143 
144     struct stat sbuf;
145     if (stat(fname.c_str(), &sbuf) != 0) {
146       const char* msg = strerror(errno);
147       ASSERT_TRUE(false) << fname << ": " << msg;
148     }
149 
150     if (offset < 0) {
151       // Relative to end of file; make it absolute
152       if (-offset > sbuf.st_size) {
153         offset = 0;
154       } else {
155         offset = sbuf.st_size + offset;
156       }
157     }
158     if (offset > sbuf.st_size) {
159       offset = sbuf.st_size;
160     }
161     if (offset + bytes_to_corrupt > sbuf.st_size) {
162       bytes_to_corrupt = sbuf.st_size - offset;
163     }
164 
165     // Do it
166     std::string contents;
167     Status s = ReadFileToString(Env::Default(), fname, &contents);
168     ASSERT_TRUE(s.ok()) << s.ToString();
169     for (int i = 0; i < bytes_to_corrupt; i++) {
170       contents[i + offset] ^= 0x80;
171     }
172     s = WriteStringToFile(Env::Default(), contents, fname);
173     ASSERT_TRUE(s.ok()) << s.ToString();
174   }
175 
Property(const std::string & name)176   int Property(const std::string& name) {
177     std::string property;
178     int result;
179     if (db_->GetProperty(name, &property) &&
180         sscanf(property.c_str(), "%d", &result) == 1) {
181       return result;
182     } else {
183       return -1;
184     }
185   }
186 
187   // Return the ith key
Key(int i,std::string * storage)188   Slice Key(int i, std::string* storage) {
189     char buf[100];
190     snprintf(buf, sizeof(buf), "%016d", i);
191     storage->assign(buf, strlen(buf));
192     return Slice(*storage);
193   }
194 
195   // Return the value to associate with the specified key
Value(int k,std::string * storage)196   Slice Value(int k, std::string* storage) {
197     Random r(k);
198     return test::RandomString(&r, kValueSize, storage);
199   }
200 };
201 
TEST(CorruptionTest,Recovery)202 TEST(CorruptionTest, Recovery) {
203   Build(100);
204   Check(100, 100);
205   Corrupt(kLogFile, 19, 1);      // WriteBatch tag for first record
206   Corrupt(kLogFile, log::kBlockSize + 1000, 1);  // Somewhere in second block
207   Reopen();
208 
209   // The 64 records in the first two log blocks are completely lost.
210   Check(36, 36);
211 }
212 
TEST(CorruptionTest,RecoverWriteError)213 TEST(CorruptionTest, RecoverWriteError) {
214   env_.writable_file_error_ = true;
215   Status s = TryReopen();
216   ASSERT_TRUE(!s.ok());
217 }
218 
TEST(CorruptionTest,NewFileErrorDuringWrite)219 TEST(CorruptionTest, NewFileErrorDuringWrite) {
220   // Do enough writing to force minor compaction
221   env_.writable_file_error_ = true;
222   const int num = 3 + (Options().write_buffer_size / kValueSize);
223   std::string value_storage;
224   Status s;
225   for (int i = 0; s.ok() && i < num; i++) {
226     WriteBatch batch;
227     batch.Put("a", Value(100, &value_storage));
228     s = db_->Write(WriteOptions(), &batch);
229   }
230   ASSERT_TRUE(!s.ok());
231   ASSERT_GE(env_.num_writable_file_errors_, 1);
232   env_.writable_file_error_ = false;
233   Reopen();
234 }
235 
TEST(CorruptionTest,TableFile)236 TEST(CorruptionTest, TableFile) {
237   Build(100);
238   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
239   dbi->TEST_CompactMemTable();
240   dbi->TEST_CompactRange(0, NULL, NULL);
241   dbi->TEST_CompactRange(1, NULL, NULL);
242 
243   Corrupt(kTableFile, 100, 1);
244   Check(90, 99);
245 }
246 
TEST(CorruptionTest,TableFileRepair)247 TEST(CorruptionTest, TableFileRepair) {
248   options_.block_size = 2 * kValueSize;  // Limit scope of corruption
249   options_.paranoid_checks = true;
250   Reopen();
251   Build(100);
252   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
253   dbi->TEST_CompactMemTable();
254   dbi->TEST_CompactRange(0, NULL, NULL);
255   dbi->TEST_CompactRange(1, NULL, NULL);
256 
257   Corrupt(kTableFile, 100, 1);
258   RepairDB();
259   Reopen();
260   Check(95, 99);
261 }
262 
TEST(CorruptionTest,TableFileIndexData)263 TEST(CorruptionTest, TableFileIndexData) {
264   Build(10000);  // Enough to build multiple Tables
265   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
266   dbi->TEST_CompactMemTable();
267 
268   Corrupt(kTableFile, -2000, 500);
269   Reopen();
270   Check(5000, 9999);
271 }
272 
TEST(CorruptionTest,MissingDescriptor)273 TEST(CorruptionTest, MissingDescriptor) {
274   Build(1000);
275   RepairDB();
276   Reopen();
277   Check(1000, 1000);
278 }
279 
TEST(CorruptionTest,SequenceNumberRecovery)280 TEST(CorruptionTest, SequenceNumberRecovery) {
281   ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1"));
282   ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2"));
283   ASSERT_OK(db_->Put(WriteOptions(), "foo", "v3"));
284   ASSERT_OK(db_->Put(WriteOptions(), "foo", "v4"));
285   ASSERT_OK(db_->Put(WriteOptions(), "foo", "v5"));
286   RepairDB();
287   Reopen();
288   std::string v;
289   ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
290   ASSERT_EQ("v5", v);
291   // Write something.  If sequence number was not recovered properly,
292   // it will be hidden by an earlier write.
293   ASSERT_OK(db_->Put(WriteOptions(), "foo", "v6"));
294   ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
295   ASSERT_EQ("v6", v);
296   Reopen();
297   ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
298   ASSERT_EQ("v6", v);
299 }
300 
TEST(CorruptionTest,CorruptedDescriptor)301 TEST(CorruptionTest, CorruptedDescriptor) {
302   ASSERT_OK(db_->Put(WriteOptions(), "foo", "hello"));
303   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
304   dbi->TEST_CompactMemTable();
305   dbi->TEST_CompactRange(0, NULL, NULL);
306 
307   Corrupt(kDescriptorFile, 0, 1000);
308   Status s = TryReopen();
309   ASSERT_TRUE(!s.ok());
310 
311   RepairDB();
312   Reopen();
313   std::string v;
314   ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
315   ASSERT_EQ("hello", v);
316 }
317 
TEST(CorruptionTest,CompactionInputError)318 TEST(CorruptionTest, CompactionInputError) {
319   Build(10);
320   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
321   dbi->TEST_CompactMemTable();
322   const int last = config::kMaxMemCompactLevel;
323   ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last)));
324 
325   Corrupt(kTableFile, 100, 1);
326   Check(5, 9);
327 
328   // Force compactions by writing lots of values
329   Build(10000);
330   Check(10000, 10000);
331 }
332 
TEST(CorruptionTest,CompactionInputErrorParanoid)333 TEST(CorruptionTest, CompactionInputErrorParanoid) {
334   options_.paranoid_checks = true;
335   options_.write_buffer_size = 512 << 10;
336   Reopen();
337   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
338 
339   // Make multiple inputs so we need to compact.
340   for (int i = 0; i < 2; i++) {
341     Build(10);
342     dbi->TEST_CompactMemTable();
343     Corrupt(kTableFile, 100, 1);
344     env_.SleepForMicroseconds(100000);
345   }
346   dbi->CompactRange(NULL, NULL);
347 
348   // Write must fail because of corrupted table
349   std::string tmp1, tmp2;
350   Status s = db_->Put(WriteOptions(), Key(5, &tmp1), Value(5, &tmp2));
351   ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
352 }
353 
TEST(CorruptionTest,UnrelatedKeys)354 TEST(CorruptionTest, UnrelatedKeys) {
355   Build(10);
356   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
357   dbi->TEST_CompactMemTable();
358   Corrupt(kTableFile, 100, 1);
359 
360   std::string tmp1, tmp2;
361   ASSERT_OK(db_->Put(WriteOptions(), Key(1000, &tmp1), Value(1000, &tmp2)));
362   std::string v;
363   ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
364   ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
365   dbi->TEST_CompactMemTable();
366   ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
367   ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
368 }
369 
370 }  // namespace leveldb
371 
main(int argc,char ** argv)372 int main(int argc, char** argv) {
373   return leveldb::test::RunAllTests();
374 }
375