• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "db/log_reader.h"
6 #include "db/log_writer.h"
7 #include "leveldb/env.h"
8 #include "util/coding.h"
9 #include "util/crc32c.h"
10 #include "util/random.h"
11 #include "util/testharness.h"
12 
13 namespace leveldb {
14 namespace log {
15 
16 // Construct a string of the specified length made out of the supplied
17 // partial string.
BigString(const std::string & partial_string,size_t n)18 static std::string BigString(const std::string& partial_string, size_t n) {
19   std::string result;
20   while (result.size() < n) {
21     result.append(partial_string);
22   }
23   result.resize(n);
24   return result;
25 }
26 
27 // Construct a string from a number
NumberString(int n)28 static std::string NumberString(int n) {
29   char buf[50];
30   snprintf(buf, sizeof(buf), "%d.", n);
31   return std::string(buf);
32 }
33 
34 // Return a skewed potentially long string
RandomSkewedString(int i,Random * rnd)35 static std::string RandomSkewedString(int i, Random* rnd) {
36   return BigString(NumberString(i), rnd->Skewed(17));
37 }
38 
39 class LogTest {
40  private:
41   class StringDest : public WritableFile {
42    public:
43     std::string contents_;
44 
Close()45     virtual Status Close() { return Status::OK(); }
Flush()46     virtual Status Flush() { return Status::OK(); }
Sync()47     virtual Status Sync() { return Status::OK(); }
Append(const Slice & slice)48     virtual Status Append(const Slice& slice) {
49       contents_.append(slice.data(), slice.size());
50       return Status::OK();
51     }
52   };
53 
54   class StringSource : public SequentialFile {
55    public:
56     Slice contents_;
57     bool force_error_;
58     bool returned_partial_;
StringSource()59     StringSource() : force_error_(false), returned_partial_(false) { }
60 
Read(size_t n,Slice * result,char * scratch)61     virtual Status Read(size_t n, Slice* result, char* scratch) {
62       ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
63 
64       if (force_error_) {
65         force_error_ = false;
66         returned_partial_ = true;
67         return Status::Corruption("read error");
68       }
69 
70       if (contents_.size() < n) {
71         n = contents_.size();
72         returned_partial_ = true;
73       }
74       *result = Slice(contents_.data(), n);
75       contents_.remove_prefix(n);
76       return Status::OK();
77     }
78 
Skip(uint64_t n)79     virtual Status Skip(uint64_t n) {
80       if (n > contents_.size()) {
81         contents_.clear();
82         return Status::NotFound("in-memory file skipepd past end");
83       }
84 
85       contents_.remove_prefix(n);
86 
87       return Status::OK();
88     }
89   };
90 
91   class ReportCollector : public Reader::Reporter {
92    public:
93     size_t dropped_bytes_;
94     std::string message_;
95 
ReportCollector()96     ReportCollector() : dropped_bytes_(0) { }
Corruption(size_t bytes,const Status & status)97     virtual void Corruption(size_t bytes, const Status& status) {
98       dropped_bytes_ += bytes;
99       message_.append(status.ToString());
100     }
101   };
102 
103   StringDest dest_;
104   StringSource source_;
105   ReportCollector report_;
106   bool reading_;
107   Writer writer_;
108   Reader reader_;
109 
110   // Record metadata for testing initial offset functionality
111   static size_t initial_offset_record_sizes_[];
112   static uint64_t initial_offset_last_record_offsets_[];
113 
114  public:
LogTest()115   LogTest() : reading_(false),
116               writer_(&dest_),
117               reader_(&source_, &report_, true/*checksum*/,
118                       0/*initial_offset*/) {
119   }
120 
Write(const std::string & msg)121   void Write(const std::string& msg) {
122     ASSERT_TRUE(!reading_) << "Write() after starting to read";
123     writer_.AddRecord(Slice(msg));
124   }
125 
WrittenBytes() const126   size_t WrittenBytes() const {
127     return dest_.contents_.size();
128   }
129 
Read()130   std::string Read() {
131     if (!reading_) {
132       reading_ = true;
133       source_.contents_ = Slice(dest_.contents_);
134     }
135     std::string scratch;
136     Slice record;
137     if (reader_.ReadRecord(&record, &scratch)) {
138       return record.ToString();
139     } else {
140       return "EOF";
141     }
142   }
143 
IncrementByte(int offset,int delta)144   void IncrementByte(int offset, int delta) {
145     dest_.contents_[offset] += delta;
146   }
147 
SetByte(int offset,char new_byte)148   void SetByte(int offset, char new_byte) {
149     dest_.contents_[offset] = new_byte;
150   }
151 
ShrinkSize(int bytes)152   void ShrinkSize(int bytes) {
153     dest_.contents_.resize(dest_.contents_.size() - bytes);
154   }
155 
FixChecksum(int header_offset,int len)156   void FixChecksum(int header_offset, int len) {
157     // Compute crc of type/len/data
158     uint32_t crc = crc32c::Value(&dest_.contents_[header_offset+6], 1 + len);
159     crc = crc32c::Mask(crc);
160     EncodeFixed32(&dest_.contents_[header_offset], crc);
161   }
162 
ForceError()163   void ForceError() {
164     source_.force_error_ = true;
165   }
166 
DroppedBytes() const167   size_t DroppedBytes() const {
168     return report_.dropped_bytes_;
169   }
170 
ReportMessage() const171   std::string ReportMessage() const {
172     return report_.message_;
173   }
174 
175   // Returns OK iff recorded error message contains "msg"
MatchError(const std::string & msg) const176   std::string MatchError(const std::string& msg) const {
177     if (report_.message_.find(msg) == std::string::npos) {
178       return report_.message_;
179     } else {
180       return "OK";
181     }
182   }
183 
WriteInitialOffsetLog()184   void WriteInitialOffsetLog() {
185     for (int i = 0; i < 4; i++) {
186       std::string record(initial_offset_record_sizes_[i],
187                          static_cast<char>('a' + i));
188       Write(record);
189     }
190   }
191 
CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end)192   void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
193     WriteInitialOffsetLog();
194     reading_ = true;
195     source_.contents_ = Slice(dest_.contents_);
196     Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
197                                        WrittenBytes() + offset_past_end);
198     Slice record;
199     std::string scratch;
200     ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
201     delete offset_reader;
202   }
203 
CheckInitialOffsetRecord(uint64_t initial_offset,int expected_record_offset)204   void CheckInitialOffsetRecord(uint64_t initial_offset,
205                                 int expected_record_offset) {
206     WriteInitialOffsetLog();
207     reading_ = true;
208     source_.contents_ = Slice(dest_.contents_);
209     Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
210                                        initial_offset);
211     Slice record;
212     std::string scratch;
213     ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
214     ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
215               record.size());
216     ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
217               offset_reader->LastRecordOffset());
218     ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
219     delete offset_reader;
220   }
221 
222 };
223 
224 size_t LogTest::initial_offset_record_sizes_[] =
225     {10000,  // Two sizable records in first block
226      10000,
227      2 * log::kBlockSize - 1000,  // Span three blocks
228      1};
229 
230 uint64_t LogTest::initial_offset_last_record_offsets_[] =
231     {0,
232      kHeaderSize + 10000,
233      2 * (kHeaderSize + 10000),
234      2 * (kHeaderSize + 10000) +
235          (2 * log::kBlockSize - 1000) + 3 * kHeaderSize};
236 
237 
TEST(LogTest,Empty)238 TEST(LogTest, Empty) {
239   ASSERT_EQ("EOF", Read());
240 }
241 
TEST(LogTest,ReadWrite)242 TEST(LogTest, ReadWrite) {
243   Write("foo");
244   Write("bar");
245   Write("");
246   Write("xxxx");
247   ASSERT_EQ("foo", Read());
248   ASSERT_EQ("bar", Read());
249   ASSERT_EQ("", Read());
250   ASSERT_EQ("xxxx", Read());
251   ASSERT_EQ("EOF", Read());
252   ASSERT_EQ("EOF", Read());  // Make sure reads at eof work
253 }
254 
TEST(LogTest,ManyBlocks)255 TEST(LogTest, ManyBlocks) {
256   for (int i = 0; i < 100000; i++) {
257     Write(NumberString(i));
258   }
259   for (int i = 0; i < 100000; i++) {
260     ASSERT_EQ(NumberString(i), Read());
261   }
262   ASSERT_EQ("EOF", Read());
263 }
264 
TEST(LogTest,Fragmentation)265 TEST(LogTest, Fragmentation) {
266   Write("small");
267   Write(BigString("medium", 50000));
268   Write(BigString("large", 100000));
269   ASSERT_EQ("small", Read());
270   ASSERT_EQ(BigString("medium", 50000), Read());
271   ASSERT_EQ(BigString("large", 100000), Read());
272   ASSERT_EQ("EOF", Read());
273 }
274 
TEST(LogTest,MarginalTrailer)275 TEST(LogTest, MarginalTrailer) {
276   // Make a trailer that is exactly the same length as an empty record.
277   const int n = kBlockSize - 2*kHeaderSize;
278   Write(BigString("foo", n));
279   ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
280   Write("");
281   Write("bar");
282   ASSERT_EQ(BigString("foo", n), Read());
283   ASSERT_EQ("", Read());
284   ASSERT_EQ("bar", Read());
285   ASSERT_EQ("EOF", Read());
286 }
287 
TEST(LogTest,MarginalTrailer2)288 TEST(LogTest, MarginalTrailer2) {
289   // Make a trailer that is exactly the same length as an empty record.
290   const int n = kBlockSize - 2*kHeaderSize;
291   Write(BigString("foo", n));
292   ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
293   Write("bar");
294   ASSERT_EQ(BigString("foo", n), Read());
295   ASSERT_EQ("bar", Read());
296   ASSERT_EQ("EOF", Read());
297   ASSERT_EQ(0, DroppedBytes());
298   ASSERT_EQ("", ReportMessage());
299 }
300 
TEST(LogTest,ShortTrailer)301 TEST(LogTest, ShortTrailer) {
302   const int n = kBlockSize - 2*kHeaderSize + 4;
303   Write(BigString("foo", n));
304   ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
305   Write("");
306   Write("bar");
307   ASSERT_EQ(BigString("foo", n), Read());
308   ASSERT_EQ("", Read());
309   ASSERT_EQ("bar", Read());
310   ASSERT_EQ("EOF", Read());
311 }
312 
TEST(LogTest,AlignedEof)313 TEST(LogTest, AlignedEof) {
314   const int n = kBlockSize - 2*kHeaderSize + 4;
315   Write(BigString("foo", n));
316   ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
317   ASSERT_EQ(BigString("foo", n), Read());
318   ASSERT_EQ("EOF", Read());
319 }
320 
TEST(LogTest,RandomRead)321 TEST(LogTest, RandomRead) {
322   const int N = 500;
323   Random write_rnd(301);
324   for (int i = 0; i < N; i++) {
325     Write(RandomSkewedString(i, &write_rnd));
326   }
327   Random read_rnd(301);
328   for (int i = 0; i < N; i++) {
329     ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
330   }
331   ASSERT_EQ("EOF", Read());
332 }
333 
334 // Tests of all the error paths in log_reader.cc follow:
335 
TEST(LogTest,ReadError)336 TEST(LogTest, ReadError) {
337   Write("foo");
338   ForceError();
339   ASSERT_EQ("EOF", Read());
340   ASSERT_EQ(kBlockSize, DroppedBytes());
341   ASSERT_EQ("OK", MatchError("read error"));
342 }
343 
TEST(LogTest,BadRecordType)344 TEST(LogTest, BadRecordType) {
345   Write("foo");
346   // Type is stored in header[6]
347   IncrementByte(6, 100);
348   FixChecksum(0, 3);
349   ASSERT_EQ("EOF", Read());
350   ASSERT_EQ(3, DroppedBytes());
351   ASSERT_EQ("OK", MatchError("unknown record type"));
352 }
353 
TEST(LogTest,TruncatedTrailingRecordIsIgnored)354 TEST(LogTest, TruncatedTrailingRecordIsIgnored) {
355   Write("foo");
356   ShrinkSize(4);   // Drop all payload as well as a header byte
357   ASSERT_EQ("EOF", Read());
358   // Truncated last record is ignored, not treated as an error.
359   ASSERT_EQ(0, DroppedBytes());
360   ASSERT_EQ("", ReportMessage());
361 }
362 
TEST(LogTest,BadLength)363 TEST(LogTest, BadLength) {
364   const int kPayloadSize = kBlockSize - kHeaderSize;
365   Write(BigString("bar", kPayloadSize));
366   Write("foo");
367   // Least significant size byte is stored in header[4].
368   IncrementByte(4, 1);
369   ASSERT_EQ("foo", Read());
370   ASSERT_EQ(kBlockSize, DroppedBytes());
371   ASSERT_EQ("OK", MatchError("bad record length"));
372 }
373 
TEST(LogTest,BadLengthAtEndIsIgnored)374 TEST(LogTest, BadLengthAtEndIsIgnored) {
375   Write("foo");
376   ShrinkSize(1);
377   ASSERT_EQ("EOF", Read());
378   ASSERT_EQ(0, DroppedBytes());
379   ASSERT_EQ("", ReportMessage());
380 }
381 
TEST(LogTest,ChecksumMismatch)382 TEST(LogTest, ChecksumMismatch) {
383   Write("foo");
384   IncrementByte(0, 10);
385   ASSERT_EQ("EOF", Read());
386   ASSERT_EQ(10, DroppedBytes());
387   ASSERT_EQ("OK", MatchError("checksum mismatch"));
388 }
389 
TEST(LogTest,UnexpectedMiddleType)390 TEST(LogTest, UnexpectedMiddleType) {
391   Write("foo");
392   SetByte(6, kMiddleType);
393   FixChecksum(0, 3);
394   ASSERT_EQ("EOF", Read());
395   ASSERT_EQ(3, DroppedBytes());
396   ASSERT_EQ("OK", MatchError("missing start"));
397 }
398 
TEST(LogTest,UnexpectedLastType)399 TEST(LogTest, UnexpectedLastType) {
400   Write("foo");
401   SetByte(6, kLastType);
402   FixChecksum(0, 3);
403   ASSERT_EQ("EOF", Read());
404   ASSERT_EQ(3, DroppedBytes());
405   ASSERT_EQ("OK", MatchError("missing start"));
406 }
407 
TEST(LogTest,UnexpectedFullType)408 TEST(LogTest, UnexpectedFullType) {
409   Write("foo");
410   Write("bar");
411   SetByte(6, kFirstType);
412   FixChecksum(0, 3);
413   ASSERT_EQ("bar", Read());
414   ASSERT_EQ("EOF", Read());
415   ASSERT_EQ(3, DroppedBytes());
416   ASSERT_EQ("OK", MatchError("partial record without end"));
417 }
418 
TEST(LogTest,UnexpectedFirstType)419 TEST(LogTest, UnexpectedFirstType) {
420   Write("foo");
421   Write(BigString("bar", 100000));
422   SetByte(6, kFirstType);
423   FixChecksum(0, 3);
424   ASSERT_EQ(BigString("bar", 100000), Read());
425   ASSERT_EQ("EOF", Read());
426   ASSERT_EQ(3, DroppedBytes());
427   ASSERT_EQ("OK", MatchError("partial record without end"));
428 }
429 
TEST(LogTest,MissingLastIsIgnored)430 TEST(LogTest, MissingLastIsIgnored) {
431   Write(BigString("bar", kBlockSize));
432   // Remove the LAST block, including header.
433   ShrinkSize(14);
434   ASSERT_EQ("EOF", Read());
435   ASSERT_EQ("", ReportMessage());
436   ASSERT_EQ(0, DroppedBytes());
437 }
438 
TEST(LogTest,PartialLastIsIgnored)439 TEST(LogTest, PartialLastIsIgnored) {
440   Write(BigString("bar", kBlockSize));
441   // Cause a bad record length in the LAST block.
442   ShrinkSize(1);
443   ASSERT_EQ("EOF", Read());
444   ASSERT_EQ("", ReportMessage());
445   ASSERT_EQ(0, DroppedBytes());
446 }
447 
TEST(LogTest,ErrorJoinsRecords)448 TEST(LogTest, ErrorJoinsRecords) {
449   // Consider two fragmented records:
450   //    first(R1) last(R1) first(R2) last(R2)
451   // where the middle two fragments disappear.  We do not want
452   // first(R1),last(R2) to get joined and returned as a valid record.
453 
454   // Write records that span two blocks
455   Write(BigString("foo", kBlockSize));
456   Write(BigString("bar", kBlockSize));
457   Write("correct");
458 
459   // Wipe the middle block
460   for (int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
461     SetByte(offset, 'x');
462   }
463 
464   ASSERT_EQ("correct", Read());
465   ASSERT_EQ("EOF", Read());
466   const int dropped = DroppedBytes();
467   ASSERT_LE(dropped, 2*kBlockSize + 100);
468   ASSERT_GE(dropped, 2*kBlockSize);
469 }
470 
TEST(LogTest,ReadStart)471 TEST(LogTest, ReadStart) {
472   CheckInitialOffsetRecord(0, 0);
473 }
474 
TEST(LogTest,ReadSecondOneOff)475 TEST(LogTest, ReadSecondOneOff) {
476   CheckInitialOffsetRecord(1, 1);
477 }
478 
TEST(LogTest,ReadSecondTenThousand)479 TEST(LogTest, ReadSecondTenThousand) {
480   CheckInitialOffsetRecord(10000, 1);
481 }
482 
TEST(LogTest,ReadSecondStart)483 TEST(LogTest, ReadSecondStart) {
484   CheckInitialOffsetRecord(10007, 1);
485 }
486 
TEST(LogTest,ReadThirdOneOff)487 TEST(LogTest, ReadThirdOneOff) {
488   CheckInitialOffsetRecord(10008, 2);
489 }
490 
TEST(LogTest,ReadThirdStart)491 TEST(LogTest, ReadThirdStart) {
492   CheckInitialOffsetRecord(20014, 2);
493 }
494 
TEST(LogTest,ReadFourthOneOff)495 TEST(LogTest, ReadFourthOneOff) {
496   CheckInitialOffsetRecord(20015, 3);
497 }
498 
TEST(LogTest,ReadFourthFirstBlockTrailer)499 TEST(LogTest, ReadFourthFirstBlockTrailer) {
500   CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
501 }
502 
TEST(LogTest,ReadFourthMiddleBlock)503 TEST(LogTest, ReadFourthMiddleBlock) {
504   CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
505 }
506 
TEST(LogTest,ReadFourthLastBlock)507 TEST(LogTest, ReadFourthLastBlock) {
508   CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
509 }
510 
TEST(LogTest,ReadFourthStart)511 TEST(LogTest, ReadFourthStart) {
512   CheckInitialOffsetRecord(
513       2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
514       3);
515 }
516 
TEST(LogTest,ReadEnd)517 TEST(LogTest, ReadEnd) {
518   CheckOffsetPastEndReturnsNoRecords(0);
519 }
520 
TEST(LogTest,ReadPastEnd)521 TEST(LogTest, ReadPastEnd) {
522   CheckOffsetPastEndReturnsNoRecords(5);
523 }
524 
525 }  // namespace log
526 }  // namespace leveldb
527 
main(int argc,char ** argv)528 int main(int argc, char** argv) {
529   return leveldb::test::RunAllTests();
530 }
531