1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #define PW_LOG_MODULE_NAME "KVS"
16 #define PW_LOG_LEVEL PW_KVS_LOG_LEVEL
17
18 #include "pw_kvs/key_value_store.h"
19
20 #include <algorithm>
21 #include <cinttypes>
22 #include <cstring>
23 #include <type_traits>
24
25 #include "pw_assert/check.h"
26 #include "pw_kvs_private/config.h"
27 #include "pw_log/shorter.h"
28 #include "pw_status/try.h"
29
30 namespace pw::kvs {
31 namespace {
32
33 using std::byte;
34
InvalidKey(Key key)35 constexpr bool InvalidKey(Key key) {
36 return key.empty() || (key.size() > internal::Entry::kMaxKeyLength);
37 }
38
39 } // namespace
40
KeyValueStore(FlashPartition * partition,span<const EntryFormat> formats,const Options & options,size_t redundancy,Vector<SectorDescriptor> & sector_descriptor_list,const SectorDescriptor ** temp_sectors_to_skip,Vector<KeyDescriptor> & key_descriptor_list,Address * addresses)41 KeyValueStore::KeyValueStore(FlashPartition* partition,
42 span<const EntryFormat> formats,
43 const Options& options,
44 size_t redundancy,
45 Vector<SectorDescriptor>& sector_descriptor_list,
46 const SectorDescriptor** temp_sectors_to_skip,
47 Vector<KeyDescriptor>& key_descriptor_list,
48 Address* addresses)
49 : partition_(*partition),
50 formats_(formats),
51 sectors_(sector_descriptor_list, *partition, temp_sectors_to_skip),
52 entry_cache_(key_descriptor_list, addresses, redundancy),
53 options_(options),
54 initialized_(InitializationState::kNotInitialized),
55 error_detected_(false),
56 internal_stats_({}),
57 last_transaction_id_(0) {}
58
Init()59 Status KeyValueStore::Init() {
60 initialized_ = InitializationState::kNotInitialized;
61 error_detected_ = false;
62 last_transaction_id_ = 0;
63
64 INF("Initializing key value store");
65 if (partition_.sector_count() > sectors_.max_size()) {
66 ERR("KVS init failed: kMaxUsableSectors (=%u) must be at least as "
67 "large as the number of sectors in the flash partition (=%u)",
68 unsigned(sectors_.max_size()),
69 unsigned(partition_.sector_count()));
70 return Status::FailedPrecondition();
71 }
72
73 if (partition_.sector_count() < 2) {
74 ERR("KVS init failed: FlashParition sector count (=%u) must be at 2. KVS "
75 "requires at least 1 working sector + 1 free/reserved sector",
76 unsigned(partition_.sector_count()));
77 return Status::FailedPrecondition();
78 }
79
80 const size_t sector_size_bytes = partition_.sector_size_bytes();
81
82 // TODO(davidrogers): investigate doing this as a static assert/compile-time
83 // check.
84 if (sector_size_bytes > SectorDescriptor::max_sector_size()) {
85 ERR("KVS init failed: sector_size_bytes (=%u) is greater than maximum "
86 "allowed sector size (=%u)",
87 unsigned(sector_size_bytes),
88 unsigned(SectorDescriptor::max_sector_size()));
89 return Status::FailedPrecondition();
90 }
91
92 Status metadata_result = InitializeMetadata();
93
94 if (!error_detected_) {
95 initialized_ = InitializationState::kReady;
96 } else {
97 initialized_ = InitializationState::kNeedsMaintenance;
98
99 if (options_.recovery != ErrorRecovery::kManual) {
100 size_t pre_fix_redundancy_errors =
101 internal_stats_.missing_redundant_entries_recovered;
102 Status recovery_status = FixErrors();
103
104 if (recovery_status.ok()) {
105 if (metadata_result.IsOutOfRange()) {
106 internal_stats_.missing_redundant_entries_recovered =
107 pre_fix_redundancy_errors;
108 INF("KVS init: Redundancy level successfully updated");
109 } else {
110 WRN("KVS init: Corruption detected and fully repaired");
111 }
112 initialized_ = InitializationState::kReady;
113 } else if (recovery_status.IsResourceExhausted()) {
114 WRN("KVS init: Unable to maintain required free sector");
115 } else {
116 WRN("KVS init: Corruption detected and unable repair");
117 }
118 } else {
119 WRN("KVS init: Corruption detected, no repair attempted due to options");
120 }
121 }
122
123 INF("KeyValueStore init complete: active keys %u, deleted keys %u, sectors "
124 "%u, logical sector size %u bytes",
125 unsigned(size()),
126 unsigned(entry_cache_.total_entries() - size()),
127 unsigned(sectors_.size()),
128 unsigned(partition_.sector_size_bytes()));
129
130 // Report any corruption was not repaired.
131 if (error_detected_) {
132 WRN("KVS init: Corruption found but not repaired, KVS unavailable until "
133 "successful maintenance.");
134 return Status::DataLoss();
135 }
136
137 return OkStatus();
138 }
139
InitializeMetadata()140 Status KeyValueStore::InitializeMetadata() {
141 const size_t sector_size_bytes = partition_.sector_size_bytes();
142
143 sectors_.Reset();
144 entry_cache_.Reset();
145
146 DBG("First pass: Read all entries from all sectors");
147 Address sector_address = 0;
148
149 size_t total_corrupt_bytes = 0;
150 size_t corrupt_entries = 0;
151 bool empty_sector_found = false;
152 size_t entry_copies_missing = 0;
153
154 for (SectorDescriptor& sector : sectors_) {
155 Address entry_address = sector_address;
156
157 size_t sector_corrupt_bytes = 0;
158
159 for (int num_entries_in_sector = 0; true; num_entries_in_sector++) {
160 DBG("Load entry: sector=%u, entry#=%d, address=%u",
161 unsigned(sector_address),
162 num_entries_in_sector,
163 unsigned(entry_address));
164
165 if (!sectors_.AddressInSector(sector, entry_address)) {
166 DBG("Fell off end of sector; moving to the next sector");
167 break;
168 }
169
170 Address next_entry_address;
171 Status status = LoadEntry(entry_address, &next_entry_address);
172 if (status.IsNotFound()) {
173 DBG("Hit un-written data in sector; moving to the next sector");
174 break;
175 } else if (!status.ok()) {
176 // The entry could not be read, indicating likely data corruption within
177 // the sector. Try to scan the remainder of the sector for other
178 // entries.
179
180 error_detected_ = true;
181 corrupt_entries++;
182
183 status = ScanForEntry(sector,
184 entry_address + Entry::kMinAlignmentBytes,
185 &next_entry_address);
186 if (!status.ok()) {
187 // No further entries in this sector. Mark the remaining bytes in the
188 // sector as corrupt (since we can't reliably know the size of the
189 // corrupt entry).
190 sector_corrupt_bytes +=
191 sector_size_bytes - (entry_address - sector_address);
192 break;
193 }
194
195 sector_corrupt_bytes += next_entry_address - entry_address;
196 }
197
198 // Entry loaded successfully; so get ready to load the next one.
199 entry_address = next_entry_address;
200
201 // Update of the number of writable bytes in this sector.
202 sector.set_writable_bytes(sector_size_bytes -
203 (entry_address - sector_address));
204 }
205
206 if (sector_corrupt_bytes > 0) {
207 // If the sector contains corrupt data, prevent any further entries from
208 // being written to it by indicating that it has no space. This should
209 // also make it a decent GC candidate. Valid keys in the sector are still
210 // readable as normal.
211 sector.mark_corrupt();
212 error_detected_ = true;
213
214 WRN("Sector %u contains %uB of corrupt data",
215 sectors_.Index(sector),
216 unsigned(sector_corrupt_bytes));
217 }
218
219 if (sector.Empty(sector_size_bytes)) {
220 empty_sector_found = true;
221 }
222 sector_address += sector_size_bytes;
223 total_corrupt_bytes += sector_corrupt_bytes;
224 }
225
226 DBG("Second pass: Count valid bytes in each sector");
227 Address newest_key = 0;
228
229 // For every valid entry, for each address, count the valid bytes in that
230 // sector. If the address fails to read, remove the address and mark the
231 // sector as corrupt. Track which entry has the newest transaction ID for
232 // initializing last_new_sector_.
233 for (EntryMetadata& metadata : entry_cache_) {
234 if (metadata.addresses().size() < redundancy()) {
235 DBG("Key 0x%08x missing copies, has %u, needs %u",
236 unsigned(metadata.hash()),
237 unsigned(metadata.addresses().size()),
238 unsigned(redundancy()));
239 entry_copies_missing++;
240 }
241 size_t index = 0;
242 while (index < metadata.addresses().size()) {
243 Address address = metadata.addresses()[index];
244 Entry entry;
245
246 Status read_result = Entry::Read(partition_, address, formats_, &entry);
247
248 SectorDescriptor& sector = sectors_.FromAddress(address);
249
250 if (read_result.ok()) {
251 sector.AddValidBytes(entry.size());
252 index++;
253 } else {
254 corrupt_entries++;
255 total_corrupt_bytes += sector.writable_bytes();
256 error_detected_ = true;
257 sector.mark_corrupt();
258
259 // Remove the bad address and stay at this index. The removal
260 // replaces out the removed address with the back address so
261 // this index needs to be rechecked with the new address.
262 metadata.RemoveAddress(address);
263 }
264 }
265
266 if (metadata.IsNewerThan(last_transaction_id_)) {
267 last_transaction_id_ = metadata.transaction_id();
268 newest_key = metadata.addresses().back();
269 }
270 }
271
272 sectors_.set_last_new_sector(newest_key);
273
274 if (!empty_sector_found) {
275 DBG("No empty sector found");
276 error_detected_ = true;
277 }
278
279 if (entry_copies_missing > 0) {
280 bool other_errors = error_detected_;
281 error_detected_ = true;
282
283 if (!other_errors && entry_copies_missing == entry_cache_.total_entries()) {
284 INF("KVS configuration changed to redundancy of %u total copies per key",
285 unsigned(redundancy()));
286 return Status::OutOfRange();
287 }
288 }
289
290 if (error_detected_) {
291 WRN("Corruption detected. Found %u corrupt bytes, %u corrupt entries, "
292 "and %u keys missing redundant copies.",
293 unsigned(total_corrupt_bytes),
294 unsigned(corrupt_entries),
295 unsigned(entry_copies_missing));
296 return Status::FailedPrecondition();
297 }
298 return OkStatus();
299 }
300
GetStorageStats() const301 KeyValueStore::StorageStats KeyValueStore::GetStorageStats() const {
302 StorageStats stats{};
303 const size_t sector_size = partition_.sector_size_bytes();
304 bool found_empty_sector = false;
305 stats.sector_erase_count = internal_stats_.sector_erase_count;
306 stats.corrupt_sectors_recovered = internal_stats_.corrupt_sectors_recovered;
307 stats.missing_redundant_entries_recovered =
308 internal_stats_.missing_redundant_entries_recovered;
309
310 for (const SectorDescriptor& sector : sectors_) {
311 stats.in_use_bytes += sector.valid_bytes();
312 stats.reclaimable_bytes += sector.RecoverableBytes(sector_size);
313
314 if (!found_empty_sector && sector.Empty(sector_size)) {
315 // The KVS tries to always keep an empty sector for GC, so don't count
316 // the first empty sector seen as writable space. However, a free sector
317 // cannot always be assumed to exist; if a GC operation fails, all sectors
318 // may be partially written, in which case the space reported might be
319 // inaccurate.
320 found_empty_sector = true;
321 continue;
322 }
323
324 stats.writable_bytes += sector.writable_bytes();
325 }
326
327 return stats;
328 }
329
330 // Check KVS for any error conditions. Primarily intended for test and
331 // internal use.
CheckForErrors()332 bool KeyValueStore::CheckForErrors() {
333 // Check for corrupted sectors
334 for (SectorDescriptor& sector : sectors_) {
335 if (sector.corrupt()) {
336 error_detected_ = true;
337 return error_detected();
338 }
339 }
340
341 // Check for missing redundancy.
342 if (redundancy() > 1) {
343 for (const EntryMetadata& metadata : entry_cache_) {
344 if (metadata.addresses().size() < redundancy()) {
345 error_detected_ = true;
346 return error_detected();
347 }
348 }
349 }
350
351 return error_detected();
352 }
353
LoadEntry(Address entry_address,Address * next_entry_address)354 Status KeyValueStore::LoadEntry(Address entry_address,
355 Address* next_entry_address) {
356 Entry entry;
357 PW_TRY(Entry::Read(partition_, entry_address, formats_, &entry));
358
359 // Read the key from flash & validate the entry (which reads the value).
360 Entry::KeyBuffer key_buffer;
361 PW_TRY_ASSIGN(size_t key_length, entry.ReadKey(key_buffer));
362 const Key key(key_buffer.data(), key_length);
363
364 PW_TRY(entry.VerifyChecksumInFlash());
365
366 // A valid entry was found, so update the next entry address before doing any
367 // of the checks that happen in AddNewOrUpdateExisting.
368 *next_entry_address = entry.next_address();
369 return entry_cache_.AddNewOrUpdateExisting(
370 entry.descriptor(key), entry.address(), partition_.sector_size_bytes());
371 }
372
373 // Scans flash memory within a sector to find a KVS entry magic.
ScanForEntry(const SectorDescriptor & sector,Address start_address,Address * next_entry_address)374 Status KeyValueStore::ScanForEntry(const SectorDescriptor& sector,
375 Address start_address,
376 Address* next_entry_address) {
377 DBG("Scanning sector %u for entries starting from address %u",
378 sectors_.Index(sector),
379 unsigned(start_address));
380
381 // Entries must start at addresses which are aligned on a multiple of
382 // Entry::kMinAlignmentBytes. However, that multiple can vary between entries.
383 // When scanning, we don't have an entry to tell us what the current alignment
384 // is, so the minimum alignment is used to be exhaustive.
385 for (Address address = AlignUp(start_address, Entry::kMinAlignmentBytes);
386 sectors_.AddressInSector(sector, address);
387 address += Entry::kMinAlignmentBytes) {
388 uint32_t magic;
389 StatusWithSize read_result =
390 partition_.Read(address, as_writable_bytes(span(&magic, 1)));
391 if (!read_result.ok()) {
392 continue;
393 }
394 if (formats_.KnownMagic(magic)) {
395 DBG("Found entry magic at address %u", unsigned(address));
396 *next_entry_address = address;
397 return OkStatus();
398 }
399 }
400
401 return Status::NotFound();
402 }
403
404 #if PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
405
RemoveDeletedKeyEntries()406 Status KeyValueStore::RemoveDeletedKeyEntries() {
407 for (internal::EntryCache::iterator it = entry_cache_.begin();
408 it != entry_cache_.end();
409 ++it) {
410 EntryMetadata& entry_metadata = *it;
411
412 // The iterator we are given back from RemoveEntry could also be deleted,
413 // so loop until we find one that isn't deleted.
414 while (entry_metadata.state() == EntryState::kDeleted) {
415 // Read the original entry to get the size for sector accounting purposes.
416 Entry entry;
417 PW_TRY(ReadEntry(entry_metadata, entry));
418
419 for (Address address : entry_metadata.addresses()) {
420 sectors_.FromAddress(address).RemoveValidBytes(entry.size());
421 }
422
423 it = entry_cache_.RemoveEntry(it);
424
425 if (it == entry_cache_.end()) {
426 return OkStatus(); // new iterator is the end, bail
427 }
428
429 entry_metadata = *it; // set entry_metadata to check for deletion again
430 }
431 }
432
433 return OkStatus();
434 }
435
436 #endif // PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
437
Get(Key key,span<byte> value_buffer,size_t offset_bytes) const438 StatusWithSize KeyValueStore::Get(Key key,
439 span<byte> value_buffer,
440 size_t offset_bytes) const {
441 PW_TRY_WITH_SIZE(CheckReadOperation(key));
442
443 EntryMetadata metadata;
444 PW_TRY_WITH_SIZE(FindExisting(key, &metadata));
445
446 return Get(key, metadata, value_buffer, offset_bytes);
447 }
448
PutBytes(Key key,span<const byte> value)449 Status KeyValueStore::PutBytes(Key key, span<const byte> value) {
450 PW_TRY(CheckWriteOperation(key));
451 DBG("Writing key/value; key length=%u, value length=%u",
452 unsigned(key.size()),
453 unsigned(value.size()));
454
455 if (Entry::size(partition_, key, value) > partition_.sector_size_bytes()) {
456 DBG("%u B value with %u B key cannot fit in one sector",
457 unsigned(value.size()),
458 unsigned(key.size()));
459 return Status::InvalidArgument();
460 }
461
462 EntryMetadata metadata;
463 Status status = FindEntry(key, &metadata);
464
465 if (status.ok()) {
466 // TODO(davidrogers): figure out logging how to support multiple addresses.
467 DBG("Overwriting entry for key 0x%08x in %u sectors including %u",
468 unsigned(metadata.hash()),
469 unsigned(metadata.addresses().size()),
470 sectors_.Index(metadata.first_address()));
471 return WriteEntryForExistingKey(metadata, EntryState::kValid, key, value);
472 }
473
474 if (status.IsNotFound()) {
475 return WriteEntryForNewKey(key, value);
476 }
477
478 return status;
479 }
480
Delete(Key key)481 Status KeyValueStore::Delete(Key key) {
482 PW_TRY(CheckWriteOperation(key));
483
484 EntryMetadata metadata;
485 PW_TRY(FindExisting(key, &metadata));
486
487 // TODO(davidrogers): figure out logging how to support multiple addresses.
488 DBG("Writing tombstone for key 0x%08x in %u sectors including %u",
489 unsigned(metadata.hash()),
490 unsigned(metadata.addresses().size()),
491 sectors_.Index(metadata.first_address()));
492 return WriteEntryForExistingKey(metadata, EntryState::kDeleted, key, {});
493 }
494
ReadKey()495 void KeyValueStore::Item::ReadKey() {
496 key_buffer_.fill('\0');
497
498 Entry entry;
499 if (kvs_.ReadEntry(*iterator_, entry).ok()) {
500 entry.ReadKey(key_buffer_)
501 .IgnoreError(); // TODO(b/242598609): Handle Status properly
502 }
503 }
504
operator ++()505 KeyValueStore::iterator& KeyValueStore::iterator::operator++() {
506 // Skip to the next entry that is valid (not deleted).
507 while (++item_.iterator_ != item_.kvs_.entry_cache_.end() &&
508 item_.iterator_->state() != EntryState::kValid) {
509 }
510 return *this;
511 }
512
begin() const513 KeyValueStore::iterator KeyValueStore::begin() const {
514 internal::EntryCache::const_iterator cache_iterator = entry_cache_.begin();
515 // Skip over any deleted entries at the start of the descriptor list.
516 while (cache_iterator != entry_cache_.end() &&
517 cache_iterator->state() != EntryState::kValid) {
518 ++cache_iterator;
519 }
520 return iterator(*this, cache_iterator);
521 }
522
ValueSize(Key key) const523 StatusWithSize KeyValueStore::ValueSize(Key key) const {
524 PW_TRY_WITH_SIZE(CheckReadOperation(key));
525
526 EntryMetadata metadata;
527 PW_TRY_WITH_SIZE(FindExisting(key, &metadata));
528
529 return ValueSize(metadata);
530 }
531
ReadEntry(const EntryMetadata & metadata,Entry & entry) const532 Status KeyValueStore::ReadEntry(const EntryMetadata& metadata,
533 Entry& entry) const {
534 // Try to read an entry
535 Status read_result = Status::DataLoss();
536 for (Address address : metadata.addresses()) {
537 read_result = Entry::Read(partition_, address, formats_, &entry);
538 if (read_result.ok()) {
539 return read_result;
540 }
541
542 // Found a bad address. Set the sector as corrupt.
543 error_detected_ = true;
544 sectors_.FromAddress(address).mark_corrupt();
545 }
546
547 ERR("No valid entries for key. Data has been lost!");
548 return read_result;
549 }
550
FindEntry(Key key,EntryMetadata * metadata_out) const551 Status KeyValueStore::FindEntry(Key key, EntryMetadata* metadata_out) const {
552 StatusWithSize find_result =
553 entry_cache_.Find(partition_, sectors_, formats_, key, metadata_out);
554
555 if (find_result.size() > 0u) {
556 error_detected_ = true;
557 }
558 return find_result.status();
559 }
560
FindExisting(Key key,EntryMetadata * metadata_out) const561 Status KeyValueStore::FindExisting(Key key, EntryMetadata* metadata_out) const {
562 Status status = FindEntry(key, metadata_out);
563
564 // If the key's hash collides with an existing key or if the key is deleted,
565 // treat it as if it is not in the KVS.
566 if (status.IsAlreadyExists() ||
567 (status.ok() && metadata_out->state() == EntryState::kDeleted)) {
568 return Status::NotFound();
569 }
570 return status;
571 }
572
Get(Key key,const EntryMetadata & metadata,span<std::byte> value_buffer,size_t offset_bytes) const573 StatusWithSize KeyValueStore::Get(Key key,
574 const EntryMetadata& metadata,
575 span<std::byte> value_buffer,
576 size_t offset_bytes) const {
577 Entry entry;
578
579 PW_TRY_WITH_SIZE(ReadEntry(metadata, entry));
580
581 StatusWithSize result = entry.ReadValue(value_buffer, offset_bytes);
582 if (result.ok() && options_.verify_on_read && offset_bytes == 0u) {
583 Status verify_result =
584 entry.VerifyChecksum(key, value_buffer.first(result.size()));
585 if (!verify_result.ok()) {
586 std::memset(value_buffer.data(), 0, result.size());
587 return StatusWithSize(verify_result, 0);
588 }
589
590 return StatusWithSize(verify_result, result.size());
591 }
592 return result;
593 }
594
FixedSizeGet(Key key,void * value,size_t size_bytes) const595 Status KeyValueStore::FixedSizeGet(Key key,
596 void* value,
597 size_t size_bytes) const {
598 PW_TRY(CheckWriteOperation(key));
599
600 EntryMetadata metadata;
601 PW_TRY(FindExisting(key, &metadata));
602
603 return FixedSizeGet(key, metadata, value, size_bytes);
604 }
605
FixedSizeGet(Key key,const EntryMetadata & metadata,void * value,size_t size_bytes) const606 Status KeyValueStore::FixedSizeGet(Key key,
607 const EntryMetadata& metadata,
608 void* value,
609 size_t size_bytes) const {
610 // Ensure that the size of the stored value matches the size of the type.
611 // Otherwise, report error. This check avoids potential memory corruption.
612 PW_TRY_ASSIGN(const size_t actual_size, ValueSize(metadata));
613
614 if (actual_size != size_bytes) {
615 DBG("Requested %u B read, but value is %u B",
616 unsigned(size_bytes),
617 unsigned(actual_size));
618 return Status::InvalidArgument();
619 }
620
621 StatusWithSize result =
622 Get(key, metadata, span(static_cast<byte*>(value), size_bytes), 0);
623
624 return result.status();
625 }
626
ValueSize(const EntryMetadata & metadata) const627 StatusWithSize KeyValueStore::ValueSize(const EntryMetadata& metadata) const {
628 Entry entry;
629 PW_TRY_WITH_SIZE(ReadEntry(metadata, entry));
630
631 return StatusWithSize(entry.value_size());
632 }
633
CheckWriteOperation(Key key) const634 Status KeyValueStore::CheckWriteOperation(Key key) const {
635 if (InvalidKey(key)) {
636 return Status::InvalidArgument();
637 }
638
639 // For normal write operation the KVS must be fully ready.
640 if (!initialized()) {
641 return Status::FailedPrecondition();
642 }
643 return OkStatus();
644 }
645
CheckReadOperation(Key key) const646 Status KeyValueStore::CheckReadOperation(Key key) const {
647 if (InvalidKey(key)) {
648 return Status::InvalidArgument();
649 }
650
651 // Operations that are explicitly read-only can be done after init() has been
652 // called but not fully ready (when needing maintenance).
653 if (initialized_ == InitializationState::kNotInitialized) {
654 return Status::FailedPrecondition();
655 }
656 return OkStatus();
657 }
658
WriteEntryForExistingKey(EntryMetadata & metadata,EntryState new_state,Key key,span<const byte> value)659 Status KeyValueStore::WriteEntryForExistingKey(EntryMetadata& metadata,
660 EntryState new_state,
661 Key key,
662 span<const byte> value) {
663 // Read the original entry to get the size for sector accounting purposes.
664 Entry entry;
665 PW_TRY(ReadEntry(metadata, entry));
666
667 return WriteEntry(key, value, new_state, &metadata, &entry);
668 }
669
WriteEntryForNewKey(Key key,span<const byte> value)670 Status KeyValueStore::WriteEntryForNewKey(Key key, span<const byte> value) {
671 // If there is no room in the cache for a new entry, it is possible some cache
672 // entries could be freed by removing deleted keys. If deleted key removal is
673 // enabled and the KVS is configured to make all possible writes succeed,
674 // attempt heavy maintenance now.
675 #if PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
676 if (options_.gc_on_write == GargbageCollectOnWrite::kAsManySectorsNeeded &&
677 entry_cache_.full()) {
678 Status maintenance_status = HeavyMaintenance();
679 if (!maintenance_status.ok()) {
680 WRN("KVS Maintenance failed for write: %s", maintenance_status.str());
681 return maintenance_status;
682 }
683 }
684 #endif // PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
685
686 if (entry_cache_.full()) {
687 WRN("KVS full: trying to store a new entry, but can't. Have %u entries",
688 unsigned(entry_cache_.total_entries()));
689 return Status::ResourceExhausted();
690 }
691
692 return WriteEntry(key, value, EntryState::kValid);
693 }
694
WriteEntry(Key key,span<const byte> value,EntryState new_state,EntryMetadata * prior_metadata,const Entry * prior_entry)695 Status KeyValueStore::WriteEntry(Key key,
696 span<const byte> value,
697 EntryState new_state,
698 EntryMetadata* prior_metadata,
699 const Entry* prior_entry) {
700 // If new entry and prior entry have matching value size, state, and checksum,
701 // check if the values match. Directly compare the prior and new values
702 // because the checksum can not be depended on to establish equality, it can
703 // only be depended on to establish inequality.
704 if (prior_entry != nullptr && prior_entry->value_size() == value.size() &&
705 prior_metadata->state() == new_state &&
706 prior_entry->ValueMatches(value).ok()) {
707 // The new value matches the prior value, don't need to write anything. Just
708 // keep the existing entry.
709 DBG("Write for key 0x%08x with matching value skipped",
710 unsigned(prior_metadata->hash()));
711 return OkStatus();
712 }
713
714 // List of addresses for sectors with space for this entry.
715 Address* reserved_addresses = entry_cache_.TempReservedAddressesForWrite();
716
717 // Find addresses to write the entry to. This may involve garbage collecting
718 // one or more sectors.
719 const size_t entry_size = Entry::size(partition_, key, value);
720 PW_TRY(GetAddressesForWrite(reserved_addresses, entry_size));
721
722 // Write the entry at the first address that was found.
723 Entry entry = CreateEntry(reserved_addresses[0], key, value, new_state);
724 PW_TRY(AppendEntry(entry, key, value));
725
726 // After writing the first entry successfully, update the key descriptors.
727 // Once a single new the entry is written, the old entries are invalidated.
728 size_t prior_size = prior_entry != nullptr ? prior_entry->size() : 0;
729 EntryMetadata new_metadata =
730 CreateOrUpdateKeyDescriptor(entry, key, prior_metadata, prior_size);
731
732 // Write the additional copies of the entry, if redundancy is greater than 1.
733 for (size_t i = 1; i < redundancy(); ++i) {
734 entry.set_address(reserved_addresses[i]);
735 PW_TRY(AppendEntry(entry, key, value));
736 new_metadata.AddNewAddress(reserved_addresses[i]);
737 }
738 return OkStatus();
739 }
740
CreateOrUpdateKeyDescriptor(const Entry & entry,Key key,EntryMetadata * prior_metadata,size_t prior_size)741 KeyValueStore::EntryMetadata KeyValueStore::CreateOrUpdateKeyDescriptor(
742 const Entry& entry,
743 Key key,
744 EntryMetadata* prior_metadata,
745 size_t prior_size) {
746 // If there is no prior descriptor, create a new one.
747 if (prior_metadata == nullptr) {
748 return entry_cache_.AddNew(entry.descriptor(key), entry.address());
749 }
750
751 return UpdateKeyDescriptor(
752 entry, entry.address(), prior_metadata, prior_size);
753 }
754
UpdateKeyDescriptor(const Entry & entry,Address new_address,EntryMetadata * prior_metadata,size_t prior_size)755 KeyValueStore::EntryMetadata KeyValueStore::UpdateKeyDescriptor(
756 const Entry& entry,
757 Address new_address,
758 EntryMetadata* prior_metadata,
759 size_t prior_size) {
760 // Remove valid bytes for the old entry and its copies, which are now stale.
761 for (Address address : prior_metadata->addresses()) {
762 sectors_.FromAddress(address).RemoveValidBytes(prior_size);
763 }
764
765 prior_metadata->Reset(entry.descriptor(prior_metadata->hash()), new_address);
766 return *prior_metadata;
767 }
768
GetAddressesForWrite(Address * write_addresses,size_t write_size)769 Status KeyValueStore::GetAddressesForWrite(Address* write_addresses,
770 size_t write_size) {
771 for (size_t i = 0; i < redundancy(); i++) {
772 SectorDescriptor* sector;
773 PW_TRY(GetSectorForWrite(§or, write_size, span(write_addresses, i)));
774 write_addresses[i] = sectors_.NextWritableAddress(*sector);
775
776 DBG("Found space for entry in sector %u at address %u",
777 sectors_.Index(sector),
778 unsigned(write_addresses[i]));
779 }
780
781 return OkStatus();
782 }
783
784 // Finds a sector to use for writing a new entry to. Does automatic garbage
785 // collection if needed and allowed.
786 //
787 // OK: Sector found with needed space.
788 // RESOURCE_EXHAUSTED: No sector available with the needed space.
GetSectorForWrite(SectorDescriptor ** sector,size_t entry_size,span<const Address> reserved_addresses)789 Status KeyValueStore::GetSectorForWrite(
790 SectorDescriptor** sector,
791 size_t entry_size,
792 span<const Address> reserved_addresses) {
793 Status result = sectors_.FindSpace(sector, entry_size, reserved_addresses);
794
795 size_t gc_sector_count = 0;
796 bool do_auto_gc = options_.gc_on_write != GargbageCollectOnWrite::kDisabled;
797
798 // Do garbage collection as needed, so long as policy allows.
799 while (result.IsResourceExhausted() && do_auto_gc) {
800 if (options_.gc_on_write == GargbageCollectOnWrite::kOneSector) {
801 // If GC config option is kOneSector clear the flag to not do any more
802 // GC after this try.
803 do_auto_gc = false;
804 }
805 // Garbage collect and then try again to find the best sector.
806 Status gc_status = GarbageCollect(reserved_addresses);
807 if (!gc_status.ok()) {
808 if (gc_status.IsNotFound()) {
809 // Not enough space, and no reclaimable bytes, this KVS is full!
810 return Status::ResourceExhausted();
811 }
812 return gc_status;
813 }
814
815 result = sectors_.FindSpace(sector, entry_size, reserved_addresses);
816
817 gc_sector_count++;
818 // Allow total sectors + 2 number of GC cycles so that once reclaimable
819 // bytes in all the sectors have been reclaimed can try and free up space by
820 // moving entries for keys other than the one being worked on in to sectors
821 // that have copies of the key trying to be written.
822 if (gc_sector_count > (partition_.sector_count() + 2)) {
823 ERR("Did more GC sectors than total sectors!!!!");
824 return Status::ResourceExhausted();
825 }
826 }
827
828 if (!result.ok()) {
829 WRN("Unable to find sector to write %u B", unsigned(entry_size));
830 }
831 return result;
832 }
833
MarkSectorCorruptIfNotOk(Status status,SectorDescriptor * sector)834 Status KeyValueStore::MarkSectorCorruptIfNotOk(Status status,
835 SectorDescriptor* sector) {
836 if (!status.ok()) {
837 DBG(" Sector %u corrupt", sectors_.Index(sector));
838 sector->mark_corrupt();
839 error_detected_ = true;
840 }
841 return status;
842 }
843
AppendEntry(const Entry & entry,Key key,span<const byte> value)844 Status KeyValueStore::AppendEntry(const Entry& entry,
845 Key key,
846 span<const byte> value) {
847 const StatusWithSize result = entry.Write(key, value);
848
849 SectorDescriptor& sector = sectors_.FromAddress(entry.address());
850
851 if (!result.ok()) {
852 ERR("Failed to write %u bytes at %#x. %u actually written",
853 unsigned(entry.size()),
854 unsigned(entry.address()),
855 unsigned(result.size()));
856 PW_TRY(MarkSectorCorruptIfNotOk(result.status(), §or));
857 }
858
859 if (options_.verify_on_write) {
860 PW_TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), §or));
861 }
862
863 sector.RemoveWritableBytes(result.size());
864 sector.AddValidBytes(result.size());
865 return OkStatus();
866 }
867
CopyEntryToSector(Entry & entry,SectorDescriptor * new_sector,Address new_address)868 StatusWithSize KeyValueStore::CopyEntryToSector(Entry& entry,
869 SectorDescriptor* new_sector,
870 Address new_address) {
871 const StatusWithSize result = entry.Copy(new_address);
872
873 PW_TRY_WITH_SIZE(MarkSectorCorruptIfNotOk(result.status(), new_sector));
874
875 if (options_.verify_on_write) {
876 Entry new_entry;
877 PW_TRY_WITH_SIZE(MarkSectorCorruptIfNotOk(
878 Entry::Read(partition_, new_address, formats_, &new_entry),
879 new_sector));
880 // TODO(davidrogers): add test that catches doing the verify on the old
881 // entry.
882 PW_TRY_WITH_SIZE(MarkSectorCorruptIfNotOk(new_entry.VerifyChecksumInFlash(),
883 new_sector));
884 }
885 // Entry was written successfully; update descriptor's address and the sector
886 // descriptors to reflect the new entry.
887 new_sector->RemoveWritableBytes(result.size());
888 new_sector->AddValidBytes(result.size());
889
890 return result;
891 }
892
RelocateEntry(const EntryMetadata & metadata,KeyValueStore::Address & address,span<const Address> reserved_addresses)893 Status KeyValueStore::RelocateEntry(const EntryMetadata& metadata,
894 KeyValueStore::Address& address,
895 span<const Address> reserved_addresses) {
896 Entry entry;
897 PW_TRY(ReadEntry(metadata, entry));
898
899 // Find a new sector for the entry and write it to the new location. For
900 // relocation the find should not not be a sector already containing the key
901 // but can be the always empty sector, since this is part of the GC process
902 // that will result in a new empty sector. Also find a sector that does not
903 // have reclaimable space (mostly for the full GC, where that would result in
904 // an immediate extra relocation).
905 SectorDescriptor* new_sector;
906
907 PW_TRY(sectors_.FindSpaceDuringGarbageCollection(
908 &new_sector, entry.size(), metadata.addresses(), reserved_addresses));
909
910 Address new_address = sectors_.NextWritableAddress(*new_sector);
911 PW_TRY_ASSIGN(const size_t result_size,
912 CopyEntryToSector(entry, new_sector, new_address));
913 sectors_.FromAddress(address).RemoveValidBytes(result_size);
914 address = new_address;
915
916 return OkStatus();
917 }
918
FullMaintenanceHelper(MaintenanceType maintenance_type)919 Status KeyValueStore::FullMaintenanceHelper(MaintenanceType maintenance_type) {
920 if (initialized_ == InitializationState::kNotInitialized) {
921 return Status::FailedPrecondition();
922 }
923
924 // Full maintenance can be a potentially heavy operation, and should be
925 // relatively infrequent, so log start/end at INFO level.
926 INF("Beginning full maintenance");
927 CheckForErrors();
928
929 // Step 1: Repair errors
930 if (error_detected_) {
931 PW_TRY(Repair());
932 }
933
934 // Step 2: Make sure all the entries are on the primary format.
935 StatusWithSize update_status = UpdateEntriesToPrimaryFormat();
936 Status overall_status = update_status.status();
937
938 if (!overall_status.ok()) {
939 ERR("Failed to update all entries to the primary format");
940 }
941
942 SectorDescriptor* sector = sectors_.last_new();
943
944 // Calculate number of bytes for the threshold.
945 size_t threshold_bytes =
946 (partition_.size_bytes() * kGcUsageThresholdPercentage) / 100;
947
948 // Is bytes in use over the threshold.
949 StorageStats stats = GetStorageStats();
950 bool over_usage_threshold = stats.in_use_bytes > threshold_bytes;
951 bool heavy = (maintenance_type == MaintenanceType::kHeavy);
952 bool force_gc = heavy || over_usage_threshold || (update_status.size() > 0);
953
954 auto do_garbage_collect_pass = [&]() {
955 // TODO(drempel): look in to making an iterator method for cycling through
956 // sectors starting from last_new_sector_.
957 Status gc_status;
958 for (size_t j = 0; j < sectors_.size(); j++) {
959 sector += 1;
960 if (sector == sectors_.end()) {
961 sector = sectors_.begin();
962 }
963
964 if (sector->RecoverableBytes(partition_.sector_size_bytes()) > 0 &&
965 (force_gc || sector->valid_bytes() == 0)) {
966 gc_status = GarbageCollectSector(*sector, {});
967 if (!gc_status.ok()) {
968 ERR("Failed to garbage collect all sectors");
969 break;
970 }
971 }
972 }
973 if (overall_status.ok()) {
974 overall_status = gc_status;
975 }
976 };
977
978 // Step 3: Do full garbage collect pass for all sectors. This will erase all
979 // old/state entries from flash and leave only current/valid entries.
980 do_garbage_collect_pass();
981
982 #if PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
983 // Step 4: (if heavy maintenance) garbage collect all the deleted keys.
984 if (heavy) {
985 // If enabled, remove deleted keys from the entry cache, including freeing
986 // sector bytes used by those keys. This must only be done directly after a
987 // full garbage collection, otherwise the current deleted entry could be
988 // garbage collected before the older stale entry producing a window for an
989 // invalid/corrupted KVS state if there was a power-fault, crash or other
990 // interruption.
991 overall_status.Update(RemoveDeletedKeyEntries());
992
993 // Do another garbage collect pass that will fully remove the deleted keys
994 // from flash. Garbage collect will only touch sectors that have something
995 // to garbage collect, which in this case is only sectors containing deleted
996 // keys.
997 do_garbage_collect_pass();
998 }
999 #endif // PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
1000
1001 if (overall_status.ok()) {
1002 INF("Full maintenance complete");
1003 } else {
1004 ERR("Full maintenance finished with some errors");
1005 }
1006 return overall_status;
1007 }
1008
PartialMaintenance()1009 Status KeyValueStore::PartialMaintenance() {
1010 if (initialized_ == InitializationState::kNotInitialized) {
1011 return Status::FailedPrecondition();
1012 }
1013
1014 CheckForErrors();
1015 // Do automatic repair, if KVS options allow for it.
1016 if (error_detected_ && options_.recovery != ErrorRecovery::kManual) {
1017 PW_TRY(Repair());
1018 }
1019 return GarbageCollect(span<const Address>());
1020 }
1021
GarbageCollect(span<const Address> reserved_addresses)1022 Status KeyValueStore::GarbageCollect(span<const Address> reserved_addresses) {
1023 DBG("Garbage Collect a single sector");
1024 for ([[maybe_unused]] Address address : reserved_addresses) {
1025 DBG(" Avoid address %u", unsigned(address));
1026 }
1027
1028 // Step 1: Find the sector to garbage collect
1029 SectorDescriptor* sector_to_gc =
1030 sectors_.FindSectorToGarbageCollect(reserved_addresses);
1031
1032 if (sector_to_gc == nullptr) {
1033 // Nothing to GC.
1034 return Status::NotFound();
1035 }
1036
1037 // Step 2: Garbage collect the selected sector.
1038 return GarbageCollectSector(*sector_to_gc, reserved_addresses);
1039 }
1040
RelocateKeyAddressesInSector(SectorDescriptor & sector_to_gc,const EntryMetadata & metadata,span<const Address> reserved_addresses)1041 Status KeyValueStore::RelocateKeyAddressesInSector(
1042 SectorDescriptor& sector_to_gc,
1043 const EntryMetadata& metadata,
1044 span<const Address> reserved_addresses) {
1045 for (FlashPartition::Address& address : metadata.addresses()) {
1046 if (sectors_.AddressInSector(sector_to_gc, address)) {
1047 DBG(" Relocate entry for Key 0x%08" PRIx32 ", sector %u",
1048 metadata.hash(),
1049 sectors_.Index(sectors_.FromAddress(address)));
1050 PW_TRY(RelocateEntry(metadata, address, reserved_addresses));
1051 }
1052 }
1053
1054 return OkStatus();
1055 }
1056
GarbageCollectSector(SectorDescriptor & sector_to_gc,span<const Address> reserved_addresses)1057 Status KeyValueStore::GarbageCollectSector(
1058 SectorDescriptor& sector_to_gc, span<const Address> reserved_addresses) {
1059 DBG(" Garbage Collect sector %u", sectors_.Index(sector_to_gc));
1060
1061 // Step 1: Move any valid entries in the GC sector to other sectors
1062 if (sector_to_gc.valid_bytes() != 0) {
1063 for (EntryMetadata& metadata : entry_cache_) {
1064 PW_TRY(RelocateKeyAddressesInSector(
1065 sector_to_gc, metadata, reserved_addresses));
1066 }
1067 }
1068
1069 if (sector_to_gc.valid_bytes() != 0) {
1070 ERR(" Failed to relocate valid entries from sector being garbage "
1071 "collected, %u valid bytes remain",
1072 unsigned(sector_to_gc.valid_bytes()));
1073 return Status::Internal();
1074 }
1075
1076 // Step 2: Reinitialize the sector
1077 if (!sector_to_gc.Empty(partition_.sector_size_bytes())) {
1078 sector_to_gc.mark_corrupt();
1079 internal_stats_.sector_erase_count++;
1080 PW_TRY(partition_.Erase(sectors_.BaseAddress(sector_to_gc), 1));
1081 sector_to_gc.set_writable_bytes(partition_.sector_size_bytes());
1082 }
1083
1084 DBG(" Garbage Collect sector %u complete", sectors_.Index(sector_to_gc));
1085 return OkStatus();
1086 }
1087
UpdateEntriesToPrimaryFormat()1088 StatusWithSize KeyValueStore::UpdateEntriesToPrimaryFormat() {
1089 size_t entries_updated = 0;
1090 for (EntryMetadata& prior_metadata : entry_cache_) {
1091 Entry entry;
1092 PW_TRY_WITH_SIZE(ReadEntry(prior_metadata, entry));
1093 if (formats_.primary().magic == entry.magic()) {
1094 // Ignore entries that are already on the primary format.
1095 continue;
1096 }
1097
1098 DBG("Updating entry 0x%08x from old format [0x%08x] to new format "
1099 "[0x%08x]",
1100 unsigned(prior_metadata.hash()),
1101 unsigned(entry.magic()),
1102 unsigned(formats_.primary().magic));
1103
1104 entries_updated++;
1105
1106 last_transaction_id_ += 1;
1107 PW_TRY_WITH_SIZE(entry.Update(formats_.primary(), last_transaction_id_));
1108
1109 // List of addresses for sectors with space for this entry.
1110 Address* reserved_addresses = entry_cache_.TempReservedAddressesForWrite();
1111
1112 // Find addresses to write the entry to. This may involve garbage collecting
1113 // one or more sectors.
1114 PW_TRY_WITH_SIZE(GetAddressesForWrite(reserved_addresses, entry.size()));
1115
1116 PW_TRY_WITH_SIZE(
1117 CopyEntryToSector(entry,
1118 §ors_.FromAddress(reserved_addresses[0]),
1119 reserved_addresses[0]));
1120
1121 // After writing the first entry successfully, update the key descriptors.
1122 // Once a single new the entry is written, the old entries are invalidated.
1123 EntryMetadata new_metadata = UpdateKeyDescriptor(
1124 entry, reserved_addresses[0], &prior_metadata, entry.size());
1125
1126 // Write the additional copies of the entry, if redundancy is greater
1127 // than 1.
1128 for (size_t i = 1; i < redundancy(); ++i) {
1129 PW_TRY_WITH_SIZE(
1130 CopyEntryToSector(entry,
1131 §ors_.FromAddress(reserved_addresses[i]),
1132 reserved_addresses[i]));
1133 new_metadata.AddNewAddress(reserved_addresses[i]);
1134 }
1135 }
1136
1137 return StatusWithSize(entries_updated);
1138 }
1139
1140 // Add any missing redundant entries/copies for a key.
AddRedundantEntries(EntryMetadata & metadata)1141 Status KeyValueStore::AddRedundantEntries(EntryMetadata& metadata) {
1142 Entry entry;
1143 PW_TRY(ReadEntry(metadata, entry));
1144 PW_TRY(entry.VerifyChecksumInFlash());
1145
1146 while (metadata.addresses().size() < redundancy()) {
1147 SectorDescriptor* new_sector;
1148 PW_TRY(GetSectorForWrite(&new_sector, entry.size(), metadata.addresses()));
1149
1150 Address new_address = sectors_.NextWritableAddress(*new_sector);
1151 PW_TRY(CopyEntryToSector(entry, new_sector, new_address));
1152
1153 metadata.AddNewAddress(new_address);
1154 }
1155 return OkStatus();
1156 }
1157
RepairCorruptSectors()1158 Status KeyValueStore::RepairCorruptSectors() {
1159 // Try to GC each corrupt sector, even if previous sectors fail. If GC of a
1160 // sector failed on the first pass, then do a second pass, since a later
1161 // sector might have cleared up space or otherwise unblocked the earlier
1162 // failed sector.
1163 Status repair_status = OkStatus();
1164
1165 size_t loop_count = 0;
1166 do {
1167 loop_count++;
1168 // Error of RESOURCE_EXHAUSTED indicates no space found for relocation.
1169 // Reset back to OK for the next pass.
1170 if (repair_status.IsResourceExhausted()) {
1171 repair_status = OkStatus();
1172 }
1173
1174 DBG(" Pass %u", unsigned(loop_count));
1175 for (SectorDescriptor& sector : sectors_) {
1176 if (sector.corrupt()) {
1177 DBG(" Found sector %u with corruption", sectors_.Index(sector));
1178 Status sector_status = GarbageCollectSector(sector, {});
1179 if (sector_status.ok()) {
1180 internal_stats_.corrupt_sectors_recovered += 1;
1181 } else if (repair_status.ok() || repair_status.IsResourceExhausted()) {
1182 repair_status = sector_status;
1183 }
1184 }
1185 }
1186 DBG(" Pass %u complete", unsigned(loop_count));
1187 } while (!repair_status.ok() && loop_count < 2);
1188
1189 return repair_status;
1190 }
1191
EnsureFreeSectorExists()1192 Status KeyValueStore::EnsureFreeSectorExists() {
1193 Status repair_status = OkStatus();
1194 bool empty_sector_found = false;
1195
1196 DBG(" Find empty sector");
1197 for (SectorDescriptor& sector : sectors_) {
1198 if (sector.Empty(partition_.sector_size_bytes())) {
1199 empty_sector_found = true;
1200 DBG(" Empty sector found");
1201 break;
1202 }
1203 }
1204 if (empty_sector_found == false) {
1205 DBG(" No empty sector found, attempting to GC a free sector");
1206 Status sector_status = GarbageCollect(span<const Address, 0>());
1207 if (repair_status.ok() && !sector_status.ok()) {
1208 DBG(" Unable to free an empty sector");
1209 repair_status = sector_status;
1210 }
1211 }
1212
1213 return repair_status;
1214 }
1215
EnsureEntryRedundancy()1216 Status KeyValueStore::EnsureEntryRedundancy() {
1217 Status repair_status = OkStatus();
1218
1219 if (redundancy() == 1) {
1220 DBG(" Redundancy not in use, nothting to check");
1221 return OkStatus();
1222 }
1223
1224 DBG(" Write any needed additional duplicate copies of keys to fulfill %u"
1225 " redundancy",
1226 unsigned(redundancy()));
1227 for (EntryMetadata& metadata : entry_cache_) {
1228 if (metadata.addresses().size() >= redundancy()) {
1229 continue;
1230 }
1231
1232 DBG(" Key with %u of %u copies found, adding missing copies",
1233 unsigned(metadata.addresses().size()),
1234 unsigned(redundancy()));
1235 Status fill_status = AddRedundantEntries(metadata);
1236 if (fill_status.ok()) {
1237 internal_stats_.missing_redundant_entries_recovered += 1;
1238 DBG(" Key missing copies added");
1239 } else {
1240 DBG(" Failed to add key missing copies");
1241 if (repair_status.ok()) {
1242 repair_status = fill_status;
1243 }
1244 }
1245 }
1246
1247 return repair_status;
1248 }
1249
FixErrors()1250 Status KeyValueStore::FixErrors() {
1251 DBG("Fixing KVS errors");
1252
1253 // Step 1: Garbage collect any sectors marked as corrupt.
1254 Status overall_status = RepairCorruptSectors();
1255
1256 // Step 2: Make sure there is at least 1 empty sector. This needs to be a
1257 // seperate check of sectors from step 1, because a found empty sector might
1258 // get written to by a later GC that fails and does not result in a free
1259 // sector.
1260 Status repair_status = EnsureFreeSectorExists();
1261 if (overall_status.ok()) {
1262 overall_status = repair_status;
1263 }
1264
1265 // Step 3: Make sure each stored key has the full number of redundant
1266 // entries.
1267 repair_status = EnsureEntryRedundancy();
1268 if (overall_status.ok()) {
1269 overall_status = repair_status;
1270 }
1271
1272 if (overall_status.ok()) {
1273 error_detected_ = false;
1274 initialized_ = InitializationState::kReady;
1275 }
1276 return overall_status;
1277 }
1278
Repair()1279 Status KeyValueStore::Repair() {
1280 // If errors have been detected, just reinit the KVS metadata. This does a
1281 // full deep error check and any needed repairs. Then repair any errors.
1282 INF("Starting KVS repair");
1283
1284 DBG("Reinitialize KVS metadata");
1285 InitializeMetadata()
1286 .IgnoreError(); // TODO(b/242598609): Handle Status properly
1287
1288 return FixErrors();
1289 }
1290
CreateEntry(Address address,Key key,span<const byte> value,EntryState state)1291 KeyValueStore::Entry KeyValueStore::CreateEntry(Address address,
1292 Key key,
1293 span<const byte> value,
1294 EntryState state) {
1295 // Always bump the transaction ID when creating a new entry.
1296 //
1297 // Burning transaction IDs prevents inconsistencies between flash and memory
1298 // that which could happen if a write succeeds, but for some reason the read
1299 // and verify step fails. Here's how this would happen:
1300 //
1301 // 1. The entry is written but for some reason the flash reports failure OR
1302 // The write succeeds, but the read / verify operation fails.
1303 // 2. The transaction ID is NOT incremented, because of the failure
1304 // 3. (later) A new entry is written, re-using the transaction ID (oops)
1305 //
1306 // By always burning transaction IDs, the above problem can't happen.
1307 last_transaction_id_ += 1;
1308
1309 if (state == EntryState::kDeleted) {
1310 return Entry::Tombstone(
1311 partition_, address, formats_.primary(), key, last_transaction_id_);
1312 }
1313 return Entry::Valid(partition_,
1314 address,
1315 formats_.primary(),
1316 key,
1317 value,
1318 last_transaction_id_);
1319 }
1320
LogDebugInfo() const1321 void KeyValueStore::LogDebugInfo() const {
1322 const size_t sector_size_bytes = partition_.sector_size_bytes();
1323 DBG("====================== KEY VALUE STORE DUMP =========================");
1324 DBG(" ");
1325 DBG("Flash partition:");
1326 DBG(" Sector count = %u", unsigned(partition_.sector_count()));
1327 DBG(" Sector max count = %u", unsigned(sectors_.max_size()));
1328 DBG(" Sectors in use = %u", unsigned(sectors_.size()));
1329 DBG(" Sector size = %u", unsigned(sector_size_bytes));
1330 DBG(" Total size = %u", unsigned(partition_.size_bytes()));
1331 DBG(" Alignment = %u", unsigned(partition_.alignment_bytes()));
1332 DBG(" ");
1333 DBG("Key descriptors:");
1334 DBG(" Entry count = %u", unsigned(entry_cache_.total_entries()));
1335 DBG(" Max entry count = %u", unsigned(entry_cache_.max_entries()));
1336 DBG(" ");
1337 DBG(" # hash version address address (hex)");
1338 size_t count = 0;
1339 for (const EntryMetadata& metadata : entry_cache_) {
1340 DBG(" |%3zu: | %8zx |%8zu | %8zu | %8zx",
1341 count++,
1342 size_t(metadata.hash()),
1343 size_t(metadata.transaction_id()),
1344 size_t(metadata.first_address()),
1345 size_t(metadata.first_address()));
1346 }
1347 DBG(" ");
1348
1349 DBG("Sector descriptors:");
1350 DBG(" # tail free valid has_space");
1351 for (const SectorDescriptor& sd : sectors_) {
1352 DBG(" |%3u: | %8zu |%8zu | %s",
1353 sectors_.Index(sd),
1354 size_t(sd.writable_bytes()),
1355 sd.valid_bytes(),
1356 sd.writable_bytes() ? "YES" : "");
1357 }
1358 DBG(" ");
1359
1360 // TODO(keir): This should stop logging after some threshold.
1361 // size_t dumped_bytes = 0;
1362 DBG("Sector raw data:");
1363 for (size_t sector_id = 0; sector_id < sectors_.size(); ++sector_id) {
1364 // Read sector data. Yes, this will blow the stack on embedded.
1365 std::array<byte, 500> raw_sector_data; // TODO!!!
1366 [[maybe_unused]] StatusWithSize sws =
1367 partition_.Read(sector_id * sector_size_bytes, raw_sector_data);
1368 DBG("Read: %u bytes", unsigned(sws.size()));
1369
1370 DBG(" base addr offs 0 1 2 3 4 5 6 7");
1371 for (size_t i = 0; i < sector_size_bytes; i += 8) {
1372 DBG(" %3zu %8zx %5zu | %02x %02x %02x %02x %02x %02x %02x %02x",
1373 sector_id,
1374 (sector_id * sector_size_bytes) + i,
1375 i,
1376 static_cast<unsigned int>(raw_sector_data[i + 0]),
1377 static_cast<unsigned int>(raw_sector_data[i + 1]),
1378 static_cast<unsigned int>(raw_sector_data[i + 2]),
1379 static_cast<unsigned int>(raw_sector_data[i + 3]),
1380 static_cast<unsigned int>(raw_sector_data[i + 4]),
1381 static_cast<unsigned int>(raw_sector_data[i + 5]),
1382 static_cast<unsigned int>(raw_sector_data[i + 6]),
1383 static_cast<unsigned int>(raw_sector_data[i + 7]));
1384
1385 // TODO(keir): Fix exit condition.
1386 if (i > 128) {
1387 break;
1388 }
1389 }
1390 DBG(" ");
1391 }
1392
1393 DBG("////////////////////// KEY VALUE STORE DUMP END /////////////////////");
1394 }
1395
LogSectors() const1396 void KeyValueStore::LogSectors() const {
1397 DBG("Sector descriptors: count %u", unsigned(sectors_.size()));
1398 for (auto& sector : sectors_) {
1399 DBG(" - Sector %u: valid %u, recoverable %u, free %u",
1400 sectors_.Index(sector),
1401 unsigned(sector.valid_bytes()),
1402 unsigned(sector.RecoverableBytes(partition_.sector_size_bytes())),
1403 unsigned(sector.writable_bytes()));
1404 }
1405 }
1406
LogKeyDescriptor() const1407 void KeyValueStore::LogKeyDescriptor() const {
1408 DBG("Key descriptors: count %u", unsigned(entry_cache_.total_entries()));
1409 for (const EntryMetadata& metadata : entry_cache_) {
1410 DBG(" - Key: %s, hash %#x, transaction ID %u, first address %#x",
1411 metadata.state() == EntryState::kDeleted ? "Deleted" : "Valid",
1412 unsigned(metadata.hash()),
1413 unsigned(metadata.transaction_id()),
1414 unsigned(metadata.first_address()));
1415 }
1416 }
1417
1418 } // namespace pw::kvs
1419