• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2022 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "icing/index/numeric/integer-index-storage.h"
16 
17 #include <algorithm>
18 #include <cstdint>
19 #include <functional>
20 #include <iterator>
21 #include <limits>
22 #include <memory>
23 #include <queue>
24 #include <string>
25 #include <string_view>
26 #include <utility>
27 #include <vector>
28 
29 #include "icing/text_classifier/lib3/utils/base/status.h"
30 #include "icing/text_classifier/lib3/utils/base/statusor.h"
31 #include "icing/absl_ports/canonical_errors.h"
32 #include "icing/absl_ports/str_cat.h"
33 #include "icing/file/file-backed-vector.h"
34 #include "icing/file/filesystem.h"
35 #include "icing/file/memory-mapped-file.h"
36 #include "icing/file/posting_list/flash-index-storage.h"
37 #include "icing/file/posting_list/posting-list-identifier.h"
38 #include "icing/index/hit/doc-hit-info.h"
39 #include "icing/index/iterator/doc-hit-info-iterator.h"
40 #include "icing/index/numeric/doc-hit-info-iterator-numeric.h"
41 #include "icing/index/numeric/integer-index-bucket-util.h"
42 #include "icing/index/numeric/integer-index-data.h"
43 #include "icing/index/numeric/numeric-index.h"
44 #include "icing/index/numeric/posting-list-integer-index-accessor.h"
45 #include "icing/index/numeric/posting-list-integer-index-serializer.h"
46 #include "icing/schema/section.h"
47 #include "icing/store/document-id.h"
48 #include "icing/util/status-macros.h"
49 
50 namespace icing {
51 namespace lib {
52 
53 namespace {
54 
55 // Helper function to flush data between [it_start, it_end) into posting list(s)
56 // and return posting list id.
57 // Note: it will sort data between [it_start, it_end) by basic hit value, so the
58 // caller should be aware that the data order will be changed after calling this
59 // function.
FlushDataIntoPostingLists(FlashIndexStorage * flash_index_storage,PostingListIntegerIndexSerializer * posting_list_serializer,const std::vector<IntegerIndexData>::iterator & it_start,const std::vector<IntegerIndexData>::iterator & it_end)60 libtextclassifier3::StatusOr<PostingListIdentifier> FlushDataIntoPostingLists(
61     FlashIndexStorage* flash_index_storage,
62     PostingListIntegerIndexSerializer* posting_list_serializer,
63     const std::vector<IntegerIndexData>::iterator& it_start,
64     const std::vector<IntegerIndexData>::iterator& it_end) {
65   if (it_start == it_end) {
66     return PostingListIdentifier::kInvalid;
67   }
68 
69   ICING_ASSIGN_OR_RETURN(
70       std::unique_ptr<PostingListIntegerIndexAccessor> new_pl_accessor,
71       PostingListIntegerIndexAccessor::Create(flash_index_storage,
72                                               posting_list_serializer));
73 
74   std::sort(it_start, it_end);
75   for (auto it = it_end - 1; it >= it_start; --it) {
76     ICING_RETURN_IF_ERROR(new_pl_accessor->PrependData(*it));
77   }
78 
79   PostingListAccessor::FinalizeResult result =
80       std::move(*new_pl_accessor).Finalize();
81   if (!result.status.ok()) {
82     return result.status;
83   }
84   if (!result.id.is_valid()) {
85     return absl_ports::InternalError("Fail to flush data into posting list(s)");
86   }
87   return result.id;
88 }
89 
90 // The following 4 methods are helper functions to get the correct file path of
91 // metadata/sorted_buckets/unsorted_buckets/flash_index_storage, according to
92 // the given working directory.
GetMetadataFilePath(std::string_view working_path)93 std::string GetMetadataFilePath(std::string_view working_path) {
94   return absl_ports::StrCat(working_path, "/", IntegerIndexStorage::kFilePrefix,
95                             ".m");
96 }
97 
GetSortedBucketsFilePath(std::string_view working_path)98 std::string GetSortedBucketsFilePath(std::string_view working_path) {
99   return absl_ports::StrCat(working_path, "/", IntegerIndexStorage::kFilePrefix,
100                             ".s");
101 }
102 
GetUnsortedBucketsFilePath(std::string_view working_path)103 std::string GetUnsortedBucketsFilePath(std::string_view working_path) {
104   return absl_ports::StrCat(working_path, "/", IntegerIndexStorage::kFilePrefix,
105                             ".u");
106 }
107 
GetFlashIndexStorageFilePath(std::string_view working_path)108 std::string GetFlashIndexStorageFilePath(std::string_view working_path) {
109   return absl_ports::StrCat(working_path, "/", IntegerIndexStorage::kFilePrefix,
110                             ".f");
111 }
112 
113 }  // namespace
114 
115 // We add (BasicHits, key) into a bucket in DocumentId descending and SectionId
116 // ascending order. When doing range query, we may access buckets and want to
117 // return BasicHits to callers sorted by DocumentId. Therefore, this problem is
118 // actually "merge K sorted lists".
119 // To implement this algorithm via priority_queue, we create this wrapper class
120 // to store PostingListIntegerIndexAccessor for iterating through the posting
121 // list chain.
122 // - Non-relevant (i.e. not in range [key_lower, key_upper]) will be skipped.
123 // - Relevant BasicHits will be returned.
124 class BucketPostingListIterator {
125  public:
126   class Comparator {
127    public:
128     // REQUIRES: 2 BucketPostingListIterator* instances (lhs, rhs) should be
129     //   valid, i.e. the preceding AdvanceAndFilter() succeeded.
operator ()(const BucketPostingListIterator * lhs,const BucketPostingListIterator * rhs) const130     bool operator()(const BucketPostingListIterator* lhs,
131                     const BucketPostingListIterator* rhs) const {
132       // std::priority_queue is a max heap and we should return BasicHits in
133       // DocumentId descending order.
134       // - BucketPostingListIterator::operator< should have the same order as
135       //   DocumentId.
136       // - BasicHit encodes inverted document id and BasicHit::operator<
137       //   compares the encoded raw value directly.
138       // - Therefore, BucketPostingListIterator::operator< should compare
139       //   BasicHit reversely.
140       // - This will make priority_queue return buckets in DocumentId
141       //   descending and SectionId ascending order.
142       // - Whatever direction we sort SectionId by (or pop by priority_queue)
143       //   doesn't matter because all hits for the same DocumentId will be
144       //   merged into a single DocHitInfo.
145       return rhs->GetCurrentBasicHit() < lhs->GetCurrentBasicHit();
146     }
147   };
148 
BucketPostingListIterator(std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor)149   explicit BucketPostingListIterator(
150       std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor)
151       : pl_accessor_(std::move(pl_accessor)),
152         should_retrieve_next_batch_(true) {}
153 
154   // Advances to the next relevant data. The posting list of a bucket contains
155   // keys within range [bucket.key_lower, bucket.key_upper], but some of them
156   // may be out of [query_key_lower, query_key_upper], so when advancing we have
157   // to filter out those non-relevant keys.
158   //
159   // Returns:
160   //   - OK on success
161   //   - RESOURCE_EXHAUSTED_ERROR if reaching the end (i.e. no more relevant
162   //     data)
163   //   - Any other PostingListIntegerIndexAccessor errors
AdvanceAndFilter(int64_t query_key_lower,int64_t query_key_upper)164   libtextclassifier3::Status AdvanceAndFilter(int64_t query_key_lower,
165                                               int64_t query_key_upper) {
166     // Move curr_ until reaching a relevant data (i.e. key in range
167     // [query_key_lower, query_key_upper])
168     do {
169       if (!should_retrieve_next_batch_) {
170         ++curr_;
171         should_retrieve_next_batch_ =
172             curr_ >= cached_batch_integer_index_data_.cend();
173       }
174       if (should_retrieve_next_batch_) {
175         ICING_RETURN_IF_ERROR(GetNextDataBatch());
176         should_retrieve_next_batch_ = false;
177       }
178     } while (curr_->key() < query_key_lower || curr_->key() > query_key_upper);
179 
180     return libtextclassifier3::Status::OK;
181   }
182 
GetCurrentBasicHit() const183   const BasicHit& GetCurrentBasicHit() const { return curr_->basic_hit(); }
184 
185  private:
186   // Gets next batch of data from the posting list chain, caches in
187   // cached_batch_integer_index_data_, and sets curr_ to the begin of the cache.
GetNextDataBatch()188   libtextclassifier3::Status GetNextDataBatch() {
189     auto cached_batch_integer_index_data_or = pl_accessor_->GetNextDataBatch();
190     if (!cached_batch_integer_index_data_or.ok()) {
191       ICING_LOG(WARNING)
192           << "Fail to get next batch data from posting list due to: "
193           << cached_batch_integer_index_data_or.status().error_message();
194       return std::move(cached_batch_integer_index_data_or).status();
195     }
196 
197     cached_batch_integer_index_data_ =
198         std::move(cached_batch_integer_index_data_or).ValueOrDie();
199     curr_ = cached_batch_integer_index_data_.cbegin();
200 
201     if (cached_batch_integer_index_data_.empty()) {
202       return absl_ports::ResourceExhaustedError("End of iterator");
203     }
204 
205     return libtextclassifier3::Status::OK;
206   }
207 
208   std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor_;
209   std::vector<IntegerIndexData> cached_batch_integer_index_data_;
210   std::vector<IntegerIndexData>::const_iterator curr_;
211   bool should_retrieve_next_batch_;
212 };
213 
214 // Wrapper class to iterate through IntegerIndexStorage to get relevant data.
215 // It uses multiple BucketPostingListIterator instances from different candidate
216 // buckets and merges all relevant BasicHits from these buckets by
217 // std::priority_queue in DocumentId descending order. Also different SectionIds
218 // of the same DocumentId will be merged into SectionIdMask and returned as a
219 // single DocHitInfo.
220 class IntegerIndexStorageIterator : public NumericIndex<int64_t>::Iterator {
221  public:
IntegerIndexStorageIterator(int64_t query_key_lower,int64_t query_key_upper,std::vector<std::unique_ptr<BucketPostingListIterator>> && bucket_pl_iters)222   explicit IntegerIndexStorageIterator(
223       int64_t query_key_lower, int64_t query_key_upper,
224       std::vector<std::unique_ptr<BucketPostingListIterator>>&& bucket_pl_iters)
225       : NumericIndex<int64_t>::Iterator(query_key_lower, query_key_upper) {
226     std::vector<BucketPostingListIterator*> bucket_pl_iters_raw_ptrs;
227     for (std::unique_ptr<BucketPostingListIterator>& bucket_pl_itr :
228          bucket_pl_iters) {
229       // Before adding BucketPostingListIterator* into the priority queue, we
230       // have to advance the bucket iterator to the first valid data since the
231       // priority queue needs valid data to compare the order.
232       // Note: it is possible that the bucket iterator fails to advance for the
233       // first round, because data could be filtered out by [query_key_lower,
234       // query_key_upper]. In this case, just discard the iterator.
235       if (bucket_pl_itr->AdvanceAndFilter(query_key_lower, query_key_upper)
236               .ok()) {
237         bucket_pl_iters_raw_ptrs.push_back(bucket_pl_itr.get());
238         bucket_pl_iters_.push_back(std::move(bucket_pl_itr));
239       }
240     }
241 
242     pq_ = std::priority_queue<BucketPostingListIterator*,
243                               std::vector<BucketPostingListIterator*>,
244                               BucketPostingListIterator::Comparator>(
245         comparator_, std::move(bucket_pl_iters_raw_ptrs));
246   }
247 
248   ~IntegerIndexStorageIterator() override = default;
249 
250   // Advances to the next DocHitInfo. Note: several BucketPostingListIterator
251   // instances may be advanced if they point to data with the same DocumentId.
252   //
253   // Returns:
254   //   - OK on success
255   //   - RESOURCE_EXHAUSTED_ERROR if reaching the end (i.e. no more relevant
256   //     data)
257   //   - Any BucketPostingListIterator errors
258   libtextclassifier3::Status Advance() override;
259 
GetDocHitInfo() const260   DocHitInfo GetDocHitInfo() const override { return doc_hit_info_; }
261 
262  private:
263   BucketPostingListIterator::Comparator comparator_;
264 
265   // We have to fetch and pop the top BucketPostingListIterator from
266   // std::priority_queue to perform "merge K sorted lists algorithm".
267   // - Since std::priority_queue::pop() doesn't return the top element, we have
268   //   to call top() and pop() together.
269   // - std::move the top() element by const_cast is not an appropriate way
270   //   because it introduces transient unstable state for std::priority_queue.
271   // - We don't want to copy BucketPostingListIterator, either.
272   // - Therefore, add bucket_pl_iters_ for the ownership of all
273   //   BucketPostingListIterator instances and std::priority_queue uses the raw
274   //   pointer. So when calling top(), we can simply copy the raw pointer via
275   //   top() and avoid transient unstable state.
276   std::vector<std::unique_ptr<BucketPostingListIterator>> bucket_pl_iters_;
277   std::priority_queue<BucketPostingListIterator*,
278                       std::vector<BucketPostingListIterator*>,
279                       BucketPostingListIterator::Comparator>
280       pq_;
281 
282   DocHitInfo doc_hit_info_;
283 };
284 
Advance()285 libtextclassifier3::Status IntegerIndexStorageIterator::Advance() {
286   if (pq_.empty()) {
287     return absl_ports::ResourceExhaustedError("End of iterator");
288   }
289 
290   DocumentId document_id = pq_.top()->GetCurrentBasicHit().document_id();
291   doc_hit_info_ = DocHitInfo(document_id);
292   // Merge sections with same document_id into a single DocHitInfo
293   while (!pq_.empty() &&
294          pq_.top()->GetCurrentBasicHit().document_id() == document_id) {
295     BucketPostingListIterator* bucket_itr = pq_.top();
296     pq_.pop();
297 
298     libtextclassifier3::Status advance_status;
299     do {
300       doc_hit_info_.UpdateSection(
301           bucket_itr->GetCurrentBasicHit().section_id());
302       advance_status = bucket_itr->AdvanceAndFilter(key_lower_, key_upper_);
303     } while (advance_status.ok() &&
304              bucket_itr->GetCurrentBasicHit().document_id() == document_id);
305     if (advance_status.ok()) {
306       pq_.push(bucket_itr);
307     }
308   }
309 
310   return libtextclassifier3::Status::OK;
311 }
312 
IsValid() const313 bool IntegerIndexStorage::Options::IsValid() const {
314   if (!HasCustomInitBuckets()) {
315     return true;
316   }
317 
318   // Verify if the range of buckets are disjoint and the range union is
319   // [INT64_MIN, INT64_MAX].
320   std::vector<Bucket> buckets;
321   buckets.reserve(custom_init_sorted_buckets.size() +
322                   custom_init_unsorted_buckets.size());
323   buckets.insert(buckets.end(), custom_init_sorted_buckets.begin(),
324                  custom_init_sorted_buckets.end());
325   buckets.insert(buckets.end(), custom_init_unsorted_buckets.begin(),
326                  custom_init_unsorted_buckets.end());
327   if (buckets.empty()) {
328     return false;
329   }
330   std::sort(buckets.begin(), buckets.end());
331   int64_t prev_upper = std::numeric_limits<int64_t>::min();
332   for (int i = 0; i < buckets.size(); ++i) {
333     // key_lower should not be greater than key_upper and init bucket should
334     // have invalid posting list identifier.
335     if (buckets[i].key_lower() > buckets[i].key_upper() ||
336         buckets[i].posting_list_identifier().is_valid()) {
337       return false;
338     }
339 
340     // Previous upper bound should not be INT64_MAX since it is not the last
341     // bucket.
342     if (prev_upper == std::numeric_limits<int64_t>::max()) {
343       return false;
344     }
345 
346     int64_t expected_lower =
347         (i == 0 ? std::numeric_limits<int64_t>::min() : prev_upper + 1);
348     if (buckets[i].key_lower() != expected_lower) {
349       return false;
350     }
351 
352     prev_upper = buckets[i].key_upper();
353   }
354 
355   return prev_upper == std::numeric_limits<int64_t>::max();
356 }
357 
358 /* static */ libtextclassifier3::StatusOr<std::unique_ptr<IntegerIndexStorage>>
Create(const Filesystem & filesystem,std::string working_path,Options options,PostingListIntegerIndexSerializer * posting_list_serializer)359 IntegerIndexStorage::Create(
360     const Filesystem& filesystem, std::string working_path, Options options,
361     PostingListIntegerIndexSerializer* posting_list_serializer) {
362   if (!options.IsValid()) {
363     return absl_ports::InvalidArgumentError(
364         "Invalid IntegerIndexStorage options");
365   }
366 
367   if (!filesystem.FileExists(GetMetadataFilePath(working_path).c_str()) ||
368       !filesystem.FileExists(GetSortedBucketsFilePath(working_path).c_str()) ||
369       !filesystem.FileExists(
370           GetUnsortedBucketsFilePath(working_path).c_str()) ||
371       !filesystem.FileExists(
372           GetFlashIndexStorageFilePath(working_path).c_str())) {
373     // Discard working_path if any of them is missing, and reinitialize.
374     if (filesystem.DirectoryExists(working_path.c_str())) {
375       ICING_RETURN_IF_ERROR(Discard(filesystem, working_path));
376     }
377     return InitializeNewFiles(filesystem, std::move(working_path),
378                               std::move(options), posting_list_serializer);
379   }
380   return InitializeExistingFiles(filesystem, std::move(working_path),
381                                  std::move(options), posting_list_serializer);
382 }
383 
~IntegerIndexStorage()384 IntegerIndexStorage::~IntegerIndexStorage() {
385   if (!PersistToDisk().ok()) {
386     ICING_LOG(WARNING)
387         << "Failed to persist hash map to disk while destructing "
388         << working_path_;
389   }
390 }
391 
392 class IntegerIndexStorageComparator {
393  public:
operator ()(const IntegerIndexStorage::Bucket & lhs,int64_t rhs) const394   bool operator()(const IntegerIndexStorage::Bucket& lhs, int64_t rhs) const {
395     return lhs.key_upper() < rhs;
396   }
397 } kComparator;
398 
AddKeys(DocumentId document_id,SectionId section_id,std::vector<int64_t> && new_keys)399 libtextclassifier3::Status IntegerIndexStorage::AddKeys(
400     DocumentId document_id, SectionId section_id,
401     std::vector<int64_t>&& new_keys) {
402   if (new_keys.empty()) {
403     return libtextclassifier3::Status::OK;
404   }
405 
406   std::sort(new_keys.begin(), new_keys.end());
407 
408   // Dedupe
409   auto last = std::unique(new_keys.begin(), new_keys.end());
410   new_keys.erase(last, new_keys.end());
411 
412   // When adding keys into a bucket, we potentially split it into 2 new buckets
413   // and one of them will be added into the unsorted bucket array.
414   // When handling keys belonging to buckets in the unsorted bucket array, we
415   // don't have to (and must not) handle these newly split buckets. Therefore,
416   // collect all newly split buckets in another vector and append them into the
417   // unsorted bucket array after adding all keys.
418   std::vector<Bucket> new_buckets;
419 
420   // Binary search range of the sorted bucket array.
421   const Bucket* sorted_bucket_arr_begin = sorted_buckets_->array();
422   const Bucket* sorted_bucket_arr_end =
423       sorted_buckets_->array() + sorted_buckets_->num_elements();
424 
425   // Step 1: handle keys belonging to buckets in the sorted bucket array. Skip
426   //         keys belonging to the unsorted bucket array and deal with them in
427   //         the next step.
428   // - Iterate through new_keys by it_start.
429   // - Binary search (std::lower_bound comparing key with bucket.key_upper()) to
430   //   find the first bucket in the sorted bucket array with key_upper is not
431   //   smaller than (>=) the key.
432   // - Skip (and advance it_start) all keys smaller than the target bucket's
433   //   key_lower. It means these keys belong to buckets in the unsorted bucket
434   //   array and we will deal with them later.
435   // - Find it_end such that all keys within range [it_start, it_end) belong to
436   //   the target bucket.
437   // - Batch add keys within range [it_start, it_end) into the target bucket.
438   auto it_start = new_keys.cbegin();
439   while (it_start != new_keys.cend() &&
440          sorted_bucket_arr_begin < sorted_bucket_arr_end) {
441     // Use std::lower_bound to find the first bucket in the sorted bucket array
442     // with key_upper >= *it_start.
443     const Bucket* target_bucket = std::lower_bound(
444         sorted_bucket_arr_begin, sorted_bucket_arr_end, *it_start, kComparator);
445     if (target_bucket >= sorted_bucket_arr_end) {
446       // Keys in range [it_start, new_keys.cend()) are greater than all sorted
447       // buckets' key_upper, so we can end step 1. In fact, they belong to
448       // buckets in the unsorted bucket array and we will deal with them in
449       // step 2.
450       break;
451     }
452 
453     // Sequential instead of binary search to advance it_start and it_end for
454     // several reasons:
455     // - Eventually we have to iterate through all keys within range [it_start,
456     //   it_end) and add them into the posting list, so binary search doesn't
457     //   improve the overall time complexity.
458     // - Binary search may jump to far-away indices, which potentially
459     //   downgrades the cache performance.
460 
461     // After binary search, we've ensured *it_start <=
462     // target_bucket->key_upper(), but it is still possible that *it_start (and
463     // the next several keys) is still smaller than target_bucket->key_lower(),
464     // so we have to skip them. In fact, they belong to buckets in the unsorted
465     // bucket array.
466     //
467     // For example:
468     // - sorted bucket array: [(INT_MIN, 0), (1, 5), (100, 300), (301, 550)]
469     // - unsorted bucket array: [(550, INT_MAX), (6, 99)]
470     // - new_keys: [10, 20, 40, 102, 150, 200, 500, 600]
471     // std::lower_bound (target = 10) will get target_bucket = (100, 300), but
472     // we have to skip 10, 20, 40 because they are smaller than 100 (the
473     // bucket's key_lower). We should move it_start pointing to key 102.
474     while (it_start != new_keys.cend() &&
475            *it_start < target_bucket->key_lower()) {
476       ++it_start;
477     }
478 
479     // Locate it_end such that all keys within range [it_start, it_end) belong
480     // to target_bucket and all keys outside this range don't belong to
481     // target_bucket.
482     //
483     // For example (continue above), we should locate it_end to point to key
484     // 500.
485     auto it_end = it_start;
486     while (it_end != new_keys.cend() && *it_end <= target_bucket->key_upper()) {
487       ++it_end;
488     }
489 
490     // Now, keys within range [it_start, it_end) belong to target_bucket, so
491     // construct IntegerIndexData and add them into the bucket's posting list.
492     if (it_start != it_end) {
493       ICING_ASSIGN_OR_RETURN(
494           FileBackedVector<Bucket>::MutableView mutable_bucket,
495           sorted_buckets_->GetMutable(target_bucket -
496                                       sorted_buckets_->array()));
497       ICING_ASSIGN_OR_RETURN(
498           std::vector<Bucket> round_new_buckets,
499           AddKeysIntoBucketAndSplitIfNecessary(
500               document_id, section_id, it_start, it_end, mutable_bucket));
501       new_buckets.insert(new_buckets.end(), round_new_buckets.begin(),
502                          round_new_buckets.end());
503     }
504 
505     it_start = it_end;
506     sorted_bucket_arr_begin = target_bucket + 1;
507   }
508 
509   // Step 2: handle keys belonging to buckets in the unsorted bucket array. They
510   //         were skipped in step 1.
511   // For each bucket in the unsorted bucket array, find [it_start, it_end) such
512   // that all keys within this range belong to the bucket and add them.
513   // - Binary search (std::lower_bound comparing bucket.key_lower() with key) to
514   //   find it_start.
515   // - Sequential advance (start from it_start) to find it_end. Same reason as
516   //   above for choosing sequential advance instead of binary search.
517   // - Add keys within range [it_start, it_end) into the bucket.
518   for (int32_t i = 0; i < unsorted_buckets_->num_elements(); ++i) {
519     ICING_ASSIGN_OR_RETURN(FileBackedVector<Bucket>::MutableView mutable_bucket,
520                            unsorted_buckets_->GetMutable(i));
521     auto it_start = std::lower_bound(new_keys.cbegin(), new_keys.cend(),
522                                      mutable_bucket.Get().key_lower());
523     if (it_start == new_keys.cend()) {
524       continue;
525     }
526 
527     // Sequential advance instead of binary search to find the correct position
528     // of it_end for the same reasons mentioned above in step 1.
529     auto it_end = it_start;
530     while (it_end != new_keys.cend() &&
531            *it_end <= mutable_bucket.Get().key_upper()) {
532       ++it_end;
533     }
534 
535     // Now, key within range [it_start, it_end) belong to the bucket, so
536     // construct IntegerIndexData and add them into the bucket's posting list.
537     if (it_start != it_end) {
538       ICING_ASSIGN_OR_RETURN(
539           std::vector<Bucket> round_new_buckets,
540           AddKeysIntoBucketAndSplitIfNecessary(
541               document_id, section_id, it_start, it_end, mutable_bucket));
542       new_buckets.insert(new_buckets.end(), round_new_buckets.begin(),
543                          round_new_buckets.end());
544     }
545   }
546 
547   // Step 3: append new buckets into the unsorted bucket array.
548   if (!new_buckets.empty()) {
549     ICING_ASSIGN_OR_RETURN(
550         typename FileBackedVector<Bucket>::MutableArrayView mutable_new_arr,
551         unsorted_buckets_->Allocate(new_buckets.size()));
552     mutable_new_arr.SetArray(/*idx=*/0, new_buckets.data(), new_buckets.size());
553   }
554 
555   // Step 4: sort and merge the unsorted bucket array into the sorted bucket
556   //         array if the length of the unsorted bucket array exceeds the
557   //         threshold.
558   if (unsorted_buckets_->num_elements() > kUnsortedBucketsLengthThreshold) {
559     ICING_RETURN_IF_ERROR(SortBuckets());
560   }
561 
562   info().num_data += new_keys.size();
563 
564   return libtextclassifier3::Status::OK;
565 }
566 
567 libtextclassifier3::StatusOr<std::unique_ptr<DocHitInfoIterator>>
GetIterator(int64_t query_key_lower,int64_t query_key_upper) const568 IntegerIndexStorage::GetIterator(int64_t query_key_lower,
569                                  int64_t query_key_upper) const {
570   if (query_key_lower > query_key_upper) {
571     return absl_ports::InvalidArgumentError(
572         "key_lower should not be greater than key_upper");
573   }
574 
575   std::vector<std::unique_ptr<BucketPostingListIterator>> bucket_pl_iters;
576 
577   // Sorted bucket array
578   const Bucket* sorted_bucket_arr_begin = sorted_buckets_->array();
579   const Bucket* sorted_bucket_arr_end =
580       sorted_buckets_->array() + sorted_buckets_->num_elements();
581   for (const Bucket* bucket =
582            std::lower_bound(sorted_bucket_arr_begin, sorted_bucket_arr_end,
583                             query_key_lower, kComparator);
584        bucket < sorted_bucket_arr_end && bucket->key_lower() <= query_key_upper;
585        ++bucket) {
586     if (!bucket->posting_list_identifier().is_valid()) {
587       continue;
588     }
589 
590     ICING_ASSIGN_OR_RETURN(
591         std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor,
592         PostingListIntegerIndexAccessor::CreateFromExisting(
593             flash_index_storage_.get(), posting_list_serializer_,
594             bucket->posting_list_identifier()));
595     bucket_pl_iters.push_back(
596         std::make_unique<BucketPostingListIterator>(std::move(pl_accessor)));
597   }
598 
599   // Unsorted bucket array
600   for (int32_t i = 0; i < unsorted_buckets_->num_elements(); ++i) {
601     ICING_ASSIGN_OR_RETURN(const Bucket* bucket, unsorted_buckets_->Get(i));
602     if (query_key_upper < bucket->key_lower() ||
603         query_key_lower > bucket->key_upper() ||
604         !bucket->posting_list_identifier().is_valid()) {
605       // Skip bucket whose range doesn't overlap with [query_key_lower,
606       // query_key_upper] or posting_list_identifier is invalid.
607       continue;
608     }
609 
610     ICING_ASSIGN_OR_RETURN(
611         std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor,
612         PostingListIntegerIndexAccessor::CreateFromExisting(
613             flash_index_storage_.get(), posting_list_serializer_,
614             bucket->posting_list_identifier()));
615     bucket_pl_iters.push_back(
616         std::make_unique<BucketPostingListIterator>(std::move(pl_accessor)));
617   }
618 
619   return std::make_unique<DocHitInfoIteratorNumeric<int64_t>>(
620       std::make_unique<IntegerIndexStorageIterator>(
621           query_key_lower, query_key_upper, std::move(bucket_pl_iters)));
622 }
623 
TransferIndex(const std::vector<DocumentId> & document_id_old_to_new,IntegerIndexStorage * new_storage) const624 libtextclassifier3::Status IntegerIndexStorage::TransferIndex(
625     const std::vector<DocumentId>& document_id_old_to_new,
626     IntegerIndexStorage* new_storage) const {
627   // Discard all pre-existing buckets in new_storage since we will append newly
628   // merged buckets gradually into new_storage.
629   if (new_storage->sorted_buckets_->num_elements() > 0) {
630     ICING_RETURN_IF_ERROR(new_storage->sorted_buckets_->TruncateTo(0));
631   }
632   if (new_storage->unsorted_buckets_->num_elements() > 0) {
633     ICING_RETURN_IF_ERROR(new_storage->unsorted_buckets_->TruncateTo(0));
634   }
635 
636   // "Reference sort" the original storage buckets.
637   std::vector<std::reference_wrapper<const Bucket>> temp_buckets;
638   temp_buckets.reserve(sorted_buckets_->num_elements() +
639                        unsorted_buckets_->num_elements());
640   temp_buckets.insert(
641       temp_buckets.end(), sorted_buckets_->array(),
642       sorted_buckets_->array() + sorted_buckets_->num_elements());
643   temp_buckets.insert(
644       temp_buckets.end(), unsorted_buckets_->array(),
645       unsorted_buckets_->array() + unsorted_buckets_->num_elements());
646   std::sort(temp_buckets.begin(), temp_buckets.end(),
647             [](const std::reference_wrapper<const Bucket>& lhs,
648                const std::reference_wrapper<const Bucket>& rhs) -> bool {
649               return lhs.get() < rhs.get();
650             });
651 
652   int64_t curr_key_lower = std::numeric_limits<int64_t>::min();
653   int64_t curr_key_upper = std::numeric_limits<int64_t>::min();
654   std::vector<IntegerIndexData> accumulated_data;
655   for (const std::reference_wrapper<const Bucket>& bucket_ref : temp_buckets) {
656     // Read all data from the bucket.
657     std::vector<IntegerIndexData> new_data;
658     if (bucket_ref.get().posting_list_identifier().is_valid()) {
659       ICING_ASSIGN_OR_RETURN(
660           std::unique_ptr<PostingListIntegerIndexAccessor> old_pl_accessor,
661           PostingListIntegerIndexAccessor::CreateFromExisting(
662               flash_index_storage_.get(), posting_list_serializer_,
663               bucket_ref.get().posting_list_identifier()));
664 
665       ICING_ASSIGN_OR_RETURN(std::vector<IntegerIndexData> batch_old_data,
666                              old_pl_accessor->GetNextDataBatch());
667       while (!batch_old_data.empty()) {
668         for (const IntegerIndexData& old_data : batch_old_data) {
669           DocumentId new_document_id =
670               old_data.basic_hit().document_id() < document_id_old_to_new.size()
671                   ? document_id_old_to_new[old_data.basic_hit().document_id()]
672                   : kInvalidDocumentId;
673           // Transfer the document id of the hit if the document is not deleted
674           // or outdated.
675           if (new_document_id != kInvalidDocumentId) {
676             new_data.push_back(
677                 IntegerIndexData(old_data.basic_hit().section_id(),
678                                  new_document_id, old_data.key()));
679           }
680         }
681         ICING_ASSIGN_OR_RETURN(batch_old_data,
682                                old_pl_accessor->GetNextDataBatch());
683       }
684     }
685 
686     // Decide whether:
687     // - Flush accumulated_data and create a new bucket for them.
688     // - OR merge new_data into accumulated_data and go to the next round.
689     if (!accumulated_data.empty() && accumulated_data.size() + new_data.size() >
690                                          kNumDataThresholdForBucketMerge) {
691       // TODO(b/259743562): [Optimization 3] adjust upper bound to fit more data
692       // from new_data to accumulated_data.
693       ICING_RETURN_IF_ERROR(FlushDataIntoNewSortedBucket(
694           curr_key_lower, curr_key_upper, std::move(accumulated_data),
695           new_storage));
696 
697       curr_key_lower = bucket_ref.get().key_lower();
698       accumulated_data = std::move(new_data);
699     } else {
700       // We can just append to accumulated data because
701       // FlushDataIntoNewSortedBucket will take care of sorting the contents.
702       std::move(new_data.begin(), new_data.end(),
703                 std::back_inserter(accumulated_data));
704     }
705     curr_key_upper = bucket_ref.get().key_upper();
706   }
707 
708   // Add the last round of bucket.
709   ICING_RETURN_IF_ERROR(
710       FlushDataIntoNewSortedBucket(curr_key_lower, curr_key_upper,
711                                    std::move(accumulated_data), new_storage));
712 
713   return libtextclassifier3::Status::OK;
714 }
715 
716 /* static */ libtextclassifier3::StatusOr<std::unique_ptr<IntegerIndexStorage>>
InitializeNewFiles(const Filesystem & filesystem,std::string && working_path,Options && options,PostingListIntegerIndexSerializer * posting_list_serializer)717 IntegerIndexStorage::InitializeNewFiles(
718     const Filesystem& filesystem, std::string&& working_path, Options&& options,
719     PostingListIntegerIndexSerializer* posting_list_serializer) {
720   // IntegerIndexStorage uses working_path as working directory path.
721   // Create working directory.
722   if (!filesystem.CreateDirectory(working_path.c_str())) {
723     return absl_ports::InternalError(
724         absl_ports::StrCat("Failed to create directory: ", working_path));
725   }
726 
727   // Initialize sorted_buckets
728   int32_t pre_mapping_mmap_size = sizeof(Bucket) * (1 << 10);
729   ICING_ASSIGN_OR_RETURN(
730       std::unique_ptr<FileBackedVector<Bucket>> sorted_buckets,
731       FileBackedVector<Bucket>::Create(
732           filesystem, GetSortedBucketsFilePath(working_path),
733           MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
734           FileBackedVector<Bucket>::kMaxFileSize,
735           options.pre_mapping_fbv ? pre_mapping_mmap_size : 0));
736 
737   // Initialize unsorted_buckets
738   pre_mapping_mmap_size = sizeof(Bucket) * kUnsortedBucketsLengthThreshold;
739   ICING_ASSIGN_OR_RETURN(
740       std::unique_ptr<FileBackedVector<Bucket>> unsorted_buckets,
741       FileBackedVector<Bucket>::Create(
742           filesystem, GetUnsortedBucketsFilePath(working_path),
743           MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
744           FileBackedVector<Bucket>::kMaxFileSize,
745           options.pre_mapping_fbv ? pre_mapping_mmap_size : 0));
746 
747   // Initialize flash_index_storage
748   ICING_ASSIGN_OR_RETURN(
749       FlashIndexStorage flash_index_storage,
750       FlashIndexStorage::Create(GetFlashIndexStorageFilePath(working_path),
751                                 &filesystem, posting_list_serializer));
752 
753   if (options.HasCustomInitBuckets()) {
754     // Insert custom init buckets.
755     std::sort(options.custom_init_sorted_buckets.begin(),
756               options.custom_init_sorted_buckets.end());
757     ICING_ASSIGN_OR_RETURN(
758         typename FileBackedVector<Bucket>::MutableArrayView
759             mutable_new_sorted_bucket_arr,
760         sorted_buckets->Allocate(options.custom_init_sorted_buckets.size()));
761     mutable_new_sorted_bucket_arr.SetArray(
762         /*idx=*/0, options.custom_init_sorted_buckets.data(),
763         options.custom_init_sorted_buckets.size());
764 
765     ICING_ASSIGN_OR_RETURN(typename FileBackedVector<Bucket>::MutableArrayView
766                                mutable_new_unsorted_bucket_arr,
767                            unsorted_buckets->Allocate(
768                                options.custom_init_unsorted_buckets.size()));
769     mutable_new_unsorted_bucket_arr.SetArray(
770         /*idx=*/0, options.custom_init_unsorted_buckets.data(),
771         options.custom_init_unsorted_buckets.size());
772 
773     // After inserting buckets, we can clear vectors since there is no need to
774     // cache them.
775     options.custom_init_sorted_buckets.clear();
776     options.custom_init_unsorted_buckets.clear();
777   } else {
778     // Insert one bucket with range [INT64_MIN, INT64_MAX].
779     ICING_RETURN_IF_ERROR(sorted_buckets->Append(Bucket(
780         /*key_lower=*/std::numeric_limits<int64_t>::min(),
781         /*key_upper=*/std::numeric_limits<int64_t>::max())));
782   }
783   ICING_RETURN_IF_ERROR(sorted_buckets->PersistToDisk());
784 
785   // Initialize metadata file. Create MemoryMappedFile with pre-mapping, and
786   // call GrowAndRemapIfNecessary to grow the underlying file.
787   ICING_ASSIGN_OR_RETURN(
788       MemoryMappedFile metadata_mmapped_file,
789       MemoryMappedFile::Create(filesystem, GetMetadataFilePath(working_path),
790                                MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
791                                /*max_file_size=*/kMetadataFileSize,
792                                /*pre_mapping_file_offset=*/0,
793                                /*pre_mapping_mmap_size=*/kMetadataFileSize));
794   ICING_RETURN_IF_ERROR(metadata_mmapped_file.GrowAndRemapIfNecessary(
795       /*file_offset=*/0, /*mmap_size=*/kMetadataFileSize));
796 
797   // Create instance.
798   auto new_integer_index_storage =
799       std::unique_ptr<IntegerIndexStorage>(new IntegerIndexStorage(
800           filesystem, std::move(working_path), std::move(options),
801           posting_list_serializer,
802           std::make_unique<MemoryMappedFile>(std::move(metadata_mmapped_file)),
803           std::move(sorted_buckets), std::move(unsorted_buckets),
804           std::make_unique<FlashIndexStorage>(std::move(flash_index_storage))));
805   // Initialize info content by writing mapped memory directly.
806   Info& info_ref = new_integer_index_storage->info();
807   info_ref.magic = Info::kMagic;
808   info_ref.num_data = 0;
809   // Initialize new PersistentStorage. The initial checksums will be computed
810   // and set via InitializeNewStorage.
811   ICING_RETURN_IF_ERROR(new_integer_index_storage->InitializeNewStorage());
812 
813   return new_integer_index_storage;
814 }
815 
816 /* static */ libtextclassifier3::StatusOr<std::unique_ptr<IntegerIndexStorage>>
InitializeExistingFiles(const Filesystem & filesystem,std::string && working_path,Options && options,PostingListIntegerIndexSerializer * posting_list_serializer)817 IntegerIndexStorage::InitializeExistingFiles(
818     const Filesystem& filesystem, std::string&& working_path, Options&& options,
819     PostingListIntegerIndexSerializer* posting_list_serializer) {
820   // Mmap the content of the crcs and info.
821   ICING_ASSIGN_OR_RETURN(
822       MemoryMappedFile metadata_mmapped_file,
823       MemoryMappedFile::Create(filesystem, GetMetadataFilePath(working_path),
824                                MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
825                                /*max_file_size=*/kMetadataFileSize,
826                                /*pre_mapping_file_offset=*/0,
827                                /*pre_mapping_mmap_size=*/kMetadataFileSize));
828   if (metadata_mmapped_file.available_size() != kMetadataFileSize) {
829     return absl_ports::FailedPreconditionError("Incorrect metadata file size");
830   }
831 
832   // Initialize sorted_buckets
833   int32_t pre_mapping_mmap_size = sizeof(Bucket) * (1 << 10);
834   ICING_ASSIGN_OR_RETURN(
835       std::unique_ptr<FileBackedVector<Bucket>> sorted_buckets,
836       FileBackedVector<Bucket>::Create(
837           filesystem, GetSortedBucketsFilePath(working_path),
838           MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
839           FileBackedVector<Bucket>::kMaxFileSize,
840           options.pre_mapping_fbv ? pre_mapping_mmap_size : 0));
841 
842   // Initialize unsorted_buckets
843   pre_mapping_mmap_size = sizeof(Bucket) * kUnsortedBucketsLengthThreshold;
844   ICING_ASSIGN_OR_RETURN(
845       std::unique_ptr<FileBackedVector<Bucket>> unsorted_buckets,
846       FileBackedVector<Bucket>::Create(
847           filesystem, GetUnsortedBucketsFilePath(working_path),
848           MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
849           FileBackedVector<Bucket>::kMaxFileSize,
850           options.pre_mapping_fbv ? pre_mapping_mmap_size : 0));
851 
852   // Initialize flash_index_storage
853   ICING_ASSIGN_OR_RETURN(
854       FlashIndexStorage flash_index_storage,
855       FlashIndexStorage::Create(GetFlashIndexStorageFilePath(working_path),
856                                 &filesystem, posting_list_serializer));
857 
858   // Create instance.
859   auto integer_index_storage =
860       std::unique_ptr<IntegerIndexStorage>(new IntegerIndexStorage(
861           filesystem, std::move(working_path), std::move(options),
862           posting_list_serializer,
863           std::make_unique<MemoryMappedFile>(std::move(metadata_mmapped_file)),
864           std::move(sorted_buckets), std::move(unsorted_buckets),
865           std::make_unique<FlashIndexStorage>(std::move(flash_index_storage))));
866   // Initialize existing PersistentStorage. Checksums will be validated.
867   ICING_RETURN_IF_ERROR(integer_index_storage->InitializeExistingStorage());
868 
869   // Validate other values of info and options.
870   // Magic should be consistent with the codebase.
871   if (integer_index_storage->info().magic != Info::kMagic) {
872     return absl_ports::FailedPreconditionError("Incorrect magic value");
873   }
874 
875   return integer_index_storage;
876 }
877 
878 /* static */ libtextclassifier3::Status
FlushDataIntoNewSortedBucket(int64_t key_lower,int64_t key_upper,std::vector<IntegerIndexData> && data,IntegerIndexStorage * storage)879 IntegerIndexStorage::FlushDataIntoNewSortedBucket(
880     int64_t key_lower, int64_t key_upper, std::vector<IntegerIndexData>&& data,
881     IntegerIndexStorage* storage) {
882   if (data.empty()) {
883     return storage->sorted_buckets_->Append(
884         Bucket(key_lower, key_upper, PostingListIdentifier::kInvalid));
885   }
886 
887   ICING_ASSIGN_OR_RETURN(
888       PostingListIdentifier pl_id,
889       FlushDataIntoPostingLists(storage->flash_index_storage_.get(),
890                                 storage->posting_list_serializer_, data.begin(),
891                                 data.end()));
892 
893   storage->info().num_data += data.size();
894   return storage->sorted_buckets_->Append(Bucket(key_lower, key_upper, pl_id));
895 }
896 
PersistStoragesToDisk()897 libtextclassifier3::Status IntegerIndexStorage::PersistStoragesToDisk() {
898   ICING_RETURN_IF_ERROR(sorted_buckets_->PersistToDisk());
899   ICING_RETURN_IF_ERROR(unsorted_buckets_->PersistToDisk());
900   if (!flash_index_storage_->PersistToDisk()) {
901     return absl_ports::InternalError(
902         "Fail to persist FlashIndexStorage to disk");
903   }
904   return libtextclassifier3::Status::OK;
905 }
906 
PersistMetadataToDisk()907 libtextclassifier3::Status IntegerIndexStorage::PersistMetadataToDisk() {
908   // Changes should have been applied to the underlying file when using
909   // MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC, but call msync() as an
910   // extra safety step to ensure they are written out.
911   return metadata_mmapped_file_->PersistToDisk();
912 }
913 
ComputeInfoChecksum()914 libtextclassifier3::StatusOr<Crc32> IntegerIndexStorage::ComputeInfoChecksum() {
915   return info().ComputeChecksum();
916 }
917 
918 libtextclassifier3::StatusOr<Crc32>
ComputeStoragesChecksum()919 IntegerIndexStorage::ComputeStoragesChecksum() {
920   // Compute crcs
921   ICING_ASSIGN_OR_RETURN(Crc32 sorted_buckets_crc,
922                          sorted_buckets_->ComputeChecksum());
923   ICING_ASSIGN_OR_RETURN(Crc32 unsorted_buckets_crc,
924                          unsorted_buckets_->ComputeChecksum());
925 
926   // TODO(b/259744228): implement and include flash_index_storage checksum
927   return Crc32(sorted_buckets_crc.Get() ^ unsorted_buckets_crc.Get());
928 }
929 
930 libtextclassifier3::StatusOr<std::vector<IntegerIndexStorage::Bucket>>
AddKeysIntoBucketAndSplitIfNecessary(DocumentId document_id,SectionId section_id,const std::vector<int64_t>::const_iterator & it_start,const std::vector<int64_t>::const_iterator & it_end,FileBackedVector<Bucket>::MutableView & mutable_bucket)931 IntegerIndexStorage::AddKeysIntoBucketAndSplitIfNecessary(
932     DocumentId document_id, SectionId section_id,
933     const std::vector<int64_t>::const_iterator& it_start,
934     const std::vector<int64_t>::const_iterator& it_end,
935     FileBackedVector<Bucket>::MutableView& mutable_bucket) {
936   std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor;
937   if (mutable_bucket.Get().posting_list_identifier().is_valid()) {
938     ICING_ASSIGN_OR_RETURN(
939         pl_accessor, PostingListIntegerIndexAccessor::CreateFromExisting(
940                          flash_index_storage_.get(), posting_list_serializer_,
941                          mutable_bucket.Get().posting_list_identifier()));
942   } else {
943     ICING_ASSIGN_OR_RETURN(
944         pl_accessor, PostingListIntegerIndexAccessor::Create(
945                          flash_index_storage_.get(), posting_list_serializer_));
946   }
947 
948   for (auto it = it_start; it != it_end; ++it) {
949     if (mutable_bucket.Get().key_lower() < mutable_bucket.Get().key_upper() &&
950         pl_accessor->WantsSplit()) {
951       // If the bucket needs split (max size and full) and is splittable, then
952       // we perform bucket splitting.
953 
954       // 1. Finalize the current posting list accessor.
955       PostingListAccessor::FinalizeResult result =
956           std::move(*pl_accessor).Finalize();
957       if (!result.status.ok()) {
958         return result.status;
959       }
960 
961       // 2. Create another posting list accessor instance. Read all data and
962       //    free all posting lists.
963       ICING_ASSIGN_OR_RETURN(
964           pl_accessor,
965           PostingListIntegerIndexAccessor::CreateFromExisting(
966               flash_index_storage_.get(), posting_list_serializer_, result.id));
967       ICING_ASSIGN_OR_RETURN(std::vector<IntegerIndexData> all_data,
968                              pl_accessor->GetAllDataAndFree());
969 
970       // 3. Append all remaining new data.
971       all_data.reserve(all_data.size() + std::distance(it, it_end));
972       for (; it != it_end; ++it) {
973         all_data.push_back(IntegerIndexData(section_id, document_id, *it));
974       }
975 
976       // 4. Run bucket splitting algorithm to decide new buckets and dispatch
977       //    data.
978       std::vector<integer_index_bucket_util::DataRangeAndBucketInfo>
979           new_bucket_infos = integer_index_bucket_util::Split(
980               all_data, mutable_bucket.Get().key_lower(),
981               mutable_bucket.Get().key_upper(),
982               kNumDataThresholdForBucketSplit);
983       if (new_bucket_infos.empty()) {
984         ICING_LOG(WARNING)
985             << "No buckets after splitting. This should not happen.";
986         return absl_ports::InternalError("Split error");
987       }
988 
989       // 5. Flush data.
990       std::vector<Bucket> new_buckets;
991       for (int i = 0; i < new_bucket_infos.size(); ++i) {
992         ICING_ASSIGN_OR_RETURN(
993             PostingListIdentifier pl_id,
994             FlushDataIntoPostingLists(
995                 flash_index_storage_.get(), posting_list_serializer_,
996                 new_bucket_infos[i].start, new_bucket_infos[i].end));
997         if (i == 0) {
998           // Reuse mutable_bucket
999           mutable_bucket.Get().set_key_lower(new_bucket_infos[i].key_lower);
1000           mutable_bucket.Get().set_key_upper(new_bucket_infos[i].key_upper);
1001           mutable_bucket.Get().set_posting_list_identifier(pl_id);
1002         } else {
1003           new_buckets.push_back(Bucket(new_bucket_infos[i].key_lower,
1004                                        new_bucket_infos[i].key_upper, pl_id));
1005         }
1006       }
1007 
1008       return new_buckets;
1009     }
1010 
1011     ICING_RETURN_IF_ERROR(pl_accessor->PrependData(
1012         IntegerIndexData(section_id, document_id, *it)));
1013   }
1014 
1015   PostingListAccessor::FinalizeResult result =
1016       std::move(*pl_accessor).Finalize();
1017   if (!result.status.ok()) {
1018     return result.status;
1019   }
1020   if (!result.id.is_valid()) {
1021     return absl_ports::InternalError("Fail to flush data into posting list(s)");
1022   }
1023 
1024   mutable_bucket.Get().set_posting_list_identifier(result.id);
1025 
1026   return std::vector<Bucket>();
1027 }
1028 
SortBuckets()1029 libtextclassifier3::Status IntegerIndexStorage::SortBuckets() {
1030   if (unsorted_buckets_->num_elements() == 0) {
1031     return libtextclassifier3::Status::OK;
1032   }
1033 
1034   int32_t sorted_len = sorted_buckets_->num_elements();
1035   int32_t unsorted_len = unsorted_buckets_->num_elements();
1036   if (sorted_len > FileBackedVector<Bucket>::kMaxNumElements - unsorted_len) {
1037     return absl_ports::OutOfRangeError(
1038         "Sorted buckets length exceeds the limit after merging");
1039   }
1040 
1041   ICING_RETURN_IF_ERROR(sorted_buckets_->Allocate(unsorted_len));
1042 
1043   // Sort unsorted_buckets_.
1044   ICING_RETURN_IF_ERROR(
1045       unsorted_buckets_->Sort(/*begin_idx=*/0, /*end_idx=*/unsorted_len));
1046 
1047   // Merge unsorted_buckets_ into sorted_buckets_ and clear unsorted_buckets_.
1048   // Note that we could have used std::sort + std::inplace_merge, but it is more
1049   // complicated to deal with FileBackedVector SetDirty logic, so implement our
1050   // own merging with FileBackedVector methods.
1051   //
1052   // Merge buckets from back. This could save some iterations and avoid setting
1053   // dirty for unchanged elements of the original sorted segments.
1054   // For example, we can avoid setting dirty for elements [1, 2, 3, 5] for the
1055   // following sorted/unsorted data:
1056   // - sorted: [1, 2, 3, 5, 8, 13, _, _, _, _)]
1057   // - unsorted: [6, 10, 14, 15]
1058   int32_t sorted_write_idx = sorted_len + unsorted_len - 1;
1059   int32_t sorted_curr_idx = sorted_len - 1;
1060   int32_t unsorted_curr_idx = unsorted_len - 1;
1061   while (unsorted_curr_idx >= 0) {
1062     if (sorted_curr_idx >= 0 && unsorted_buckets_->array()[unsorted_curr_idx] <
1063                                     sorted_buckets_->array()[sorted_curr_idx]) {
1064       ICING_RETURN_IF_ERROR(sorted_buckets_->Set(
1065           sorted_write_idx, sorted_buckets_->array()[sorted_curr_idx]));
1066       --sorted_curr_idx;
1067 
1068     } else {
1069       ICING_RETURN_IF_ERROR(sorted_buckets_->Set(
1070           sorted_write_idx, unsorted_buckets_->array()[unsorted_curr_idx]));
1071       --unsorted_curr_idx;
1072     }
1073     --sorted_write_idx;
1074   }
1075 
1076   ICING_RETURN_IF_ERROR(unsorted_buckets_->TruncateTo(0));
1077 
1078   return libtextclassifier3::Status::OK;
1079 }
1080 
1081 }  // namespace lib
1082 }  // namespace icing
1083