1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include "db/log_reader.h"
6 #include "db/log_writer.h"
7 #include "leveldb/env.h"
8 #include "util/coding.h"
9 #include "util/crc32c.h"
10 #include "util/random.h"
11 #include "util/testharness.h"
12
13 namespace leveldb {
14 namespace log {
15
16 // Construct a string of the specified length made out of the supplied
17 // partial string.
BigString(const std::string & partial_string,size_t n)18 static std::string BigString(const std::string& partial_string, size_t n) {
19 std::string result;
20 while (result.size() < n) {
21 result.append(partial_string);
22 }
23 result.resize(n);
24 return result;
25 }
26
27 // Construct a string from a number
NumberString(int n)28 static std::string NumberString(int n) {
29 char buf[50];
30 snprintf(buf, sizeof(buf), "%d.", n);
31 return std::string(buf);
32 }
33
34 // Return a skewed potentially long string
RandomSkewedString(int i,Random * rnd)35 static std::string RandomSkewedString(int i, Random* rnd) {
36 return BigString(NumberString(i), rnd->Skewed(17));
37 }
38
39 class LogTest {
40 private:
41 class StringDest : public WritableFile {
42 public:
43 std::string contents_;
44
Close()45 virtual Status Close() { return Status::OK(); }
Flush()46 virtual Status Flush() { return Status::OK(); }
Sync()47 virtual Status Sync() { return Status::OK(); }
Append(const Slice & slice)48 virtual Status Append(const Slice& slice) {
49 contents_.append(slice.data(), slice.size());
50 return Status::OK();
51 }
52 };
53
54 class StringSource : public SequentialFile {
55 public:
56 Slice contents_;
57 bool force_error_;
58 bool returned_partial_;
StringSource()59 StringSource() : force_error_(false), returned_partial_(false) { }
60
Read(size_t n,Slice * result,char * scratch)61 virtual Status Read(size_t n, Slice* result, char* scratch) {
62 ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
63
64 if (force_error_) {
65 force_error_ = false;
66 returned_partial_ = true;
67 return Status::Corruption("read error");
68 }
69
70 if (contents_.size() < n) {
71 n = contents_.size();
72 returned_partial_ = true;
73 }
74 *result = Slice(contents_.data(), n);
75 contents_.remove_prefix(n);
76 return Status::OK();
77 }
78
Skip(uint64_t n)79 virtual Status Skip(uint64_t n) {
80 if (n > contents_.size()) {
81 contents_.clear();
82 return Status::NotFound("in-memory file skipepd past end");
83 }
84
85 contents_.remove_prefix(n);
86
87 return Status::OK();
88 }
89 };
90
91 class ReportCollector : public Reader::Reporter {
92 public:
93 size_t dropped_bytes_;
94 std::string message_;
95
ReportCollector()96 ReportCollector() : dropped_bytes_(0) { }
Corruption(size_t bytes,const Status & status)97 virtual void Corruption(size_t bytes, const Status& status) {
98 dropped_bytes_ += bytes;
99 message_.append(status.ToString());
100 }
101 };
102
103 StringDest dest_;
104 StringSource source_;
105 ReportCollector report_;
106 bool reading_;
107 Writer writer_;
108 Reader reader_;
109
110 // Record metadata for testing initial offset functionality
111 static size_t initial_offset_record_sizes_[];
112 static uint64_t initial_offset_last_record_offsets_[];
113
114 public:
LogTest()115 LogTest() : reading_(false),
116 writer_(&dest_),
117 reader_(&source_, &report_, true/*checksum*/,
118 0/*initial_offset*/) {
119 }
120
Write(const std::string & msg)121 void Write(const std::string& msg) {
122 ASSERT_TRUE(!reading_) << "Write() after starting to read";
123 writer_.AddRecord(Slice(msg));
124 }
125
WrittenBytes() const126 size_t WrittenBytes() const {
127 return dest_.contents_.size();
128 }
129
Read()130 std::string Read() {
131 if (!reading_) {
132 reading_ = true;
133 source_.contents_ = Slice(dest_.contents_);
134 }
135 std::string scratch;
136 Slice record;
137 if (reader_.ReadRecord(&record, &scratch)) {
138 return record.ToString();
139 } else {
140 return "EOF";
141 }
142 }
143
IncrementByte(int offset,int delta)144 void IncrementByte(int offset, int delta) {
145 dest_.contents_[offset] += delta;
146 }
147
SetByte(int offset,char new_byte)148 void SetByte(int offset, char new_byte) {
149 dest_.contents_[offset] = new_byte;
150 }
151
ShrinkSize(int bytes)152 void ShrinkSize(int bytes) {
153 dest_.contents_.resize(dest_.contents_.size() - bytes);
154 }
155
FixChecksum(int header_offset,int len)156 void FixChecksum(int header_offset, int len) {
157 // Compute crc of type/len/data
158 uint32_t crc = crc32c::Value(&dest_.contents_[header_offset+6], 1 + len);
159 crc = crc32c::Mask(crc);
160 EncodeFixed32(&dest_.contents_[header_offset], crc);
161 }
162
ForceError()163 void ForceError() {
164 source_.force_error_ = true;
165 }
166
DroppedBytes() const167 size_t DroppedBytes() const {
168 return report_.dropped_bytes_;
169 }
170
ReportMessage() const171 std::string ReportMessage() const {
172 return report_.message_;
173 }
174
175 // Returns OK iff recorded error message contains "msg"
MatchError(const std::string & msg) const176 std::string MatchError(const std::string& msg) const {
177 if (report_.message_.find(msg) == std::string::npos) {
178 return report_.message_;
179 } else {
180 return "OK";
181 }
182 }
183
WriteInitialOffsetLog()184 void WriteInitialOffsetLog() {
185 for (int i = 0; i < 4; i++) {
186 std::string record(initial_offset_record_sizes_[i],
187 static_cast<char>('a' + i));
188 Write(record);
189 }
190 }
191
CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end)192 void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
193 WriteInitialOffsetLog();
194 reading_ = true;
195 source_.contents_ = Slice(dest_.contents_);
196 Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
197 WrittenBytes() + offset_past_end);
198 Slice record;
199 std::string scratch;
200 ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
201 delete offset_reader;
202 }
203
CheckInitialOffsetRecord(uint64_t initial_offset,int expected_record_offset)204 void CheckInitialOffsetRecord(uint64_t initial_offset,
205 int expected_record_offset) {
206 WriteInitialOffsetLog();
207 reading_ = true;
208 source_.contents_ = Slice(dest_.contents_);
209 Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
210 initial_offset);
211 Slice record;
212 std::string scratch;
213 ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
214 ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
215 record.size());
216 ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
217 offset_reader->LastRecordOffset());
218 ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
219 delete offset_reader;
220 }
221
222 };
223
224 size_t LogTest::initial_offset_record_sizes_[] =
225 {10000, // Two sizable records in first block
226 10000,
227 2 * log::kBlockSize - 1000, // Span three blocks
228 1};
229
230 uint64_t LogTest::initial_offset_last_record_offsets_[] =
231 {0,
232 kHeaderSize + 10000,
233 2 * (kHeaderSize + 10000),
234 2 * (kHeaderSize + 10000) +
235 (2 * log::kBlockSize - 1000) + 3 * kHeaderSize};
236
237
TEST(LogTest,Empty)238 TEST(LogTest, Empty) {
239 ASSERT_EQ("EOF", Read());
240 }
241
TEST(LogTest,ReadWrite)242 TEST(LogTest, ReadWrite) {
243 Write("foo");
244 Write("bar");
245 Write("");
246 Write("xxxx");
247 ASSERT_EQ("foo", Read());
248 ASSERT_EQ("bar", Read());
249 ASSERT_EQ("", Read());
250 ASSERT_EQ("xxxx", Read());
251 ASSERT_EQ("EOF", Read());
252 ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
253 }
254
TEST(LogTest,ManyBlocks)255 TEST(LogTest, ManyBlocks) {
256 for (int i = 0; i < 100000; i++) {
257 Write(NumberString(i));
258 }
259 for (int i = 0; i < 100000; i++) {
260 ASSERT_EQ(NumberString(i), Read());
261 }
262 ASSERT_EQ("EOF", Read());
263 }
264
TEST(LogTest,Fragmentation)265 TEST(LogTest, Fragmentation) {
266 Write("small");
267 Write(BigString("medium", 50000));
268 Write(BigString("large", 100000));
269 ASSERT_EQ("small", Read());
270 ASSERT_EQ(BigString("medium", 50000), Read());
271 ASSERT_EQ(BigString("large", 100000), Read());
272 ASSERT_EQ("EOF", Read());
273 }
274
TEST(LogTest,MarginalTrailer)275 TEST(LogTest, MarginalTrailer) {
276 // Make a trailer that is exactly the same length as an empty record.
277 const int n = kBlockSize - 2*kHeaderSize;
278 Write(BigString("foo", n));
279 ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
280 Write("");
281 Write("bar");
282 ASSERT_EQ(BigString("foo", n), Read());
283 ASSERT_EQ("", Read());
284 ASSERT_EQ("bar", Read());
285 ASSERT_EQ("EOF", Read());
286 }
287
TEST(LogTest,MarginalTrailer2)288 TEST(LogTest, MarginalTrailer2) {
289 // Make a trailer that is exactly the same length as an empty record.
290 const int n = kBlockSize - 2*kHeaderSize;
291 Write(BigString("foo", n));
292 ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
293 Write("bar");
294 ASSERT_EQ(BigString("foo", n), Read());
295 ASSERT_EQ("bar", Read());
296 ASSERT_EQ("EOF", Read());
297 ASSERT_EQ(0, DroppedBytes());
298 ASSERT_EQ("", ReportMessage());
299 }
300
TEST(LogTest,ShortTrailer)301 TEST(LogTest, ShortTrailer) {
302 const int n = kBlockSize - 2*kHeaderSize + 4;
303 Write(BigString("foo", n));
304 ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
305 Write("");
306 Write("bar");
307 ASSERT_EQ(BigString("foo", n), Read());
308 ASSERT_EQ("", Read());
309 ASSERT_EQ("bar", Read());
310 ASSERT_EQ("EOF", Read());
311 }
312
TEST(LogTest,AlignedEof)313 TEST(LogTest, AlignedEof) {
314 const int n = kBlockSize - 2*kHeaderSize + 4;
315 Write(BigString("foo", n));
316 ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
317 ASSERT_EQ(BigString("foo", n), Read());
318 ASSERT_EQ("EOF", Read());
319 }
320
TEST(LogTest,RandomRead)321 TEST(LogTest, RandomRead) {
322 const int N = 500;
323 Random write_rnd(301);
324 for (int i = 0; i < N; i++) {
325 Write(RandomSkewedString(i, &write_rnd));
326 }
327 Random read_rnd(301);
328 for (int i = 0; i < N; i++) {
329 ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
330 }
331 ASSERT_EQ("EOF", Read());
332 }
333
334 // Tests of all the error paths in log_reader.cc follow:
335
TEST(LogTest,ReadError)336 TEST(LogTest, ReadError) {
337 Write("foo");
338 ForceError();
339 ASSERT_EQ("EOF", Read());
340 ASSERT_EQ(kBlockSize, DroppedBytes());
341 ASSERT_EQ("OK", MatchError("read error"));
342 }
343
TEST(LogTest,BadRecordType)344 TEST(LogTest, BadRecordType) {
345 Write("foo");
346 // Type is stored in header[6]
347 IncrementByte(6, 100);
348 FixChecksum(0, 3);
349 ASSERT_EQ("EOF", Read());
350 ASSERT_EQ(3, DroppedBytes());
351 ASSERT_EQ("OK", MatchError("unknown record type"));
352 }
353
TEST(LogTest,TruncatedTrailingRecordIsIgnored)354 TEST(LogTest, TruncatedTrailingRecordIsIgnored) {
355 Write("foo");
356 ShrinkSize(4); // Drop all payload as well as a header byte
357 ASSERT_EQ("EOF", Read());
358 // Truncated last record is ignored, not treated as an error.
359 ASSERT_EQ(0, DroppedBytes());
360 ASSERT_EQ("", ReportMessage());
361 }
362
TEST(LogTest,BadLength)363 TEST(LogTest, BadLength) {
364 const int kPayloadSize = kBlockSize - kHeaderSize;
365 Write(BigString("bar", kPayloadSize));
366 Write("foo");
367 // Least significant size byte is stored in header[4].
368 IncrementByte(4, 1);
369 ASSERT_EQ("foo", Read());
370 ASSERT_EQ(kBlockSize, DroppedBytes());
371 ASSERT_EQ("OK", MatchError("bad record length"));
372 }
373
TEST(LogTest,BadLengthAtEndIsIgnored)374 TEST(LogTest, BadLengthAtEndIsIgnored) {
375 Write("foo");
376 ShrinkSize(1);
377 ASSERT_EQ("EOF", Read());
378 ASSERT_EQ(0, DroppedBytes());
379 ASSERT_EQ("", ReportMessage());
380 }
381
TEST(LogTest,ChecksumMismatch)382 TEST(LogTest, ChecksumMismatch) {
383 Write("foo");
384 IncrementByte(0, 10);
385 ASSERT_EQ("EOF", Read());
386 ASSERT_EQ(10, DroppedBytes());
387 ASSERT_EQ("OK", MatchError("checksum mismatch"));
388 }
389
TEST(LogTest,UnexpectedMiddleType)390 TEST(LogTest, UnexpectedMiddleType) {
391 Write("foo");
392 SetByte(6, kMiddleType);
393 FixChecksum(0, 3);
394 ASSERT_EQ("EOF", Read());
395 ASSERT_EQ(3, DroppedBytes());
396 ASSERT_EQ("OK", MatchError("missing start"));
397 }
398
TEST(LogTest,UnexpectedLastType)399 TEST(LogTest, UnexpectedLastType) {
400 Write("foo");
401 SetByte(6, kLastType);
402 FixChecksum(0, 3);
403 ASSERT_EQ("EOF", Read());
404 ASSERT_EQ(3, DroppedBytes());
405 ASSERT_EQ("OK", MatchError("missing start"));
406 }
407
TEST(LogTest,UnexpectedFullType)408 TEST(LogTest, UnexpectedFullType) {
409 Write("foo");
410 Write("bar");
411 SetByte(6, kFirstType);
412 FixChecksum(0, 3);
413 ASSERT_EQ("bar", Read());
414 ASSERT_EQ("EOF", Read());
415 ASSERT_EQ(3, DroppedBytes());
416 ASSERT_EQ("OK", MatchError("partial record without end"));
417 }
418
TEST(LogTest,UnexpectedFirstType)419 TEST(LogTest, UnexpectedFirstType) {
420 Write("foo");
421 Write(BigString("bar", 100000));
422 SetByte(6, kFirstType);
423 FixChecksum(0, 3);
424 ASSERT_EQ(BigString("bar", 100000), Read());
425 ASSERT_EQ("EOF", Read());
426 ASSERT_EQ(3, DroppedBytes());
427 ASSERT_EQ("OK", MatchError("partial record without end"));
428 }
429
TEST(LogTest,MissingLastIsIgnored)430 TEST(LogTest, MissingLastIsIgnored) {
431 Write(BigString("bar", kBlockSize));
432 // Remove the LAST block, including header.
433 ShrinkSize(14);
434 ASSERT_EQ("EOF", Read());
435 ASSERT_EQ("", ReportMessage());
436 ASSERT_EQ(0, DroppedBytes());
437 }
438
TEST(LogTest,PartialLastIsIgnored)439 TEST(LogTest, PartialLastIsIgnored) {
440 Write(BigString("bar", kBlockSize));
441 // Cause a bad record length in the LAST block.
442 ShrinkSize(1);
443 ASSERT_EQ("EOF", Read());
444 ASSERT_EQ("", ReportMessage());
445 ASSERT_EQ(0, DroppedBytes());
446 }
447
TEST(LogTest,ErrorJoinsRecords)448 TEST(LogTest, ErrorJoinsRecords) {
449 // Consider two fragmented records:
450 // first(R1) last(R1) first(R2) last(R2)
451 // where the middle two fragments disappear. We do not want
452 // first(R1),last(R2) to get joined and returned as a valid record.
453
454 // Write records that span two blocks
455 Write(BigString("foo", kBlockSize));
456 Write(BigString("bar", kBlockSize));
457 Write("correct");
458
459 // Wipe the middle block
460 for (int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
461 SetByte(offset, 'x');
462 }
463
464 ASSERT_EQ("correct", Read());
465 ASSERT_EQ("EOF", Read());
466 const int dropped = DroppedBytes();
467 ASSERT_LE(dropped, 2*kBlockSize + 100);
468 ASSERT_GE(dropped, 2*kBlockSize);
469 }
470
TEST(LogTest,ReadStart)471 TEST(LogTest, ReadStart) {
472 CheckInitialOffsetRecord(0, 0);
473 }
474
TEST(LogTest,ReadSecondOneOff)475 TEST(LogTest, ReadSecondOneOff) {
476 CheckInitialOffsetRecord(1, 1);
477 }
478
TEST(LogTest,ReadSecondTenThousand)479 TEST(LogTest, ReadSecondTenThousand) {
480 CheckInitialOffsetRecord(10000, 1);
481 }
482
TEST(LogTest,ReadSecondStart)483 TEST(LogTest, ReadSecondStart) {
484 CheckInitialOffsetRecord(10007, 1);
485 }
486
TEST(LogTest,ReadThirdOneOff)487 TEST(LogTest, ReadThirdOneOff) {
488 CheckInitialOffsetRecord(10008, 2);
489 }
490
TEST(LogTest,ReadThirdStart)491 TEST(LogTest, ReadThirdStart) {
492 CheckInitialOffsetRecord(20014, 2);
493 }
494
TEST(LogTest,ReadFourthOneOff)495 TEST(LogTest, ReadFourthOneOff) {
496 CheckInitialOffsetRecord(20015, 3);
497 }
498
TEST(LogTest,ReadFourthFirstBlockTrailer)499 TEST(LogTest, ReadFourthFirstBlockTrailer) {
500 CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
501 }
502
TEST(LogTest,ReadFourthMiddleBlock)503 TEST(LogTest, ReadFourthMiddleBlock) {
504 CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
505 }
506
TEST(LogTest,ReadFourthLastBlock)507 TEST(LogTest, ReadFourthLastBlock) {
508 CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
509 }
510
TEST(LogTest,ReadFourthStart)511 TEST(LogTest, ReadFourthStart) {
512 CheckInitialOffsetRecord(
513 2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
514 3);
515 }
516
TEST(LogTest,ReadEnd)517 TEST(LogTest, ReadEnd) {
518 CheckOffsetPastEndReturnsNoRecords(0);
519 }
520
TEST(LogTest,ReadPastEnd)521 TEST(LogTest, ReadPastEnd) {
522 CheckOffsetPastEndReturnsNoRecords(5);
523 }
524
525 } // namespace log
526 } // namespace leveldb
527
main(int argc,char ** argv)528 int main(int argc, char** argv) {
529 return leveldb::test::RunAllTests();
530 }
531