• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2022 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "icing/join/join-processor.h"
16 
17 #include <algorithm>
18 #include <deque>
19 #include <memory>
20 #include <optional>
21 #include <queue>
22 #include <string>
23 #include <string_view>
24 #include <unordered_map>
25 #include <unordered_set>
26 #include <utility>
27 #include <vector>
28 
29 #include "icing/text_classifier/lib3/utils/base/status.h"
30 #include "icing/text_classifier/lib3/utils/base/statusor.h"
31 #include "icing/absl_ports/canonical_errors.h"
32 #include "icing/absl_ports/str_cat.h"
33 #include "icing/join/aggregation-scorer.h"
34 #include "icing/join/document-join-id-pair.h"
35 #include "icing/join/join-children-fetcher-impl-deprecated.h"
36 #include "icing/join/join-children-fetcher-impl-v3.h"
37 #include "icing/join/join-children-fetcher.h"
38 #include "icing/join/qualified-id-join-index.h"
39 #include "icing/join/qualified-id.h"
40 #include "icing/proto/schema.pb.h"
41 #include "icing/proto/scoring.pb.h"
42 #include "icing/proto/search.pb.h"
43 #include "icing/schema/joinable-property.h"
44 #include "icing/scoring/scored-document-hit.h"
45 #include "icing/store/document-filter-data.h"
46 #include "icing/store/document-id.h"
47 #include "icing/store/namespace-id-fingerprint.h"
48 #include "icing/util/logging.h"
49 #include "icing/util/status-macros.h"
50 
51 namespace icing {
52 namespace lib {
53 
54 libtextclassifier3::StatusOr<std::unique_ptr<JoinChildrenFetcher>>
GetChildrenFetcher(const JoinSpecProto & join_spec,std::vector<ScoredDocumentHit> && child_scored_document_hits)55 JoinProcessor::GetChildrenFetcher(
56     const JoinSpecProto& join_spec,
57     std::vector<ScoredDocumentHit>&& child_scored_document_hits) {
58   if (join_spec.parent_property_expression() != kQualifiedIdExpr) {
59     // TODO(b/256022027): So far we only support kQualifiedIdExpr for
60     // parent_property_expression, we could support more.
61     return absl_ports::UnimplementedError(absl_ports::StrCat(
62         "Parent property expression must be ", kQualifiedIdExpr));
63   }
64 
65   switch (qualified_id_join_index_->version()) {
66     case QualifiedIdJoinIndex::Version::kV2:
67       return GetChildrenFetcherV2(join_spec,
68                                   std::move(child_scored_document_hits));
69     case QualifiedIdJoinIndex::Version::kV3:
70       return JoinChildrenFetcherImplV3::Create(
71           join_spec, schema_store_, doc_store_, qualified_id_join_index_,
72           current_time_ms_, std::move(child_scored_document_hits));
73   }
74 }
75 
76 libtextclassifier3::StatusOr<std::unique_ptr<JoinChildrenFetcher>>
GetChildrenFetcherV2(const JoinSpecProto & join_spec,std::vector<ScoredDocumentHit> && child_scored_document_hits)77 JoinProcessor::GetChildrenFetcherV2(
78     const JoinSpecProto& join_spec,
79     std::vector<ScoredDocumentHit>&& child_scored_document_hits) {
80   // Step 1a: sort child ScoredDocumentHits in document id descending order.
81   std::sort(child_scored_document_hits.begin(),
82             child_scored_document_hits.end(),
83             [](const ScoredDocumentHit& lhs, const ScoredDocumentHit& rhs) {
84               return lhs.document_id() > rhs.document_id();
85             });
86 
87   // Step 1b: group all child ScoredDocumentHits by the document's
88   //          schema_type_id.
89   std::unordered_map<SchemaTypeId, std::vector<ScoredDocumentHit>>
90       schema_to_child_scored_doc_hits_map;
91   for (const ScoredDocumentHit& child_scored_document_hit :
92        child_scored_document_hits) {
93     std::optional<DocumentFilterData> child_doc_filter_data =
94         doc_store_->GetAliveDocumentFilterData(
95             child_scored_document_hit.document_id(), current_time_ms_);
96     if (!child_doc_filter_data) {
97       continue;
98     }
99 
100     schema_to_child_scored_doc_hits_map[child_doc_filter_data->schema_type_id()]
101         .push_back(child_scored_document_hit);
102   }
103 
104   // Step 1c: for each schema_type_id, lookup QualifiedIdJoinIndexImplV2 to
105   //          fetch all child join data from posting list(s). Convert all
106   //          child join data to referenced parent document ids and bucketize
107   //          child ScoredDocumentHits by it.
108   std::unordered_map<DocumentId, std::vector<ScoredDocumentHit>>
109       parent_to_child_docs_map;
110   for (auto& [schema_type_id, grouped_child_scored_doc_hits] :
111        schema_to_child_scored_doc_hits_map) {
112     // Get joinable_property_id of this schema.
113     ICING_ASSIGN_OR_RETURN(
114         const JoinablePropertyMetadata* metadata,
115         schema_store_->GetJoinablePropertyMetadata(
116             schema_type_id, join_spec.child_property_expression()));
117     if (metadata == nullptr ||
118         metadata->value_type != JoinableConfig::ValueType::QUALIFIED_ID) {
119       // Currently we only support qualified id, so skip other types.
120       continue;
121     }
122 
123     // Lookup QualifiedIdJoinIndexImplV2.
124     ICING_ASSIGN_OR_RETURN(
125         std::unique_ptr<QualifiedIdJoinIndex::JoinDataIteratorBase>
126             join_index_iter,
127         qualified_id_join_index_->GetIterator(
128             schema_type_id, /*joinable_property_id=*/metadata->id));
129 
130     // - Join index contains all join data of schema_type_id and
131     //   join_index_iter will return all of them in (child) document id
132     //   descending order.
133     // - But we only need join data of child document ids which appear in
134     //   grouped_child_scored_doc_hits. Also grouped_child_scored_doc_hits
135     //   contain ScoredDocumentHits in (child) document id descending order.
136     // - Therefore, we advance 2 iterators to intersect them and get desired
137     //   join data.
138     auto child_scored_doc_hits_iter = grouped_child_scored_doc_hits.cbegin();
139     while (join_index_iter->Advance().ok() &&
140            child_scored_doc_hits_iter != grouped_child_scored_doc_hits.cend()) {
141       // Advance child_scored_doc_hits_iter until it points to a
142       // ScoredDocumentHit with document id <= the one pointed by
143       // join_index_iter.
144       while (child_scored_doc_hits_iter !=
145                  grouped_child_scored_doc_hits.cend() &&
146              child_scored_doc_hits_iter->document_id() >
147                  join_index_iter->GetCurrent().document_id()) {
148         ++child_scored_doc_hits_iter;
149       }
150 
151       if (child_scored_doc_hits_iter != grouped_child_scored_doc_hits.cend() &&
152           child_scored_doc_hits_iter->document_id() ==
153               join_index_iter->GetCurrent().document_id()) {
154         // We get a join data whose child document id exists in both join
155         // index and grouped_child_scored_doc_hits. Convert its join info to
156         // referenced parent document ids and bucketize ScoredDocumentHits by
157         // it (putting into parent_to_child_docs_map).
158         const NamespaceIdFingerprint& ref_doc_nsid_uri_fingerprint =
159             join_index_iter->GetCurrent().join_info();
160         libtextclassifier3::StatusOr<DocumentId> ref_parent_doc_id_or =
161             doc_store_->GetDocumentId(ref_doc_nsid_uri_fingerprint);
162         if (ref_parent_doc_id_or.ok()) {
163           parent_to_child_docs_map[std::move(ref_parent_doc_id_or).ValueOrDie()]
164               .push_back(*child_scored_doc_hits_iter);
165         }
166       }
167     }
168   }
169 
170   // Step 1d: finally, sort each parent's joined child ScoredDocumentHits by
171   //          score.
172   ScoredDocumentHitComparator score_comparator(
173       /*is_descending=*/join_spec.nested_spec().scoring_spec().order_by() ==
174       ScoringSpecProto::Order::DESC);
175   for (auto& [parent_doc_id, bucketized_child_scored_hits] :
176        parent_to_child_docs_map) {
177     std::sort(bucketized_child_scored_hits.begin(),
178               bucketized_child_scored_hits.end(), score_comparator);
179   }
180 
181   return JoinChildrenFetcherImplDeprecated::Create(
182       join_spec, std::move(parent_to_child_docs_map));
183 }
184 
185 libtextclassifier3::StatusOr<std::vector<JoinedScoredDocumentHit>>
Join(const JoinSpecProto & join_spec,std::vector<ScoredDocumentHit> && parent_scored_document_hits,const JoinChildrenFetcher & join_children_fetcher)186 JoinProcessor::Join(
187     const JoinSpecProto& join_spec,
188     std::vector<ScoredDocumentHit>&& parent_scored_document_hits,
189     const JoinChildrenFetcher& join_children_fetcher) {
190   std::unique_ptr<AggregationScorer> aggregation_scorer =
191       AggregationScorer::Create(join_spec);
192 
193   std::vector<JoinedScoredDocumentHit> joined_scored_document_hits;
194   joined_scored_document_hits.reserve(parent_scored_document_hits.size());
195 
196   // Step 2: iterate through all parent documentIds and construct
197   //         JoinedScoredDocumentHit for each by looking up
198   //         join_children_fetcher.
199   for (ScoredDocumentHit& parent : parent_scored_document_hits) {
200     ICING_ASSIGN_OR_RETURN(
201         std::vector<ScoredDocumentHit> children,
202         join_children_fetcher.GetChildren(parent.document_id()));
203 
204     double final_score = aggregation_scorer->GetScore(parent, children);
205     joined_scored_document_hits.emplace_back(final_score, std::move(parent),
206                                              std::move(children));
207   }
208 
209   return joined_scored_document_hits;
210 }
211 
212 libtextclassifier3::StatusOr<std::unordered_set<DocumentId>>
GetPropagatedChildDocumentsToDelete(const std::unordered_set<DocumentId> & deleted_document_ids)213 JoinProcessor::GetPropagatedChildDocumentsToDelete(
214     const std::unordered_set<DocumentId>& deleted_document_ids) {
215   // Sanity check: join index should be V3.
216   if (qualified_id_join_index_->version() !=
217       QualifiedIdJoinIndex::Version::kV3) {
218     return absl_ports::UnimplementedError(
219         "QualifiedIdJoinIndex version must be V3 to support delete "
220         "propagation.");
221   }
222 
223   // BFS traverse to find all child documents to propagate delete.
224   std::queue<DocumentId> que(
225       std::deque(deleted_document_ids.begin(), deleted_document_ids.end()));
226   std::unordered_set<DocumentId> child_documents_to_delete;
227   while (!que.empty()) {
228     DocumentId doc_id_to_expand = que.front();
229     que.pop();
230 
231     ICING_ASSIGN_OR_RETURN(std::vector<DocumentJoinIdPair> child_join_id_pairs,
232                            qualified_id_join_index_->Get(doc_id_to_expand));
233     for (const DocumentJoinIdPair& child_join_id_pair : child_join_id_pairs) {
234       if (child_documents_to_delete.find(child_join_id_pair.document_id()) !=
235               child_documents_to_delete.end() ||
236           deleted_document_ids.find(child_join_id_pair.document_id()) !=
237               deleted_document_ids.end()) {
238         // Already added into the set to delete or already deleted (happens only
239         // when there is a cycle back to the deleted or traversed document in
240         // the join relation). Skip it.
241         continue;
242       }
243 
244       // Get DocumentFilterData of the child document to look up its schema type
245       // id.
246       // - Skip if the child document has been deleted, since delete propagation
247       //   should've been done to all its children when deleting it previously.
248       // - Otherwise, we have to handle this child document and propagate delete
249       //   to the grandchildren, even if it is expired.
250       std::optional<DocumentFilterData> child_filter_data =
251           doc_store_->GetNonDeletedDocumentFilterData(
252               child_join_id_pair.document_id());
253       if (!child_filter_data) {
254         // The child document has been deleted. Skip.
255         continue;
256       }
257 
258       libtextclassifier3::StatusOr<const JoinablePropertyMetadata*>
259           metadata_or = schema_store_->GetJoinablePropertyMetadata(
260               child_filter_data->schema_type_id(),
261               child_join_id_pair.joinable_property_id());
262       if (!metadata_or.ok() || metadata_or.ValueOrDie() == nullptr) {
263         // This shouldn't happen because we've validated it during indexing and
264         // only put valid DocumentJoinIdPair into qualified id join index.
265         // Log and skip it.
266         ICING_LOG(ERROR) << "Failed to get metadata for schema type id "
267                          << child_filter_data->schema_type_id()
268                          << ", joinable property id "
269                          << static_cast<int>(
270                                 child_join_id_pair.joinable_property_id());
271         continue;
272       }
273       const JoinablePropertyMetadata* metadata = metadata_or.ValueOrDie();
274 
275       if (metadata->value_type == JoinableConfig::ValueType::QUALIFIED_ID &&
276           metadata->delete_propagation_type ==
277               JoinableConfig::DeletePropagationType::PROPAGATE_FROM) {
278         child_documents_to_delete.insert(child_join_id_pair.document_id());
279         que.push(child_join_id_pair.document_id());
280       }
281     }
282   }
283 
284   return child_documents_to_delete;
285 }
286 
287 }  // namespace lib
288 }  // namespace icing
289