• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "db/log_reader.h"
6 
7 #include <stdio.h>
8 #include "leveldb/env.h"
9 #include "util/coding.h"
10 #include "util/crc32c.h"
11 
12 namespace leveldb {
13 namespace log {
14 
~Reporter()15 Reader::Reporter::~Reporter() {
16 }
17 
Reader(SequentialFile * file,Reporter * reporter,bool checksum,uint64_t initial_offset)18 Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
19                uint64_t initial_offset)
20     : file_(file),
21       reporter_(reporter),
22       checksum_(checksum),
23       backing_store_(new char[kBlockSize]),
24       buffer_(),
25       eof_(false),
26       last_record_offset_(0),
27       end_of_buffer_offset_(0),
28       initial_offset_(initial_offset) {
29 }
30 
~Reader()31 Reader::~Reader() {
32   delete[] backing_store_;
33 }
34 
SkipToInitialBlock()35 bool Reader::SkipToInitialBlock() {
36   size_t offset_in_block = initial_offset_ % kBlockSize;
37   uint64_t block_start_location = initial_offset_ - offset_in_block;
38 
39   // Don't search a block if we'd be in the trailer
40   if (offset_in_block > kBlockSize - 6) {
41     offset_in_block = 0;
42     block_start_location += kBlockSize;
43   }
44 
45   end_of_buffer_offset_ = block_start_location;
46 
47   // Skip to start of first block that can contain the initial record
48   if (block_start_location > 0) {
49     Status skip_status = file_->Skip(block_start_location);
50     if (!skip_status.ok()) {
51       ReportDrop(block_start_location, skip_status);
52       return false;
53     }
54   }
55 
56   return true;
57 }
58 
ReadRecord(Slice * record,std::string * scratch)59 bool Reader::ReadRecord(Slice* record, std::string* scratch) {
60   if (last_record_offset_ < initial_offset_) {
61     if (!SkipToInitialBlock()) {
62       return false;
63     }
64   }
65 
66   scratch->clear();
67   record->clear();
68   bool in_fragmented_record = false;
69   // Record offset of the logical record that we're reading
70   // 0 is a dummy value to make compilers happy
71   uint64_t prospective_record_offset = 0;
72 
73   Slice fragment;
74   while (true) {
75     uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
76     const unsigned int record_type = ReadPhysicalRecord(&fragment);
77     switch (record_type) {
78       case kFullType:
79         if (in_fragmented_record) {
80           // Handle bug in earlier versions of log::Writer where
81           // it could emit an empty kFirstType record at the tail end
82           // of a block followed by a kFullType or kFirstType record
83           // at the beginning of the next block.
84           if (scratch->empty()) {
85             in_fragmented_record = false;
86           } else {
87             ReportCorruption(scratch->size(), "partial record without end(1)");
88           }
89         }
90         prospective_record_offset = physical_record_offset;
91         scratch->clear();
92         *record = fragment;
93         last_record_offset_ = prospective_record_offset;
94         return true;
95 
96       case kFirstType:
97         if (in_fragmented_record) {
98           // Handle bug in earlier versions of log::Writer where
99           // it could emit an empty kFirstType record at the tail end
100           // of a block followed by a kFullType or kFirstType record
101           // at the beginning of the next block.
102           if (scratch->empty()) {
103             in_fragmented_record = false;
104           } else {
105             ReportCorruption(scratch->size(), "partial record without end(2)");
106           }
107         }
108         prospective_record_offset = physical_record_offset;
109         scratch->assign(fragment.data(), fragment.size());
110         in_fragmented_record = true;
111         break;
112 
113       case kMiddleType:
114         if (!in_fragmented_record) {
115           ReportCorruption(fragment.size(),
116                            "missing start of fragmented record(1)");
117         } else {
118           scratch->append(fragment.data(), fragment.size());
119         }
120         break;
121 
122       case kLastType:
123         if (!in_fragmented_record) {
124           ReportCorruption(fragment.size(),
125                            "missing start of fragmented record(2)");
126         } else {
127           scratch->append(fragment.data(), fragment.size());
128           *record = Slice(*scratch);
129           last_record_offset_ = prospective_record_offset;
130           return true;
131         }
132         break;
133 
134       case kEof:
135         if (in_fragmented_record) {
136           // This can be caused by the writer dying immediately after
137           // writing a physical record but before completing the next; don't
138           // treat it as a corruption, just ignore the entire logical record.
139           scratch->clear();
140         }
141         return false;
142 
143       case kBadRecord:
144         if (in_fragmented_record) {
145           ReportCorruption(scratch->size(), "error in middle of record");
146           in_fragmented_record = false;
147           scratch->clear();
148         }
149         break;
150 
151       default: {
152         char buf[40];
153         snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
154         ReportCorruption(
155             (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
156             buf);
157         in_fragmented_record = false;
158         scratch->clear();
159         break;
160       }
161     }
162   }
163   return false;
164 }
165 
LastRecordOffset()166 uint64_t Reader::LastRecordOffset() {
167   return last_record_offset_;
168 }
169 
ReportCorruption(size_t bytes,const char * reason)170 void Reader::ReportCorruption(size_t bytes, const char* reason) {
171   ReportDrop(bytes, Status::Corruption(reason));
172 }
173 
ReportDrop(size_t bytes,const Status & reason)174 void Reader::ReportDrop(size_t bytes, const Status& reason) {
175   if (reporter_ != NULL &&
176       end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) {
177     reporter_->Corruption(bytes, reason);
178   }
179 }
180 
ReadPhysicalRecord(Slice * result)181 unsigned int Reader::ReadPhysicalRecord(Slice* result) {
182   while (true) {
183     if (buffer_.size() < kHeaderSize) {
184       if (!eof_) {
185         // Last read was a full read, so this is a trailer to skip
186         buffer_.clear();
187         Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
188         end_of_buffer_offset_ += buffer_.size();
189         if (!status.ok()) {
190           buffer_.clear();
191           ReportDrop(kBlockSize, status);
192           eof_ = true;
193           return kEof;
194         } else if (buffer_.size() < kBlockSize) {
195           eof_ = true;
196         }
197         continue;
198       } else {
199         // Note that if buffer_ is non-empty, we have a truncated header at the
200         // end of the file, which can be caused by the writer crashing in the
201         // middle of writing the header. Instead of considering this an error,
202         // just report EOF.
203         buffer_.clear();
204         return kEof;
205       }
206     }
207 
208     // Parse the header
209     const char* header = buffer_.data();
210     const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
211     const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
212     const unsigned int type = header[6];
213     const uint32_t length = a | (b << 8);
214     if (kHeaderSize + length > buffer_.size()) {
215       size_t drop_size = buffer_.size();
216       buffer_.clear();
217       if (!eof_) {
218         ReportCorruption(drop_size, "bad record length");
219         return kBadRecord;
220       }
221       // If the end of the file has been reached without reading |length| bytes
222       // of payload, assume the writer died in the middle of writing the record.
223       // Don't report a corruption.
224       return kEof;
225     }
226 
227     if (type == kZeroType && length == 0) {
228       // Skip zero length record without reporting any drops since
229       // such records are produced by the mmap based writing code in
230       // env_posix.cc that preallocates file regions.
231       buffer_.clear();
232       return kBadRecord;
233     }
234 
235     // Check crc
236     if (checksum_) {
237       uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
238       uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
239       if (actual_crc != expected_crc) {
240         // Drop the rest of the buffer since "length" itself may have
241         // been corrupted and if we trust it, we could find some
242         // fragment of a real log record that just happens to look
243         // like a valid log record.
244         size_t drop_size = buffer_.size();
245         buffer_.clear();
246         ReportCorruption(drop_size, "checksum mismatch");
247         return kBadRecord;
248       }
249     }
250 
251     buffer_.remove_prefix(kHeaderSize + length);
252 
253     // Skip physical record that started before initial_offset_
254     if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
255         initial_offset_) {
256       result->clear();
257       return kBadRecord;
258     }
259 
260     *result = Slice(header + kHeaderSize, length);
261     return type;
262   }
263 }
264 
265 }  // namespace log
266 }  // namespace leveldb
267