1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/perfetto_sql/intrinsics/operators/span_join_operator.h"
18
19 #include <sqlite3.h>
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstring>
23 #include <limits>
24 #include <memory>
25 #include <optional>
26 #include <string>
27 #include <tuple>
28 #include <unordered_set>
29 #include <utility>
30 #include <vector>
31
32 #include "perfetto/base/compiler.h"
33 #include "perfetto/base/logging.h"
34 #include "perfetto/base/status.h"
35 #include "perfetto/ext/base/string_splitter.h"
36 #include "perfetto/ext/base/string_utils.h"
37 #include "perfetto/trace_processor/basic_types.h"
38 #include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
39 #include "src/trace_processor/sqlite/bindings/sqlite_result.h"
40 #include "src/trace_processor/sqlite/module_lifecycle_manager.h"
41 #include "src/trace_processor/sqlite/sql_source.h"
42 #include "src/trace_processor/sqlite/sqlite_utils.h"
43 #include "src/trace_processor/tp_metatrace.h"
44 #include "src/trace_processor/util/status_macros.h"
45
46 #include "protos/perfetto/trace_processor/metatrace_categories.pbzero.h"
47
48 namespace perfetto::trace_processor {
49
50 namespace {
51
52 constexpr char kTsColumnName[] = "ts";
53 constexpr char kDurColumnName[] = "dur";
54
IsRequiredColumn(const std::string & name)55 bool IsRequiredColumn(const std::string& name) {
56 return name == kTsColumnName;
57 }
58
IsSpecialColumn(const std::string & name,const std::optional<std::string> & partition_col)59 bool IsSpecialColumn(const std::string& name,
60 const std::optional<std::string>& partition_col) {
61 return name == kTsColumnName || name == kDurColumnName ||
62 name == partition_col;
63 }
64
HasDuplicateColumns(const std::vector<std::pair<SqlValue::Type,std::string>> & t1,const std::vector<std::pair<SqlValue::Type,std::string>> & t2,const std::optional<std::string> & partition_col)65 std::optional<std::string> HasDuplicateColumns(
66 const std::vector<std::pair<SqlValue::Type, std::string>>& t1,
67 const std::vector<std::pair<SqlValue::Type, std::string>>& t2,
68 const std::optional<std::string>& partition_col) {
69 std::unordered_set<std::string> seen_names;
70 for (const auto& col : t1) {
71 if (IsSpecialColumn(col.second, partition_col)) {
72 continue;
73 }
74 if (seen_names.count(col.second) > 0) {
75 return col.second;
76 }
77 seen_names.insert(col.second);
78 }
79 for (const auto& col : t2) {
80 if (IsSpecialColumn(col.second, partition_col)) {
81 continue;
82 }
83 if (seen_names.count(col.second) > 0) {
84 return col.second;
85 }
86 seen_names.insert(col.second);
87 }
88 return std::nullopt;
89 }
90
OpToString(int op)91 std::string OpToString(int op) {
92 switch (op) {
93 case SQLITE_INDEX_CONSTRAINT_EQ:
94 return "=";
95 case SQLITE_INDEX_CONSTRAINT_NE:
96 return "!=";
97 case SQLITE_INDEX_CONSTRAINT_GE:
98 return ">=";
99 case SQLITE_INDEX_CONSTRAINT_GT:
100 return ">";
101 case SQLITE_INDEX_CONSTRAINT_LE:
102 return "<=";
103 case SQLITE_INDEX_CONSTRAINT_LT:
104 return "<";
105 case SQLITE_INDEX_CONSTRAINT_LIKE:
106 return " like ";
107 case SQLITE_INDEX_CONSTRAINT_GLOB:
108 return " glob ";
109 case SQLITE_INDEX_CONSTRAINT_ISNULL:
110 // The "null" will be added below in EscapedSqliteValueAsString.
111 return " is ";
112 case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
113 // The "null" will be added below in EscapedSqliteValueAsString.
114 return " is not ";
115 default:
116 PERFETTO_FATAL("Operator to string conversion not impemented for %d", op);
117 }
118 }
119
EscapedSqliteValueAsString(sqlite3_value * value)120 std::string EscapedSqliteValueAsString(sqlite3_value* value) {
121 switch (sqlite3_value_type(value)) {
122 case SQLITE_INTEGER:
123 return std::to_string(sqlite3_value_int64(value));
124 case SQLITE_FLOAT:
125 return std::to_string(sqlite3_value_double(value));
126 case SQLITE_TEXT: {
127 // If str itself contains a single quote, we need to escape it with
128 // another single quote.
129 const char* str =
130 reinterpret_cast<const char*>(sqlite3_value_text(value));
131 return "'" + base::ReplaceAll(str, "'", "''") + "'";
132 }
133 case SQLITE_NULL:
134 return " null";
135 default:
136 PERFETTO_FATAL("Unknown value type %d", sqlite3_value_type(value));
137 }
138 }
139
140 } // namespace
141
PopulateColumnLocatorMap(uint32_t offset)142 void SpanJoinOperatorModule::State::PopulateColumnLocatorMap(uint32_t offset) {
143 for (uint32_t i = 0; i < t1_defn.columns().size(); ++i) {
144 if (i == t1_defn.ts_idx() || i == t1_defn.dur_idx() ||
145 i == t1_defn.partition_idx()) {
146 continue;
147 }
148 ColumnLocator* locator = &global_index_to_column_locator[offset++];
149 locator->defn = &t1_defn;
150 locator->col_index = i;
151 }
152 for (uint32_t i = 0; i < t2_defn.columns().size(); ++i) {
153 if (i == t2_defn.ts_idx() || i == t2_defn.dur_idx() ||
154 i == t2_defn.partition_idx()) {
155 continue;
156 }
157 ColumnLocator* locator = &global_index_to_column_locator[offset++];
158 locator->defn = &t2_defn;
159 locator->col_index = i;
160 }
161 }
162
BestIndexStrForDefinition(const sqlite3_index_info * info,const TableDefinition & defn)163 std::string SpanJoinOperatorModule::State::BestIndexStrForDefinition(
164 const sqlite3_index_info* info,
165 const TableDefinition& defn) {
166 uint32_t count = 0;
167 std::string constraints;
168 for (int i = 0; i < info->nConstraint; i++) {
169 const auto& c = info->aConstraint[i];
170 if (!c.usable) {
171 continue;
172 }
173
174 auto col_name = GetNameForGlobalColumnIndex(defn, c.iColumn);
175 if (col_name.empty()) {
176 continue;
177 }
178
179 // Le constraints can be passed straight to the child tables as they won't
180 // affect the span join computation. Similarily, source_geq constraints
181 // explicitly request that they are passed as geq constraints to the source
182 // tables.
183 if (col_name == kTsColumnName && !sqlite::utils::IsOpLe(c.op) &&
184 c.op != kSourceGeqOpCode) {
185 continue;
186 }
187
188 // Allow SQLite handle any constraints on duration apart from source_geq
189 // constraints.
190 if (col_name == kDurColumnName && c.op != kSourceGeqOpCode) {
191 continue;
192 }
193
194 // If we're emitting shadow slices, don't propogate any constraints
195 // on this table as this will break the shadow slice computation.
196 if (defn.ShouldEmitPresentPartitionShadow()) {
197 continue;
198 }
199
200 PERFETTO_DCHECK(info->aConstraintUsage[i].argvIndex > 0);
201 std::string argvIndex =
202 std::to_string(info->aConstraintUsage[i].argvIndex - 1);
203 std::string op = OpToString(
204 c.op == kSourceGeqOpCode ? SQLITE_INDEX_CONSTRAINT_GE : c.op);
205 constraints += "," + argvIndex + "," + "`" + col_name + "`" + op;
206 count++;
207 }
208 return std::to_string(count) + constraints;
209 }
210
Create(PerfettoSqlEngine * engine,const TableDescriptor & desc,EmitShadowType emit_shadow_type,TableDefinition * defn)211 base::Status SpanJoinOperatorModule::TableDefinition::Create(
212 PerfettoSqlEngine* engine,
213 const TableDescriptor& desc,
214 EmitShadowType emit_shadow_type,
215 TableDefinition* defn) {
216 if (desc.partition_col == kTsColumnName ||
217 desc.partition_col == kDurColumnName) {
218 return base::ErrStatus(
219 "SPAN_JOIN: partition column cannot be any of {ts, dur} for table %s",
220 desc.name.c_str());
221 }
222
223 std::vector<std::pair<SqlValue::Type, std::string>> cols;
224 RETURN_IF_ERROR(sqlite::utils::GetColumnsForTable(
225 engine->sqlite_engine()->db(), desc.name, cols));
226
227 uint32_t required_columns_found = 0;
228 uint32_t ts_idx = std::numeric_limits<uint32_t>::max();
229 std::optional<uint32_t> dur_idx;
230 uint32_t partition_idx = std::numeric_limits<uint32_t>::max();
231 for (uint32_t i = 0; i < cols.size(); i++) {
232 auto col = cols[i];
233 if (IsRequiredColumn(col.second)) {
234 ++required_columns_found;
235 }
236 if (base::Contains(col.second, ",")) {
237 return base::ErrStatus("SPAN_JOIN: column '%s' cannot contain any ','",
238 col.second.c_str());
239 }
240 if (base::Contains(col.second, ':')) {
241 return base::ErrStatus("SPAN_JOIN: column '%s' cannot contain any ':'",
242 col.second.c_str());
243 }
244
245 if (col.second == kTsColumnName) {
246 ts_idx = i;
247 } else if (col.second == kDurColumnName) {
248 dur_idx = i;
249 } else if (col.second == desc.partition_col) {
250 partition_idx = i;
251 }
252 }
253 if (required_columns_found != 1) {
254 return base::ErrStatus("SPAN_JOIN: Missing ts column in table %s",
255 desc.name.c_str());
256 }
257 if (desc.IsPartitioned() && partition_idx >= cols.size()) {
258 return base::ErrStatus(
259 "SPAN_JOIN: Missing partition column '%s' in table '%s'",
260 desc.partition_col.c_str(), desc.name.c_str());
261 }
262
263 PERFETTO_DCHECK(ts_idx < cols.size());
264
265 *defn = TableDefinition(desc.name, desc.partition_col, std::move(cols),
266 emit_shadow_type, ts_idx, dur_idx, partition_idx);
267 return base::OkStatus();
268 }
269
270 std::string
CreateVtabCreateTableSection() const271 SpanJoinOperatorModule::TableDefinition::CreateVtabCreateTableSection() const {
272 std::string cols;
273 for (const auto& col : columns()) {
274 if (IsSpecialColumn(col.second, partition_col())) {
275 continue;
276 }
277 if (col.first == SqlValue::Type::kNull) {
278 cols += col.second + ",";
279 } else {
280 cols += col.second + " " +
281 sqlite::utils::SqlValueTypeToSqliteTypeName(col.first) + ",";
282 }
283 }
284 return cols;
285 }
286
GetNameForGlobalColumnIndex(const TableDefinition & defn,int global_column)287 std::string SpanJoinOperatorModule::State::GetNameForGlobalColumnIndex(
288 const TableDefinition& defn,
289 int global_column) {
290 auto col_idx = static_cast<size_t>(global_column);
291 if (col_idx == Column::kTimestamp) {
292 return kTsColumnName;
293 }
294 if (col_idx == Column::kDuration) {
295 return kDurColumnName;
296 }
297 if (col_idx == Column::kPartition &&
298 partitioning != PartitioningType::kNoPartitioning) {
299 return defn.partition_col();
300 }
301
302 const auto& locator = global_index_to_column_locator[col_idx];
303 if (locator.defn != &defn) {
304 return "";
305 }
306 return defn.columns()[locator.col_index].second;
307 }
308
Query(SpanJoinOperatorModule::State * state,const TableDefinition * definition)309 SpanJoinOperatorModule::Query::Query(SpanJoinOperatorModule::State* state,
310 const TableDefinition* definition)
311 : defn_(definition), in_state_(state) {
312 PERFETTO_DCHECK(!defn_->IsPartitioned() ||
313 defn_->partition_idx() < defn_->columns().size());
314 }
315
316 SpanJoinOperatorModule::Query::~Query() = default;
317
Initialize(std::string sql_query,InitialEofBehavior eof_behavior)318 base::Status SpanJoinOperatorModule::Query::Initialize(
319 std::string sql_query,
320 InitialEofBehavior eof_behavior) {
321 *this = Query(in_state_, definition());
322 sql_query_ = std::move(sql_query);
323 base::Status status = Rewind();
324 if (!status.ok())
325 return status;
326 if (eof_behavior == InitialEofBehavior::kTreatAsMissingPartitionShadow &&
327 IsEof()) {
328 state_ = State::kMissingPartitionShadow;
329 }
330 return status;
331 }
332
Next()333 base::Status SpanJoinOperatorModule::Query::Next() {
334 RETURN_IF_ERROR(NextSliceState());
335 return FindNextValidSlice();
336 }
337
IsValidSlice()338 bool SpanJoinOperatorModule::Query::IsValidSlice() {
339 // Disallow any single partition shadow slices if the definition doesn't allow
340 // them.
341 if (IsPresentPartitionShadow() && !defn_->ShouldEmitPresentPartitionShadow())
342 return false;
343
344 // Disallow any missing partition shadow slices if the definition doesn't
345 // allow them.
346 if (IsMissingPartitionShadow() && !defn_->ShouldEmitMissingPartitionShadow())
347 return false;
348
349 // Disallow any "empty" shadows; these are shadows which either have the same
350 // start and end time or missing-partition shadows which have the same start
351 // and end partition.
352 if (IsEmptyShadow())
353 return false;
354
355 return true;
356 }
357
FindNextValidSlice()358 base::Status SpanJoinOperatorModule::Query::FindNextValidSlice() {
359 // The basic idea of this function is that |NextSliceState()| always emits
360 // all possible slices (including shadows for any gaps inbetween the real
361 // slices) and we filter out the invalid slices (as defined by the table
362 // definition) using |IsValidSlice()|.
363 //
364 // This has proved to be a lot cleaner to implement than trying to choose
365 // when to emit and not emit shadows directly.
366 while (!IsEof() && !IsValidSlice()) {
367 RETURN_IF_ERROR(NextSliceState());
368 }
369 return base::OkStatus();
370 }
371
NextSliceState()372 base::Status SpanJoinOperatorModule::Query::NextSliceState() {
373 switch (state_) {
374 case State::kReal: {
375 // Forward the cursor to figure out where the next slice should be.
376 RETURN_IF_ERROR(CursorNext());
377
378 // Depending on the next slice, we can do two things here:
379 // 1. If the next slice is on the same partition, we can just emit a
380 // single shadow until the start of the next slice.
381 // 2. If the next slice is on another partition or we hit eof, just emit
382 // a shadow to the end of the whole partition.
383 bool shadow_to_end = cursor_eof_ || (defn_->IsPartitioned() &&
384 partition_ != CursorPartition());
385 state_ = State::kPresentPartitionShadow;
386 ts_ = AdjustedTsEnd();
387 ts_end_ =
388 shadow_to_end ? std::numeric_limits<int64_t>::max() : CursorTs();
389 return base::OkStatus();
390 }
391 case State::kPresentPartitionShadow: {
392 if (ts_end_ == std::numeric_limits<int64_t>::max()) {
393 // If the shadow is to the end of the slice, create a missing partition
394 // shadow to the start of the partition of the next slice or to the max
395 // partition if we hit eof.
396 state_ = State::kMissingPartitionShadow;
397 ts_ = 0;
398 ts_end_ = std::numeric_limits<int64_t>::max();
399
400 missing_partition_start_ = partition_ + 1;
401 missing_partition_end_ = cursor_eof_
402 ? std::numeric_limits<int64_t>::max()
403 : CursorPartition();
404 } else {
405 // If the shadow is not to the end, we must have another slice on the
406 // current partition.
407 state_ = State::kReal;
408 ts_ = CursorTs();
409 ts_end_ = ts_ + CursorDur();
410
411 PERFETTO_DCHECK(!defn_->IsPartitioned() ||
412 partition_ == CursorPartition());
413 }
414 return base::OkStatus();
415 }
416 case State::kMissingPartitionShadow: {
417 if (missing_partition_end_ == std::numeric_limits<int64_t>::max()) {
418 PERFETTO_DCHECK(cursor_eof_);
419
420 // If we have a missing partition to the max partition, we must have hit
421 // eof.
422 state_ = State::kEof;
423 } else {
424 PERFETTO_DCHECK(!defn_->IsPartitioned() ||
425 CursorPartition() == missing_partition_end_);
426
427 // Otherwise, setup a single partition slice on the end partition to the
428 // start of the next slice.
429 state_ = State::kPresentPartitionShadow;
430 ts_ = 0;
431 ts_end_ = CursorTs();
432 partition_ = missing_partition_end_;
433 }
434 return base::OkStatus();
435 }
436 case State::kEof: {
437 PERFETTO_DFATAL("Called Next when EOF");
438 return base::ErrStatus("Called Next when EOF");
439 }
440 }
441 PERFETTO_FATAL("For GCC");
442 }
443
Rewind()444 base::Status SpanJoinOperatorModule::Query::Rewind() {
445 auto res = in_state_->engine->sqlite_engine()->PrepareStatement(
446 SqlSource::FromTraceProcessorImplementation(sql_query_));
447 cursor_eof_ = false;
448 RETURN_IF_ERROR(res.status());
449 stmt_ = std::move(res);
450
451 RETURN_IF_ERROR(CursorNext());
452
453 // Setup the first slice as a missing partition shadow from the lowest
454 // partition until the first slice partition. We will handle finding the real
455 // slice in |FindNextValidSlice()|.
456 state_ = State::kMissingPartitionShadow;
457 ts_ = 0;
458 ts_end_ = std::numeric_limits<int64_t>::max();
459 missing_partition_start_ = std::numeric_limits<int64_t>::min();
460
461 if (cursor_eof_) {
462 missing_partition_end_ = std::numeric_limits<int64_t>::max();
463 } else if (defn_->IsPartitioned()) {
464 missing_partition_end_ = CursorPartition();
465 } else {
466 missing_partition_end_ = std::numeric_limits<int64_t>::min();
467 }
468
469 // Actually compute the first valid slice.
470 return FindNextValidSlice();
471 }
472
CursorNext()473 base::Status SpanJoinOperatorModule::Query::CursorNext() {
474 if (defn_->IsPartitioned()) {
475 auto partition_idx = static_cast<int>(defn_->partition_idx());
476 // Fastforward through any rows with null partition keys.
477 int row_type;
478 do {
479 cursor_eof_ = !stmt_->Step();
480 RETURN_IF_ERROR(stmt_->status());
481 row_type = sqlite3_column_type(stmt_->sqlite_stmt(), partition_idx);
482 } while (!cursor_eof_ && row_type == SQLITE_NULL);
483
484 if (!cursor_eof_ && row_type != SQLITE_INTEGER) {
485 return base::ErrStatus("SPAN_JOIN: partition is not an INT column");
486 }
487 } else {
488 cursor_eof_ = !stmt_->Step();
489 }
490 return base::OkStatus();
491 }
492
ReportSqliteResult(sqlite3_context * context,size_t index)493 void SpanJoinOperatorModule::Query::ReportSqliteResult(sqlite3_context* context,
494 size_t index) {
495 if (state_ != State::kReal) {
496 return sqlite::result::Null(context);
497 }
498
499 sqlite3_stmt* stmt = stmt_->sqlite_stmt();
500 int idx = static_cast<int>(index);
501 switch (sqlite3_column_type(stmt, idx)) {
502 case SQLITE_INTEGER:
503 return sqlite::result::Long(context, sqlite3_column_int64(stmt, idx));
504 case SQLITE_FLOAT:
505 return sqlite::result::Double(context, sqlite3_column_double(stmt, idx));
506 case SQLITE_TEXT: {
507 // TODO(lalitm): note for future optimizations: if we knew the addresses
508 // of the string intern pool, we could check if the string returned here
509 // comes from the pool, and pass it as non-transient.
510 const auto* ptr =
511 reinterpret_cast<const char*>(sqlite3_column_text(stmt, idx));
512 return sqlite::result::TransientString(context, ptr);
513 }
514 case SQLITE_BLOB: {
515 return sqlite::result::TransientBytes(context,
516 sqlite3_column_blob(stmt, idx),
517 sqlite3_column_bytes(stmt, idx));
518 }
519 }
520 }
521
TableDefinition(std::string name,std::string partition_col,std::vector<std::pair<SqlValue::Type,std::string>> cols,EmitShadowType emit_shadow_type,uint32_t ts_idx,std::optional<uint32_t> dur_idx,uint32_t partition_idx)522 SpanJoinOperatorModule::TableDefinition::TableDefinition(
523 std::string name,
524 std::string partition_col,
525 std::vector<std::pair<SqlValue::Type, std::string>> cols,
526 EmitShadowType emit_shadow_type,
527 uint32_t ts_idx,
528 std::optional<uint32_t> dur_idx,
529 uint32_t partition_idx)
530 : emit_shadow_type_(emit_shadow_type),
531 name_(std::move(name)),
532 partition_col_(std::move(partition_col)),
533 cols_(std::move(cols)),
534 ts_idx_(ts_idx),
535 dur_idx_(dur_idx),
536 partition_idx_(partition_idx) {}
537
Parse(const std::string & raw_descriptor,TableDescriptor * descriptor)538 base::Status SpanJoinOperatorModule::TableDescriptor::Parse(
539 const std::string& raw_descriptor,
540 TableDescriptor* descriptor) {
541 // Descriptors have one of the following forms:
542 // table_name [PARTITIONED column_name]
543
544 // Find the table name.
545 base::StringSplitter splitter(raw_descriptor, ' ');
546 if (!splitter.Next())
547 return base::ErrStatus("SPAN_JOIN: Missing table name");
548
549 descriptor->name = splitter.cur_token();
550 if (!splitter.Next())
551 return base::OkStatus();
552
553 if (!base::CaseInsensitiveEqual(splitter.cur_token(), "PARTITIONED"))
554 return base::ErrStatus("SPAN_JOIN: Invalid token");
555
556 if (!splitter.Next())
557 return base::ErrStatus("SPAN_JOIN: Missing partitioning column");
558
559 descriptor->partition_col = splitter.cur_token();
560 return base::OkStatus();
561 }
562
CreateSqlQuery(base::StringSplitter & idx,sqlite3_value ** argv) const563 std::string SpanJoinOperatorModule::TableDefinition::CreateSqlQuery(
564 base::StringSplitter& idx,
565 sqlite3_value** argv) const {
566 std::vector<std::string> col_names;
567 for (const auto& c : columns()) {
568 col_names.push_back("`" + c.second + "`");
569 }
570
571 PERFETTO_CHECK(idx.Next());
572 std::optional<uint32_t> cs_count = base::StringToUInt32(idx.cur_token());
573 PERFETTO_CHECK(cs_count);
574 std::vector<std::string> cs;
575 cs.reserve(*cs_count);
576 for (uint32_t i = 0; i < *cs_count; ++i) {
577 PERFETTO_CHECK(idx.Next());
578 std::optional<uint32_t> argv_idx = base::StringToUInt32(idx.cur_token());
579 PERFETTO_CHECK(argv_idx);
580
581 PERFETTO_CHECK(idx.Next());
582 cs.emplace_back(idx.cur_token() +
583 EscapedSqliteValueAsString(argv[*argv_idx]));
584 }
585
586 std::string sql = "SELECT " + base::Join(col_names, ", ");
587 sql += " FROM " + name();
588 if (!cs.empty()) {
589 sql += " WHERE " + base::Join(cs, " AND ");
590 }
591 sql += " ORDER BY ";
592 sql += IsPartitioned() ? base::Join({"`" + partition_col() + "`", "ts"}, ", ")
593 : "ts";
594 sql += ";";
595 return sql;
596 }
597
Create(sqlite3 * db,void * ctx,int argc,const char * const * argv,sqlite3_vtab ** vtab,char ** pzErr)598 int SpanJoinOperatorModule::Create(sqlite3* db,
599 void* ctx,
600 int argc,
601 const char* const* argv,
602 sqlite3_vtab** vtab,
603 char** pzErr) {
604 // argv[0] - argv[2] are SQLite populated fields which are always present.
605 if (argc != 5) {
606 *pzErr = sqlite3_mprintf("SPAN_JOIN: expected exactly two arguments");
607 return SQLITE_ERROR;
608 }
609
610 auto* context = GetContext(ctx);
611 auto state = std::make_unique<State>();
612 state->engine = context->engine;
613 state->module_name = argv[0];
614
615 TableDescriptor t1_desc;
616 auto status = TableDescriptor::Parse(
617 std::string(reinterpret_cast<const char*>(argv[3])), &t1_desc);
618 if (!status.ok()) {
619 *pzErr = sqlite3_mprintf("%s", status.c_message());
620 return SQLITE_ERROR;
621 }
622
623 TableDescriptor t2_desc;
624 status = TableDescriptor::Parse(
625 std::string(reinterpret_cast<const char*>(argv[4])), &t2_desc);
626 if (!status.ok()) {
627 *pzErr = sqlite3_mprintf("%s", status.c_message());
628 return SQLITE_ERROR;
629 }
630
631 // Check that the partition columns match between the two tables.
632 if (t1_desc.partition_col == t2_desc.partition_col) {
633 state->partitioning = t1_desc.IsPartitioned()
634 ? PartitioningType::kSamePartitioning
635 : PartitioningType::kNoPartitioning;
636 } else if (t1_desc.IsPartitioned() && t2_desc.IsPartitioned()) {
637 *pzErr = sqlite3_mprintf(
638 "SPAN_JOIN: mismatching partitions between the two tables; "
639 "(partition %s in table %s, partition %s in table %s)",
640 t1_desc.partition_col.c_str(), t1_desc.name.c_str(),
641 t2_desc.partition_col.c_str(), t2_desc.name.c_str());
642 return SQLITE_ERROR;
643 } else {
644 state->partitioning = PartitioningType::kMixedPartitioning;
645 }
646
647 bool t1_part_mixed =
648 t1_desc.IsPartitioned() &&
649 state->partitioning == PartitioningType::kMixedPartitioning;
650 bool t2_part_mixed =
651 t2_desc.IsPartitioned() &&
652 state->partitioning == PartitioningType::kMixedPartitioning;
653
654 EmitShadowType t1_shadow_type;
655 if (state->IsOuterJoin()) {
656 if (t1_part_mixed ||
657 state->partitioning == PartitioningType::kNoPartitioning) {
658 t1_shadow_type = EmitShadowType::kPresentPartitionOnly;
659 } else {
660 t1_shadow_type = EmitShadowType::kAll;
661 }
662 } else {
663 t1_shadow_type = EmitShadowType::kNone;
664 }
665 status = TableDefinition::Create(state->engine, t1_desc, t1_shadow_type,
666 &state->t1_defn);
667 if (!status.ok()) {
668 *pzErr = sqlite3_mprintf("%s", status.c_message());
669 return SQLITE_ERROR;
670 }
671
672 EmitShadowType t2_shadow_type;
673 if (state->IsOuterJoin() || state->IsLeftJoin()) {
674 if (t2_part_mixed ||
675 state->partitioning == PartitioningType::kNoPartitioning) {
676 t2_shadow_type = EmitShadowType::kPresentPartitionOnly;
677 } else {
678 t2_shadow_type = EmitShadowType::kAll;
679 }
680 } else {
681 t2_shadow_type = EmitShadowType::kNone;
682 }
683 status = TableDefinition::Create(state->engine, t2_desc, t2_shadow_type,
684 &state->t2_defn);
685 if (!status.ok()) {
686 *pzErr = sqlite3_mprintf("%s", status.c_message());
687 return SQLITE_ERROR;
688 }
689
690 if (!state->t1_defn.dur_idx().has_value() &&
691 !state->t2_defn.dur_idx().has_value()) {
692 *pzErr = sqlite3_mprintf(
693 "SPAN_JOIN: column %s must be present in at least one of tables %s and "
694 "%s",
695 kDurColumnName, state->t1_defn.name().c_str(),
696 state->t2_defn.name().c_str());
697 return SQLITE_ERROR;
698 }
699
700 if (auto dupe = HasDuplicateColumns(
701 state->t1_defn.columns(), state->t2_defn.columns(),
702 state->partitioning == PartitioningType::kNoPartitioning
703 ? std::nullopt
704 : std::make_optional(state->partition_col()))) {
705 *pzErr = sqlite3_mprintf(
706 "SPAN_JOIN: column %s present in both tables %s and %s", dupe->c_str(),
707 state->t1_defn.name().c_str(), state->t2_defn.name().c_str());
708 return SQLITE_ERROR;
709 }
710
711 // Create the map from column index to the column in the child sub-queries.
712 state->PopulateColumnLocatorMap(
713 state->partitioning == PartitioningType::kNoPartitioning ? 2 : 3);
714
715 std::string primary_key = "ts";
716 std::string partition;
717 if (state->partitioning != PartitioningType::kNoPartitioning) {
718 partition = state->partition_col() + " BIGINT,";
719 primary_key += ", " + state->partition_col();
720 }
721 std::string t1_section = state->t1_defn.CreateVtabCreateTableSection();
722 std::string t2_section = state->t2_defn.CreateVtabCreateTableSection();
723 static constexpr char kStmt[] = R"(
724 CREATE TABLE x(
725 ts BIGINT,
726 dur BIGINT,
727 %s
728 %s
729 %s
730 PRIMARY KEY(%s)
731 )
732 )";
733 base::StackString<1024> create_table_str(
734 kStmt, partition.c_str(), t1_section.c_str(), t2_section.c_str(),
735 primary_key.c_str());
736 state->create_table_stmt = create_table_str.ToStdString();
737 if (int ret = sqlite3_declare_vtab(db, create_table_str.c_str());
738 ret != SQLITE_OK) {
739 return ret;
740 }
741
742 std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
743 res->state = context->manager.OnCreate(argv, std::move(state));
744 *vtab = res.release();
745 return SQLITE_OK;
746 }
747
Destroy(sqlite3_vtab * vtab)748 int SpanJoinOperatorModule::Destroy(sqlite3_vtab* vtab) {
749 std::unique_ptr<Vtab> tab(GetVtab(vtab));
750 sqlite::ModuleStateManager<SpanJoinOperatorModule>::OnDestroy(tab->state);
751 return SQLITE_OK;
752 }
753
Connect(sqlite3 * db,void * ctx,int,const char * const * argv,sqlite3_vtab ** vtab,char **)754 int SpanJoinOperatorModule::Connect(sqlite3* db,
755 void* ctx,
756 int,
757 const char* const* argv,
758 sqlite3_vtab** vtab,
759 char**) {
760 auto* context = GetContext(ctx);
761 std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
762 res->state = context->manager.OnConnect(argv);
763
764 auto* state =
765 sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(res->state);
766 if (int ret = sqlite3_declare_vtab(db, state->create_table_stmt.c_str());
767 ret != SQLITE_OK) {
768 return ret;
769 }
770 *vtab = res.release();
771 return SQLITE_OK;
772 }
773
Disconnect(sqlite3_vtab * vtab)774 int SpanJoinOperatorModule::Disconnect(sqlite3_vtab* vtab) {
775 std::unique_ptr<Vtab> tab(GetVtab(vtab));
776 sqlite::ModuleStateManager<SpanJoinOperatorModule>::OnDisconnect(tab->state);
777 return SQLITE_OK;
778 }
779
BestIndex(sqlite3_vtab * tab,sqlite3_index_info * info)780 int SpanJoinOperatorModule::BestIndex(sqlite3_vtab* tab,
781 sqlite3_index_info* info) {
782 int argvIndex = 1;
783 for (int i = 0; i < info->nConstraint; ++i) {
784 if (!info->aConstraint[i].usable) {
785 continue;
786 }
787 info->aConstraintUsage[i].argvIndex = argvIndex++;
788 }
789
790 Vtab* table = GetVtab(tab);
791 State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
792 table->state);
793 if (state->partitioning == PartitioningType::kNoPartitioning) {
794 // If both tables are not partitioned and we have a single order by on ts,
795 // we return data in the correct order.
796 info->orderByConsumed = info->nOrderBy == 1 &&
797 info->aOrderBy[0].iColumn == Column::kTimestamp &&
798 !info->aOrderBy[0].desc;
799 } else {
800 // If one of the tables is partitioned, and we have an order by on the
801 // partition column followed (optionally) by an order by on timestamp, we
802 // return data in the correct order.
803 bool is_first_ob_partition =
804 info->nOrderBy > 0 && info->aOrderBy[0].iColumn == Column::kPartition &&
805 !info->aOrderBy[0].desc;
806 bool is_second_ob_ts = info->nOrderBy >= 2 &&
807 info->aOrderBy[1].iColumn == Column::kTimestamp &&
808 !info->aOrderBy[1].desc;
809 info->orderByConsumed =
810 (info->nOrderBy == 1 && is_first_ob_partition) ||
811 (info->nOrderBy == 2 && is_first_ob_partition && is_second_ob_ts);
812 }
813
814 for (int i = 0; i < info->nConstraint; ++i) {
815 if (info->aConstraint[i].op == kSourceGeqOpCode) {
816 info->aConstraintUsage[i].omit = true;
817 }
818 }
819
820 std::string t1 = state->BestIndexStrForDefinition(info, state->t1_defn);
821 std::string t2 = state->BestIndexStrForDefinition(info, state->t2_defn);
822 info->idxStr = sqlite3_mprintf("%s,%s", t1.c_str(), t2.c_str());
823 info->needToFreeIdxStr = true;
824
825 return SQLITE_OK;
826 }
827
Open(sqlite3_vtab * tab,sqlite3_vtab_cursor ** cursor)828 int SpanJoinOperatorModule::Open(sqlite3_vtab* tab,
829 sqlite3_vtab_cursor** cursor) {
830 State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
831 GetVtab(tab)->state);
832 std::unique_ptr<Cursor> c = std::make_unique<Cursor>(state);
833 *cursor = c.release();
834 return SQLITE_OK;
835 }
836
Close(sqlite3_vtab_cursor * cursor)837 int SpanJoinOperatorModule::Close(sqlite3_vtab_cursor* cursor) {
838 std::unique_ptr<Cursor> c(GetCursor(cursor));
839 return SQLITE_OK;
840 }
841
Filter(sqlite3_vtab_cursor * cursor,int,const char * idxStr,int,sqlite3_value ** argv)842 int SpanJoinOperatorModule::Filter(sqlite3_vtab_cursor* cursor,
843 int,
844 const char* idxStr,
845 int,
846 sqlite3_value** argv) {
847 PERFETTO_TP_TRACE(metatrace::Category::QUERY_DETAILED, "SPAN_JOIN_XFILTER");
848
849 Cursor* c = GetCursor(cursor);
850 Vtab* table = GetVtab(cursor->pVtab);
851 State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
852 table->state);
853
854 base::StringSplitter splitter(std::string(idxStr), ',');
855 bool t1_partitioned_mixed =
856 c->t1.definition()->IsPartitioned() &&
857 state->partitioning == PartitioningType::kMixedPartitioning;
858 auto t1_eof = state->IsOuterJoin() && !t1_partitioned_mixed
859 ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
860 : Query::InitialEofBehavior::kTreatAsEof;
861 base::Status status =
862 c->t1.Initialize(state->t1_defn.CreateSqlQuery(splitter, argv), t1_eof);
863 if (!status.ok()) {
864 return sqlite::utils::SetError(table, status.c_message());
865 }
866
867 bool t2_partitioned_mixed =
868 c->t2.definition()->IsPartitioned() &&
869 state->partitioning == PartitioningType::kMixedPartitioning;
870 auto t2_eof =
871 (state->IsLeftJoin() || state->IsOuterJoin()) && !t2_partitioned_mixed
872 ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
873 : Query::InitialEofBehavior::kTreatAsEof;
874 status =
875 c->t2.Initialize(state->t2_defn.CreateSqlQuery(splitter, argv), t2_eof);
876 if (!status.ok()) {
877 return sqlite::utils::SetError(table, status.c_message());
878 }
879
880 status = c->FindOverlappingSpan();
881 if (!status.ok()) {
882 return sqlite::utils::SetError(table, status.c_message());
883 }
884 return SQLITE_OK;
885 }
886
Next(sqlite3_vtab_cursor * cursor)887 int SpanJoinOperatorModule::Next(sqlite3_vtab_cursor* cursor) {
888 Cursor* c = GetCursor(cursor);
889 Vtab* table = GetVtab(cursor->pVtab);
890 base::Status status = c->next_query->Next();
891 if (!status.ok()) {
892 return sqlite::utils::SetError(table, status.c_message());
893 }
894 status = c->FindOverlappingSpan();
895 if (!status.ok()) {
896 return sqlite::utils::SetError(table, status.c_message());
897 }
898 return SQLITE_OK;
899 }
900
Eof(sqlite3_vtab_cursor * cur)901 int SpanJoinOperatorModule::Eof(sqlite3_vtab_cursor* cur) {
902 Cursor* c = GetCursor(cur);
903 return c->t1.IsEof() || c->t2.IsEof();
904 }
905
Column(sqlite3_vtab_cursor * cursor,sqlite3_context * context,int N)906 int SpanJoinOperatorModule::Column(sqlite3_vtab_cursor* cursor,
907 sqlite3_context* context,
908 int N) {
909 Cursor* c = GetCursor(cursor);
910 Vtab* table = GetVtab(cursor->pVtab);
911 State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
912 table->state);
913
914 PERFETTO_DCHECK(c->t1.IsReal() || c->t2.IsReal());
915
916 switch (N) {
917 case Column::kTimestamp: {
918 auto max_ts = std::max(c->t1.ts(), c->t2.ts());
919 sqlite::result::Long(context, static_cast<sqlite3_int64>(max_ts));
920 break;
921 }
922 case Column::kDuration: {
923 auto max_start = std::max(c->t1.ts(), c->t2.ts());
924 auto min_end = std::min(c->t1.raw_ts_end(), c->t2.raw_ts_end());
925 auto dur = min_end - max_start;
926 sqlite::result::Long(context, static_cast<sqlite3_int64>(dur));
927 break;
928 }
929 case Column::kPartition: {
930 if (state->partitioning != PartitioningType::kNoPartitioning) {
931 int64_t partition;
932 if (state->partitioning == PartitioningType::kMixedPartitioning) {
933 partition = c->last_mixed_partition_;
934 } else {
935 partition = c->t1.IsReal() ? c->t1.partition() : c->t2.partition();
936 }
937 sqlite::result::Long(context, static_cast<sqlite3_int64>(partition));
938 break;
939 }
940 PERFETTO_FALLTHROUGH;
941 }
942 default: {
943 const auto* locator =
944 state->global_index_to_column_locator.Find(static_cast<size_t>(N));
945 PERFETTO_CHECK(locator);
946 if (locator->defn == c->t1.definition()) {
947 c->t1.ReportSqliteResult(context, locator->col_index);
948 } else {
949 c->t2.ReportSqliteResult(context, locator->col_index);
950 }
951 }
952 }
953 return SQLITE_OK;
954 }
955
Rowid(sqlite3_vtab_cursor *,sqlite_int64 *)956 int SpanJoinOperatorModule::Rowid(sqlite3_vtab_cursor*, sqlite_int64*) {
957 return SQLITE_ERROR;
958 }
959
FindFunction(sqlite3_vtab *,int,const char * name,FindFunctionFn ** fn,void **)960 int SpanJoinOperatorModule::FindFunction(sqlite3_vtab*,
961 int,
962 const char* name,
963 FindFunctionFn** fn,
964 void**) {
965 if (base::CaseInsensitiveEqual(name, "source_geq")) {
966 *fn = [](sqlite3_context* ctx, int, sqlite3_value**) {
967 return sqlite::result::Error(ctx, "Should not be called.");
968 };
969 return kSourceGeqOpCode;
970 }
971 return 0;
972 }
973
IsOverlappingSpan() const974 bool SpanJoinOperatorModule::Cursor::IsOverlappingSpan() const {
975 // If either of the tables are eof, then we cannot possibly have an
976 // overlapping span.
977 if (t1.IsEof() || t2.IsEof())
978 return false;
979
980 // One of the tables always needs to have a real span to have a valid
981 // overlapping span.
982 if (!t1.IsReal() && !t2.IsReal())
983 return false;
984
985 using PartitioningType = PartitioningType;
986 if (state->partitioning == PartitioningType::kSamePartitioning) {
987 // If both tables are partitioned, then ensure that the partitions overlap.
988 bool partition_in_bounds = (t1.FirstPartition() >= t2.FirstPartition() &&
989 t1.FirstPartition() <= t2.LastPartition()) ||
990 (t2.FirstPartition() >= t1.FirstPartition() &&
991 t2.FirstPartition() <= t1.LastPartition());
992 if (!partition_in_bounds)
993 return false;
994 }
995
996 // We consider all slices to be [start, end) - that is the range of
997 // timestamps has an open interval at the start but a closed interval
998 // at the end. (with the exception of dur == -1 which we treat as if
999 // end == start for the purpose of this function).
1000 return (t1.ts() == t2.ts() && t1.IsReal() && t2.IsReal()) ||
1001 (t1.ts() >= t2.ts() && t1.ts() < t2.AdjustedTsEnd()) ||
1002 (t2.ts() >= t1.ts() && t2.ts() < t1.AdjustedTsEnd());
1003 }
1004
FindOverlappingSpan()1005 base::Status SpanJoinOperatorModule::Cursor::FindOverlappingSpan() {
1006 // We loop until we find a slice which overlaps from the two tables.
1007 while (true) {
1008 if (state->partitioning == PartitioningType::kMixedPartitioning) {
1009 // If we have a mixed partition setup, we need to have special checks
1010 // for eof and to reset the unpartitioned cursor every time the partition
1011 // changes in the partitioned table.
1012 auto* partitioned = t1.definition()->IsPartitioned() ? &t1 : &t2;
1013 auto* unpartitioned = t1.definition()->IsPartitioned() ? &t2 : &t1;
1014
1015 // If the partitioned table reaches eof, then we are really done.
1016 if (partitioned->IsEof())
1017 break;
1018
1019 // If the partition has changed from the previous one, reset the cursor
1020 // and keep a lot of the new partition.
1021 if (last_mixed_partition_ != partitioned->partition()) {
1022 base::Status status = unpartitioned->Rewind();
1023 if (!status.ok())
1024 return status;
1025 last_mixed_partition_ = partitioned->partition();
1026 }
1027 } else if (t1.IsEof() || t2.IsEof()) {
1028 // For both no partition and same partition cases, either cursor ending
1029 // ends the whole span join.
1030 break;
1031 }
1032
1033 // Find which slice finishes first.
1034 next_query = FindEarliestFinishQuery();
1035
1036 // If the current span is overlapping, just finish there to emit the current
1037 // slice.
1038 if (IsOverlappingSpan())
1039 break;
1040
1041 // Otherwise, step to the next row.
1042 base::Status status = next_query->Next();
1043 if (!status.ok())
1044 return status;
1045 }
1046 return base::OkStatus();
1047 }
1048
1049 SpanJoinOperatorModule::Query*
FindEarliestFinishQuery()1050 SpanJoinOperatorModule::Cursor::FindEarliestFinishQuery() {
1051 int64_t t1_part;
1052 int64_t t2_part;
1053
1054 switch (state->partitioning) {
1055 case PartitioningType::kMixedPartitioning: {
1056 // If either table is EOF, forward the other table to try and make
1057 // the partitions not match anymore.
1058 if (t1.IsEof())
1059 return &t2;
1060 if (t2.IsEof())
1061 return &t1;
1062
1063 // Otherwise, just make the partition equal from both tables.
1064 t1_part = last_mixed_partition_;
1065 t2_part = last_mixed_partition_;
1066 break;
1067 }
1068 case PartitioningType::kSamePartitioning: {
1069 // Get the partition values from the cursor.
1070 t1_part = t1.LastPartition();
1071 t2_part = t2.LastPartition();
1072 break;
1073 }
1074 case PartitioningType::kNoPartitioning: {
1075 t1_part = 0;
1076 t2_part = 0;
1077 break;
1078 }
1079 }
1080
1081 // Prefer to forward the earliest cursors based on the following
1082 // lexiographical ordering:
1083 // 1. partition
1084 // 2. end timestamp
1085 // 3. whether the slice is real or shadow (shadow < real)
1086 bool t1_less = std::make_tuple(t1_part, t1.AdjustedTsEnd(), t1.IsReal()) <
1087 std::make_tuple(t2_part, t2.AdjustedTsEnd(), t2.IsReal());
1088 return t1_less ? &t1 : &t2;
1089 }
1090
1091 } // namespace perfetto::trace_processor
1092