1 // Copyright (C) 2022 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "icing/join/join-processor.h"
16
17 #include <algorithm>
18 #include <deque>
19 #include <memory>
20 #include <optional>
21 #include <queue>
22 #include <string>
23 #include <string_view>
24 #include <unordered_map>
25 #include <unordered_set>
26 #include <utility>
27 #include <vector>
28
29 #include "icing/text_classifier/lib3/utils/base/status.h"
30 #include "icing/text_classifier/lib3/utils/base/statusor.h"
31 #include "icing/absl_ports/canonical_errors.h"
32 #include "icing/absl_ports/str_cat.h"
33 #include "icing/join/aggregation-scorer.h"
34 #include "icing/join/document-join-id-pair.h"
35 #include "icing/join/join-children-fetcher-impl-deprecated.h"
36 #include "icing/join/join-children-fetcher-impl-v3.h"
37 #include "icing/join/join-children-fetcher.h"
38 #include "icing/join/qualified-id-join-index.h"
39 #include "icing/join/qualified-id.h"
40 #include "icing/proto/schema.pb.h"
41 #include "icing/proto/scoring.pb.h"
42 #include "icing/proto/search.pb.h"
43 #include "icing/schema/joinable-property.h"
44 #include "icing/scoring/scored-document-hit.h"
45 #include "icing/store/document-filter-data.h"
46 #include "icing/store/document-id.h"
47 #include "icing/store/namespace-id-fingerprint.h"
48 #include "icing/util/logging.h"
49 #include "icing/util/status-macros.h"
50
51 namespace icing {
52 namespace lib {
53
54 libtextclassifier3::StatusOr<std::unique_ptr<JoinChildrenFetcher>>
GetChildrenFetcher(const JoinSpecProto & join_spec,std::vector<ScoredDocumentHit> && child_scored_document_hits)55 JoinProcessor::GetChildrenFetcher(
56 const JoinSpecProto& join_spec,
57 std::vector<ScoredDocumentHit>&& child_scored_document_hits) {
58 if (join_spec.parent_property_expression() != kQualifiedIdExpr) {
59 // TODO(b/256022027): So far we only support kQualifiedIdExpr for
60 // parent_property_expression, we could support more.
61 return absl_ports::UnimplementedError(absl_ports::StrCat(
62 "Parent property expression must be ", kQualifiedIdExpr));
63 }
64
65 switch (qualified_id_join_index_->version()) {
66 case QualifiedIdJoinIndex::Version::kV2:
67 return GetChildrenFetcherV2(join_spec,
68 std::move(child_scored_document_hits));
69 case QualifiedIdJoinIndex::Version::kV3:
70 return JoinChildrenFetcherImplV3::Create(
71 join_spec, schema_store_, doc_store_, qualified_id_join_index_,
72 current_time_ms_, std::move(child_scored_document_hits));
73 }
74 }
75
76 libtextclassifier3::StatusOr<std::unique_ptr<JoinChildrenFetcher>>
GetChildrenFetcherV2(const JoinSpecProto & join_spec,std::vector<ScoredDocumentHit> && child_scored_document_hits)77 JoinProcessor::GetChildrenFetcherV2(
78 const JoinSpecProto& join_spec,
79 std::vector<ScoredDocumentHit>&& child_scored_document_hits) {
80 // Step 1a: sort child ScoredDocumentHits in document id descending order.
81 std::sort(child_scored_document_hits.begin(),
82 child_scored_document_hits.end(),
83 [](const ScoredDocumentHit& lhs, const ScoredDocumentHit& rhs) {
84 return lhs.document_id() > rhs.document_id();
85 });
86
87 // Step 1b: group all child ScoredDocumentHits by the document's
88 // schema_type_id.
89 std::unordered_map<SchemaTypeId, std::vector<ScoredDocumentHit>>
90 schema_to_child_scored_doc_hits_map;
91 for (const ScoredDocumentHit& child_scored_document_hit :
92 child_scored_document_hits) {
93 std::optional<DocumentFilterData> child_doc_filter_data =
94 doc_store_->GetAliveDocumentFilterData(
95 child_scored_document_hit.document_id(), current_time_ms_);
96 if (!child_doc_filter_data) {
97 continue;
98 }
99
100 schema_to_child_scored_doc_hits_map[child_doc_filter_data->schema_type_id()]
101 .push_back(child_scored_document_hit);
102 }
103
104 // Step 1c: for each schema_type_id, lookup QualifiedIdJoinIndexImplV2 to
105 // fetch all child join data from posting list(s). Convert all
106 // child join data to referenced parent document ids and bucketize
107 // child ScoredDocumentHits by it.
108 std::unordered_map<DocumentId, std::vector<ScoredDocumentHit>>
109 parent_to_child_docs_map;
110 for (auto& [schema_type_id, grouped_child_scored_doc_hits] :
111 schema_to_child_scored_doc_hits_map) {
112 // Get joinable_property_id of this schema.
113 ICING_ASSIGN_OR_RETURN(
114 const JoinablePropertyMetadata* metadata,
115 schema_store_->GetJoinablePropertyMetadata(
116 schema_type_id, join_spec.child_property_expression()));
117 if (metadata == nullptr ||
118 metadata->value_type != JoinableConfig::ValueType::QUALIFIED_ID) {
119 // Currently we only support qualified id, so skip other types.
120 continue;
121 }
122
123 // Lookup QualifiedIdJoinIndexImplV2.
124 ICING_ASSIGN_OR_RETURN(
125 std::unique_ptr<QualifiedIdJoinIndex::JoinDataIteratorBase>
126 join_index_iter,
127 qualified_id_join_index_->GetIterator(
128 schema_type_id, /*joinable_property_id=*/metadata->id));
129
130 // - Join index contains all join data of schema_type_id and
131 // join_index_iter will return all of them in (child) document id
132 // descending order.
133 // - But we only need join data of child document ids which appear in
134 // grouped_child_scored_doc_hits. Also grouped_child_scored_doc_hits
135 // contain ScoredDocumentHits in (child) document id descending order.
136 // - Therefore, we advance 2 iterators to intersect them and get desired
137 // join data.
138 auto child_scored_doc_hits_iter = grouped_child_scored_doc_hits.cbegin();
139 while (join_index_iter->Advance().ok() &&
140 child_scored_doc_hits_iter != grouped_child_scored_doc_hits.cend()) {
141 // Advance child_scored_doc_hits_iter until it points to a
142 // ScoredDocumentHit with document id <= the one pointed by
143 // join_index_iter.
144 while (child_scored_doc_hits_iter !=
145 grouped_child_scored_doc_hits.cend() &&
146 child_scored_doc_hits_iter->document_id() >
147 join_index_iter->GetCurrent().document_id()) {
148 ++child_scored_doc_hits_iter;
149 }
150
151 if (child_scored_doc_hits_iter != grouped_child_scored_doc_hits.cend() &&
152 child_scored_doc_hits_iter->document_id() ==
153 join_index_iter->GetCurrent().document_id()) {
154 // We get a join data whose child document id exists in both join
155 // index and grouped_child_scored_doc_hits. Convert its join info to
156 // referenced parent document ids and bucketize ScoredDocumentHits by
157 // it (putting into parent_to_child_docs_map).
158 const NamespaceIdFingerprint& ref_doc_nsid_uri_fingerprint =
159 join_index_iter->GetCurrent().join_info();
160 libtextclassifier3::StatusOr<DocumentId> ref_parent_doc_id_or =
161 doc_store_->GetDocumentId(ref_doc_nsid_uri_fingerprint);
162 if (ref_parent_doc_id_or.ok()) {
163 parent_to_child_docs_map[std::move(ref_parent_doc_id_or).ValueOrDie()]
164 .push_back(*child_scored_doc_hits_iter);
165 }
166 }
167 }
168 }
169
170 // Step 1d: finally, sort each parent's joined child ScoredDocumentHits by
171 // score.
172 ScoredDocumentHitComparator score_comparator(
173 /*is_descending=*/join_spec.nested_spec().scoring_spec().order_by() ==
174 ScoringSpecProto::Order::DESC);
175 for (auto& [parent_doc_id, bucketized_child_scored_hits] :
176 parent_to_child_docs_map) {
177 std::sort(bucketized_child_scored_hits.begin(),
178 bucketized_child_scored_hits.end(), score_comparator);
179 }
180
181 return JoinChildrenFetcherImplDeprecated::Create(
182 join_spec, std::move(parent_to_child_docs_map));
183 }
184
185 libtextclassifier3::StatusOr<std::vector<JoinedScoredDocumentHit>>
Join(const JoinSpecProto & join_spec,std::vector<ScoredDocumentHit> && parent_scored_document_hits,const JoinChildrenFetcher & join_children_fetcher)186 JoinProcessor::Join(
187 const JoinSpecProto& join_spec,
188 std::vector<ScoredDocumentHit>&& parent_scored_document_hits,
189 const JoinChildrenFetcher& join_children_fetcher) {
190 std::unique_ptr<AggregationScorer> aggregation_scorer =
191 AggregationScorer::Create(join_spec);
192
193 std::vector<JoinedScoredDocumentHit> joined_scored_document_hits;
194 joined_scored_document_hits.reserve(parent_scored_document_hits.size());
195
196 // Step 2: iterate through all parent documentIds and construct
197 // JoinedScoredDocumentHit for each by looking up
198 // join_children_fetcher.
199 for (ScoredDocumentHit& parent : parent_scored_document_hits) {
200 ICING_ASSIGN_OR_RETURN(
201 std::vector<ScoredDocumentHit> children,
202 join_children_fetcher.GetChildren(parent.document_id()));
203
204 double final_score = aggregation_scorer->GetScore(parent, children);
205 joined_scored_document_hits.emplace_back(final_score, std::move(parent),
206 std::move(children));
207 }
208
209 return joined_scored_document_hits;
210 }
211
212 libtextclassifier3::StatusOr<std::unordered_set<DocumentId>>
GetPropagatedChildDocumentsToDelete(const std::unordered_set<DocumentId> & deleted_document_ids)213 JoinProcessor::GetPropagatedChildDocumentsToDelete(
214 const std::unordered_set<DocumentId>& deleted_document_ids) {
215 // Sanity check: join index should be V3.
216 if (qualified_id_join_index_->version() !=
217 QualifiedIdJoinIndex::Version::kV3) {
218 return absl_ports::UnimplementedError(
219 "QualifiedIdJoinIndex version must be V3 to support delete "
220 "propagation.");
221 }
222
223 // BFS traverse to find all child documents to propagate delete.
224 std::queue<DocumentId> que(
225 std::deque(deleted_document_ids.begin(), deleted_document_ids.end()));
226 std::unordered_set<DocumentId> child_documents_to_delete;
227 while (!que.empty()) {
228 DocumentId doc_id_to_expand = que.front();
229 que.pop();
230
231 ICING_ASSIGN_OR_RETURN(std::vector<DocumentJoinIdPair> child_join_id_pairs,
232 qualified_id_join_index_->Get(doc_id_to_expand));
233 for (const DocumentJoinIdPair& child_join_id_pair : child_join_id_pairs) {
234 if (child_documents_to_delete.find(child_join_id_pair.document_id()) !=
235 child_documents_to_delete.end() ||
236 deleted_document_ids.find(child_join_id_pair.document_id()) !=
237 deleted_document_ids.end()) {
238 // Already added into the set to delete or already deleted (happens only
239 // when there is a cycle back to the deleted or traversed document in
240 // the join relation). Skip it.
241 continue;
242 }
243
244 // Get DocumentFilterData of the child document to look up its schema type
245 // id.
246 // - Skip if the child document has been deleted, since delete propagation
247 // should've been done to all its children when deleting it previously.
248 // - Otherwise, we have to handle this child document and propagate delete
249 // to the grandchildren, even if it is expired.
250 std::optional<DocumentFilterData> child_filter_data =
251 doc_store_->GetNonDeletedDocumentFilterData(
252 child_join_id_pair.document_id());
253 if (!child_filter_data) {
254 // The child document has been deleted. Skip.
255 continue;
256 }
257
258 libtextclassifier3::StatusOr<const JoinablePropertyMetadata*>
259 metadata_or = schema_store_->GetJoinablePropertyMetadata(
260 child_filter_data->schema_type_id(),
261 child_join_id_pair.joinable_property_id());
262 if (!metadata_or.ok() || metadata_or.ValueOrDie() == nullptr) {
263 // This shouldn't happen because we've validated it during indexing and
264 // only put valid DocumentJoinIdPair into qualified id join index.
265 // Log and skip it.
266 ICING_LOG(ERROR) << "Failed to get metadata for schema type id "
267 << child_filter_data->schema_type_id()
268 << ", joinable property id "
269 << static_cast<int>(
270 child_join_id_pair.joinable_property_id());
271 continue;
272 }
273 const JoinablePropertyMetadata* metadata = metadata_or.ValueOrDie();
274
275 if (metadata->value_type == JoinableConfig::ValueType::QUALIFIED_ID &&
276 metadata->delete_propagation_type ==
277 JoinableConfig::DeletePropagationType::PROPAGATE_FROM) {
278 child_documents_to_delete.insert(child_join_id_pair.document_id());
279 que.push(child_join_id_pair.document_id());
280 }
281 }
282 }
283
284 return child_documents_to_delete;
285 }
286
287 } // namespace lib
288 } // namespace icing
289