• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #define PW_LOG_MODULE_NAME "KVS"
16 #define PW_LOG_LEVEL PW_KVS_LOG_LEVEL
17 
18 #include "pw_kvs/key_value_store.h"
19 
20 #include <algorithm>
21 #include <cinttypes>
22 #include <cstring>
23 #include <type_traits>
24 
25 #include "pw_assert/check.h"
26 #include "pw_kvs_private/config.h"
27 #include "pw_log/shorter.h"
28 #include "pw_status/try.h"
29 
30 namespace pw::kvs {
31 namespace {
32 
33 using std::byte;
34 
InvalidKey(Key key)35 constexpr bool InvalidKey(Key key) {
36   return key.empty() || (key.size() > internal::Entry::kMaxKeyLength);
37 }
38 
39 }  // namespace
40 
KeyValueStore(FlashPartition * partition,span<const EntryFormat> formats,const Options & options,size_t redundancy,Vector<SectorDescriptor> & sector_descriptor_list,const SectorDescriptor ** temp_sectors_to_skip,Vector<KeyDescriptor> & key_descriptor_list,Address * addresses)41 KeyValueStore::KeyValueStore(FlashPartition* partition,
42                              span<const EntryFormat> formats,
43                              const Options& options,
44                              size_t redundancy,
45                              Vector<SectorDescriptor>& sector_descriptor_list,
46                              const SectorDescriptor** temp_sectors_to_skip,
47                              Vector<KeyDescriptor>& key_descriptor_list,
48                              Address* addresses)
49     : partition_(*partition),
50       formats_(formats),
51       sectors_(sector_descriptor_list, *partition, temp_sectors_to_skip),
52       entry_cache_(key_descriptor_list, addresses, redundancy),
53       options_(options),
54       initialized_(InitializationState::kNotInitialized),
55       error_detected_(false),
56       internal_stats_({}),
57       last_transaction_id_(0) {}
58 
Init()59 Status KeyValueStore::Init() {
60   initialized_ = InitializationState::kNotInitialized;
61   error_detected_ = false;
62   last_transaction_id_ = 0;
63 
64   INF("Initializing key value store");
65   if (partition_.sector_count() > sectors_.max_size()) {
66     ERR("KVS init failed: kMaxUsableSectors (=%u) must be at least as "
67         "large as the number of sectors in the flash partition (=%u)",
68         unsigned(sectors_.max_size()),
69         unsigned(partition_.sector_count()));
70     return Status::FailedPrecondition();
71   }
72 
73   if (partition_.sector_count() < 2) {
74     ERR("KVS init failed: FlashParition sector count (=%u) must be at 2. KVS "
75         "requires at least 1 working sector + 1 free/reserved sector",
76         unsigned(partition_.sector_count()));
77     return Status::FailedPrecondition();
78   }
79 
80   const size_t sector_size_bytes = partition_.sector_size_bytes();
81 
82   // TODO(davidrogers): investigate doing this as a static assert/compile-time
83   // check.
84   if (sector_size_bytes > SectorDescriptor::max_sector_size()) {
85     ERR("KVS init failed: sector_size_bytes (=%u) is greater than maximum "
86         "allowed sector size (=%u)",
87         unsigned(sector_size_bytes),
88         unsigned(SectorDescriptor::max_sector_size()));
89     return Status::FailedPrecondition();
90   }
91 
92   Status metadata_result = InitializeMetadata();
93 
94   if (!error_detected_) {
95     initialized_ = InitializationState::kReady;
96   } else {
97     initialized_ = InitializationState::kNeedsMaintenance;
98 
99     if (options_.recovery != ErrorRecovery::kManual) {
100       size_t pre_fix_redundancy_errors =
101           internal_stats_.missing_redundant_entries_recovered;
102       Status recovery_status = FixErrors();
103 
104       if (recovery_status.ok()) {
105         if (metadata_result.IsOutOfRange()) {
106           internal_stats_.missing_redundant_entries_recovered =
107               pre_fix_redundancy_errors;
108           INF("KVS init: Redundancy level successfully updated");
109         } else {
110           WRN("KVS init: Corruption detected and fully repaired");
111         }
112         initialized_ = InitializationState::kReady;
113       } else if (recovery_status.IsResourceExhausted()) {
114         WRN("KVS init: Unable to maintain required free sector");
115       } else {
116         WRN("KVS init: Corruption detected and unable repair");
117       }
118     } else {
119       WRN("KVS init: Corruption detected, no repair attempted due to options");
120     }
121   }
122 
123   INF("KeyValueStore init complete: active keys %u, deleted keys %u, sectors "
124       "%u, logical sector size %u bytes",
125       unsigned(size()),
126       unsigned(entry_cache_.total_entries() - size()),
127       unsigned(sectors_.size()),
128       unsigned(partition_.sector_size_bytes()));
129 
130   // Report any corruption was not repaired.
131   if (error_detected_) {
132     WRN("KVS init: Corruption found but not repaired, KVS unavailable until "
133         "successful maintenance.");
134     return Status::DataLoss();
135   }
136 
137   return OkStatus();
138 }
139 
InitializeMetadata()140 Status KeyValueStore::InitializeMetadata() {
141   const size_t sector_size_bytes = partition_.sector_size_bytes();
142 
143   sectors_.Reset();
144   entry_cache_.Reset();
145 
146   DBG("First pass: Read all entries from all sectors");
147   Address sector_address = 0;
148 
149   size_t total_corrupt_bytes = 0;
150   size_t corrupt_entries = 0;
151   bool empty_sector_found = false;
152   size_t entry_copies_missing = 0;
153 
154   for (SectorDescriptor& sector : sectors_) {
155     Address entry_address = sector_address;
156 
157     size_t sector_corrupt_bytes = 0;
158 
159     for (int num_entries_in_sector = 0; true; num_entries_in_sector++) {
160       DBG("Load entry: sector=%u, entry#=%d, address=%u",
161           unsigned(sector_address),
162           num_entries_in_sector,
163           unsigned(entry_address));
164 
165       if (!sectors_.AddressInSector(sector, entry_address)) {
166         DBG("Fell off end of sector; moving to the next sector");
167         break;
168       }
169 
170       Address next_entry_address;
171       Status status = LoadEntry(entry_address, &next_entry_address);
172       if (status.IsNotFound()) {
173         DBG("Hit un-written data in sector; moving to the next sector");
174         break;
175       } else if (!status.ok()) {
176         // The entry could not be read, indicating likely data corruption within
177         // the sector. Try to scan the remainder of the sector for other
178         // entries.
179 
180         error_detected_ = true;
181         corrupt_entries++;
182 
183         status = ScanForEntry(sector,
184                               entry_address + Entry::kMinAlignmentBytes,
185                               &next_entry_address);
186         if (!status.ok()) {
187           // No further entries in this sector. Mark the remaining bytes in the
188           // sector as corrupt (since we can't reliably know the size of the
189           // corrupt entry).
190           sector_corrupt_bytes +=
191               sector_size_bytes - (entry_address - sector_address);
192           break;
193         }
194 
195         sector_corrupt_bytes += next_entry_address - entry_address;
196       }
197 
198       // Entry loaded successfully; so get ready to load the next one.
199       entry_address = next_entry_address;
200 
201       // Update of the number of writable bytes in this sector.
202       sector.set_writable_bytes(sector_size_bytes -
203                                 (entry_address - sector_address));
204     }
205 
206     if (sector_corrupt_bytes > 0) {
207       // If the sector contains corrupt data, prevent any further entries from
208       // being written to it by indicating that it has no space. This should
209       // also make it a decent GC candidate. Valid keys in the sector are still
210       // readable as normal.
211       sector.mark_corrupt();
212       error_detected_ = true;
213 
214       WRN("Sector %u contains %uB of corrupt data",
215           sectors_.Index(sector),
216           unsigned(sector_corrupt_bytes));
217     }
218 
219     if (sector.Empty(sector_size_bytes)) {
220       empty_sector_found = true;
221     }
222     sector_address += sector_size_bytes;
223     total_corrupt_bytes += sector_corrupt_bytes;
224   }
225 
226   DBG("Second pass: Count valid bytes in each sector");
227   Address newest_key = 0;
228 
229   // For every valid entry, for each address, count the valid bytes in that
230   // sector. If the address fails to read, remove the address and mark the
231   // sector as corrupt. Track which entry has the newest transaction ID for
232   // initializing last_new_sector_.
233   for (EntryMetadata& metadata : entry_cache_) {
234     if (metadata.addresses().size() < redundancy()) {
235       DBG("Key 0x%08x missing copies, has %u, needs %u",
236           unsigned(metadata.hash()),
237           unsigned(metadata.addresses().size()),
238           unsigned(redundancy()));
239       entry_copies_missing++;
240     }
241     size_t index = 0;
242     while (index < metadata.addresses().size()) {
243       Address address = metadata.addresses()[index];
244       Entry entry;
245 
246       Status read_result = Entry::Read(partition_, address, formats_, &entry);
247 
248       SectorDescriptor& sector = sectors_.FromAddress(address);
249 
250       if (read_result.ok()) {
251         sector.AddValidBytes(entry.size());
252         index++;
253       } else {
254         corrupt_entries++;
255         total_corrupt_bytes += sector.writable_bytes();
256         error_detected_ = true;
257         sector.mark_corrupt();
258 
259         // Remove the bad address and stay at this index. The removal
260         // replaces out the removed address with the back address so
261         // this index needs to be rechecked with the new address.
262         metadata.RemoveAddress(address);
263       }
264     }
265 
266     if (metadata.IsNewerThan(last_transaction_id_)) {
267       last_transaction_id_ = metadata.transaction_id();
268       newest_key = metadata.addresses().back();
269     }
270   }
271 
272   sectors_.set_last_new_sector(newest_key);
273 
274   if (!empty_sector_found) {
275     DBG("No empty sector found");
276     error_detected_ = true;
277   }
278 
279   if (entry_copies_missing > 0) {
280     bool other_errors = error_detected_;
281     error_detected_ = true;
282 
283     if (!other_errors && entry_copies_missing == entry_cache_.total_entries()) {
284       INF("KVS configuration changed to redundancy of %u total copies per key",
285           unsigned(redundancy()));
286       return Status::OutOfRange();
287     }
288   }
289 
290   if (error_detected_) {
291     WRN("Corruption detected. Found %u corrupt bytes, %u corrupt entries, "
292         "and %u keys missing redundant copies.",
293         unsigned(total_corrupt_bytes),
294         unsigned(corrupt_entries),
295         unsigned(entry_copies_missing));
296     return Status::FailedPrecondition();
297   }
298   return OkStatus();
299 }
300 
GetStorageStats() const301 KeyValueStore::StorageStats KeyValueStore::GetStorageStats() const {
302   StorageStats stats{};
303   const size_t sector_size = partition_.sector_size_bytes();
304   bool found_empty_sector = false;
305   stats.sector_erase_count = internal_stats_.sector_erase_count;
306   stats.corrupt_sectors_recovered = internal_stats_.corrupt_sectors_recovered;
307   stats.missing_redundant_entries_recovered =
308       internal_stats_.missing_redundant_entries_recovered;
309 
310   for (const SectorDescriptor& sector : sectors_) {
311     stats.in_use_bytes += sector.valid_bytes();
312     stats.reclaimable_bytes += sector.RecoverableBytes(sector_size);
313 
314     if (!found_empty_sector && sector.Empty(sector_size)) {
315       // The KVS tries to always keep an empty sector for GC, so don't count
316       // the first empty sector seen as writable space. However, a free sector
317       // cannot always be assumed to exist; if a GC operation fails, all sectors
318       // may be partially written, in which case the space reported might be
319       // inaccurate.
320       found_empty_sector = true;
321       continue;
322     }
323 
324     stats.writable_bytes += sector.writable_bytes();
325   }
326 
327   return stats;
328 }
329 
330 // Check KVS for any error conditions. Primarily intended for test and
331 // internal use.
CheckForErrors()332 bool KeyValueStore::CheckForErrors() {
333   // Check for corrupted sectors
334   for (SectorDescriptor& sector : sectors_) {
335     if (sector.corrupt()) {
336       error_detected_ = true;
337       return error_detected();
338     }
339   }
340 
341   // Check for missing redundancy.
342   if (redundancy() > 1) {
343     for (const EntryMetadata& metadata : entry_cache_) {
344       if (metadata.addresses().size() < redundancy()) {
345         error_detected_ = true;
346         return error_detected();
347       }
348     }
349   }
350 
351   return error_detected();
352 }
353 
LoadEntry(Address entry_address,Address * next_entry_address)354 Status KeyValueStore::LoadEntry(Address entry_address,
355                                 Address* next_entry_address) {
356   Entry entry;
357   PW_TRY(Entry::Read(partition_, entry_address, formats_, &entry));
358 
359   // Read the key from flash & validate the entry (which reads the value).
360   Entry::KeyBuffer key_buffer;
361   PW_TRY_ASSIGN(size_t key_length, entry.ReadKey(key_buffer));
362   const Key key(key_buffer.data(), key_length);
363 
364   PW_TRY(entry.VerifyChecksumInFlash());
365 
366   // A valid entry was found, so update the next entry address before doing any
367   // of the checks that happen in AddNewOrUpdateExisting.
368   *next_entry_address = entry.next_address();
369   return entry_cache_.AddNewOrUpdateExisting(
370       entry.descriptor(key), entry.address(), partition_.sector_size_bytes());
371 }
372 
373 // Scans flash memory within a sector to find a KVS entry magic.
ScanForEntry(const SectorDescriptor & sector,Address start_address,Address * next_entry_address)374 Status KeyValueStore::ScanForEntry(const SectorDescriptor& sector,
375                                    Address start_address,
376                                    Address* next_entry_address) {
377   DBG("Scanning sector %u for entries starting from address %u",
378       sectors_.Index(sector),
379       unsigned(start_address));
380 
381   // Entries must start at addresses which are aligned on a multiple of
382   // Entry::kMinAlignmentBytes. However, that multiple can vary between entries.
383   // When scanning, we don't have an entry to tell us what the current alignment
384   // is, so the minimum alignment is used to be exhaustive.
385   for (Address address = AlignUp(start_address, Entry::kMinAlignmentBytes);
386        sectors_.AddressInSector(sector, address);
387        address += Entry::kMinAlignmentBytes) {
388     uint32_t magic;
389     StatusWithSize read_result =
390         partition_.Read(address, as_writable_bytes(span(&magic, 1)));
391     if (!read_result.ok()) {
392       continue;
393     }
394     if (formats_.KnownMagic(magic)) {
395       DBG("Found entry magic at address %u", unsigned(address));
396       *next_entry_address = address;
397       return OkStatus();
398     }
399   }
400 
401   return Status::NotFound();
402 }
403 
404 #if PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
405 
RemoveDeletedKeyEntries()406 Status KeyValueStore::RemoveDeletedKeyEntries() {
407   for (internal::EntryCache::iterator it = entry_cache_.begin();
408        it != entry_cache_.end();
409        ++it) {
410     EntryMetadata& entry_metadata = *it;
411 
412     // The iterator we are given back from RemoveEntry could also be deleted,
413     // so loop until we find one that isn't deleted.
414     while (entry_metadata.state() == EntryState::kDeleted) {
415       // Read the original entry to get the size for sector accounting purposes.
416       Entry entry;
417       PW_TRY(ReadEntry(entry_metadata, entry));
418 
419       for (Address address : entry_metadata.addresses()) {
420         sectors_.FromAddress(address).RemoveValidBytes(entry.size());
421       }
422 
423       it = entry_cache_.RemoveEntry(it);
424 
425       if (it == entry_cache_.end()) {
426         return OkStatus();  // new iterator is the end, bail
427       }
428 
429       entry_metadata = *it;  // set entry_metadata to check for deletion again
430     }
431   }
432 
433   return OkStatus();
434 }
435 
436 #endif  // PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
437 
Get(Key key,span<byte> value_buffer,size_t offset_bytes) const438 StatusWithSize KeyValueStore::Get(Key key,
439                                   span<byte> value_buffer,
440                                   size_t offset_bytes) const {
441   PW_TRY_WITH_SIZE(CheckReadOperation(key));
442 
443   EntryMetadata metadata;
444   PW_TRY_WITH_SIZE(FindExisting(key, &metadata));
445 
446   return Get(key, metadata, value_buffer, offset_bytes);
447 }
448 
PutBytes(Key key,span<const byte> value)449 Status KeyValueStore::PutBytes(Key key, span<const byte> value) {
450   PW_TRY(CheckWriteOperation(key));
451   DBG("Writing key/value; key length=%u, value length=%u",
452       unsigned(key.size()),
453       unsigned(value.size()));
454 
455   if (Entry::size(partition_, key, value) > partition_.sector_size_bytes()) {
456     DBG("%u B value with %u B key cannot fit in one sector",
457         unsigned(value.size()),
458         unsigned(key.size()));
459     return Status::InvalidArgument();
460   }
461 
462   EntryMetadata metadata;
463   Status status = FindEntry(key, &metadata);
464 
465   if (status.ok()) {
466     // TODO(davidrogers): figure out logging how to support multiple addresses.
467     DBG("Overwriting entry for key 0x%08x in %u sectors including %u",
468         unsigned(metadata.hash()),
469         unsigned(metadata.addresses().size()),
470         sectors_.Index(metadata.first_address()));
471     return WriteEntryForExistingKey(metadata, EntryState::kValid, key, value);
472   }
473 
474   if (status.IsNotFound()) {
475     return WriteEntryForNewKey(key, value);
476   }
477 
478   return status;
479 }
480 
Delete(Key key)481 Status KeyValueStore::Delete(Key key) {
482   PW_TRY(CheckWriteOperation(key));
483 
484   EntryMetadata metadata;
485   PW_TRY(FindExisting(key, &metadata));
486 
487   // TODO(davidrogers): figure out logging how to support multiple addresses.
488   DBG("Writing tombstone for key 0x%08x in %u sectors including %u",
489       unsigned(metadata.hash()),
490       unsigned(metadata.addresses().size()),
491       sectors_.Index(metadata.first_address()));
492   return WriteEntryForExistingKey(metadata, EntryState::kDeleted, key, {});
493 }
494 
ReadKey()495 void KeyValueStore::Item::ReadKey() {
496   key_buffer_.fill('\0');
497 
498   Entry entry;
499   if (kvs_.ReadEntry(*iterator_, entry).ok()) {
500     entry.ReadKey(key_buffer_)
501         .IgnoreError();  // TODO(b/242598609): Handle Status properly
502   }
503 }
504 
operator ++()505 KeyValueStore::iterator& KeyValueStore::iterator::operator++() {
506   // Skip to the next entry that is valid (not deleted).
507   while (++item_.iterator_ != item_.kvs_.entry_cache_.end() &&
508          item_.iterator_->state() != EntryState::kValid) {
509   }
510   return *this;
511 }
512 
begin() const513 KeyValueStore::iterator KeyValueStore::begin() const {
514   internal::EntryCache::const_iterator cache_iterator = entry_cache_.begin();
515   // Skip over any deleted entries at the start of the descriptor list.
516   while (cache_iterator != entry_cache_.end() &&
517          cache_iterator->state() != EntryState::kValid) {
518     ++cache_iterator;
519   }
520   return iterator(*this, cache_iterator);
521 }
522 
ValueSize(Key key) const523 StatusWithSize KeyValueStore::ValueSize(Key key) const {
524   PW_TRY_WITH_SIZE(CheckReadOperation(key));
525 
526   EntryMetadata metadata;
527   PW_TRY_WITH_SIZE(FindExisting(key, &metadata));
528 
529   return ValueSize(metadata);
530 }
531 
ReadEntry(const EntryMetadata & metadata,Entry & entry) const532 Status KeyValueStore::ReadEntry(const EntryMetadata& metadata,
533                                 Entry& entry) const {
534   // Try to read an entry
535   Status read_result = Status::DataLoss();
536   for (Address address : metadata.addresses()) {
537     read_result = Entry::Read(partition_, address, formats_, &entry);
538     if (read_result.ok()) {
539       return read_result;
540     }
541 
542     // Found a bad address. Set the sector as corrupt.
543     error_detected_ = true;
544     sectors_.FromAddress(address).mark_corrupt();
545   }
546 
547   ERR("No valid entries for key. Data has been lost!");
548   return read_result;
549 }
550 
FindEntry(Key key,EntryMetadata * metadata_out) const551 Status KeyValueStore::FindEntry(Key key, EntryMetadata* metadata_out) const {
552   StatusWithSize find_result =
553       entry_cache_.Find(partition_, sectors_, formats_, key, metadata_out);
554 
555   if (find_result.size() > 0u) {
556     error_detected_ = true;
557   }
558   return find_result.status();
559 }
560 
FindExisting(Key key,EntryMetadata * metadata_out) const561 Status KeyValueStore::FindExisting(Key key, EntryMetadata* metadata_out) const {
562   Status status = FindEntry(key, metadata_out);
563 
564   // If the key's hash collides with an existing key or if the key is deleted,
565   // treat it as if it is not in the KVS.
566   if (status.IsAlreadyExists() ||
567       (status.ok() && metadata_out->state() == EntryState::kDeleted)) {
568     return Status::NotFound();
569   }
570   return status;
571 }
572 
Get(Key key,const EntryMetadata & metadata,span<std::byte> value_buffer,size_t offset_bytes) const573 StatusWithSize KeyValueStore::Get(Key key,
574                                   const EntryMetadata& metadata,
575                                   span<std::byte> value_buffer,
576                                   size_t offset_bytes) const {
577   Entry entry;
578 
579   PW_TRY_WITH_SIZE(ReadEntry(metadata, entry));
580 
581   StatusWithSize result = entry.ReadValue(value_buffer, offset_bytes);
582   if (result.ok() && options_.verify_on_read && offset_bytes == 0u) {
583     Status verify_result =
584         entry.VerifyChecksum(key, value_buffer.first(result.size()));
585     if (!verify_result.ok()) {
586       std::memset(value_buffer.data(), 0, result.size());
587       return StatusWithSize(verify_result, 0);
588     }
589 
590     return StatusWithSize(verify_result, result.size());
591   }
592   return result;
593 }
594 
FixedSizeGet(Key key,void * value,size_t size_bytes) const595 Status KeyValueStore::FixedSizeGet(Key key,
596                                    void* value,
597                                    size_t size_bytes) const {
598   PW_TRY(CheckWriteOperation(key));
599 
600   EntryMetadata metadata;
601   PW_TRY(FindExisting(key, &metadata));
602 
603   return FixedSizeGet(key, metadata, value, size_bytes);
604 }
605 
FixedSizeGet(Key key,const EntryMetadata & metadata,void * value,size_t size_bytes) const606 Status KeyValueStore::FixedSizeGet(Key key,
607                                    const EntryMetadata& metadata,
608                                    void* value,
609                                    size_t size_bytes) const {
610   // Ensure that the size of the stored value matches the size of the type.
611   // Otherwise, report error. This check avoids potential memory corruption.
612   PW_TRY_ASSIGN(const size_t actual_size, ValueSize(metadata));
613 
614   if (actual_size != size_bytes) {
615     DBG("Requested %u B read, but value is %u B",
616         unsigned(size_bytes),
617         unsigned(actual_size));
618     return Status::InvalidArgument();
619   }
620 
621   StatusWithSize result =
622       Get(key, metadata, span(static_cast<byte*>(value), size_bytes), 0);
623 
624   return result.status();
625 }
626 
ValueSize(const EntryMetadata & metadata) const627 StatusWithSize KeyValueStore::ValueSize(const EntryMetadata& metadata) const {
628   Entry entry;
629   PW_TRY_WITH_SIZE(ReadEntry(metadata, entry));
630 
631   return StatusWithSize(entry.value_size());
632 }
633 
CheckWriteOperation(Key key) const634 Status KeyValueStore::CheckWriteOperation(Key key) const {
635   if (InvalidKey(key)) {
636     return Status::InvalidArgument();
637   }
638 
639   // For normal write operation the KVS must be fully ready.
640   if (!initialized()) {
641     return Status::FailedPrecondition();
642   }
643   return OkStatus();
644 }
645 
CheckReadOperation(Key key) const646 Status KeyValueStore::CheckReadOperation(Key key) const {
647   if (InvalidKey(key)) {
648     return Status::InvalidArgument();
649   }
650 
651   // Operations that are explicitly read-only can be done after init() has been
652   // called but not fully ready (when needing maintenance).
653   if (initialized_ == InitializationState::kNotInitialized) {
654     return Status::FailedPrecondition();
655   }
656   return OkStatus();
657 }
658 
WriteEntryForExistingKey(EntryMetadata & metadata,EntryState new_state,Key key,span<const byte> value)659 Status KeyValueStore::WriteEntryForExistingKey(EntryMetadata& metadata,
660                                                EntryState new_state,
661                                                Key key,
662                                                span<const byte> value) {
663   // Read the original entry to get the size for sector accounting purposes.
664   Entry entry;
665   PW_TRY(ReadEntry(metadata, entry));
666 
667   return WriteEntry(key, value, new_state, &metadata, &entry);
668 }
669 
WriteEntryForNewKey(Key key,span<const byte> value)670 Status KeyValueStore::WriteEntryForNewKey(Key key, span<const byte> value) {
671   // If there is no room in the cache for a new entry, it is possible some cache
672   // entries could be freed by removing deleted keys. If deleted key removal is
673   // enabled and the KVS is configured to make all possible writes succeed,
674   // attempt heavy maintenance now.
675 #if PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
676   if (options_.gc_on_write == GargbageCollectOnWrite::kAsManySectorsNeeded &&
677       entry_cache_.full()) {
678     Status maintenance_status = HeavyMaintenance();
679     if (!maintenance_status.ok()) {
680       WRN("KVS Maintenance failed for write: %s", maintenance_status.str());
681       return maintenance_status;
682     }
683   }
684 #endif  // PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
685 
686   if (entry_cache_.full()) {
687     WRN("KVS full: trying to store a new entry, but can't. Have %u entries",
688         unsigned(entry_cache_.total_entries()));
689     return Status::ResourceExhausted();
690   }
691 
692   return WriteEntry(key, value, EntryState::kValid);
693 }
694 
WriteEntry(Key key,span<const byte> value,EntryState new_state,EntryMetadata * prior_metadata,const Entry * prior_entry)695 Status KeyValueStore::WriteEntry(Key key,
696                                  span<const byte> value,
697                                  EntryState new_state,
698                                  EntryMetadata* prior_metadata,
699                                  const Entry* prior_entry) {
700   // If new entry and prior entry have matching value size, state, and checksum,
701   // check if the values match. Directly compare the prior and new values
702   // because the checksum can not be depended on to establish equality, it can
703   // only be depended on to establish inequality.
704   if (prior_entry != nullptr && prior_entry->value_size() == value.size() &&
705       prior_metadata->state() == new_state &&
706       prior_entry->ValueMatches(value).ok()) {
707     // The new value matches the prior value, don't need to write anything. Just
708     // keep the existing entry.
709     DBG("Write for key 0x%08x with matching value skipped",
710         unsigned(prior_metadata->hash()));
711     return OkStatus();
712   }
713 
714   // List of addresses for sectors with space for this entry.
715   Address* reserved_addresses = entry_cache_.TempReservedAddressesForWrite();
716 
717   // Find addresses to write the entry to. This may involve garbage collecting
718   // one or more sectors.
719   const size_t entry_size = Entry::size(partition_, key, value);
720   PW_TRY(GetAddressesForWrite(reserved_addresses, entry_size));
721 
722   // Write the entry at the first address that was found.
723   Entry entry = CreateEntry(reserved_addresses[0], key, value, new_state);
724   PW_TRY(AppendEntry(entry, key, value));
725 
726   // After writing the first entry successfully, update the key descriptors.
727   // Once a single new the entry is written, the old entries are invalidated.
728   size_t prior_size = prior_entry != nullptr ? prior_entry->size() : 0;
729   EntryMetadata new_metadata =
730       CreateOrUpdateKeyDescriptor(entry, key, prior_metadata, prior_size);
731 
732   // Write the additional copies of the entry, if redundancy is greater than 1.
733   for (size_t i = 1; i < redundancy(); ++i) {
734     entry.set_address(reserved_addresses[i]);
735     PW_TRY(AppendEntry(entry, key, value));
736     new_metadata.AddNewAddress(reserved_addresses[i]);
737   }
738   return OkStatus();
739 }
740 
CreateOrUpdateKeyDescriptor(const Entry & entry,Key key,EntryMetadata * prior_metadata,size_t prior_size)741 KeyValueStore::EntryMetadata KeyValueStore::CreateOrUpdateKeyDescriptor(
742     const Entry& entry,
743     Key key,
744     EntryMetadata* prior_metadata,
745     size_t prior_size) {
746   // If there is no prior descriptor, create a new one.
747   if (prior_metadata == nullptr) {
748     return entry_cache_.AddNew(entry.descriptor(key), entry.address());
749   }
750 
751   return UpdateKeyDescriptor(
752       entry, entry.address(), prior_metadata, prior_size);
753 }
754 
UpdateKeyDescriptor(const Entry & entry,Address new_address,EntryMetadata * prior_metadata,size_t prior_size)755 KeyValueStore::EntryMetadata KeyValueStore::UpdateKeyDescriptor(
756     const Entry& entry,
757     Address new_address,
758     EntryMetadata* prior_metadata,
759     size_t prior_size) {
760   // Remove valid bytes for the old entry and its copies, which are now stale.
761   for (Address address : prior_metadata->addresses()) {
762     sectors_.FromAddress(address).RemoveValidBytes(prior_size);
763   }
764 
765   prior_metadata->Reset(entry.descriptor(prior_metadata->hash()), new_address);
766   return *prior_metadata;
767 }
768 
GetAddressesForWrite(Address * write_addresses,size_t write_size)769 Status KeyValueStore::GetAddressesForWrite(Address* write_addresses,
770                                            size_t write_size) {
771   for (size_t i = 0; i < redundancy(); i++) {
772     SectorDescriptor* sector;
773     PW_TRY(GetSectorForWrite(&sector, write_size, span(write_addresses, i)));
774     write_addresses[i] = sectors_.NextWritableAddress(*sector);
775 
776     DBG("Found space for entry in sector %u at address %u",
777         sectors_.Index(sector),
778         unsigned(write_addresses[i]));
779   }
780 
781   return OkStatus();
782 }
783 
784 // Finds a sector to use for writing a new entry to. Does automatic garbage
785 // collection if needed and allowed.
786 //
787 //                 OK: Sector found with needed space.
788 // RESOURCE_EXHAUSTED: No sector available with the needed space.
GetSectorForWrite(SectorDescriptor ** sector,size_t entry_size,span<const Address> reserved_addresses)789 Status KeyValueStore::GetSectorForWrite(
790     SectorDescriptor** sector,
791     size_t entry_size,
792     span<const Address> reserved_addresses) {
793   Status result = sectors_.FindSpace(sector, entry_size, reserved_addresses);
794 
795   size_t gc_sector_count = 0;
796   bool do_auto_gc = options_.gc_on_write != GargbageCollectOnWrite::kDisabled;
797 
798   // Do garbage collection as needed, so long as policy allows.
799   while (result.IsResourceExhausted() && do_auto_gc) {
800     if (options_.gc_on_write == GargbageCollectOnWrite::kOneSector) {
801       // If GC config option is kOneSector clear the flag to not do any more
802       // GC after this try.
803       do_auto_gc = false;
804     }
805     // Garbage collect and then try again to find the best sector.
806     Status gc_status = GarbageCollect(reserved_addresses);
807     if (!gc_status.ok()) {
808       if (gc_status.IsNotFound()) {
809         // Not enough space, and no reclaimable bytes, this KVS is full!
810         return Status::ResourceExhausted();
811       }
812       return gc_status;
813     }
814 
815     result = sectors_.FindSpace(sector, entry_size, reserved_addresses);
816 
817     gc_sector_count++;
818     // Allow total sectors + 2 number of GC cycles so that once reclaimable
819     // bytes in all the sectors have been reclaimed can try and free up space by
820     // moving entries for keys other than the one being worked on in to sectors
821     // that have copies of the key trying to be written.
822     if (gc_sector_count > (partition_.sector_count() + 2)) {
823       ERR("Did more GC sectors than total sectors!!!!");
824       return Status::ResourceExhausted();
825     }
826   }
827 
828   if (!result.ok()) {
829     WRN("Unable to find sector to write %u B", unsigned(entry_size));
830   }
831   return result;
832 }
833 
MarkSectorCorruptIfNotOk(Status status,SectorDescriptor * sector)834 Status KeyValueStore::MarkSectorCorruptIfNotOk(Status status,
835                                                SectorDescriptor* sector) {
836   if (!status.ok()) {
837     DBG("  Sector %u corrupt", sectors_.Index(sector));
838     sector->mark_corrupt();
839     error_detected_ = true;
840   }
841   return status;
842 }
843 
AppendEntry(const Entry & entry,Key key,span<const byte> value)844 Status KeyValueStore::AppendEntry(const Entry& entry,
845                                   Key key,
846                                   span<const byte> value) {
847   const StatusWithSize result = entry.Write(key, value);
848 
849   SectorDescriptor& sector = sectors_.FromAddress(entry.address());
850 
851   if (!result.ok()) {
852     ERR("Failed to write %u bytes at %#x. %u actually written",
853         unsigned(entry.size()),
854         unsigned(entry.address()),
855         unsigned(result.size()));
856     PW_TRY(MarkSectorCorruptIfNotOk(result.status(), &sector));
857   }
858 
859   if (options_.verify_on_write) {
860     PW_TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), &sector));
861   }
862 
863   sector.RemoveWritableBytes(result.size());
864   sector.AddValidBytes(result.size());
865   return OkStatus();
866 }
867 
CopyEntryToSector(Entry & entry,SectorDescriptor * new_sector,Address new_address)868 StatusWithSize KeyValueStore::CopyEntryToSector(Entry& entry,
869                                                 SectorDescriptor* new_sector,
870                                                 Address new_address) {
871   const StatusWithSize result = entry.Copy(new_address);
872 
873   PW_TRY_WITH_SIZE(MarkSectorCorruptIfNotOk(result.status(), new_sector));
874 
875   if (options_.verify_on_write) {
876     Entry new_entry;
877     PW_TRY_WITH_SIZE(MarkSectorCorruptIfNotOk(
878         Entry::Read(partition_, new_address, formats_, &new_entry),
879         new_sector));
880     // TODO(davidrogers): add test that catches doing the verify on the old
881     // entry.
882     PW_TRY_WITH_SIZE(MarkSectorCorruptIfNotOk(new_entry.VerifyChecksumInFlash(),
883                                               new_sector));
884   }
885   // Entry was written successfully; update descriptor's address and the sector
886   // descriptors to reflect the new entry.
887   new_sector->RemoveWritableBytes(result.size());
888   new_sector->AddValidBytes(result.size());
889 
890   return result;
891 }
892 
RelocateEntry(const EntryMetadata & metadata,KeyValueStore::Address & address,span<const Address> reserved_addresses)893 Status KeyValueStore::RelocateEntry(const EntryMetadata& metadata,
894                                     KeyValueStore::Address& address,
895                                     span<const Address> reserved_addresses) {
896   Entry entry;
897   PW_TRY(ReadEntry(metadata, entry));
898 
899   // Find a new sector for the entry and write it to the new location. For
900   // relocation the find should not not be a sector already containing the key
901   // but can be the always empty sector, since this is part of the GC process
902   // that will result in a new empty sector. Also find a sector that does not
903   // have reclaimable space (mostly for the full GC, where that would result in
904   // an immediate extra relocation).
905   SectorDescriptor* new_sector;
906 
907   PW_TRY(sectors_.FindSpaceDuringGarbageCollection(
908       &new_sector, entry.size(), metadata.addresses(), reserved_addresses));
909 
910   Address new_address = sectors_.NextWritableAddress(*new_sector);
911   PW_TRY_ASSIGN(const size_t result_size,
912                 CopyEntryToSector(entry, new_sector, new_address));
913   sectors_.FromAddress(address).RemoveValidBytes(result_size);
914   address = new_address;
915 
916   return OkStatus();
917 }
918 
FullMaintenanceHelper(MaintenanceType maintenance_type)919 Status KeyValueStore::FullMaintenanceHelper(MaintenanceType maintenance_type) {
920   if (initialized_ == InitializationState::kNotInitialized) {
921     return Status::FailedPrecondition();
922   }
923 
924   // Full maintenance can be a potentially heavy operation, and should be
925   // relatively infrequent, so log start/end at INFO level.
926   INF("Beginning full maintenance");
927   CheckForErrors();
928 
929   // Step 1: Repair errors
930   if (error_detected_) {
931     PW_TRY(Repair());
932   }
933 
934   // Step 2: Make sure all the entries are on the primary format.
935   StatusWithSize update_status = UpdateEntriesToPrimaryFormat();
936   Status overall_status = update_status.status();
937 
938   if (!overall_status.ok()) {
939     ERR("Failed to update all entries to the primary format");
940   }
941 
942   SectorDescriptor* sector = sectors_.last_new();
943 
944   // Calculate number of bytes for the threshold.
945   size_t threshold_bytes =
946       (partition_.size_bytes() * kGcUsageThresholdPercentage) / 100;
947 
948   // Is bytes in use over the threshold.
949   StorageStats stats = GetStorageStats();
950   bool over_usage_threshold = stats.in_use_bytes > threshold_bytes;
951   bool heavy = (maintenance_type == MaintenanceType::kHeavy);
952   bool force_gc = heavy || over_usage_threshold || (update_status.size() > 0);
953 
954   auto do_garbage_collect_pass = [&]() {
955     // TODO(drempel): look in to making an iterator method for cycling through
956     // sectors starting from last_new_sector_.
957     Status gc_status;
958     for (size_t j = 0; j < sectors_.size(); j++) {
959       sector += 1;
960       if (sector == sectors_.end()) {
961         sector = sectors_.begin();
962       }
963 
964       if (sector->RecoverableBytes(partition_.sector_size_bytes()) > 0 &&
965           (force_gc || sector->valid_bytes() == 0)) {
966         gc_status = GarbageCollectSector(*sector, {});
967         if (!gc_status.ok()) {
968           ERR("Failed to garbage collect all sectors");
969           break;
970         }
971       }
972     }
973     if (overall_status.ok()) {
974       overall_status = gc_status;
975     }
976   };
977 
978   // Step 3: Do full garbage collect pass for all sectors. This will erase all
979   // old/state entries from flash and leave only current/valid entries.
980   do_garbage_collect_pass();
981 
982 #if PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
983   // Step 4: (if heavy maintenance) garbage collect all the deleted keys.
984   if (heavy) {
985     // If enabled, remove deleted keys from the entry cache, including freeing
986     // sector bytes used by those keys. This must only be done directly after a
987     // full garbage collection, otherwise the current deleted entry could be
988     // garbage collected before the older stale entry producing a window for an
989     // invalid/corrupted KVS state if there was a power-fault, crash or other
990     // interruption.
991     overall_status.Update(RemoveDeletedKeyEntries());
992 
993     // Do another garbage collect pass that will fully remove the deleted keys
994     // from flash. Garbage collect will only touch sectors that have something
995     // to garbage collect, which in this case is only sectors containing deleted
996     // keys.
997     do_garbage_collect_pass();
998   }
999 #endif  // PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
1000 
1001   if (overall_status.ok()) {
1002     INF("Full maintenance complete");
1003   } else {
1004     ERR("Full maintenance finished with some errors");
1005   }
1006   return overall_status;
1007 }
1008 
PartialMaintenance()1009 Status KeyValueStore::PartialMaintenance() {
1010   if (initialized_ == InitializationState::kNotInitialized) {
1011     return Status::FailedPrecondition();
1012   }
1013 
1014   CheckForErrors();
1015   // Do automatic repair, if KVS options allow for it.
1016   if (error_detected_ && options_.recovery != ErrorRecovery::kManual) {
1017     PW_TRY(Repair());
1018   }
1019   return GarbageCollect(span<const Address>());
1020 }
1021 
GarbageCollect(span<const Address> reserved_addresses)1022 Status KeyValueStore::GarbageCollect(span<const Address> reserved_addresses) {
1023   DBG("Garbage Collect a single sector");
1024   for ([[maybe_unused]] Address address : reserved_addresses) {
1025     DBG("   Avoid address %u", unsigned(address));
1026   }
1027 
1028   // Step 1: Find the sector to garbage collect
1029   SectorDescriptor* sector_to_gc =
1030       sectors_.FindSectorToGarbageCollect(reserved_addresses);
1031 
1032   if (sector_to_gc == nullptr) {
1033     // Nothing to GC.
1034     return Status::NotFound();
1035   }
1036 
1037   // Step 2: Garbage collect the selected sector.
1038   return GarbageCollectSector(*sector_to_gc, reserved_addresses);
1039 }
1040 
RelocateKeyAddressesInSector(SectorDescriptor & sector_to_gc,const EntryMetadata & metadata,span<const Address> reserved_addresses)1041 Status KeyValueStore::RelocateKeyAddressesInSector(
1042     SectorDescriptor& sector_to_gc,
1043     const EntryMetadata& metadata,
1044     span<const Address> reserved_addresses) {
1045   for (FlashPartition::Address& address : metadata.addresses()) {
1046     if (sectors_.AddressInSector(sector_to_gc, address)) {
1047       DBG("  Relocate entry for Key 0x%08" PRIx32 ", sector %u",
1048           metadata.hash(),
1049           sectors_.Index(sectors_.FromAddress(address)));
1050       PW_TRY(RelocateEntry(metadata, address, reserved_addresses));
1051     }
1052   }
1053 
1054   return OkStatus();
1055 }
1056 
GarbageCollectSector(SectorDescriptor & sector_to_gc,span<const Address> reserved_addresses)1057 Status KeyValueStore::GarbageCollectSector(
1058     SectorDescriptor& sector_to_gc, span<const Address> reserved_addresses) {
1059   DBG("  Garbage Collect sector %u", sectors_.Index(sector_to_gc));
1060 
1061   // Step 1: Move any valid entries in the GC sector to other sectors
1062   if (sector_to_gc.valid_bytes() != 0) {
1063     for (EntryMetadata& metadata : entry_cache_) {
1064       PW_TRY(RelocateKeyAddressesInSector(
1065           sector_to_gc, metadata, reserved_addresses));
1066     }
1067   }
1068 
1069   if (sector_to_gc.valid_bytes() != 0) {
1070     ERR("  Failed to relocate valid entries from sector being garbage "
1071         "collected, %u valid bytes remain",
1072         unsigned(sector_to_gc.valid_bytes()));
1073     return Status::Internal();
1074   }
1075 
1076   // Step 2: Reinitialize the sector
1077   if (!sector_to_gc.Empty(partition_.sector_size_bytes())) {
1078     sector_to_gc.mark_corrupt();
1079     internal_stats_.sector_erase_count++;
1080     PW_TRY(partition_.Erase(sectors_.BaseAddress(sector_to_gc), 1));
1081     sector_to_gc.set_writable_bytes(partition_.sector_size_bytes());
1082   }
1083 
1084   DBG("  Garbage Collect sector %u complete", sectors_.Index(sector_to_gc));
1085   return OkStatus();
1086 }
1087 
UpdateEntriesToPrimaryFormat()1088 StatusWithSize KeyValueStore::UpdateEntriesToPrimaryFormat() {
1089   size_t entries_updated = 0;
1090   for (EntryMetadata& prior_metadata : entry_cache_) {
1091     Entry entry;
1092     PW_TRY_WITH_SIZE(ReadEntry(prior_metadata, entry));
1093     if (formats_.primary().magic == entry.magic()) {
1094       // Ignore entries that are already on the primary format.
1095       continue;
1096     }
1097 
1098     DBG("Updating entry 0x%08x from old format [0x%08x] to new format "
1099         "[0x%08x]",
1100         unsigned(prior_metadata.hash()),
1101         unsigned(entry.magic()),
1102         unsigned(formats_.primary().magic));
1103 
1104     entries_updated++;
1105 
1106     last_transaction_id_ += 1;
1107     PW_TRY_WITH_SIZE(entry.Update(formats_.primary(), last_transaction_id_));
1108 
1109     // List of addresses for sectors with space for this entry.
1110     Address* reserved_addresses = entry_cache_.TempReservedAddressesForWrite();
1111 
1112     // Find addresses to write the entry to. This may involve garbage collecting
1113     // one or more sectors.
1114     PW_TRY_WITH_SIZE(GetAddressesForWrite(reserved_addresses, entry.size()));
1115 
1116     PW_TRY_WITH_SIZE(
1117         CopyEntryToSector(entry,
1118                           &sectors_.FromAddress(reserved_addresses[0]),
1119                           reserved_addresses[0]));
1120 
1121     // After writing the first entry successfully, update the key descriptors.
1122     // Once a single new the entry is written, the old entries are invalidated.
1123     EntryMetadata new_metadata = UpdateKeyDescriptor(
1124         entry, reserved_addresses[0], &prior_metadata, entry.size());
1125 
1126     // Write the additional copies of the entry, if redundancy is greater
1127     // than 1.
1128     for (size_t i = 1; i < redundancy(); ++i) {
1129       PW_TRY_WITH_SIZE(
1130           CopyEntryToSector(entry,
1131                             &sectors_.FromAddress(reserved_addresses[i]),
1132                             reserved_addresses[i]));
1133       new_metadata.AddNewAddress(reserved_addresses[i]);
1134     }
1135   }
1136 
1137   return StatusWithSize(entries_updated);
1138 }
1139 
1140 // Add any missing redundant entries/copies for a key.
AddRedundantEntries(EntryMetadata & metadata)1141 Status KeyValueStore::AddRedundantEntries(EntryMetadata& metadata) {
1142   Entry entry;
1143   PW_TRY(ReadEntry(metadata, entry));
1144   PW_TRY(entry.VerifyChecksumInFlash());
1145 
1146   while (metadata.addresses().size() < redundancy()) {
1147     SectorDescriptor* new_sector;
1148     PW_TRY(GetSectorForWrite(&new_sector, entry.size(), metadata.addresses()));
1149 
1150     Address new_address = sectors_.NextWritableAddress(*new_sector);
1151     PW_TRY(CopyEntryToSector(entry, new_sector, new_address));
1152 
1153     metadata.AddNewAddress(new_address);
1154   }
1155   return OkStatus();
1156 }
1157 
RepairCorruptSectors()1158 Status KeyValueStore::RepairCorruptSectors() {
1159   // Try to GC each corrupt sector, even if previous sectors fail. If GC of a
1160   // sector failed on the first pass, then do a second pass, since a later
1161   // sector might have cleared up space or otherwise unblocked the earlier
1162   // failed sector.
1163   Status repair_status = OkStatus();
1164 
1165   size_t loop_count = 0;
1166   do {
1167     loop_count++;
1168     // Error of RESOURCE_EXHAUSTED indicates no space found for relocation.
1169     // Reset back to OK for the next pass.
1170     if (repair_status.IsResourceExhausted()) {
1171       repair_status = OkStatus();
1172     }
1173 
1174     DBG("   Pass %u", unsigned(loop_count));
1175     for (SectorDescriptor& sector : sectors_) {
1176       if (sector.corrupt()) {
1177         DBG("   Found sector %u with corruption", sectors_.Index(sector));
1178         Status sector_status = GarbageCollectSector(sector, {});
1179         if (sector_status.ok()) {
1180           internal_stats_.corrupt_sectors_recovered += 1;
1181         } else if (repair_status.ok() || repair_status.IsResourceExhausted()) {
1182           repair_status = sector_status;
1183         }
1184       }
1185     }
1186     DBG("   Pass %u complete", unsigned(loop_count));
1187   } while (!repair_status.ok() && loop_count < 2);
1188 
1189   return repair_status;
1190 }
1191 
EnsureFreeSectorExists()1192 Status KeyValueStore::EnsureFreeSectorExists() {
1193   Status repair_status = OkStatus();
1194   bool empty_sector_found = false;
1195 
1196   DBG("   Find empty sector");
1197   for (SectorDescriptor& sector : sectors_) {
1198     if (sector.Empty(partition_.sector_size_bytes())) {
1199       empty_sector_found = true;
1200       DBG("   Empty sector found");
1201       break;
1202     }
1203   }
1204   if (empty_sector_found == false) {
1205     DBG("   No empty sector found, attempting to GC a free sector");
1206     Status sector_status = GarbageCollect(span<const Address, 0>());
1207     if (repair_status.ok() && !sector_status.ok()) {
1208       DBG("   Unable to free an empty sector");
1209       repair_status = sector_status;
1210     }
1211   }
1212 
1213   return repair_status;
1214 }
1215 
EnsureEntryRedundancy()1216 Status KeyValueStore::EnsureEntryRedundancy() {
1217   Status repair_status = OkStatus();
1218 
1219   if (redundancy() == 1) {
1220     DBG("   Redundancy not in use, nothting to check");
1221     return OkStatus();
1222   }
1223 
1224   DBG("   Write any needed additional duplicate copies of keys to fulfill %u"
1225       " redundancy",
1226       unsigned(redundancy()));
1227   for (EntryMetadata& metadata : entry_cache_) {
1228     if (metadata.addresses().size() >= redundancy()) {
1229       continue;
1230     }
1231 
1232     DBG("   Key with %u of %u copies found, adding missing copies",
1233         unsigned(metadata.addresses().size()),
1234         unsigned(redundancy()));
1235     Status fill_status = AddRedundantEntries(metadata);
1236     if (fill_status.ok()) {
1237       internal_stats_.missing_redundant_entries_recovered += 1;
1238       DBG("   Key missing copies added");
1239     } else {
1240       DBG("   Failed to add key missing copies");
1241       if (repair_status.ok()) {
1242         repair_status = fill_status;
1243       }
1244     }
1245   }
1246 
1247   return repair_status;
1248 }
1249 
FixErrors()1250 Status KeyValueStore::FixErrors() {
1251   DBG("Fixing KVS errors");
1252 
1253   // Step 1: Garbage collect any sectors marked as corrupt.
1254   Status overall_status = RepairCorruptSectors();
1255 
1256   // Step 2: Make sure there is at least 1 empty sector. This needs to be a
1257   // seperate check of sectors from step 1, because a found empty sector might
1258   // get written to by a later GC that fails and does not result in a free
1259   // sector.
1260   Status repair_status = EnsureFreeSectorExists();
1261   if (overall_status.ok()) {
1262     overall_status = repair_status;
1263   }
1264 
1265   // Step 3: Make sure each stored key has the full number of redundant
1266   // entries.
1267   repair_status = EnsureEntryRedundancy();
1268   if (overall_status.ok()) {
1269     overall_status = repair_status;
1270   }
1271 
1272   if (overall_status.ok()) {
1273     error_detected_ = false;
1274     initialized_ = InitializationState::kReady;
1275   }
1276   return overall_status;
1277 }
1278 
Repair()1279 Status KeyValueStore::Repair() {
1280   // If errors have been detected, just reinit the KVS metadata. This does a
1281   // full deep error check and any needed repairs. Then repair any errors.
1282   INF("Starting KVS repair");
1283 
1284   DBG("Reinitialize KVS metadata");
1285   InitializeMetadata()
1286       .IgnoreError();  // TODO(b/242598609): Handle Status properly
1287 
1288   return FixErrors();
1289 }
1290 
CreateEntry(Address address,Key key,span<const byte> value,EntryState state)1291 KeyValueStore::Entry KeyValueStore::CreateEntry(Address address,
1292                                                 Key key,
1293                                                 span<const byte> value,
1294                                                 EntryState state) {
1295   // Always bump the transaction ID when creating a new entry.
1296   //
1297   // Burning transaction IDs prevents inconsistencies between flash and memory
1298   // that which could happen if a write succeeds, but for some reason the read
1299   // and verify step fails. Here's how this would happen:
1300   //
1301   //   1. The entry is written but for some reason the flash reports failure OR
1302   //      The write succeeds, but the read / verify operation fails.
1303   //   2. The transaction ID is NOT incremented, because of the failure
1304   //   3. (later) A new entry is written, re-using the transaction ID (oops)
1305   //
1306   // By always burning transaction IDs, the above problem can't happen.
1307   last_transaction_id_ += 1;
1308 
1309   if (state == EntryState::kDeleted) {
1310     return Entry::Tombstone(
1311         partition_, address, formats_.primary(), key, last_transaction_id_);
1312   }
1313   return Entry::Valid(partition_,
1314                       address,
1315                       formats_.primary(),
1316                       key,
1317                       value,
1318                       last_transaction_id_);
1319 }
1320 
LogDebugInfo() const1321 void KeyValueStore::LogDebugInfo() const {
1322   const size_t sector_size_bytes = partition_.sector_size_bytes();
1323   DBG("====================== KEY VALUE STORE DUMP =========================");
1324   DBG(" ");
1325   DBG("Flash partition:");
1326   DBG("  Sector count     = %u", unsigned(partition_.sector_count()));
1327   DBG("  Sector max count = %u", unsigned(sectors_.max_size()));
1328   DBG("  Sectors in use   = %u", unsigned(sectors_.size()));
1329   DBG("  Sector size      = %u", unsigned(sector_size_bytes));
1330   DBG("  Total size       = %u", unsigned(partition_.size_bytes()));
1331   DBG("  Alignment        = %u", unsigned(partition_.alignment_bytes()));
1332   DBG(" ");
1333   DBG("Key descriptors:");
1334   DBG("  Entry count     = %u", unsigned(entry_cache_.total_entries()));
1335   DBG("  Max entry count = %u", unsigned(entry_cache_.max_entries()));
1336   DBG(" ");
1337   DBG("      #     hash        version    address   address (hex)");
1338   size_t count = 0;
1339   for (const EntryMetadata& metadata : entry_cache_) {
1340     DBG("   |%3zu: | %8zx  |%8zu  | %8zu | %8zx",
1341         count++,
1342         size_t(metadata.hash()),
1343         size_t(metadata.transaction_id()),
1344         size_t(metadata.first_address()),
1345         size_t(metadata.first_address()));
1346   }
1347   DBG(" ");
1348 
1349   DBG("Sector descriptors:");
1350   DBG("      #     tail free  valid    has_space");
1351   for (const SectorDescriptor& sd : sectors_) {
1352     DBG("   |%3u: | %8zu  |%8zu  | %s",
1353         sectors_.Index(sd),
1354         size_t(sd.writable_bytes()),
1355         sd.valid_bytes(),
1356         sd.writable_bytes() ? "YES" : "");
1357   }
1358   DBG(" ");
1359 
1360   // TODO(keir): This should stop logging after some threshold.
1361   // size_t dumped_bytes = 0;
1362   DBG("Sector raw data:");
1363   for (size_t sector_id = 0; sector_id < sectors_.size(); ++sector_id) {
1364     // Read sector data. Yes, this will blow the stack on embedded.
1365     std::array<byte, 500> raw_sector_data;  // TODO!!!
1366     [[maybe_unused]] StatusWithSize sws =
1367         partition_.Read(sector_id * sector_size_bytes, raw_sector_data);
1368     DBG("Read: %u bytes", unsigned(sws.size()));
1369 
1370     DBG("  base    addr  offs   0  1  2  3  4  5  6  7");
1371     for (size_t i = 0; i < sector_size_bytes; i += 8) {
1372       DBG("  %3zu %8zx %5zu | %02x %02x %02x %02x %02x %02x %02x %02x",
1373           sector_id,
1374           (sector_id * sector_size_bytes) + i,
1375           i,
1376           static_cast<unsigned int>(raw_sector_data[i + 0]),
1377           static_cast<unsigned int>(raw_sector_data[i + 1]),
1378           static_cast<unsigned int>(raw_sector_data[i + 2]),
1379           static_cast<unsigned int>(raw_sector_data[i + 3]),
1380           static_cast<unsigned int>(raw_sector_data[i + 4]),
1381           static_cast<unsigned int>(raw_sector_data[i + 5]),
1382           static_cast<unsigned int>(raw_sector_data[i + 6]),
1383           static_cast<unsigned int>(raw_sector_data[i + 7]));
1384 
1385       // TODO(keir): Fix exit condition.
1386       if (i > 128) {
1387         break;
1388       }
1389     }
1390     DBG(" ");
1391   }
1392 
1393   DBG("////////////////////// KEY VALUE STORE DUMP END /////////////////////");
1394 }
1395 
LogSectors() const1396 void KeyValueStore::LogSectors() const {
1397   DBG("Sector descriptors: count %u", unsigned(sectors_.size()));
1398   for (auto& sector : sectors_) {
1399     DBG("  - Sector %u: valid %u, recoverable %u, free %u",
1400         sectors_.Index(sector),
1401         unsigned(sector.valid_bytes()),
1402         unsigned(sector.RecoverableBytes(partition_.sector_size_bytes())),
1403         unsigned(sector.writable_bytes()));
1404   }
1405 }
1406 
LogKeyDescriptor() const1407 void KeyValueStore::LogKeyDescriptor() const {
1408   DBG("Key descriptors: count %u", unsigned(entry_cache_.total_entries()));
1409   for (const EntryMetadata& metadata : entry_cache_) {
1410     DBG("  - Key: %s, hash %#x, transaction ID %u, first address %#x",
1411         metadata.state() == EntryState::kDeleted ? "Deleted" : "Valid",
1412         unsigned(metadata.hash()),
1413         unsigned(metadata.transaction_id()),
1414         unsigned(metadata.first_address()));
1415   }
1416 }
1417 
1418 }  // namespace pw::kvs
1419