• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2022 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "icing/index/numeric/integer-index-storage.h"
16 
17 #include <algorithm>
18 #include <cstdint>
19 #include <functional>
20 #include <iterator>
21 #include <limits>
22 #include <memory>
23 #include <queue>
24 #include <string>
25 #include <string_view>
26 #include <utility>
27 #include <vector>
28 
29 #include "icing/text_classifier/lib3/utils/base/status.h"
30 #include "icing/text_classifier/lib3/utils/base/statusor.h"
31 #include "icing/absl_ports/canonical_errors.h"
32 #include "icing/absl_ports/str_cat.h"
33 #include "icing/file/file-backed-vector.h"
34 #include "icing/file/filesystem.h"
35 #include "icing/file/memory-mapped-file.h"
36 #include "icing/file/posting_list/flash-index-storage.h"
37 #include "icing/file/posting_list/posting-list-identifier.h"
38 #include "icing/index/hit/doc-hit-info.h"
39 #include "icing/index/iterator/doc-hit-info-iterator.h"
40 #include "icing/index/numeric/doc-hit-info-iterator-numeric.h"
41 #include "icing/index/numeric/integer-index-bucket-util.h"
42 #include "icing/index/numeric/integer-index-data.h"
43 #include "icing/index/numeric/numeric-index.h"
44 #include "icing/index/numeric/posting-list-integer-index-accessor.h"
45 #include "icing/index/numeric/posting-list-integer-index-serializer.h"
46 #include "icing/schema/section.h"
47 #include "icing/store/document-id.h"
48 #include "icing/util/crc32.h"
49 #include "icing/util/status-macros.h"
50 
51 namespace icing {
52 namespace lib {
53 
54 namespace {
55 
56 // Helper function to flush data between [it_start, it_end) into posting list(s)
57 // and return posting list id.
58 // Note: it will sort data between [it_start, it_end) by basic hit value, so the
59 // caller should be aware that the data order will be changed after calling this
60 // function.
FlushDataIntoPostingLists(FlashIndexStorage * flash_index_storage,PostingListIntegerIndexSerializer * posting_list_serializer,const std::vector<IntegerIndexData>::iterator & it_start,const std::vector<IntegerIndexData>::iterator & it_end)61 libtextclassifier3::StatusOr<PostingListIdentifier> FlushDataIntoPostingLists(
62     FlashIndexStorage* flash_index_storage,
63     PostingListIntegerIndexSerializer* posting_list_serializer,
64     const std::vector<IntegerIndexData>::iterator& it_start,
65     const std::vector<IntegerIndexData>::iterator& it_end) {
66   if (it_start == it_end) {
67     return PostingListIdentifier::kInvalid;
68   }
69 
70   ICING_ASSIGN_OR_RETURN(
71       std::unique_ptr<PostingListIntegerIndexAccessor> new_pl_accessor,
72       PostingListIntegerIndexAccessor::Create(flash_index_storage,
73                                               posting_list_serializer));
74 
75   std::sort(it_start, it_end);
76   for (auto it = it_end - 1; it >= it_start; --it) {
77     ICING_RETURN_IF_ERROR(new_pl_accessor->PrependData(*it));
78   }
79 
80   PostingListAccessor::FinalizeResult result =
81       std::move(*new_pl_accessor).Finalize();
82   if (!result.status.ok()) {
83     return result.status;
84   }
85   if (!result.id.is_valid()) {
86     return absl_ports::InternalError("Fail to flush data into posting list(s)");
87   }
88   return result.id;
89 }
90 
91 // The following 4 methods are helper functions to get the correct file path of
92 // metadata/sorted_buckets/unsorted_buckets/flash_index_storage, according to
93 // the given working directory.
GetMetadataFilePath(std::string_view working_path)94 std::string GetMetadataFilePath(std::string_view working_path) {
95   return absl_ports::StrCat(working_path, "/", IntegerIndexStorage::kFilePrefix,
96                             ".m");
97 }
98 
GetSortedBucketsFilePath(std::string_view working_path)99 std::string GetSortedBucketsFilePath(std::string_view working_path) {
100   return absl_ports::StrCat(working_path, "/", IntegerIndexStorage::kFilePrefix,
101                             ".s");
102 }
103 
GetUnsortedBucketsFilePath(std::string_view working_path)104 std::string GetUnsortedBucketsFilePath(std::string_view working_path) {
105   return absl_ports::StrCat(working_path, "/", IntegerIndexStorage::kFilePrefix,
106                             ".u");
107 }
108 
GetFlashIndexStorageFilePath(std::string_view working_path)109 std::string GetFlashIndexStorageFilePath(std::string_view working_path) {
110   return absl_ports::StrCat(working_path, "/", IntegerIndexStorage::kFilePrefix,
111                             ".f");
112 }
113 
114 }  // namespace
115 
116 // We add (BasicHits, key) into a bucket in DocumentId descending and SectionId
117 // ascending order. When doing range query, we may access buckets and want to
118 // return BasicHits to callers sorted by DocumentId. Therefore, this problem is
119 // actually "merge K sorted lists".
120 // To implement this algorithm via priority_queue, we create this wrapper class
121 // to store PostingListIntegerIndexAccessor for iterating through the posting
122 // list chain.
123 // - Non-relevant (i.e. not in range [key_lower, key_upper]) will be skipped.
124 // - Relevant BasicHits will be returned.
125 class BucketPostingListIterator {
126  public:
127   class Comparator {
128    public:
129     // REQUIRES: 2 BucketPostingListIterator* instances (lhs, rhs) should be
130     //   valid, i.e. the preceding AdvanceAndFilter() succeeded.
operator ()(const BucketPostingListIterator * lhs,const BucketPostingListIterator * rhs) const131     bool operator()(const BucketPostingListIterator* lhs,
132                     const BucketPostingListIterator* rhs) const {
133       // std::priority_queue is a max heap and we should return BasicHits in
134       // DocumentId descending order.
135       // - BucketPostingListIterator::operator< should have the same order as
136       //   DocumentId.
137       // - BasicHit encodes inverted document id and BasicHit::operator<
138       //   compares the encoded raw value directly.
139       // - Therefore, BucketPostingListIterator::operator< should compare
140       //   BasicHit reversely.
141       // - This will make priority_queue return buckets in DocumentId
142       //   descending and SectionId ascending order.
143       // - Whatever direction we sort SectionId by (or pop by priority_queue)
144       //   doesn't matter because all hits for the same DocumentId will be
145       //   merged into a single DocHitInfo.
146       return rhs->GetCurrentBasicHit() < lhs->GetCurrentBasicHit();
147     }
148   };
149 
BucketPostingListIterator(std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor)150   explicit BucketPostingListIterator(
151       std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor)
152       : pl_accessor_(std::move(pl_accessor)),
153         should_retrieve_next_batch_(true) {}
154 
155   struct AdvanceAndFilterResult {
156     libtextclassifier3::Status status = libtextclassifier3::Status::OK;
157     int32_t num_advance_calls = 0;
158     int32_t num_blocks_inspected = 0;
159   };
160   // Advances to the next relevant data. The posting list of a bucket contains
161   // keys within range [bucket.key_lower, bucket.key_upper], but some of them
162   // may be out of [query_key_lower, query_key_upper], so when advancing we have
163   // to filter out those non-relevant keys.
164   //
165   // Returns:
166   //   AdvanceAndFilterResult. status will be:
167   //   - OK on success
168   //   - RESOURCE_EXHAUSTED_ERROR if reaching the end (i.e. no more relevant
169   //     data)
170   //   - Any other PostingListIntegerIndexAccessor errors
AdvanceAndFilter(int64_t query_key_lower,int64_t query_key_upper)171   AdvanceAndFilterResult AdvanceAndFilter(int64_t query_key_lower,
172                                           int64_t query_key_upper) {
173     AdvanceAndFilterResult result;
174     // Move curr_ until reaching a relevant data (i.e. key in range
175     // [query_key_lower, query_key_upper])
176     do {
177       if (!should_retrieve_next_batch_) {
178         ++curr_;
179         should_retrieve_next_batch_ =
180             curr_ >= cached_batch_integer_index_data_.cend();
181       }
182       if (should_retrieve_next_batch_) {
183         auto status = GetNextDataBatch();
184         if (!status.ok()) {
185           result.status = std::move(status);
186           return result;
187         }
188         ++result.num_blocks_inspected;
189         should_retrieve_next_batch_ = false;
190       }
191       ++result.num_advance_calls;
192     } while (curr_->key() < query_key_lower || curr_->key() > query_key_upper);
193 
194     return result;
195   }
196 
GetCurrentBasicHit() const197   const BasicHit& GetCurrentBasicHit() const { return curr_->basic_hit(); }
198 
199  private:
200   // Gets next batch of data from the posting list chain, caches in
201   // cached_batch_integer_index_data_, and sets curr_ to the begin of the cache.
GetNextDataBatch()202   libtextclassifier3::Status GetNextDataBatch() {
203     auto cached_batch_integer_index_data_or = pl_accessor_->GetNextDataBatch();
204     if (!cached_batch_integer_index_data_or.ok()) {
205       ICING_LOG(WARNING)
206           << "Fail to get next batch data from posting list due to: "
207           << cached_batch_integer_index_data_or.status().error_message();
208       return std::move(cached_batch_integer_index_data_or).status();
209     }
210 
211     cached_batch_integer_index_data_ =
212         std::move(cached_batch_integer_index_data_or).ValueOrDie();
213     curr_ = cached_batch_integer_index_data_.cbegin();
214 
215     if (cached_batch_integer_index_data_.empty()) {
216       return absl_ports::ResourceExhaustedError("End of iterator");
217     }
218 
219     return libtextclassifier3::Status::OK;
220   }
221 
222   std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor_;
223   std::vector<IntegerIndexData> cached_batch_integer_index_data_;
224   std::vector<IntegerIndexData>::const_iterator curr_;
225   bool should_retrieve_next_batch_;
226 };
227 
228 // Wrapper class to iterate through IntegerIndexStorage to get relevant data.
229 // It uses multiple BucketPostingListIterator instances from different candidate
230 // buckets and merges all relevant BasicHits from these buckets by
231 // std::priority_queue in DocumentId descending order. Also different SectionIds
232 // of the same DocumentId will be merged into SectionIdMask and returned as a
233 // single DocHitInfo.
234 class IntegerIndexStorageIterator : public NumericIndex<int64_t>::Iterator {
235  public:
IntegerIndexStorageIterator(int64_t query_key_lower,int64_t query_key_upper,std::vector<std::unique_ptr<BucketPostingListIterator>> && bucket_pl_iters)236   explicit IntegerIndexStorageIterator(
237       int64_t query_key_lower, int64_t query_key_upper,
238       std::vector<std::unique_ptr<BucketPostingListIterator>>&& bucket_pl_iters)
239       : NumericIndex<int64_t>::Iterator(query_key_lower, query_key_upper),
240         num_advance_calls_(0),
241         num_blocks_inspected_(0) {
242     std::vector<BucketPostingListIterator*> bucket_pl_iters_raw_ptrs;
243     for (std::unique_ptr<BucketPostingListIterator>& bucket_pl_itr :
244          bucket_pl_iters) {
245       // Before adding BucketPostingListIterator* into the priority queue, we
246       // have to advance the bucket iterator to the first valid data since the
247       // priority queue needs valid data to compare the order.
248       // Note: it is possible that the bucket iterator fails to advance for the
249       // first round, because data could be filtered out by [query_key_lower,
250       // query_key_upper]. In this case, just discard the iterator.
251       BucketPostingListIterator::AdvanceAndFilterResult
252           advance_and_filter_result =
253               bucket_pl_itr->AdvanceAndFilter(query_key_lower, query_key_upper);
254       if (advance_and_filter_result.status.ok()) {
255         bucket_pl_iters_raw_ptrs.push_back(bucket_pl_itr.get());
256         bucket_pl_iters_.push_back(std::move(bucket_pl_itr));
257       }
258       num_advance_calls_ += advance_and_filter_result.num_advance_calls;
259       num_blocks_inspected_ += advance_and_filter_result.num_blocks_inspected;
260     }
261 
262     pq_ = std::priority_queue<BucketPostingListIterator*,
263                               std::vector<BucketPostingListIterator*>,
264                               BucketPostingListIterator::Comparator>(
265         comparator_, std::move(bucket_pl_iters_raw_ptrs));
266   }
267 
268   ~IntegerIndexStorageIterator() override = default;
269 
270   // Advances to the next DocHitInfo. Note: several BucketPostingListIterator
271   // instances may be advanced if they point to data with the same DocumentId.
272   //
273   // Returns:
274   //   - OK on success
275   //   - RESOURCE_EXHAUSTED_ERROR if reaching the end (i.e. no more relevant
276   //     data)
277   //   - Any BucketPostingListIterator errors
278   libtextclassifier3::Status Advance() override;
279 
GetDocHitInfo() const280   DocHitInfo GetDocHitInfo() const override { return doc_hit_info_; }
281 
GetNumAdvanceCalls() const282   int32_t GetNumAdvanceCalls() const override { return num_advance_calls_; }
283 
GetNumBlocksInspected() const284   int32_t GetNumBlocksInspected() const override {
285     return num_blocks_inspected_;
286   }
287 
288  private:
289   BucketPostingListIterator::Comparator comparator_;
290 
291   // We have to fetch and pop the top BucketPostingListIterator from
292   // std::priority_queue to perform "merge K sorted lists algorithm".
293   // - Since std::priority_queue::pop() doesn't return the top element, we have
294   //   to call top() and pop() together.
295   // - std::move the top() element by const_cast is not an appropriate way
296   //   because it introduces transient unstable state for std::priority_queue.
297   // - We don't want to copy BucketPostingListIterator, either.
298   // - Therefore, add bucket_pl_iters_ for the ownership of all
299   //   BucketPostingListIterator instances and std::priority_queue uses the raw
300   //   pointer. So when calling top(), we can simply copy the raw pointer via
301   //   top() and avoid transient unstable state.
302   std::vector<std::unique_ptr<BucketPostingListIterator>> bucket_pl_iters_;
303   std::priority_queue<BucketPostingListIterator*,
304                       std::vector<BucketPostingListIterator*>,
305                       BucketPostingListIterator::Comparator>
306       pq_;
307 
308   DocHitInfo doc_hit_info_;
309 
310   int32_t num_advance_calls_;
311   int32_t num_blocks_inspected_;
312 };
313 
Advance()314 libtextclassifier3::Status IntegerIndexStorageIterator::Advance() {
315   if (pq_.empty()) {
316     return absl_ports::ResourceExhaustedError("End of iterator");
317   }
318 
319   DocumentId document_id = pq_.top()->GetCurrentBasicHit().document_id();
320   doc_hit_info_ = DocHitInfo(document_id);
321   // Merge sections with same document_id into a single DocHitInfo
322   while (!pq_.empty() &&
323          pq_.top()->GetCurrentBasicHit().document_id() == document_id) {
324     BucketPostingListIterator* bucket_itr = pq_.top();
325     pq_.pop();
326 
327     libtextclassifier3::Status advance_status;
328     do {
329       doc_hit_info_.UpdateSection(
330           bucket_itr->GetCurrentBasicHit().section_id());
331       BucketPostingListIterator::AdvanceAndFilterResult
332           advance_and_filter_result =
333               bucket_itr->AdvanceAndFilter(key_lower_, key_upper_);
334       advance_status = std::move(advance_and_filter_result.status);
335       num_advance_calls_ += advance_and_filter_result.num_advance_calls;
336       num_blocks_inspected_ += advance_and_filter_result.num_blocks_inspected;
337     } while (advance_status.ok() &&
338              bucket_itr->GetCurrentBasicHit().document_id() == document_id);
339     if (advance_status.ok()) {
340       pq_.push(bucket_itr);
341     }
342   }
343 
344   return libtextclassifier3::Status::OK;
345 }
346 
IsValid() const347 bool IntegerIndexStorage::Options::IsValid() const {
348   if (num_data_threshold_for_bucket_split <=
349       kMinNumDataThresholdForBucketSplit) {
350     return false;
351   }
352 
353   if (!HasCustomInitBuckets()) {
354     return true;
355   }
356 
357   // Verify if the range of buckets are disjoint and the range union is
358   // [INT64_MIN, INT64_MAX].
359   std::vector<Bucket> buckets;
360   buckets.reserve(custom_init_sorted_buckets.size() +
361                   custom_init_unsorted_buckets.size());
362   buckets.insert(buckets.end(), custom_init_sorted_buckets.begin(),
363                  custom_init_sorted_buckets.end());
364   buckets.insert(buckets.end(), custom_init_unsorted_buckets.begin(),
365                  custom_init_unsorted_buckets.end());
366   if (buckets.empty()) {
367     return false;
368   }
369   std::sort(buckets.begin(), buckets.end());
370   int64_t prev_upper = std::numeric_limits<int64_t>::min();
371   for (int i = 0; i < buckets.size(); ++i) {
372     // key_lower should not be greater than key_upper and init bucket should
373     // have invalid posting list identifier.
374     if (buckets[i].key_lower() > buckets[i].key_upper() ||
375         buckets[i].posting_list_identifier().is_valid()) {
376       return false;
377     }
378 
379     // Previous upper bound should not be INT64_MAX since it is not the last
380     // bucket.
381     if (prev_upper == std::numeric_limits<int64_t>::max()) {
382       return false;
383     }
384 
385     int64_t expected_lower =
386         (i == 0 ? std::numeric_limits<int64_t>::min() : prev_upper + 1);
387     if (buckets[i].key_lower() != expected_lower) {
388       return false;
389     }
390 
391     prev_upper = buckets[i].key_upper();
392   }
393 
394   return prev_upper == std::numeric_limits<int64_t>::max();
395 }
396 
397 /* static */ libtextclassifier3::StatusOr<std::unique_ptr<IntegerIndexStorage>>
Create(const Filesystem & filesystem,std::string working_path,Options options,PostingListIntegerIndexSerializer * posting_list_serializer)398 IntegerIndexStorage::Create(
399     const Filesystem& filesystem, std::string working_path, Options options,
400     PostingListIntegerIndexSerializer* posting_list_serializer) {
401   if (!options.IsValid()) {
402     return absl_ports::InvalidArgumentError(
403         "Invalid IntegerIndexStorage options");
404   }
405 
406   if (!filesystem.FileExists(GetMetadataFilePath(working_path).c_str()) ||
407       !filesystem.FileExists(GetSortedBucketsFilePath(working_path).c_str()) ||
408       !filesystem.FileExists(
409           GetUnsortedBucketsFilePath(working_path).c_str()) ||
410       !filesystem.FileExists(
411           GetFlashIndexStorageFilePath(working_path).c_str())) {
412     // Discard working_path if any of them is missing, and reinitialize.
413     if (filesystem.DirectoryExists(working_path.c_str())) {
414       ICING_RETURN_IF_ERROR(Discard(filesystem, working_path));
415     }
416     return InitializeNewFiles(filesystem, std::move(working_path),
417                               std::move(options), posting_list_serializer);
418   }
419   return InitializeExistingFiles(filesystem, std::move(working_path),
420                                  std::move(options), posting_list_serializer);
421 }
422 
~IntegerIndexStorage()423 IntegerIndexStorage::~IntegerIndexStorage() {
424   if (!PersistToDisk().ok()) {
425     ICING_LOG(WARNING)
426         << "Failed to persist hash map to disk while destructing "
427         << working_path_;
428   }
429 }
430 
431 class IntegerIndexStorageComparator {
432  public:
operator ()(const IntegerIndexStorage::Bucket & lhs,int64_t rhs) const433   bool operator()(const IntegerIndexStorage::Bucket& lhs, int64_t rhs) const {
434     return lhs.key_upper() < rhs;
435   }
436 } kComparator;
437 
AddKeys(DocumentId document_id,SectionId section_id,std::vector<int64_t> && new_keys)438 libtextclassifier3::Status IntegerIndexStorage::AddKeys(
439     DocumentId document_id, SectionId section_id,
440     std::vector<int64_t>&& new_keys) {
441   if (new_keys.empty()) {
442     return libtextclassifier3::Status::OK;
443   }
444 
445   SetDirty();
446 
447   std::sort(new_keys.begin(), new_keys.end());
448 
449   // Dedupe
450   auto last = std::unique(new_keys.begin(), new_keys.end());
451   new_keys.erase(last, new_keys.end());
452 
453   if (static_cast<int32_t>(new_keys.size()) >
454       std::numeric_limits<int32_t>::max() - info().num_data) {
455     return absl_ports::ResourceExhaustedError(
456         "# of keys in this integer index storage exceed the limit");
457   }
458 
459   // When adding keys into a bucket, we potentially split it into 2 new buckets
460   // and one of them will be added into the unsorted bucket array.
461   // When handling keys belonging to buckets in the unsorted bucket array, we
462   // don't have to (and must not) handle these newly split buckets. Therefore,
463   // collect all newly split buckets in another vector and append them into the
464   // unsorted bucket array after adding all keys.
465   std::vector<Bucket> new_buckets;
466 
467   // Binary search range of the sorted bucket array.
468   const Bucket* sorted_bucket_arr_begin = sorted_buckets_->array();
469   const Bucket* sorted_bucket_arr_end =
470       sorted_buckets_->array() + sorted_buckets_->num_elements();
471 
472   // Step 1: handle keys belonging to buckets in the sorted bucket array. Skip
473   //         keys belonging to the unsorted bucket array and deal with them in
474   //         the next step.
475   // - Iterate through new_keys by it_start.
476   // - Binary search (std::lower_bound comparing key with bucket.key_upper()) to
477   //   find the first bucket in the sorted bucket array with key_upper is not
478   //   smaller than (>=) the key.
479   // - Skip (and advance it_start) all keys smaller than the target bucket's
480   //   key_lower. It means these keys belong to buckets in the unsorted bucket
481   //   array and we will deal with them later.
482   // - Find it_end such that all keys within range [it_start, it_end) belong to
483   //   the target bucket.
484   // - Batch add keys within range [it_start, it_end) into the target bucket.
485   auto it_start = new_keys.cbegin();
486   while (it_start != new_keys.cend() &&
487          sorted_bucket_arr_begin < sorted_bucket_arr_end) {
488     // Use std::lower_bound to find the first bucket in the sorted bucket array
489     // with key_upper >= *it_start.
490     const Bucket* target_bucket = std::lower_bound(
491         sorted_bucket_arr_begin, sorted_bucket_arr_end, *it_start, kComparator);
492     if (target_bucket >= sorted_bucket_arr_end) {
493       // Keys in range [it_start, new_keys.cend()) are greater than all sorted
494       // buckets' key_upper, so we can end step 1. In fact, they belong to
495       // buckets in the unsorted bucket array and we will deal with them in
496       // step 2.
497       break;
498     }
499 
500     // Sequential instead of binary search to advance it_start and it_end for
501     // several reasons:
502     // - Eventually we have to iterate through all keys within range [it_start,
503     //   it_end) and add them into the posting list, so binary search doesn't
504     //   improve the overall time complexity.
505     // - Binary search may jump to far-away indices, which potentially
506     //   downgrades the cache performance.
507 
508     // After binary search, we've ensured *it_start <=
509     // target_bucket->key_upper(), but it is still possible that *it_start (and
510     // the next several keys) is still smaller than target_bucket->key_lower(),
511     // so we have to skip them. In fact, they belong to buckets in the unsorted
512     // bucket array.
513     //
514     // For example:
515     // - sorted bucket array: [(INT_MIN, 0), (1, 5), (100, 300), (301, 550)]
516     // - unsorted bucket array: [(550, INT_MAX), (6, 99)]
517     // - new_keys: [10, 20, 40, 102, 150, 200, 500, 600]
518     // std::lower_bound (target = 10) will get target_bucket = (100, 300), but
519     // we have to skip 10, 20, 40 because they are smaller than 100 (the
520     // bucket's key_lower). We should move it_start pointing to key 102.
521     while (it_start != new_keys.cend() &&
522            *it_start < target_bucket->key_lower()) {
523       ++it_start;
524     }
525 
526     // Locate it_end such that all keys within range [it_start, it_end) belong
527     // to target_bucket and all keys outside this range don't belong to
528     // target_bucket.
529     //
530     // For example (continue above), we should locate it_end to point to key
531     // 500.
532     auto it_end = it_start;
533     while (it_end != new_keys.cend() && *it_end <= target_bucket->key_upper()) {
534       ++it_end;
535     }
536 
537     // Now, keys within range [it_start, it_end) belong to target_bucket, so
538     // construct IntegerIndexData and add them into the bucket's posting list.
539     if (it_start != it_end) {
540       ICING_ASSIGN_OR_RETURN(
541           FileBackedVector<Bucket>::MutableView mutable_bucket,
542           sorted_buckets_->GetMutable(target_bucket -
543                                       sorted_buckets_->array()));
544       ICING_ASSIGN_OR_RETURN(
545           std::vector<Bucket> round_new_buckets,
546           AddKeysIntoBucketAndSplitIfNecessary(
547               document_id, section_id, it_start, it_end, mutable_bucket));
548       new_buckets.insert(new_buckets.end(), round_new_buckets.begin(),
549                          round_new_buckets.end());
550     }
551 
552     it_start = it_end;
553     sorted_bucket_arr_begin = target_bucket + 1;
554   }
555 
556   // Step 2: handle keys belonging to buckets in the unsorted bucket array. They
557   //         were skipped in step 1.
558   // For each bucket in the unsorted bucket array, find [it_start, it_end) such
559   // that all keys within this range belong to the bucket and add them.
560   // - Binary search (std::lower_bound comparing bucket.key_lower() with key) to
561   //   find it_start.
562   // - Sequential advance (start from it_start) to find it_end. Same reason as
563   //   above for choosing sequential advance instead of binary search.
564   // - Add keys within range [it_start, it_end) into the bucket.
565   for (int32_t i = 0; i < unsorted_buckets_->num_elements(); ++i) {
566     ICING_ASSIGN_OR_RETURN(FileBackedVector<Bucket>::MutableView mutable_bucket,
567                            unsorted_buckets_->GetMutable(i));
568     auto it_start = std::lower_bound(new_keys.cbegin(), new_keys.cend(),
569                                      mutable_bucket.Get().key_lower());
570     if (it_start == new_keys.cend()) {
571       continue;
572     }
573 
574     // Sequential advance instead of binary search to find the correct position
575     // of it_end for the same reasons mentioned above in step 1.
576     auto it_end = it_start;
577     while (it_end != new_keys.cend() &&
578            *it_end <= mutable_bucket.Get().key_upper()) {
579       ++it_end;
580     }
581 
582     // Now, key within range [it_start, it_end) belong to the bucket, so
583     // construct IntegerIndexData and add them into the bucket's posting list.
584     if (it_start != it_end) {
585       ICING_ASSIGN_OR_RETURN(
586           std::vector<Bucket> round_new_buckets,
587           AddKeysIntoBucketAndSplitIfNecessary(
588               document_id, section_id, it_start, it_end, mutable_bucket));
589       new_buckets.insert(new_buckets.end(), round_new_buckets.begin(),
590                          round_new_buckets.end());
591     }
592   }
593 
594   // Step 3: append new buckets into the unsorted bucket array.
595   if (!new_buckets.empty()) {
596     ICING_ASSIGN_OR_RETURN(
597         typename FileBackedVector<Bucket>::MutableArrayView mutable_new_arr,
598         unsorted_buckets_->Allocate(new_buckets.size()));
599     mutable_new_arr.SetArray(/*idx=*/0, new_buckets.data(), new_buckets.size());
600   }
601 
602   // Step 4: sort and merge the unsorted bucket array into the sorted bucket
603   //         array if the length of the unsorted bucket array exceeds the
604   //         threshold.
605   if (unsorted_buckets_->num_elements() > kUnsortedBucketsLengthThreshold) {
606     ICING_RETURN_IF_ERROR(SortBuckets());
607   }
608 
609   info().num_data += new_keys.size();
610 
611   return libtextclassifier3::Status::OK;
612 }
613 
614 libtextclassifier3::StatusOr<std::unique_ptr<DocHitInfoIterator>>
GetIterator(int64_t query_key_lower,int64_t query_key_upper) const615 IntegerIndexStorage::GetIterator(int64_t query_key_lower,
616                                  int64_t query_key_upper) const {
617   if (query_key_lower > query_key_upper) {
618     return absl_ports::InvalidArgumentError(
619         "key_lower should not be greater than key_upper");
620   }
621 
622   std::vector<std::unique_ptr<BucketPostingListIterator>> bucket_pl_iters;
623 
624   // Sorted bucket array
625   const Bucket* sorted_bucket_arr_begin = sorted_buckets_->array();
626   const Bucket* sorted_bucket_arr_end =
627       sorted_buckets_->array() + sorted_buckets_->num_elements();
628   for (const Bucket* bucket =
629            std::lower_bound(sorted_bucket_arr_begin, sorted_bucket_arr_end,
630                             query_key_lower, kComparator);
631        bucket < sorted_bucket_arr_end && bucket->key_lower() <= query_key_upper;
632        ++bucket) {
633     if (!bucket->posting_list_identifier().is_valid()) {
634       continue;
635     }
636 
637     ICING_ASSIGN_OR_RETURN(
638         std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor,
639         PostingListIntegerIndexAccessor::CreateFromExisting(
640             flash_index_storage_.get(), posting_list_serializer_,
641             bucket->posting_list_identifier()));
642     bucket_pl_iters.push_back(
643         std::make_unique<BucketPostingListIterator>(std::move(pl_accessor)));
644   }
645 
646   // Unsorted bucket array
647   for (int32_t i = 0; i < unsorted_buckets_->num_elements(); ++i) {
648     ICING_ASSIGN_OR_RETURN(const Bucket* bucket, unsorted_buckets_->Get(i));
649     if (query_key_upper < bucket->key_lower() ||
650         query_key_lower > bucket->key_upper() ||
651         !bucket->posting_list_identifier().is_valid()) {
652       // Skip bucket whose range doesn't overlap with [query_key_lower,
653       // query_key_upper] or posting_list_identifier is invalid.
654       continue;
655     }
656 
657     ICING_ASSIGN_OR_RETURN(
658         std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor,
659         PostingListIntegerIndexAccessor::CreateFromExisting(
660             flash_index_storage_.get(), posting_list_serializer_,
661             bucket->posting_list_identifier()));
662     bucket_pl_iters.push_back(
663         std::make_unique<BucketPostingListIterator>(std::move(pl_accessor)));
664   }
665 
666   return std::make_unique<DocHitInfoIteratorNumeric<int64_t>>(
667       std::make_unique<IntegerIndexStorageIterator>(
668           query_key_lower, query_key_upper, std::move(bucket_pl_iters)));
669 }
670 
TransferIndex(const std::vector<DocumentId> & document_id_old_to_new,IntegerIndexStorage * new_storage) const671 libtextclassifier3::Status IntegerIndexStorage::TransferIndex(
672     const std::vector<DocumentId>& document_id_old_to_new,
673     IntegerIndexStorage* new_storage) const {
674   // Discard all pre-existing buckets in new_storage since we will append newly
675   // merged buckets gradually into new_storage.
676   if (new_storage->sorted_buckets_->num_elements() > 0) {
677     ICING_RETURN_IF_ERROR(new_storage->sorted_buckets_->TruncateTo(0));
678   }
679   if (new_storage->unsorted_buckets_->num_elements() > 0) {
680     ICING_RETURN_IF_ERROR(new_storage->unsorted_buckets_->TruncateTo(0));
681   }
682 
683   // "Reference sort" the original storage buckets.
684   std::vector<std::reference_wrapper<const Bucket>> temp_buckets;
685   temp_buckets.reserve(sorted_buckets_->num_elements() +
686                        unsorted_buckets_->num_elements());
687   temp_buckets.insert(
688       temp_buckets.end(), sorted_buckets_->array(),
689       sorted_buckets_->array() + sorted_buckets_->num_elements());
690   temp_buckets.insert(
691       temp_buckets.end(), unsorted_buckets_->array(),
692       unsorted_buckets_->array() + unsorted_buckets_->num_elements());
693   std::sort(temp_buckets.begin(), temp_buckets.end(),
694             [](const std::reference_wrapper<const Bucket>& lhs,
695                const std::reference_wrapper<const Bucket>& rhs) -> bool {
696               return lhs.get() < rhs.get();
697             });
698 
699   const int32_t num_data_threshold_for_bucket_merge =
700       kNumDataThresholdRatioForBucketMerge *
701       new_storage->options_.num_data_threshold_for_bucket_split;
702   int64_t curr_key_lower = std::numeric_limits<int64_t>::min();
703   int64_t curr_key_upper = std::numeric_limits<int64_t>::min();
704   std::vector<IntegerIndexData> accumulated_data;
705   for (const std::reference_wrapper<const Bucket>& bucket_ref : temp_buckets) {
706     // Read all data from the bucket.
707     std::vector<IntegerIndexData> new_data;
708     if (bucket_ref.get().posting_list_identifier().is_valid()) {
709       ICING_ASSIGN_OR_RETURN(
710           std::unique_ptr<PostingListIntegerIndexAccessor> old_pl_accessor,
711           PostingListIntegerIndexAccessor::CreateFromExisting(
712               flash_index_storage_.get(), posting_list_serializer_,
713               bucket_ref.get().posting_list_identifier()));
714 
715       ICING_ASSIGN_OR_RETURN(std::vector<IntegerIndexData> batch_old_data,
716                              old_pl_accessor->GetNextDataBatch());
717       while (!batch_old_data.empty()) {
718         for (const IntegerIndexData& old_data : batch_old_data) {
719           DocumentId old_document_id = old_data.basic_hit().document_id();
720           if (old_document_id < 0 ||
721               old_document_id >= document_id_old_to_new.size()) {
722             // If it happens, then the posting list is corrupted. Return error
723             // and let the caller rebuild everything.
724             return absl_ports::InternalError(
725                 "Integer index hit document id is out of range. The index may "
726                 "have been corrupted.");
727           }
728 
729           DocumentId new_document_id = document_id_old_to_new[old_document_id];
730           // Transfer the document id of the hit if the document is not deleted
731           // or outdated.
732           if (new_document_id != kInvalidDocumentId) {
733             new_data.push_back(
734                 IntegerIndexData(old_data.basic_hit().section_id(),
735                                  new_document_id, old_data.key()));
736           }
737         }
738         ICING_ASSIGN_OR_RETURN(batch_old_data,
739                                old_pl_accessor->GetNextDataBatch());
740       }
741     }
742 
743     // Decide whether:
744     // - Flush accumulated_data and create a new bucket for them.
745     // - OR merge new_data into accumulated_data and go to the next round.
746     if (!accumulated_data.empty() && accumulated_data.size() + new_data.size() >
747                                          num_data_threshold_for_bucket_merge) {
748       // TODO(b/259743562): [Optimization 3] adjust upper bound to fit more data
749       // from new_data to accumulated_data.
750       ICING_RETURN_IF_ERROR(FlushDataIntoNewSortedBucket(
751           curr_key_lower, curr_key_upper, std::move(accumulated_data),
752           new_storage));
753 
754       curr_key_lower = bucket_ref.get().key_lower();
755       accumulated_data = std::move(new_data);
756     } else {
757       // We can just append to accumulated data because
758       // FlushDataIntoNewSortedBucket will take care of sorting the contents.
759       std::move(new_data.begin(), new_data.end(),
760                 std::back_inserter(accumulated_data));
761     }
762     curr_key_upper = bucket_ref.get().key_upper();
763   }
764 
765   // Add the last round of bucket.
766   ICING_RETURN_IF_ERROR(
767       FlushDataIntoNewSortedBucket(curr_key_lower, curr_key_upper,
768                                    std::move(accumulated_data), new_storage));
769 
770   return libtextclassifier3::Status::OK;
771 }
772 
773 /* static */ libtextclassifier3::StatusOr<std::unique_ptr<IntegerIndexStorage>>
InitializeNewFiles(const Filesystem & filesystem,std::string && working_path,Options && options,PostingListIntegerIndexSerializer * posting_list_serializer)774 IntegerIndexStorage::InitializeNewFiles(
775     const Filesystem& filesystem, std::string&& working_path, Options&& options,
776     PostingListIntegerIndexSerializer* posting_list_serializer) {
777   // IntegerIndexStorage uses working_path as working directory path.
778   // Create working directory.
779   if (!filesystem.CreateDirectory(working_path.c_str())) {
780     return absl_ports::InternalError(
781         absl_ports::StrCat("Failed to create directory: ", working_path));
782   }
783 
784   // Initialize sorted_buckets
785   int32_t pre_mapping_mmap_size = sizeof(Bucket) * (1 << 10);
786   ICING_ASSIGN_OR_RETURN(
787       std::unique_ptr<FileBackedVector<Bucket>> sorted_buckets,
788       FileBackedVector<Bucket>::Create(
789           filesystem, GetSortedBucketsFilePath(working_path),
790           MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
791           FileBackedVector<Bucket>::kMaxFileSize,
792           options.pre_mapping_fbv ? pre_mapping_mmap_size : 0));
793 
794   // Initialize unsorted_buckets
795   pre_mapping_mmap_size = sizeof(Bucket) * kUnsortedBucketsLengthThreshold;
796   ICING_ASSIGN_OR_RETURN(
797       std::unique_ptr<FileBackedVector<Bucket>> unsorted_buckets,
798       FileBackedVector<Bucket>::Create(
799           filesystem, GetUnsortedBucketsFilePath(working_path),
800           MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
801           FileBackedVector<Bucket>::kMaxFileSize,
802           options.pre_mapping_fbv ? pre_mapping_mmap_size : 0));
803 
804   // Initialize flash_index_storage
805   ICING_ASSIGN_OR_RETURN(
806       FlashIndexStorage flash_index_storage,
807       FlashIndexStorage::Create(GetFlashIndexStorageFilePath(working_path),
808                                 &filesystem, posting_list_serializer));
809 
810   if (options.HasCustomInitBuckets()) {
811     // Insert custom init buckets.
812     std::sort(options.custom_init_sorted_buckets.begin(),
813               options.custom_init_sorted_buckets.end());
814     ICING_ASSIGN_OR_RETURN(
815         typename FileBackedVector<Bucket>::MutableArrayView
816             mutable_new_sorted_bucket_arr,
817         sorted_buckets->Allocate(options.custom_init_sorted_buckets.size()));
818     mutable_new_sorted_bucket_arr.SetArray(
819         /*idx=*/0, options.custom_init_sorted_buckets.data(),
820         options.custom_init_sorted_buckets.size());
821 
822     ICING_ASSIGN_OR_RETURN(typename FileBackedVector<Bucket>::MutableArrayView
823                                mutable_new_unsorted_bucket_arr,
824                            unsorted_buckets->Allocate(
825                                options.custom_init_unsorted_buckets.size()));
826     mutable_new_unsorted_bucket_arr.SetArray(
827         /*idx=*/0, options.custom_init_unsorted_buckets.data(),
828         options.custom_init_unsorted_buckets.size());
829 
830     // After inserting buckets, we can clear vectors since there is no need to
831     // cache them.
832     options.custom_init_sorted_buckets.clear();
833     options.custom_init_unsorted_buckets.clear();
834   } else {
835     // Insert one bucket with range [INT64_MIN, INT64_MAX].
836     ICING_RETURN_IF_ERROR(sorted_buckets->Append(Bucket(
837         /*key_lower=*/std::numeric_limits<int64_t>::min(),
838         /*key_upper=*/std::numeric_limits<int64_t>::max())));
839   }
840   ICING_RETURN_IF_ERROR(sorted_buckets->PersistToDisk());
841 
842   // Initialize metadata file. Create MemoryMappedFile with pre-mapping, and
843   // call GrowAndRemapIfNecessary to grow the underlying file.
844   ICING_ASSIGN_OR_RETURN(
845       MemoryMappedFile metadata_mmapped_file,
846       MemoryMappedFile::Create(filesystem, GetMetadataFilePath(working_path),
847                                MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
848                                /*max_file_size=*/kMetadataFileSize,
849                                /*pre_mapping_file_offset=*/0,
850                                /*pre_mapping_mmap_size=*/kMetadataFileSize));
851   ICING_RETURN_IF_ERROR(metadata_mmapped_file.GrowAndRemapIfNecessary(
852       /*file_offset=*/0, /*mmap_size=*/kMetadataFileSize));
853 
854   // Create instance.
855   auto new_integer_index_storage =
856       std::unique_ptr<IntegerIndexStorage>(new IntegerIndexStorage(
857           filesystem, std::move(working_path), std::move(options),
858           posting_list_serializer,
859           std::make_unique<MemoryMappedFile>(std::move(metadata_mmapped_file)),
860           std::move(sorted_buckets), std::move(unsorted_buckets),
861           std::make_unique<FlashIndexStorage>(std::move(flash_index_storage))));
862   // Initialize info content by writing mapped memory directly.
863   Info& info_ref = new_integer_index_storage->info();
864   info_ref.magic = Info::kMagic;
865   info_ref.num_data = 0;
866 
867   // Initialize new PersistentStorage. The initial checksums will be computed
868   // and set via InitializeNewStorage.
869   ICING_RETURN_IF_ERROR(new_integer_index_storage->InitializeNewStorage());
870 
871   return new_integer_index_storage;
872 }
873 
874 /* static */ libtextclassifier3::StatusOr<std::unique_ptr<IntegerIndexStorage>>
InitializeExistingFiles(const Filesystem & filesystem,std::string && working_path,Options && options,PostingListIntegerIndexSerializer * posting_list_serializer)875 IntegerIndexStorage::InitializeExistingFiles(
876     const Filesystem& filesystem, std::string&& working_path, Options&& options,
877     PostingListIntegerIndexSerializer* posting_list_serializer) {
878   // Mmap the content of the crcs and info.
879   ICING_ASSIGN_OR_RETURN(
880       MemoryMappedFile metadata_mmapped_file,
881       MemoryMappedFile::Create(filesystem, GetMetadataFilePath(working_path),
882                                MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
883                                /*max_file_size=*/kMetadataFileSize,
884                                /*pre_mapping_file_offset=*/0,
885                                /*pre_mapping_mmap_size=*/kMetadataFileSize));
886   if (metadata_mmapped_file.available_size() != kMetadataFileSize) {
887     return absl_ports::FailedPreconditionError("Incorrect metadata file size");
888   }
889 
890   // Initialize sorted_buckets
891   int32_t pre_mapping_mmap_size = sizeof(Bucket) * (1 << 10);
892   ICING_ASSIGN_OR_RETURN(
893       std::unique_ptr<FileBackedVector<Bucket>> sorted_buckets,
894       FileBackedVector<Bucket>::Create(
895           filesystem, GetSortedBucketsFilePath(working_path),
896           MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
897           FileBackedVector<Bucket>::kMaxFileSize,
898           options.pre_mapping_fbv ? pre_mapping_mmap_size : 0));
899 
900   // Initialize unsorted_buckets
901   pre_mapping_mmap_size = sizeof(Bucket) * kUnsortedBucketsLengthThreshold;
902   ICING_ASSIGN_OR_RETURN(
903       std::unique_ptr<FileBackedVector<Bucket>> unsorted_buckets,
904       FileBackedVector<Bucket>::Create(
905           filesystem, GetUnsortedBucketsFilePath(working_path),
906           MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
907           FileBackedVector<Bucket>::kMaxFileSize,
908           options.pre_mapping_fbv ? pre_mapping_mmap_size : 0));
909 
910   // Initialize flash_index_storage
911   ICING_ASSIGN_OR_RETURN(
912       FlashIndexStorage flash_index_storage,
913       FlashIndexStorage::Create(GetFlashIndexStorageFilePath(working_path),
914                                 &filesystem, posting_list_serializer));
915 
916   // Create instance.
917   auto integer_index_storage =
918       std::unique_ptr<IntegerIndexStorage>(new IntegerIndexStorage(
919           filesystem, std::move(working_path), std::move(options),
920           posting_list_serializer,
921           std::make_unique<MemoryMappedFile>(std::move(metadata_mmapped_file)),
922           std::move(sorted_buckets), std::move(unsorted_buckets),
923           std::make_unique<FlashIndexStorage>(std::move(flash_index_storage))));
924 
925   // Initialize existing PersistentStorage. Checksums will be validated.
926   ICING_RETURN_IF_ERROR(integer_index_storage->InitializeExistingStorage());
927 
928   // Validate other values of info and options.
929   // Magic should be consistent with the codebase.
930   if (integer_index_storage->info().magic != Info::kMagic) {
931     return absl_ports::FailedPreconditionError("Incorrect magic value");
932   }
933 
934   return integer_index_storage;
935 }
936 
937 /* static */ libtextclassifier3::Status
FlushDataIntoNewSortedBucket(int64_t key_lower,int64_t key_upper,std::vector<IntegerIndexData> && data,IntegerIndexStorage * storage)938 IntegerIndexStorage::FlushDataIntoNewSortedBucket(
939     int64_t key_lower, int64_t key_upper, std::vector<IntegerIndexData>&& data,
940     IntegerIndexStorage* storage) {
941   storage->SetDirty();
942 
943   if (data.empty()) {
944     return storage->sorted_buckets_->Append(Bucket(
945         key_lower, key_upper, PostingListIdentifier::kInvalid, /*num_data=*/0));
946   }
947 
948   ICING_ASSIGN_OR_RETURN(
949       PostingListIdentifier pl_id,
950       FlushDataIntoPostingLists(storage->flash_index_storage_.get(),
951                                 storage->posting_list_serializer_, data.begin(),
952                                 data.end()));
953 
954   storage->info().num_data += data.size();
955   return storage->sorted_buckets_->Append(
956       Bucket(key_lower, key_upper, pl_id, data.size()));
957 }
958 
PersistStoragesToDisk()959 libtextclassifier3::Status IntegerIndexStorage::PersistStoragesToDisk() {
960   if (is_initialized_ && !is_storage_dirty()) {
961     return libtextclassifier3::Status::OK;
962   }
963 
964   ICING_RETURN_IF_ERROR(sorted_buckets_->PersistToDisk());
965   ICING_RETURN_IF_ERROR(unsorted_buckets_->PersistToDisk());
966   if (!flash_index_storage_->PersistToDisk()) {
967     return absl_ports::InternalError(
968         "Fail to persist FlashIndexStorage to disk");
969   }
970   is_storage_dirty_ = false;
971   return libtextclassifier3::Status::OK;
972 }
973 
PersistMetadataToDisk()974 libtextclassifier3::Status IntegerIndexStorage::PersistMetadataToDisk() {
975   // We can skip persisting metadata to disk only if both info and storage are
976   // clean.
977   if (is_initialized_ && !is_info_dirty() && !is_storage_dirty()) {
978     return libtextclassifier3::Status::OK;
979   }
980 
981   // Changes should have been applied to the underlying file when using
982   // MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC, but call msync() as an
983   // extra safety step to ensure they are written out.
984   ICING_RETURN_IF_ERROR(metadata_mmapped_file_->PersistToDisk());
985   is_info_dirty_ = false;
986   return libtextclassifier3::Status::OK;
987 }
988 
989 libtextclassifier3::StatusOr<Crc32>
UpdateStoragesChecksum()990 IntegerIndexStorage::UpdateStoragesChecksum() {
991   if (is_initialized_ && !is_storage_dirty()) {
992     return Crc32(crcs().component_crcs.storages_crc);
993   }
994 
995   // Compute crcs
996   ICING_ASSIGN_OR_RETURN(Crc32 sorted_buckets_crc,
997                          sorted_buckets_->UpdateChecksum());
998   ICING_ASSIGN_OR_RETURN(Crc32 unsorted_buckets_crc,
999                          unsorted_buckets_->UpdateChecksum());
1000 
1001   // TODO(b/259744228): implement and include flash_index_storage checksum
1002   return Crc32(sorted_buckets_crc.Get() ^ unsorted_buckets_crc.Get());
1003 }
1004 
GetInfoChecksum() const1005 libtextclassifier3::StatusOr<Crc32> IntegerIndexStorage::GetInfoChecksum()
1006     const {
1007   if (is_initialized_ && !is_info_dirty()) {
1008     return Crc32(crcs().component_crcs.info_crc);
1009   }
1010 
1011   return info().GetChecksum();
1012 }
1013 
GetStoragesChecksum() const1014 libtextclassifier3::StatusOr<Crc32> IntegerIndexStorage::GetStoragesChecksum()
1015     const {
1016   if (is_initialized_ && !is_storage_dirty()) {
1017     return Crc32(crcs().component_crcs.storages_crc);
1018   }
1019 
1020   // Compute crcs
1021   Crc32 sorted_buckets_crc = sorted_buckets_->GetChecksum();
1022   Crc32 unsorted_buckets_crc = unsorted_buckets_->GetChecksum();
1023   // TODO(b/259744228): implement and include flash_index_storage checksum
1024   return Crc32(sorted_buckets_crc.Get() ^ unsorted_buckets_crc.Get());
1025 }
1026 
1027 libtextclassifier3::StatusOr<std::vector<IntegerIndexStorage::Bucket>>
AddKeysIntoBucketAndSplitIfNecessary(DocumentId document_id,SectionId section_id,const std::vector<int64_t>::const_iterator & it_start,const std::vector<int64_t>::const_iterator & it_end,FileBackedVector<Bucket>::MutableView & mutable_bucket)1028 IntegerIndexStorage::AddKeysIntoBucketAndSplitIfNecessary(
1029     DocumentId document_id, SectionId section_id,
1030     const std::vector<int64_t>::const_iterator& it_start,
1031     const std::vector<int64_t>::const_iterator& it_end,
1032     FileBackedVector<Bucket>::MutableView& mutable_bucket) {
1033   int32_t num_data_in_bucket = mutable_bucket.Get().num_data();
1034   int32_t num_new_data = std::distance(it_start, it_end);
1035   if (mutable_bucket.Get().key_lower() < mutable_bucket.Get().key_upper() &&
1036       num_new_data + num_data_in_bucket >
1037           options_.num_data_threshold_for_bucket_split) {
1038     // Split bucket.
1039 
1040     // 1. Read all data and free all posting lists.
1041     std::vector<IntegerIndexData> all_data;
1042     if (mutable_bucket.Get().posting_list_identifier().is_valid()) {
1043       ICING_ASSIGN_OR_RETURN(
1044           std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor,
1045           PostingListIntegerIndexAccessor::CreateFromExisting(
1046               flash_index_storage_.get(), posting_list_serializer_,
1047               mutable_bucket.Get().posting_list_identifier()));
1048       ICING_ASSIGN_OR_RETURN(all_data, pl_accessor->GetAllDataAndFree());
1049     }
1050 
1051     // 2. Append all new data.
1052     all_data.reserve(all_data.size() + num_new_data);
1053     for (auto it = it_start; it != it_end; ++it) {
1054       all_data.push_back(IntegerIndexData(section_id, document_id, *it));
1055     }
1056 
1057     // 3. Run bucket splitting algorithm to decide new buckets and dispatch
1058     //    data.
1059     //    - # of data in a full bucket =
1060     //      options_.num_data_threshold_for_bucket_split.
1061     //    - Bucket splitting logic will be invoked if adding new data
1062     //      (num_new_data >= 1) into a full bucket.
1063     //    - In order to achieve good (amortized) time complexity, we want # of
1064     //      data in new buckets to be around half_of_threshold (i.e.
1065     //      options_.num_data_threshold_for_bucket_split / 2).
1066     //    - Using half_of_threshold as the cutoff threshold will cause splitting
1067     //      buckets with [half_of_threshold, half_of_threshold, num_new_data]
1068     //      data, which is not ideal because num_new_data is usually small.
1069     //    - Thus, we pick (half_of_threshold + kNumDataAfterSplitAdjustment) as
1070     //      the cutoff threshold to avoid over-splitting. It can tolerate
1071     //      num_new_data up to (2 * kNumDataAfterSplitAdjustment) and
1072     //      split only 2 buckets (instead of 3) with
1073     //      [half_of_threshold + kNumDataAfterSplitAdjustment,
1074     //       half_of_threshold + (kNumDataAfterSplitAdjustment - num_new_data)].
1075     int32_t cutoff_threshold =
1076         options_.num_data_threshold_for_bucket_split / 2 +
1077         kNumDataAfterSplitAdjustment;
1078     std::vector<integer_index_bucket_util::DataRangeAndBucketInfo>
1079         new_bucket_infos = integer_index_bucket_util::Split(
1080             all_data, mutable_bucket.Get().key_lower(),
1081             mutable_bucket.Get().key_upper(), cutoff_threshold);
1082     if (new_bucket_infos.empty()) {
1083       ICING_LOG(WARNING)
1084           << "No buckets after splitting. This should not happen.";
1085       return absl_ports::InternalError("Split error");
1086     }
1087 
1088     // 4. Flush data and create new buckets.
1089     std::vector<Bucket> new_buckets;
1090     for (int i = 0; i < new_bucket_infos.size(); ++i) {
1091       int32_t num_data_in_new_bucket =
1092           std::distance(new_bucket_infos[i].start, new_bucket_infos[i].end);
1093       ICING_ASSIGN_OR_RETURN(
1094           PostingListIdentifier pl_id,
1095           FlushDataIntoPostingLists(
1096               flash_index_storage_.get(), posting_list_serializer_,
1097               new_bucket_infos[i].start, new_bucket_infos[i].end));
1098       if (i == 0) {
1099         // Reuse mutable_bucket
1100         mutable_bucket.Get().set_key_lower(new_bucket_infos[i].key_lower);
1101         mutable_bucket.Get().set_key_upper(new_bucket_infos[i].key_upper);
1102         mutable_bucket.Get().set_posting_list_identifier(pl_id);
1103         mutable_bucket.Get().set_num_data(num_data_in_new_bucket);
1104       } else {
1105         new_buckets.push_back(Bucket(new_bucket_infos[i].key_lower,
1106                                      new_bucket_infos[i].key_upper, pl_id,
1107                                      num_data_in_new_bucket));
1108       }
1109     }
1110 
1111     return new_buckets;
1112   }
1113 
1114   // Otherwise, we don't need to split bucket. Just simply add all new data into
1115   // the bucket.
1116   std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor;
1117   if (mutable_bucket.Get().posting_list_identifier().is_valid()) {
1118     ICING_ASSIGN_OR_RETURN(
1119         pl_accessor, PostingListIntegerIndexAccessor::CreateFromExisting(
1120                          flash_index_storage_.get(), posting_list_serializer_,
1121                          mutable_bucket.Get().posting_list_identifier()));
1122   } else {
1123     ICING_ASSIGN_OR_RETURN(
1124         pl_accessor, PostingListIntegerIndexAccessor::Create(
1125                          flash_index_storage_.get(), posting_list_serializer_));
1126   }
1127 
1128   for (auto it = it_start; it != it_end; ++it) {
1129     ICING_RETURN_IF_ERROR(pl_accessor->PrependData(
1130         IntegerIndexData(section_id, document_id, *it)));
1131   }
1132 
1133   PostingListAccessor::FinalizeResult result =
1134       std::move(*pl_accessor).Finalize();
1135   if (!result.status.ok()) {
1136     return result.status;
1137   }
1138   if (!result.id.is_valid()) {
1139     return absl_ports::InternalError("Fail to flush data into posting list(s)");
1140   }
1141 
1142   mutable_bucket.Get().set_posting_list_identifier(result.id);
1143   // We've already verified num_new_data won't exceed the limit of the entire
1144   // storage, so it is safe to add to the counter of the bucket.
1145   mutable_bucket.Get().set_num_data(num_data_in_bucket + num_new_data);
1146 
1147   return std::vector<Bucket>();
1148 }
1149 
SortBuckets()1150 libtextclassifier3::Status IntegerIndexStorage::SortBuckets() {
1151   if (unsorted_buckets_->num_elements() == 0) {
1152     return libtextclassifier3::Status::OK;
1153   }
1154 
1155   int32_t sorted_len = sorted_buckets_->num_elements();
1156   int32_t unsorted_len = unsorted_buckets_->num_elements();
1157   if (sorted_len > FileBackedVector<Bucket>::kMaxNumElements - unsorted_len) {
1158     return absl_ports::OutOfRangeError(
1159         "Sorted buckets length exceeds the limit after merging");
1160   }
1161 
1162   ICING_RETURN_IF_ERROR(sorted_buckets_->Allocate(unsorted_len));
1163 
1164   // Sort unsorted_buckets_.
1165   ICING_RETURN_IF_ERROR(
1166       unsorted_buckets_->Sort(/*begin_idx=*/0, /*end_idx=*/unsorted_len));
1167 
1168   // Merge unsorted_buckets_ into sorted_buckets_ and clear unsorted_buckets_.
1169   // Note that we could have used std::sort + std::inplace_merge, but it is more
1170   // complicated to deal with FileBackedVector SetDirty logic, so implement our
1171   // own merging with FileBackedVector methods.
1172   //
1173   // Merge buckets from back. This could save some iterations and avoid setting
1174   // dirty for unchanged elements of the original sorted segments.
1175   // For example, we can avoid setting dirty for elements [1, 2, 3, 5] for the
1176   // following sorted/unsorted data:
1177   // - sorted: [1, 2, 3, 5, 8, 13, _, _, _, _)]
1178   // - unsorted: [6, 10, 14, 15]
1179   int32_t sorted_write_idx = sorted_len + unsorted_len - 1;
1180   int32_t sorted_curr_idx = sorted_len - 1;
1181   int32_t unsorted_curr_idx = unsorted_len - 1;
1182   while (unsorted_curr_idx >= 0) {
1183     if (sorted_curr_idx >= 0 && unsorted_buckets_->array()[unsorted_curr_idx] <
1184                                     sorted_buckets_->array()[sorted_curr_idx]) {
1185       ICING_RETURN_IF_ERROR(sorted_buckets_->Set(
1186           sorted_write_idx, sorted_buckets_->array()[sorted_curr_idx]));
1187       --sorted_curr_idx;
1188 
1189     } else {
1190       ICING_RETURN_IF_ERROR(sorted_buckets_->Set(
1191           sorted_write_idx, unsorted_buckets_->array()[unsorted_curr_idx]));
1192       --unsorted_curr_idx;
1193     }
1194     --sorted_write_idx;
1195   }
1196 
1197   ICING_RETURN_IF_ERROR(unsorted_buckets_->TruncateTo(0));
1198 
1199   return libtextclassifier3::Status::OK;
1200 }
1201 
1202 }  // namespace lib
1203 }  // namespace icing
1204