1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/perfetto_sql/intrinsics/operators/span_join_operator.h"
18
19 #include <sqlite3.h>
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstring>
23 #include <limits>
24 #include <memory>
25 #include <optional>
26 #include <string>
27 #include <tuple>
28 #include <unordered_set>
29 #include <utility>
30 #include <vector>
31
32 #include "perfetto/base/compiler.h"
33 #include "perfetto/base/logging.h"
34 #include "perfetto/base/status.h"
35 #include "perfetto/ext/base/string_splitter.h"
36 #include "perfetto/ext/base/string_utils.h"
37 #include "perfetto/trace_processor/basic_types.h"
38 #include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
39 #include "src/trace_processor/sqlite/bindings/sqlite_result.h"
40 #include "src/trace_processor/sqlite/module_lifecycle_manager.h"
41 #include "src/trace_processor/sqlite/sql_source.h"
42 #include "src/trace_processor/sqlite/sqlite_utils.h"
43 #include "src/trace_processor/tp_metatrace.h"
44 #include "src/trace_processor/util/status_macros.h"
45
46 #include "protos/perfetto/trace_processor/metatrace_categories.pbzero.h"
47
48 namespace perfetto::trace_processor {
49
50 namespace {
51
52 constexpr char kTsColumnName[] = "ts";
53 constexpr char kDurColumnName[] = "dur";
54
IsRequiredColumn(const std::string & name)55 bool IsRequiredColumn(const std::string& name) {
56 return name == kTsColumnName || name == kDurColumnName;
57 }
58
HasDuplicateColumns(const std::vector<std::pair<SqlValue::Type,std::string>> & t1,const std::vector<std::pair<SqlValue::Type,std::string>> & t2,const std::optional<std::string> & partition_col)59 std::optional<std::string> HasDuplicateColumns(
60 const std::vector<std::pair<SqlValue::Type, std::string>>& t1,
61 const std::vector<std::pair<SqlValue::Type, std::string>>& t2,
62 const std::optional<std::string>& partition_col) {
63 std::unordered_set<std::string> seen_names;
64 for (const auto& col : t1) {
65 if (IsRequiredColumn(col.second) || col.second == partition_col) {
66 continue;
67 }
68 if (seen_names.count(col.second) > 0) {
69 return col.second;
70 }
71 seen_names.insert(col.second);
72 }
73 for (const auto& col : t2) {
74 if (IsRequiredColumn(col.second) || col.second == partition_col) {
75 continue;
76 }
77 if (seen_names.count(col.second) > 0) {
78 return col.second;
79 }
80 seen_names.insert(col.second);
81 }
82 return std::nullopt;
83 }
84
OpToString(int op)85 std::string OpToString(int op) {
86 switch (op) {
87 case SQLITE_INDEX_CONSTRAINT_EQ:
88 return "=";
89 case SQLITE_INDEX_CONSTRAINT_NE:
90 return "!=";
91 case SQLITE_INDEX_CONSTRAINT_GE:
92 return ">=";
93 case SQLITE_INDEX_CONSTRAINT_GT:
94 return ">";
95 case SQLITE_INDEX_CONSTRAINT_LE:
96 return "<=";
97 case SQLITE_INDEX_CONSTRAINT_LT:
98 return "<";
99 case SQLITE_INDEX_CONSTRAINT_LIKE:
100 return " like ";
101 case SQLITE_INDEX_CONSTRAINT_GLOB:
102 return " glob ";
103 case SQLITE_INDEX_CONSTRAINT_ISNULL:
104 // The "null" will be added below in EscapedSqliteValueAsString.
105 return " is ";
106 case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
107 // The "null" will be added below in EscapedSqliteValueAsString.
108 return " is not ";
109 default:
110 PERFETTO_FATAL("Operator to string conversion not impemented for %d", op);
111 }
112 }
113
EscapedSqliteValueAsString(sqlite3_value * value)114 std::string EscapedSqliteValueAsString(sqlite3_value* value) {
115 switch (sqlite3_value_type(value)) {
116 case SQLITE_INTEGER:
117 return std::to_string(sqlite3_value_int64(value));
118 case SQLITE_FLOAT:
119 return std::to_string(sqlite3_value_double(value));
120 case SQLITE_TEXT: {
121 // If str itself contains a single quote, we need to escape it with
122 // another single quote.
123 const char* str =
124 reinterpret_cast<const char*>(sqlite3_value_text(value));
125 return "'" + base::ReplaceAll(str, "'", "''") + "'";
126 }
127 case SQLITE_NULL:
128 return " null";
129 default:
130 PERFETTO_FATAL("Unknown value type %d", sqlite3_value_type(value));
131 }
132 }
133
134 } // namespace
135
PopulateColumnLocatorMap(uint32_t offset)136 void SpanJoinOperatorModule::State::PopulateColumnLocatorMap(uint32_t offset) {
137 for (uint32_t i = 0; i < t1_defn.columns().size(); ++i) {
138 if (i == t1_defn.ts_idx() || i == t1_defn.dur_idx() ||
139 i == t1_defn.partition_idx()) {
140 continue;
141 }
142 ColumnLocator* locator = &global_index_to_column_locator[offset++];
143 locator->defn = &t1_defn;
144 locator->col_index = i;
145 }
146 for (uint32_t i = 0; i < t2_defn.columns().size(); ++i) {
147 if (i == t2_defn.ts_idx() || i == t2_defn.dur_idx() ||
148 i == t2_defn.partition_idx()) {
149 continue;
150 }
151 ColumnLocator* locator = &global_index_to_column_locator[offset++];
152 locator->defn = &t2_defn;
153 locator->col_index = i;
154 }
155 }
156
BestIndexStrForDefinition(const sqlite3_index_info * info,const TableDefinition & defn)157 std::string SpanJoinOperatorModule::State::BestIndexStrForDefinition(
158 const sqlite3_index_info* info,
159 const TableDefinition& defn) {
160 uint32_t count = 0;
161 std::string constraints;
162 for (int i = 0; i < info->nConstraint; i++) {
163 const auto& c = info->aConstraint[i];
164 if (!c.usable) {
165 continue;
166 }
167
168 auto col_name = GetNameForGlobalColumnIndex(defn, c.iColumn);
169 if (col_name.empty()) {
170 continue;
171 }
172
173 // Le constraints can be passed straight to the child tables as they won't
174 // affect the span join computation. Similarily, source_geq constraints
175 // explicitly request that they are passed as geq constraints to the source
176 // tables.
177 if (col_name == kTsColumnName && !sqlite::utils::IsOpLe(c.op) &&
178 c.op != kSourceGeqOpCode) {
179 continue;
180 }
181
182 // Allow SQLite handle any constraints on duration apart from source_geq
183 // constraints.
184 if (col_name == kDurColumnName && c.op != kSourceGeqOpCode) {
185 continue;
186 }
187
188 // If we're emitting shadow slices, don't propogate any constraints
189 // on this table as this will break the shadow slice computation.
190 if (defn.ShouldEmitPresentPartitionShadow()) {
191 continue;
192 }
193
194 PERFETTO_DCHECK(info->aConstraintUsage[i].argvIndex > 0);
195 std::string argvIndex =
196 std::to_string(info->aConstraintUsage[i].argvIndex - 1);
197 std::string op = OpToString(
198 c.op == kSourceGeqOpCode ? SQLITE_INDEX_CONSTRAINT_GE : c.op);
199 constraints += "," + argvIndex + "," + "`" + col_name + "`" + op;
200 count++;
201 }
202 return std::to_string(count) + constraints;
203 }
204
Create(PerfettoSqlEngine * engine,const TableDescriptor & desc,EmitShadowType emit_shadow_type,TableDefinition * defn)205 base::Status SpanJoinOperatorModule::TableDefinition::Create(
206 PerfettoSqlEngine* engine,
207 const TableDescriptor& desc,
208 EmitShadowType emit_shadow_type,
209 TableDefinition* defn) {
210 if (desc.partition_col == kTsColumnName ||
211 desc.partition_col == kDurColumnName) {
212 return base::ErrStatus(
213 "SPAN_JOIN: partition column cannot be any of {ts, dur} for table %s",
214 desc.name.c_str());
215 }
216
217 std::vector<std::pair<SqlValue::Type, std::string>> cols;
218 RETURN_IF_ERROR(sqlite::utils::GetColumnsForTable(
219 engine->sqlite_engine()->db(), desc.name, cols));
220
221 uint32_t required_columns_found = 0;
222 uint32_t ts_idx = std::numeric_limits<uint32_t>::max();
223 uint32_t dur_idx = std::numeric_limits<uint32_t>::max();
224 uint32_t partition_idx = std::numeric_limits<uint32_t>::max();
225 for (uint32_t i = 0; i < cols.size(); i++) {
226 auto col = cols[i];
227 if (IsRequiredColumn(col.second)) {
228 ++required_columns_found;
229 if (col.first != SqlValue::Type::kLong &&
230 col.first != SqlValue::Type::kNull) {
231 return base::ErrStatus(
232 "SPAN_JOIN: Invalid type for column '%s' in table %s",
233 col.second.c_str(), desc.name.c_str());
234 }
235 }
236 if (base::Contains(col.second, ",")) {
237 return base::ErrStatus("SPAN_JOIN: column '%s' cannot contain any ','",
238 col.second.c_str());
239 }
240 if (base::Contains(col.second, ':')) {
241 return base::ErrStatus("SPAN_JOIN: column '%s' cannot contain any ':'",
242 col.second.c_str());
243 }
244
245 if (col.second == kTsColumnName) {
246 ts_idx = i;
247 } else if (col.second == kDurColumnName) {
248 dur_idx = i;
249 } else if (col.second == desc.partition_col) {
250 partition_idx = i;
251 }
252 }
253 if (required_columns_found != 2) {
254 return base::ErrStatus(
255 "SPAN_JOIN: Missing one of columns {ts, dur} in table %s",
256 desc.name.c_str());
257 }
258 if (desc.IsPartitioned() && partition_idx >= cols.size()) {
259 return base::ErrStatus(
260 "SPAN_JOIN: Missing partition column '%s' in table '%s'",
261 desc.partition_col.c_str(), desc.name.c_str());
262 }
263
264 PERFETTO_DCHECK(ts_idx < cols.size());
265 PERFETTO_DCHECK(dur_idx < cols.size());
266
267 *defn = TableDefinition(desc.name, desc.partition_col, std::move(cols),
268 emit_shadow_type, ts_idx, dur_idx, partition_idx);
269 return base::OkStatus();
270 }
271
272 std::string
CreateVtabCreateTableSection() const273 SpanJoinOperatorModule::TableDefinition::CreateVtabCreateTableSection() const {
274 std::string cols;
275 for (const auto& col : columns()) {
276 if (IsRequiredColumn(col.second) || col.second == partition_col()) {
277 continue;
278 }
279 if (col.first == SqlValue::Type::kNull) {
280 cols += col.second + ",";
281 } else {
282 cols += col.second + " " +
283 sqlite::utils::SqlValueTypeToString(col.first) + ",";
284 }
285 }
286 return cols;
287 }
288
GetNameForGlobalColumnIndex(const TableDefinition & defn,int global_column)289 std::string SpanJoinOperatorModule::State::GetNameForGlobalColumnIndex(
290 const TableDefinition& defn,
291 int global_column) {
292 auto col_idx = static_cast<size_t>(global_column);
293 if (col_idx == Column::kTimestamp) {
294 return kTsColumnName;
295 }
296 if (col_idx == Column::kDuration) {
297 return kDurColumnName;
298 }
299 if (col_idx == Column::kPartition &&
300 partitioning != PartitioningType::kNoPartitioning) {
301 return defn.partition_col();
302 }
303
304 const auto& locator = global_index_to_column_locator[col_idx];
305 if (locator.defn != &defn) {
306 return "";
307 }
308 return defn.columns()[locator.col_index].second;
309 }
310
Query(SpanJoinOperatorModule::State * state,const TableDefinition * definition)311 SpanJoinOperatorModule::Query::Query(SpanJoinOperatorModule::State* state,
312 const TableDefinition* definition)
313 : defn_(definition), in_state_(state) {
314 PERFETTO_DCHECK(!defn_->IsPartitioned() ||
315 defn_->partition_idx() < defn_->columns().size());
316 }
317
318 SpanJoinOperatorModule::Query::~Query() = default;
319
Initialize(std::string sql_query,InitialEofBehavior eof_behavior)320 base::Status SpanJoinOperatorModule::Query::Initialize(
321 std::string sql_query,
322 InitialEofBehavior eof_behavior) {
323 *this = Query(in_state_, definition());
324 sql_query_ = std::move(sql_query);
325 base::Status status = Rewind();
326 if (!status.ok())
327 return status;
328 if (eof_behavior == InitialEofBehavior::kTreatAsMissingPartitionShadow &&
329 IsEof()) {
330 state_ = State::kMissingPartitionShadow;
331 }
332 return status;
333 }
334
Next()335 base::Status SpanJoinOperatorModule::Query::Next() {
336 RETURN_IF_ERROR(NextSliceState());
337 return FindNextValidSlice();
338 }
339
IsValidSlice()340 bool SpanJoinOperatorModule::Query::IsValidSlice() {
341 // Disallow any single partition shadow slices if the definition doesn't allow
342 // them.
343 if (IsPresentPartitionShadow() && !defn_->ShouldEmitPresentPartitionShadow())
344 return false;
345
346 // Disallow any missing partition shadow slices if the definition doesn't
347 // allow them.
348 if (IsMissingPartitionShadow() && !defn_->ShouldEmitMissingPartitionShadow())
349 return false;
350
351 // Disallow any "empty" shadows; these are shadows which either have the same
352 // start and end time or missing-partition shadows which have the same start
353 // and end partition.
354 if (IsEmptyShadow())
355 return false;
356
357 return true;
358 }
359
FindNextValidSlice()360 base::Status SpanJoinOperatorModule::Query::FindNextValidSlice() {
361 // The basic idea of this function is that |NextSliceState()| always emits
362 // all possible slices (including shadows for any gaps inbetween the real
363 // slices) and we filter out the invalid slices (as defined by the table
364 // definition) using |IsValidSlice()|.
365 //
366 // This has proved to be a lot cleaner to implement than trying to choose
367 // when to emit and not emit shadows directly.
368 while (!IsEof() && !IsValidSlice()) {
369 RETURN_IF_ERROR(NextSliceState());
370 }
371 return base::OkStatus();
372 }
373
NextSliceState()374 base::Status SpanJoinOperatorModule::Query::NextSliceState() {
375 switch (state_) {
376 case State::kReal: {
377 // Forward the cursor to figure out where the next slice should be.
378 RETURN_IF_ERROR(CursorNext());
379
380 // Depending on the next slice, we can do two things here:
381 // 1. If the next slice is on the same partition, we can just emit a
382 // single shadow until the start of the next slice.
383 // 2. If the next slice is on another partition or we hit eof, just emit
384 // a shadow to the end of the whole partition.
385 bool shadow_to_end = cursor_eof_ || (defn_->IsPartitioned() &&
386 partition_ != CursorPartition());
387 state_ = State::kPresentPartitionShadow;
388 ts_ = AdjustedTsEnd();
389 ts_end_ =
390 shadow_to_end ? std::numeric_limits<int64_t>::max() : CursorTs();
391 return base::OkStatus();
392 }
393 case State::kPresentPartitionShadow: {
394 if (ts_end_ == std::numeric_limits<int64_t>::max()) {
395 // If the shadow is to the end of the slice, create a missing partition
396 // shadow to the start of the partition of the next slice or to the max
397 // partition if we hit eof.
398 state_ = State::kMissingPartitionShadow;
399 ts_ = 0;
400 ts_end_ = std::numeric_limits<int64_t>::max();
401
402 missing_partition_start_ = partition_ + 1;
403 missing_partition_end_ = cursor_eof_
404 ? std::numeric_limits<int64_t>::max()
405 : CursorPartition();
406 } else {
407 // If the shadow is not to the end, we must have another slice on the
408 // current partition.
409 state_ = State::kReal;
410 ts_ = CursorTs();
411 ts_end_ = ts_ + CursorDur();
412
413 PERFETTO_DCHECK(!defn_->IsPartitioned() ||
414 partition_ == CursorPartition());
415 }
416 return base::OkStatus();
417 }
418 case State::kMissingPartitionShadow: {
419 if (missing_partition_end_ == std::numeric_limits<int64_t>::max()) {
420 PERFETTO_DCHECK(cursor_eof_);
421
422 // If we have a missing partition to the max partition, we must have hit
423 // eof.
424 state_ = State::kEof;
425 } else {
426 PERFETTO_DCHECK(!defn_->IsPartitioned() ||
427 CursorPartition() == missing_partition_end_);
428
429 // Otherwise, setup a single partition slice on the end partition to the
430 // start of the next slice.
431 state_ = State::kPresentPartitionShadow;
432 ts_ = 0;
433 ts_end_ = CursorTs();
434 partition_ = missing_partition_end_;
435 }
436 return base::OkStatus();
437 }
438 case State::kEof: {
439 PERFETTO_DFATAL("Called Next when EOF");
440 return base::ErrStatus("Called Next when EOF");
441 }
442 }
443 PERFETTO_FATAL("For GCC");
444 }
445
Rewind()446 base::Status SpanJoinOperatorModule::Query::Rewind() {
447 auto res = in_state_->engine->sqlite_engine()->PrepareStatement(
448 SqlSource::FromTraceProcessorImplementation(sql_query_));
449 cursor_eof_ = false;
450 RETURN_IF_ERROR(res.status());
451 stmt_ = std::move(res);
452
453 RETURN_IF_ERROR(CursorNext());
454
455 // Setup the first slice as a missing partition shadow from the lowest
456 // partition until the first slice partition. We will handle finding the real
457 // slice in |FindNextValidSlice()|.
458 state_ = State::kMissingPartitionShadow;
459 ts_ = 0;
460 ts_end_ = std::numeric_limits<int64_t>::max();
461 missing_partition_start_ = std::numeric_limits<int64_t>::min();
462
463 if (cursor_eof_) {
464 missing_partition_end_ = std::numeric_limits<int64_t>::max();
465 } else if (defn_->IsPartitioned()) {
466 missing_partition_end_ = CursorPartition();
467 } else {
468 missing_partition_end_ = std::numeric_limits<int64_t>::min();
469 }
470
471 // Actually compute the first valid slice.
472 return FindNextValidSlice();
473 }
474
CursorNext()475 base::Status SpanJoinOperatorModule::Query::CursorNext() {
476 if (defn_->IsPartitioned()) {
477 auto partition_idx = static_cast<int>(defn_->partition_idx());
478 // Fastforward through any rows with null partition keys.
479 int row_type;
480 do {
481 cursor_eof_ = !stmt_->Step();
482 RETURN_IF_ERROR(stmt_->status());
483 row_type = sqlite3_column_type(stmt_->sqlite_stmt(), partition_idx);
484 } while (!cursor_eof_ && row_type == SQLITE_NULL);
485
486 if (!cursor_eof_ && row_type != SQLITE_INTEGER) {
487 return base::ErrStatus("SPAN_JOIN: partition is not an INT column");
488 }
489 } else {
490 cursor_eof_ = !stmt_->Step();
491 }
492 return base::OkStatus();
493 }
494
ReportSqliteResult(sqlite3_context * context,size_t index)495 void SpanJoinOperatorModule::Query::ReportSqliteResult(sqlite3_context* context,
496 size_t index) {
497 if (state_ != State::kReal) {
498 return sqlite::result::Null(context);
499 }
500
501 sqlite3_stmt* stmt = stmt_->sqlite_stmt();
502 int idx = static_cast<int>(index);
503 switch (sqlite3_column_type(stmt, idx)) {
504 case SQLITE_INTEGER:
505 return sqlite::result::Long(context, sqlite3_column_int64(stmt, idx));
506 case SQLITE_FLOAT:
507 return sqlite::result::Double(context, sqlite3_column_double(stmt, idx));
508 case SQLITE_TEXT: {
509 // TODO(lalitm): note for future optimizations: if we knew the addresses
510 // of the string intern pool, we could check if the string returned here
511 // comes from the pool, and pass it as non-transient.
512 const auto* ptr =
513 reinterpret_cast<const char*>(sqlite3_column_text(stmt, idx));
514 return sqlite::result::TransientString(context, ptr);
515 }
516 case SQLITE_BLOB: {
517 return sqlite::result::TransientBytes(context,
518 sqlite3_column_blob(stmt, idx),
519 sqlite3_column_bytes(stmt, idx));
520 }
521 }
522 }
523
TableDefinition(std::string name,std::string partition_col,std::vector<std::pair<SqlValue::Type,std::string>> cols,EmitShadowType emit_shadow_type,uint32_t ts_idx,uint32_t dur_idx,uint32_t partition_idx)524 SpanJoinOperatorModule::TableDefinition::TableDefinition(
525 std::string name,
526 std::string partition_col,
527 std::vector<std::pair<SqlValue::Type, std::string>> cols,
528 EmitShadowType emit_shadow_type,
529 uint32_t ts_idx,
530 uint32_t dur_idx,
531 uint32_t partition_idx)
532 : emit_shadow_type_(emit_shadow_type),
533 name_(std::move(name)),
534 partition_col_(std::move(partition_col)),
535 cols_(std::move(cols)),
536 ts_idx_(ts_idx),
537 dur_idx_(dur_idx),
538 partition_idx_(partition_idx) {}
539
Parse(const std::string & raw_descriptor,TableDescriptor * descriptor)540 base::Status SpanJoinOperatorModule::TableDescriptor::Parse(
541 const std::string& raw_descriptor,
542 TableDescriptor* descriptor) {
543 // Descriptors have one of the following forms:
544 // table_name [PARTITIONED column_name]
545
546 // Find the table name.
547 base::StringSplitter splitter(raw_descriptor, ' ');
548 if (!splitter.Next())
549 return base::ErrStatus("SPAN_JOIN: Missing table name");
550
551 descriptor->name = splitter.cur_token();
552 if (!splitter.Next())
553 return base::OkStatus();
554
555 if (!base::CaseInsensitiveEqual(splitter.cur_token(), "PARTITIONED"))
556 return base::ErrStatus("SPAN_JOIN: Invalid token");
557
558 if (!splitter.Next())
559 return base::ErrStatus("SPAN_JOIN: Missing partitioning column");
560
561 descriptor->partition_col = splitter.cur_token();
562 return base::OkStatus();
563 }
564
CreateSqlQuery(base::StringSplitter & idx,sqlite3_value ** argv) const565 std::string SpanJoinOperatorModule::TableDefinition::CreateSqlQuery(
566 base::StringSplitter& idx,
567 sqlite3_value** argv) const {
568 std::vector<std::string> col_names;
569 for (const auto& c : columns()) {
570 col_names.push_back("`" + c.second + "`");
571 }
572
573 PERFETTO_CHECK(idx.Next());
574 std::optional<uint32_t> cs_count = base::StringToUInt32(idx.cur_token());
575 PERFETTO_CHECK(cs_count);
576 std::vector<std::string> cs;
577 cs.reserve(*cs_count);
578 for (uint32_t i = 0; i < *cs_count; ++i) {
579 PERFETTO_CHECK(idx.Next());
580 std::optional<uint32_t> argv_idx = base::StringToUInt32(idx.cur_token());
581 PERFETTO_CHECK(argv_idx);
582
583 PERFETTO_CHECK(idx.Next());
584 cs.emplace_back(idx.cur_token() +
585 EscapedSqliteValueAsString(argv[*argv_idx]));
586 }
587
588 std::string sql = "SELECT " + base::Join(col_names, ", ");
589 sql += " FROM " + name();
590 if (!cs.empty()) {
591 sql += " WHERE " + base::Join(cs, " AND ");
592 }
593 sql += " ORDER BY ";
594 sql += IsPartitioned() ? base::Join({"`" + partition_col() + "`", "ts"}, ", ")
595 : "ts";
596 sql += ";";
597 PERFETTO_DLOG("%s", sql.c_str());
598 return sql;
599 }
600
Create(sqlite3 * db,void * ctx,int argc,const char * const * argv,sqlite3_vtab ** vtab,char ** pzErr)601 int SpanJoinOperatorModule::Create(sqlite3* db,
602 void* ctx,
603 int argc,
604 const char* const* argv,
605 sqlite3_vtab** vtab,
606 char** pzErr) {
607 // argv[0] - argv[2] are SQLite populated fields which are always present.
608 if (argc < 5) {
609 *pzErr = sqlite3_mprintf("SPAN_JOIN: expected at least 2 args");
610 return SQLITE_ERROR;
611 }
612
613 auto* context = GetContext(ctx);
614 auto state = std::make_unique<State>();
615 state->engine = context->engine;
616 state->module_name = argv[0];
617
618 TableDescriptor t1_desc;
619 auto status = TableDescriptor::Parse(
620 std::string(reinterpret_cast<const char*>(argv[3])), &t1_desc);
621 if (!status.ok()) {
622 *pzErr = sqlite3_mprintf("%s", status.c_message());
623 return SQLITE_ERROR;
624 }
625
626 TableDescriptor t2_desc;
627 status = TableDescriptor::Parse(
628 std::string(reinterpret_cast<const char*>(argv[4])), &t2_desc);
629 if (!status.ok()) {
630 *pzErr = sqlite3_mprintf("%s", status.c_message());
631 return SQLITE_ERROR;
632 }
633
634 // Check that the partition columns match between the two tables.
635 if (t1_desc.partition_col == t2_desc.partition_col) {
636 state->partitioning = t1_desc.IsPartitioned()
637 ? PartitioningType::kSamePartitioning
638 : PartitioningType::kNoPartitioning;
639 } else if (t1_desc.IsPartitioned() && t2_desc.IsPartitioned()) {
640 *pzErr = sqlite3_mprintf(
641 "SPAN_JOIN: mismatching partitions between the two tables; "
642 "(partition %s in table %s, partition %s in table %s)",
643 t1_desc.partition_col.c_str(), t1_desc.name.c_str(),
644 t2_desc.partition_col.c_str(), t2_desc.name.c_str());
645 return SQLITE_ERROR;
646 } else {
647 state->partitioning = PartitioningType::kMixedPartitioning;
648 }
649
650 bool t1_part_mixed =
651 t1_desc.IsPartitioned() &&
652 state->partitioning == PartitioningType::kMixedPartitioning;
653 bool t2_part_mixed =
654 t2_desc.IsPartitioned() &&
655 state->partitioning == PartitioningType::kMixedPartitioning;
656
657 EmitShadowType t1_shadow_type;
658 if (state->IsOuterJoin()) {
659 if (t1_part_mixed ||
660 state->partitioning == PartitioningType::kNoPartitioning) {
661 t1_shadow_type = EmitShadowType::kPresentPartitionOnly;
662 } else {
663 t1_shadow_type = EmitShadowType::kAll;
664 }
665 } else {
666 t1_shadow_type = EmitShadowType::kNone;
667 }
668 status = TableDefinition::Create(state->engine, t1_desc, t1_shadow_type,
669 &state->t1_defn);
670 if (!status.ok()) {
671 *pzErr = sqlite3_mprintf("%s", status.c_message());
672 return SQLITE_ERROR;
673 }
674
675 EmitShadowType t2_shadow_type;
676 if (state->IsOuterJoin() || state->IsLeftJoin()) {
677 if (t2_part_mixed ||
678 state->partitioning == PartitioningType::kNoPartitioning) {
679 t2_shadow_type = EmitShadowType::kPresentPartitionOnly;
680 } else {
681 t2_shadow_type = EmitShadowType::kAll;
682 }
683 } else {
684 t2_shadow_type = EmitShadowType::kNone;
685 }
686 status = TableDefinition::Create(state->engine, t2_desc, t2_shadow_type,
687 &state->t2_defn);
688 if (!status.ok()) {
689 *pzErr = sqlite3_mprintf("%s", status.c_message());
690 return SQLITE_ERROR;
691 }
692
693 if (auto dupe = HasDuplicateColumns(
694 state->t1_defn.columns(), state->t2_defn.columns(),
695 state->partitioning == PartitioningType::kNoPartitioning
696 ? std::nullopt
697 : std::make_optional(state->partition_col()))) {
698 *pzErr = sqlite3_mprintf(
699 "SPAN_JOIN: column %s present in both tables %s and %s", dupe->c_str(),
700 state->t1_defn.name().c_str(), state->t2_defn.name().c_str());
701 return SQLITE_ERROR;
702 }
703
704 // Create the map from column index to the column in the child sub-queries.
705 state->PopulateColumnLocatorMap(
706 state->partitioning == PartitioningType::kNoPartitioning ? 2 : 3);
707
708 std::string primary_key = "ts";
709 std::string partition;
710 if (state->partitioning != PartitioningType::kNoPartitioning) {
711 partition = state->partition_col() + " BIGINT,";
712 primary_key += ", " + state->partition_col();
713 }
714 std::string t1_section = state->t1_defn.CreateVtabCreateTableSection();
715 std::string t2_section = state->t2_defn.CreateVtabCreateTableSection();
716 static constexpr char kStmt[] = R"(
717 CREATE TABLE x(
718 ts BIGINT,
719 dur BIGINT,
720 %s
721 %s
722 %s
723 PRIMARY KEY(%s)
724 )
725 )";
726 base::StackString<1024> create_table_str(
727 kStmt, partition.c_str(), t1_section.c_str(), t2_section.c_str(),
728 primary_key.c_str());
729 PERFETTO_DLOG("SPAN_JOIN: create table statement: %s",
730 create_table_str.c_str());
731 state->create_table_stmt = create_table_str.ToStdString();
732 if (int ret = sqlite3_declare_vtab(db, create_table_str.c_str());
733 ret != SQLITE_OK) {
734 return ret;
735 }
736
737 std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
738 res->state = context->manager.OnCreate(argv, std::move(state));
739 *vtab = res.release();
740 return SQLITE_OK;
741 }
742
Destroy(sqlite3_vtab * vtab)743 int SpanJoinOperatorModule::Destroy(sqlite3_vtab* vtab) {
744 std::unique_ptr<Vtab> tab(GetVtab(vtab));
745 sqlite::ModuleStateManager<SpanJoinOperatorModule>::OnDestroy(tab->state);
746 return SQLITE_OK;
747 }
748
Connect(sqlite3 * db,void * ctx,int,const char * const * argv,sqlite3_vtab ** vtab,char **)749 int SpanJoinOperatorModule::Connect(sqlite3* db,
750 void* ctx,
751 int,
752 const char* const* argv,
753 sqlite3_vtab** vtab,
754 char**) {
755 auto* context = GetContext(ctx);
756 std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
757 res->state = context->manager.OnConnect(argv);
758
759 auto* state =
760 sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(res->state);
761 if (int ret = sqlite3_declare_vtab(db, state->create_table_stmt.c_str());
762 ret != SQLITE_OK) {
763 return ret;
764 }
765 *vtab = res.release();
766 return SQLITE_OK;
767 }
768
Disconnect(sqlite3_vtab * vtab)769 int SpanJoinOperatorModule::Disconnect(sqlite3_vtab* vtab) {
770 std::unique_ptr<Vtab> tab(GetVtab(vtab));
771 sqlite::ModuleStateManager<SpanJoinOperatorModule>::OnDisconnect(tab->state);
772 return SQLITE_OK;
773 }
774
BestIndex(sqlite3_vtab * tab,sqlite3_index_info * info)775 int SpanJoinOperatorModule::BestIndex(sqlite3_vtab* tab,
776 sqlite3_index_info* info) {
777 int argvIndex = 1;
778 for (int i = 0; i < info->nConstraint; ++i) {
779 if (!info->aConstraint[i].usable) {
780 continue;
781 }
782 info->aConstraintUsage[i].argvIndex = argvIndex++;
783 }
784
785 Vtab* table = GetVtab(tab);
786 State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
787 table->state);
788 if (state->partitioning == PartitioningType::kNoPartitioning) {
789 // If both tables are not partitioned and we have a single order by on ts,
790 // we return data in the correct order.
791 info->orderByConsumed = info->nOrderBy == 1 &&
792 info->aOrderBy[0].iColumn == Column::kTimestamp &&
793 !info->aOrderBy[0].desc;
794 } else {
795 // If one of the tables is partitioned, and we have an order by on the
796 // partition column followed (optionally) by an order by on timestamp, we
797 // return data in the correct order.
798 bool is_first_ob_partition =
799 info->nOrderBy > 0 && info->aOrderBy[0].iColumn == Column::kPartition &&
800 !info->aOrderBy[0].desc;
801 bool is_second_ob_ts = info->nOrderBy >= 2 &&
802 info->aOrderBy[1].iColumn == Column::kTimestamp &&
803 !info->aOrderBy[1].desc;
804 info->orderByConsumed =
805 (info->nOrderBy == 1 && is_first_ob_partition) ||
806 (info->nOrderBy == 2 && is_first_ob_partition && is_second_ob_ts);
807 }
808
809 for (int i = 0; i < info->nConstraint; ++i) {
810 if (info->aConstraint[i].op == kSourceGeqOpCode) {
811 info->aConstraintUsage[i].omit = true;
812 }
813 }
814
815 std::string t1 = state->BestIndexStrForDefinition(info, state->t1_defn);
816 std::string t2 = state->BestIndexStrForDefinition(info, state->t2_defn);
817 info->idxStr = sqlite3_mprintf("%s,%s", t1.c_str(), t2.c_str());
818 info->needToFreeIdxStr = true;
819
820 return SQLITE_OK;
821 }
822
Open(sqlite3_vtab * tab,sqlite3_vtab_cursor ** cursor)823 int SpanJoinOperatorModule::Open(sqlite3_vtab* tab,
824 sqlite3_vtab_cursor** cursor) {
825 State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
826 GetVtab(tab)->state);
827 std::unique_ptr<Cursor> c = std::make_unique<Cursor>(state);
828 *cursor = c.release();
829 return SQLITE_OK;
830 }
831
Close(sqlite3_vtab_cursor * cursor)832 int SpanJoinOperatorModule::Close(sqlite3_vtab_cursor* cursor) {
833 std::unique_ptr<Cursor> c(GetCursor(cursor));
834 return SQLITE_OK;
835 }
836
Filter(sqlite3_vtab_cursor * cursor,int,const char * idxStr,int argc,sqlite3_value ** argv)837 int SpanJoinOperatorModule::Filter(sqlite3_vtab_cursor* cursor,
838 int,
839 const char* idxStr,
840 int argc,
841 sqlite3_value** argv) {
842 PERFETTO_TP_TRACE(metatrace::Category::QUERY_DETAILED, "SPAN_JOIN_XFILTER");
843 PERFETTO_DLOG("SpanJoin::Filter: argc=%d, idxStr=%s", argc, idxStr);
844
845 Cursor* c = GetCursor(cursor);
846 Vtab* table = GetVtab(cursor->pVtab);
847 State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
848 table->state);
849
850 base::StringSplitter splitter(std::string(idxStr), ',');
851 bool t1_partitioned_mixed =
852 c->t1.definition()->IsPartitioned() &&
853 state->partitioning == PartitioningType::kMixedPartitioning;
854 auto t1_eof = state->IsOuterJoin() && !t1_partitioned_mixed
855 ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
856 : Query::InitialEofBehavior::kTreatAsEof;
857 base::Status status =
858 c->t1.Initialize(state->t1_defn.CreateSqlQuery(splitter, argv), t1_eof);
859 if (!status.ok()) {
860 return sqlite::utils::SetError(table, status.c_message());
861 }
862
863 bool t2_partitioned_mixed =
864 c->t2.definition()->IsPartitioned() &&
865 state->partitioning == PartitioningType::kMixedPartitioning;
866 auto t2_eof =
867 (state->IsLeftJoin() || state->IsOuterJoin()) && !t2_partitioned_mixed
868 ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
869 : Query::InitialEofBehavior::kTreatAsEof;
870 status =
871 c->t2.Initialize(state->t2_defn.CreateSqlQuery(splitter, argv), t2_eof);
872 if (!status.ok()) {
873 return sqlite::utils::SetError(table, status.c_message());
874 }
875
876 status = c->FindOverlappingSpan();
877 if (!status.ok()) {
878 return sqlite::utils::SetError(table, status.c_message());
879 }
880 return SQLITE_OK;
881 }
882
Next(sqlite3_vtab_cursor * cursor)883 int SpanJoinOperatorModule::Next(sqlite3_vtab_cursor* cursor) {
884 Cursor* c = GetCursor(cursor);
885 Vtab* table = GetVtab(cursor->pVtab);
886 base::Status status = c->next_query->Next();
887 if (!status.ok()) {
888 return sqlite::utils::SetError(table, status.c_message());
889 }
890 status = c->FindOverlappingSpan();
891 if (!status.ok()) {
892 return sqlite::utils::SetError(table, status.c_message());
893 }
894 return SQLITE_OK;
895 }
896
Eof(sqlite3_vtab_cursor * cur)897 int SpanJoinOperatorModule::Eof(sqlite3_vtab_cursor* cur) {
898 Cursor* c = GetCursor(cur);
899 return c->t1.IsEof() || c->t2.IsEof();
900 }
901
Column(sqlite3_vtab_cursor * cursor,sqlite3_context * context,int N)902 int SpanJoinOperatorModule::Column(sqlite3_vtab_cursor* cursor,
903 sqlite3_context* context,
904 int N) {
905 Cursor* c = GetCursor(cursor);
906 Vtab* table = GetVtab(cursor->pVtab);
907 State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
908 table->state);
909
910 PERFETTO_DCHECK(c->t1.IsReal() || c->t2.IsReal());
911
912 switch (N) {
913 case Column::kTimestamp: {
914 auto max_ts = std::max(c->t1.ts(), c->t2.ts());
915 sqlite::result::Long(context, static_cast<sqlite3_int64>(max_ts));
916 break;
917 }
918 case Column::kDuration: {
919 auto max_start = std::max(c->t1.ts(), c->t2.ts());
920 auto min_end = std::min(c->t1.raw_ts_end(), c->t2.raw_ts_end());
921 auto dur = min_end - max_start;
922 sqlite::result::Long(context, static_cast<sqlite3_int64>(dur));
923 break;
924 }
925 case Column::kPartition: {
926 if (state->partitioning != PartitioningType::kNoPartitioning) {
927 int64_t partition;
928 if (state->partitioning == PartitioningType::kMixedPartitioning) {
929 partition = c->last_mixed_partition_;
930 } else {
931 partition = c->t1.IsReal() ? c->t1.partition() : c->t2.partition();
932 }
933 sqlite::result::Long(context, static_cast<sqlite3_int64>(partition));
934 break;
935 }
936 PERFETTO_FALLTHROUGH;
937 }
938 default: {
939 const auto* locator =
940 state->global_index_to_column_locator.Find(static_cast<size_t>(N));
941 PERFETTO_CHECK(locator);
942 if (locator->defn == c->t1.definition()) {
943 c->t1.ReportSqliteResult(context, locator->col_index);
944 } else {
945 c->t2.ReportSqliteResult(context, locator->col_index);
946 }
947 }
948 }
949 return SQLITE_OK;
950 }
951
Rowid(sqlite3_vtab_cursor *,sqlite_int64 *)952 int SpanJoinOperatorModule::Rowid(sqlite3_vtab_cursor*, sqlite_int64*) {
953 return SQLITE_ERROR;
954 }
955
FindFunction(sqlite3_vtab *,int,const char * name,FindFunctionFn ** fn,void **)956 int SpanJoinOperatorModule::FindFunction(sqlite3_vtab*,
957 int,
958 const char* name,
959 FindFunctionFn** fn,
960 void**) {
961 if (base::CaseInsensitiveEqual(name, "source_geq")) {
962 *fn = [](sqlite3_context* ctx, int, sqlite3_value**) {
963 return sqlite::result::Error(ctx, "Should not be called.");
964 };
965 return kSourceGeqOpCode;
966 }
967 return 0;
968 }
969
IsOverlappingSpan() const970 bool SpanJoinOperatorModule::Cursor::IsOverlappingSpan() const {
971 // If either of the tables are eof, then we cannot possibly have an
972 // overlapping span.
973 if (t1.IsEof() || t2.IsEof())
974 return false;
975
976 // One of the tables always needs to have a real span to have a valid
977 // overlapping span.
978 if (!t1.IsReal() && !t2.IsReal())
979 return false;
980
981 using PartitioningType = PartitioningType;
982 if (state->partitioning == PartitioningType::kSamePartitioning) {
983 // If both tables are partitioned, then ensure that the partitions overlap.
984 bool partition_in_bounds = (t1.FirstPartition() >= t2.FirstPartition() &&
985 t1.FirstPartition() <= t2.LastPartition()) ||
986 (t2.FirstPartition() >= t1.FirstPartition() &&
987 t2.FirstPartition() <= t1.LastPartition());
988 if (!partition_in_bounds)
989 return false;
990 }
991
992 // We consider all slices to be [start, end) - that is the range of
993 // timestamps has an open interval at the start but a closed interval
994 // at the end. (with the exception of dur == -1 which we treat as if
995 // end == start for the purpose of this function).
996 return (t1.ts() == t2.ts() && t1.IsReal() && t2.IsReal()) ||
997 (t1.ts() >= t2.ts() && t1.ts() < t2.AdjustedTsEnd()) ||
998 (t2.ts() >= t1.ts() && t2.ts() < t1.AdjustedTsEnd());
999 }
1000
FindOverlappingSpan()1001 base::Status SpanJoinOperatorModule::Cursor::FindOverlappingSpan() {
1002 // We loop until we find a slice which overlaps from the two tables.
1003 while (true) {
1004 if (state->partitioning == PartitioningType::kMixedPartitioning) {
1005 // If we have a mixed partition setup, we need to have special checks
1006 // for eof and to reset the unpartitioned cursor every time the partition
1007 // changes in the partitioned table.
1008 auto* partitioned = t1.definition()->IsPartitioned() ? &t1 : &t2;
1009 auto* unpartitioned = t1.definition()->IsPartitioned() ? &t2 : &t1;
1010
1011 // If the partitioned table reaches eof, then we are really done.
1012 if (partitioned->IsEof())
1013 break;
1014
1015 // If the partition has changed from the previous one, reset the cursor
1016 // and keep a lot of the new partition.
1017 if (last_mixed_partition_ != partitioned->partition()) {
1018 base::Status status = unpartitioned->Rewind();
1019 if (!status.ok())
1020 return status;
1021 last_mixed_partition_ = partitioned->partition();
1022 }
1023 } else if (t1.IsEof() || t2.IsEof()) {
1024 // For both no partition and same partition cases, either cursor ending
1025 // ends the whole span join.
1026 break;
1027 }
1028
1029 // Find which slice finishes first.
1030 next_query = FindEarliestFinishQuery();
1031
1032 // If the current span is overlapping, just finish there to emit the current
1033 // slice.
1034 if (IsOverlappingSpan())
1035 break;
1036
1037 // Otherwise, step to the next row.
1038 base::Status status = next_query->Next();
1039 if (!status.ok())
1040 return status;
1041 }
1042 return base::OkStatus();
1043 }
1044
1045 SpanJoinOperatorModule::Query*
FindEarliestFinishQuery()1046 SpanJoinOperatorModule::Cursor::FindEarliestFinishQuery() {
1047 int64_t t1_part;
1048 int64_t t2_part;
1049
1050 switch (state->partitioning) {
1051 case PartitioningType::kMixedPartitioning: {
1052 // If either table is EOF, forward the other table to try and make
1053 // the partitions not match anymore.
1054 if (t1.IsEof())
1055 return &t2;
1056 if (t2.IsEof())
1057 return &t1;
1058
1059 // Otherwise, just make the partition equal from both tables.
1060 t1_part = last_mixed_partition_;
1061 t2_part = last_mixed_partition_;
1062 break;
1063 }
1064 case PartitioningType::kSamePartitioning: {
1065 // Get the partition values from the cursor.
1066 t1_part = t1.LastPartition();
1067 t2_part = t2.LastPartition();
1068 break;
1069 }
1070 case PartitioningType::kNoPartitioning: {
1071 t1_part = 0;
1072 t2_part = 0;
1073 break;
1074 }
1075 }
1076
1077 // Prefer to forward the earliest cursors based on the following
1078 // lexiographical ordering:
1079 // 1. partition
1080 // 2. end timestamp
1081 // 3. whether the slice is real or shadow (shadow < real)
1082 bool t1_less = std::make_tuple(t1_part, t1.AdjustedTsEnd(), t1.IsReal()) <
1083 std::make_tuple(t2_part, t2.AdjustedTsEnd(), t2.IsReal());
1084 return t1_less ? &t1 : &t2;
1085 }
1086
1087 } // namespace perfetto::trace_processor
1088