• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/perfetto_sql/intrinsics/operators/span_join_operator.h"
18 
19 #include <sqlite3.h>
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstring>
23 #include <limits>
24 #include <memory>
25 #include <optional>
26 #include <string>
27 #include <tuple>
28 #include <unordered_set>
29 #include <utility>
30 #include <vector>
31 
32 #include "perfetto/base/compiler.h"
33 #include "perfetto/base/logging.h"
34 #include "perfetto/base/status.h"
35 #include "perfetto/ext/base/string_splitter.h"
36 #include "perfetto/ext/base/string_utils.h"
37 #include "perfetto/trace_processor/basic_types.h"
38 #include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
39 #include "src/trace_processor/sqlite/bindings/sqlite_result.h"
40 #include "src/trace_processor/sqlite/module_lifecycle_manager.h"
41 #include "src/trace_processor/sqlite/sql_source.h"
42 #include "src/trace_processor/sqlite/sqlite_utils.h"
43 #include "src/trace_processor/tp_metatrace.h"
44 #include "src/trace_processor/util/status_macros.h"
45 
46 #include "protos/perfetto/trace_processor/metatrace_categories.pbzero.h"
47 
48 namespace perfetto::trace_processor {
49 
50 namespace {
51 
52 constexpr char kTsColumnName[] = "ts";
53 constexpr char kDurColumnName[] = "dur";
54 
IsRequiredColumn(const std::string & name)55 bool IsRequiredColumn(const std::string& name) {
56   return name == kTsColumnName || name == kDurColumnName;
57 }
58 
HasDuplicateColumns(const std::vector<std::pair<SqlValue::Type,std::string>> & t1,const std::vector<std::pair<SqlValue::Type,std::string>> & t2,const std::optional<std::string> & partition_col)59 std::optional<std::string> HasDuplicateColumns(
60     const std::vector<std::pair<SqlValue::Type, std::string>>& t1,
61     const std::vector<std::pair<SqlValue::Type, std::string>>& t2,
62     const std::optional<std::string>& partition_col) {
63   std::unordered_set<std::string> seen_names;
64   for (const auto& col : t1) {
65     if (IsRequiredColumn(col.second) || col.second == partition_col) {
66       continue;
67     }
68     if (seen_names.count(col.second) > 0) {
69       return col.second;
70     }
71     seen_names.insert(col.second);
72   }
73   for (const auto& col : t2) {
74     if (IsRequiredColumn(col.second) || col.second == partition_col) {
75       continue;
76     }
77     if (seen_names.count(col.second) > 0) {
78       return col.second;
79     }
80     seen_names.insert(col.second);
81   }
82   return std::nullopt;
83 }
84 
OpToString(int op)85 std::string OpToString(int op) {
86   switch (op) {
87     case SQLITE_INDEX_CONSTRAINT_EQ:
88       return "=";
89     case SQLITE_INDEX_CONSTRAINT_NE:
90       return "!=";
91     case SQLITE_INDEX_CONSTRAINT_GE:
92       return ">=";
93     case SQLITE_INDEX_CONSTRAINT_GT:
94       return ">";
95     case SQLITE_INDEX_CONSTRAINT_LE:
96       return "<=";
97     case SQLITE_INDEX_CONSTRAINT_LT:
98       return "<";
99     case SQLITE_INDEX_CONSTRAINT_LIKE:
100       return " like ";
101     case SQLITE_INDEX_CONSTRAINT_GLOB:
102       return " glob ";
103     case SQLITE_INDEX_CONSTRAINT_ISNULL:
104       // The "null" will be added below in EscapedSqliteValueAsString.
105       return " is ";
106     case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
107       // The "null" will be added below in EscapedSqliteValueAsString.
108       return " is not ";
109     default:
110       PERFETTO_FATAL("Operator to string conversion not impemented for %d", op);
111   }
112 }
113 
EscapedSqliteValueAsString(sqlite3_value * value)114 std::string EscapedSqliteValueAsString(sqlite3_value* value) {
115   switch (sqlite3_value_type(value)) {
116     case SQLITE_INTEGER:
117       return std::to_string(sqlite3_value_int64(value));
118     case SQLITE_FLOAT:
119       return std::to_string(sqlite3_value_double(value));
120     case SQLITE_TEXT: {
121       // If str itself contains a single quote, we need to escape it with
122       // another single quote.
123       const char* str =
124           reinterpret_cast<const char*>(sqlite3_value_text(value));
125       return "'" + base::ReplaceAll(str, "'", "''") + "'";
126     }
127     case SQLITE_NULL:
128       return " null";
129     default:
130       PERFETTO_FATAL("Unknown value type %d", sqlite3_value_type(value));
131   }
132 }
133 
134 }  // namespace
135 
PopulateColumnLocatorMap(uint32_t offset)136 void SpanJoinOperatorModule::State::PopulateColumnLocatorMap(uint32_t offset) {
137   for (uint32_t i = 0; i < t1_defn.columns().size(); ++i) {
138     if (i == t1_defn.ts_idx() || i == t1_defn.dur_idx() ||
139         i == t1_defn.partition_idx()) {
140       continue;
141     }
142     ColumnLocator* locator = &global_index_to_column_locator[offset++];
143     locator->defn = &t1_defn;
144     locator->col_index = i;
145   }
146   for (uint32_t i = 0; i < t2_defn.columns().size(); ++i) {
147     if (i == t2_defn.ts_idx() || i == t2_defn.dur_idx() ||
148         i == t2_defn.partition_idx()) {
149       continue;
150     }
151     ColumnLocator* locator = &global_index_to_column_locator[offset++];
152     locator->defn = &t2_defn;
153     locator->col_index = i;
154   }
155 }
156 
BestIndexStrForDefinition(const sqlite3_index_info * info,const TableDefinition & defn)157 std::string SpanJoinOperatorModule::State::BestIndexStrForDefinition(
158     const sqlite3_index_info* info,
159     const TableDefinition& defn) {
160   uint32_t count = 0;
161   std::string constraints;
162   for (int i = 0; i < info->nConstraint; i++) {
163     const auto& c = info->aConstraint[i];
164     if (!c.usable) {
165       continue;
166     }
167 
168     auto col_name = GetNameForGlobalColumnIndex(defn, c.iColumn);
169     if (col_name.empty()) {
170       continue;
171     }
172 
173     // Le constraints can be passed straight to the child tables as they won't
174     // affect the span join computation. Similarily, source_geq constraints
175     // explicitly request that they are passed as geq constraints to the source
176     // tables.
177     if (col_name == kTsColumnName && !sqlite::utils::IsOpLe(c.op) &&
178         c.op != kSourceGeqOpCode) {
179       continue;
180     }
181 
182     // Allow SQLite handle any constraints on duration apart from source_geq
183     // constraints.
184     if (col_name == kDurColumnName && c.op != kSourceGeqOpCode) {
185       continue;
186     }
187 
188     // If we're emitting shadow slices, don't propogate any constraints
189     // on this table as this will break the shadow slice computation.
190     if (defn.ShouldEmitPresentPartitionShadow()) {
191       continue;
192     }
193 
194     PERFETTO_DCHECK(info->aConstraintUsage[i].argvIndex > 0);
195     std::string argvIndex =
196         std::to_string(info->aConstraintUsage[i].argvIndex - 1);
197     std::string op = OpToString(
198         c.op == kSourceGeqOpCode ? SQLITE_INDEX_CONSTRAINT_GE : c.op);
199     constraints += "," + argvIndex + "," + "`" + col_name + "`" + op;
200     count++;
201   }
202   return std::to_string(count) + constraints;
203 }
204 
Create(PerfettoSqlEngine * engine,const TableDescriptor & desc,EmitShadowType emit_shadow_type,TableDefinition * defn)205 base::Status SpanJoinOperatorModule::TableDefinition::Create(
206     PerfettoSqlEngine* engine,
207     const TableDescriptor& desc,
208     EmitShadowType emit_shadow_type,
209     TableDefinition* defn) {
210   if (desc.partition_col == kTsColumnName ||
211       desc.partition_col == kDurColumnName) {
212     return base::ErrStatus(
213         "SPAN_JOIN: partition column cannot be any of {ts, dur} for table %s",
214         desc.name.c_str());
215   }
216 
217   std::vector<std::pair<SqlValue::Type, std::string>> cols;
218   RETURN_IF_ERROR(sqlite::utils::GetColumnsForTable(
219       engine->sqlite_engine()->db(), desc.name, cols));
220 
221   uint32_t required_columns_found = 0;
222   uint32_t ts_idx = std::numeric_limits<uint32_t>::max();
223   uint32_t dur_idx = std::numeric_limits<uint32_t>::max();
224   uint32_t partition_idx = std::numeric_limits<uint32_t>::max();
225   for (uint32_t i = 0; i < cols.size(); i++) {
226     auto col = cols[i];
227     if (IsRequiredColumn(col.second)) {
228       ++required_columns_found;
229       if (col.first != SqlValue::Type::kLong &&
230           col.first != SqlValue::Type::kNull) {
231         return base::ErrStatus(
232             "SPAN_JOIN: Invalid type for column '%s' in table %s",
233             col.second.c_str(), desc.name.c_str());
234       }
235     }
236     if (base::Contains(col.second, ",")) {
237       return base::ErrStatus("SPAN_JOIN: column '%s' cannot contain any ','",
238                              col.second.c_str());
239     }
240     if (base::Contains(col.second, ':')) {
241       return base::ErrStatus("SPAN_JOIN: column '%s' cannot contain any ':'",
242                              col.second.c_str());
243     }
244 
245     if (col.second == kTsColumnName) {
246       ts_idx = i;
247     } else if (col.second == kDurColumnName) {
248       dur_idx = i;
249     } else if (col.second == desc.partition_col) {
250       partition_idx = i;
251     }
252   }
253   if (required_columns_found != 2) {
254     return base::ErrStatus(
255         "SPAN_JOIN: Missing one of columns {ts, dur} in table %s",
256         desc.name.c_str());
257   }
258   if (desc.IsPartitioned() && partition_idx >= cols.size()) {
259     return base::ErrStatus(
260         "SPAN_JOIN: Missing partition column '%s' in table '%s'",
261         desc.partition_col.c_str(), desc.name.c_str());
262   }
263 
264   PERFETTO_DCHECK(ts_idx < cols.size());
265   PERFETTO_DCHECK(dur_idx < cols.size());
266 
267   *defn = TableDefinition(desc.name, desc.partition_col, std::move(cols),
268                           emit_shadow_type, ts_idx, dur_idx, partition_idx);
269   return base::OkStatus();
270 }
271 
272 std::string
CreateVtabCreateTableSection() const273 SpanJoinOperatorModule::TableDefinition::CreateVtabCreateTableSection() const {
274   std::string cols;
275   for (const auto& col : columns()) {
276     if (IsRequiredColumn(col.second) || col.second == partition_col()) {
277       continue;
278     }
279     if (col.first == SqlValue::Type::kNull) {
280       cols += col.second + ",";
281     } else {
282       cols += col.second + " " +
283               sqlite::utils::SqlValueTypeToString(col.first) + ",";
284     }
285   }
286   return cols;
287 }
288 
GetNameForGlobalColumnIndex(const TableDefinition & defn,int global_column)289 std::string SpanJoinOperatorModule::State::GetNameForGlobalColumnIndex(
290     const TableDefinition& defn,
291     int global_column) {
292   auto col_idx = static_cast<size_t>(global_column);
293   if (col_idx == Column::kTimestamp) {
294     return kTsColumnName;
295   }
296   if (col_idx == Column::kDuration) {
297     return kDurColumnName;
298   }
299   if (col_idx == Column::kPartition &&
300       partitioning != PartitioningType::kNoPartitioning) {
301     return defn.partition_col();
302   }
303 
304   const auto& locator = global_index_to_column_locator[col_idx];
305   if (locator.defn != &defn) {
306     return "";
307   }
308   return defn.columns()[locator.col_index].second;
309 }
310 
Query(SpanJoinOperatorModule::State * state,const TableDefinition * definition)311 SpanJoinOperatorModule::Query::Query(SpanJoinOperatorModule::State* state,
312                                      const TableDefinition* definition)
313     : defn_(definition), in_state_(state) {
314   PERFETTO_DCHECK(!defn_->IsPartitioned() ||
315                   defn_->partition_idx() < defn_->columns().size());
316 }
317 
318 SpanJoinOperatorModule::Query::~Query() = default;
319 
Initialize(std::string sql_query,InitialEofBehavior eof_behavior)320 base::Status SpanJoinOperatorModule::Query::Initialize(
321     std::string sql_query,
322     InitialEofBehavior eof_behavior) {
323   *this = Query(in_state_, definition());
324   sql_query_ = std::move(sql_query);
325   base::Status status = Rewind();
326   if (!status.ok())
327     return status;
328   if (eof_behavior == InitialEofBehavior::kTreatAsMissingPartitionShadow &&
329       IsEof()) {
330     state_ = State::kMissingPartitionShadow;
331   }
332   return status;
333 }
334 
Next()335 base::Status SpanJoinOperatorModule::Query::Next() {
336   RETURN_IF_ERROR(NextSliceState());
337   return FindNextValidSlice();
338 }
339 
IsValidSlice()340 bool SpanJoinOperatorModule::Query::IsValidSlice() {
341   // Disallow any single partition shadow slices if the definition doesn't allow
342   // them.
343   if (IsPresentPartitionShadow() && !defn_->ShouldEmitPresentPartitionShadow())
344     return false;
345 
346   // Disallow any missing partition shadow slices if the definition doesn't
347   // allow them.
348   if (IsMissingPartitionShadow() && !defn_->ShouldEmitMissingPartitionShadow())
349     return false;
350 
351   // Disallow any "empty" shadows; these are shadows which either have the same
352   // start and end time or missing-partition shadows which have the same start
353   // and end partition.
354   if (IsEmptyShadow())
355     return false;
356 
357   return true;
358 }
359 
FindNextValidSlice()360 base::Status SpanJoinOperatorModule::Query::FindNextValidSlice() {
361   // The basic idea of this function is that |NextSliceState()| always emits
362   // all possible slices (including shadows for any gaps inbetween the real
363   // slices) and we filter out the invalid slices (as defined by the table
364   // definition) using |IsValidSlice()|.
365   //
366   // This has proved to be a lot cleaner to implement than trying to choose
367   // when to emit and not emit shadows directly.
368   while (!IsEof() && !IsValidSlice()) {
369     RETURN_IF_ERROR(NextSliceState());
370   }
371   return base::OkStatus();
372 }
373 
NextSliceState()374 base::Status SpanJoinOperatorModule::Query::NextSliceState() {
375   switch (state_) {
376     case State::kReal: {
377       // Forward the cursor to figure out where the next slice should be.
378       RETURN_IF_ERROR(CursorNext());
379 
380       // Depending on the next slice, we can do two things here:
381       // 1. If the next slice is on the same partition, we can just emit a
382       //    single shadow until the start of the next slice.
383       // 2. If the next slice is on another partition or we hit eof, just emit
384       //    a shadow to the end of the whole partition.
385       bool shadow_to_end = cursor_eof_ || (defn_->IsPartitioned() &&
386                                            partition_ != CursorPartition());
387       state_ = State::kPresentPartitionShadow;
388       ts_ = AdjustedTsEnd();
389       ts_end_ =
390           shadow_to_end ? std::numeric_limits<int64_t>::max() : CursorTs();
391       return base::OkStatus();
392     }
393     case State::kPresentPartitionShadow: {
394       if (ts_end_ == std::numeric_limits<int64_t>::max()) {
395         // If the shadow is to the end of the slice, create a missing partition
396         // shadow to the start of the partition of the next slice or to the max
397         // partition if we hit eof.
398         state_ = State::kMissingPartitionShadow;
399         ts_ = 0;
400         ts_end_ = std::numeric_limits<int64_t>::max();
401 
402         missing_partition_start_ = partition_ + 1;
403         missing_partition_end_ = cursor_eof_
404                                      ? std::numeric_limits<int64_t>::max()
405                                      : CursorPartition();
406       } else {
407         // If the shadow is not to the end, we must have another slice on the
408         // current partition.
409         state_ = State::kReal;
410         ts_ = CursorTs();
411         ts_end_ = ts_ + CursorDur();
412 
413         PERFETTO_DCHECK(!defn_->IsPartitioned() ||
414                         partition_ == CursorPartition());
415       }
416       return base::OkStatus();
417     }
418     case State::kMissingPartitionShadow: {
419       if (missing_partition_end_ == std::numeric_limits<int64_t>::max()) {
420         PERFETTO_DCHECK(cursor_eof_);
421 
422         // If we have a missing partition to the max partition, we must have hit
423         // eof.
424         state_ = State::kEof;
425       } else {
426         PERFETTO_DCHECK(!defn_->IsPartitioned() ||
427                         CursorPartition() == missing_partition_end_);
428 
429         // Otherwise, setup a single partition slice on the end partition to the
430         // start of the next slice.
431         state_ = State::kPresentPartitionShadow;
432         ts_ = 0;
433         ts_end_ = CursorTs();
434         partition_ = missing_partition_end_;
435       }
436       return base::OkStatus();
437     }
438     case State::kEof: {
439       PERFETTO_DFATAL("Called Next when EOF");
440       return base::ErrStatus("Called Next when EOF");
441     }
442   }
443   PERFETTO_FATAL("For GCC");
444 }
445 
Rewind()446 base::Status SpanJoinOperatorModule::Query::Rewind() {
447   auto res = in_state_->engine->sqlite_engine()->PrepareStatement(
448       SqlSource::FromTraceProcessorImplementation(sql_query_));
449   cursor_eof_ = false;
450   RETURN_IF_ERROR(res.status());
451   stmt_ = std::move(res);
452 
453   RETURN_IF_ERROR(CursorNext());
454 
455   // Setup the first slice as a missing partition shadow from the lowest
456   // partition until the first slice partition. We will handle finding the real
457   // slice in |FindNextValidSlice()|.
458   state_ = State::kMissingPartitionShadow;
459   ts_ = 0;
460   ts_end_ = std::numeric_limits<int64_t>::max();
461   missing_partition_start_ = std::numeric_limits<int64_t>::min();
462 
463   if (cursor_eof_) {
464     missing_partition_end_ = std::numeric_limits<int64_t>::max();
465   } else if (defn_->IsPartitioned()) {
466     missing_partition_end_ = CursorPartition();
467   } else {
468     missing_partition_end_ = std::numeric_limits<int64_t>::min();
469   }
470 
471   // Actually compute the first valid slice.
472   return FindNextValidSlice();
473 }
474 
CursorNext()475 base::Status SpanJoinOperatorModule::Query::CursorNext() {
476   if (defn_->IsPartitioned()) {
477     auto partition_idx = static_cast<int>(defn_->partition_idx());
478     // Fastforward through any rows with null partition keys.
479     int row_type;
480     do {
481       cursor_eof_ = !stmt_->Step();
482       RETURN_IF_ERROR(stmt_->status());
483       row_type = sqlite3_column_type(stmt_->sqlite_stmt(), partition_idx);
484     } while (!cursor_eof_ && row_type == SQLITE_NULL);
485 
486     if (!cursor_eof_ && row_type != SQLITE_INTEGER) {
487       return base::ErrStatus("SPAN_JOIN: partition is not an INT column");
488     }
489   } else {
490     cursor_eof_ = !stmt_->Step();
491   }
492   return base::OkStatus();
493 }
494 
ReportSqliteResult(sqlite3_context * context,size_t index)495 void SpanJoinOperatorModule::Query::ReportSqliteResult(sqlite3_context* context,
496                                                        size_t index) {
497   if (state_ != State::kReal) {
498     return sqlite::result::Null(context);
499   }
500 
501   sqlite3_stmt* stmt = stmt_->sqlite_stmt();
502   int idx = static_cast<int>(index);
503   switch (sqlite3_column_type(stmt, idx)) {
504     case SQLITE_INTEGER:
505       return sqlite::result::Long(context, sqlite3_column_int64(stmt, idx));
506     case SQLITE_FLOAT:
507       return sqlite::result::Double(context, sqlite3_column_double(stmt, idx));
508     case SQLITE_TEXT: {
509       // TODO(lalitm): note for future optimizations: if we knew the addresses
510       // of the string intern pool, we could check if the string returned here
511       // comes from the pool, and pass it as non-transient.
512       const auto* ptr =
513           reinterpret_cast<const char*>(sqlite3_column_text(stmt, idx));
514       return sqlite::result::TransientString(context, ptr);
515     }
516     case SQLITE_BLOB: {
517       return sqlite::result::TransientBytes(context,
518                                             sqlite3_column_blob(stmt, idx),
519                                             sqlite3_column_bytes(stmt, idx));
520     }
521   }
522 }
523 
TableDefinition(std::string name,std::string partition_col,std::vector<std::pair<SqlValue::Type,std::string>> cols,EmitShadowType emit_shadow_type,uint32_t ts_idx,uint32_t dur_idx,uint32_t partition_idx)524 SpanJoinOperatorModule::TableDefinition::TableDefinition(
525     std::string name,
526     std::string partition_col,
527     std::vector<std::pair<SqlValue::Type, std::string>> cols,
528     EmitShadowType emit_shadow_type,
529     uint32_t ts_idx,
530     uint32_t dur_idx,
531     uint32_t partition_idx)
532     : emit_shadow_type_(emit_shadow_type),
533       name_(std::move(name)),
534       partition_col_(std::move(partition_col)),
535       cols_(std::move(cols)),
536       ts_idx_(ts_idx),
537       dur_idx_(dur_idx),
538       partition_idx_(partition_idx) {}
539 
Parse(const std::string & raw_descriptor,TableDescriptor * descriptor)540 base::Status SpanJoinOperatorModule::TableDescriptor::Parse(
541     const std::string& raw_descriptor,
542     TableDescriptor* descriptor) {
543   // Descriptors have one of the following forms:
544   // table_name [PARTITIONED column_name]
545 
546   // Find the table name.
547   base::StringSplitter splitter(raw_descriptor, ' ');
548   if (!splitter.Next())
549     return base::ErrStatus("SPAN_JOIN: Missing table name");
550 
551   descriptor->name = splitter.cur_token();
552   if (!splitter.Next())
553     return base::OkStatus();
554 
555   if (!base::CaseInsensitiveEqual(splitter.cur_token(), "PARTITIONED"))
556     return base::ErrStatus("SPAN_JOIN: Invalid token");
557 
558   if (!splitter.Next())
559     return base::ErrStatus("SPAN_JOIN: Missing partitioning column");
560 
561   descriptor->partition_col = splitter.cur_token();
562   return base::OkStatus();
563 }
564 
CreateSqlQuery(base::StringSplitter & idx,sqlite3_value ** argv) const565 std::string SpanJoinOperatorModule::TableDefinition::CreateSqlQuery(
566     base::StringSplitter& idx,
567     sqlite3_value** argv) const {
568   std::vector<std::string> col_names;
569   for (const auto& c : columns()) {
570     col_names.push_back("`" + c.second + "`");
571   }
572 
573   PERFETTO_CHECK(idx.Next());
574   std::optional<uint32_t> cs_count = base::StringToUInt32(idx.cur_token());
575   PERFETTO_CHECK(cs_count);
576   std::vector<std::string> cs;
577   cs.reserve(*cs_count);
578   for (uint32_t i = 0; i < *cs_count; ++i) {
579     PERFETTO_CHECK(idx.Next());
580     std::optional<uint32_t> argv_idx = base::StringToUInt32(idx.cur_token());
581     PERFETTO_CHECK(argv_idx);
582 
583     PERFETTO_CHECK(idx.Next());
584     cs.emplace_back(idx.cur_token() +
585                     EscapedSqliteValueAsString(argv[*argv_idx]));
586   }
587 
588   std::string sql = "SELECT " + base::Join(col_names, ", ");
589   sql += " FROM " + name();
590   if (!cs.empty()) {
591     sql += " WHERE " + base::Join(cs, " AND ");
592   }
593   sql += " ORDER BY ";
594   sql += IsPartitioned() ? base::Join({"`" + partition_col() + "`", "ts"}, ", ")
595                          : "ts";
596   sql += ";";
597   PERFETTO_DLOG("%s", sql.c_str());
598   return sql;
599 }
600 
Create(sqlite3 * db,void * ctx,int argc,const char * const * argv,sqlite3_vtab ** vtab,char ** pzErr)601 int SpanJoinOperatorModule::Create(sqlite3* db,
602                                    void* ctx,
603                                    int argc,
604                                    const char* const* argv,
605                                    sqlite3_vtab** vtab,
606                                    char** pzErr) {
607   // argv[0] - argv[2] are SQLite populated fields which are always present.
608   if (argc < 5) {
609     *pzErr = sqlite3_mprintf("SPAN_JOIN: expected at least 2 args");
610     return SQLITE_ERROR;
611   }
612 
613   auto* context = GetContext(ctx);
614   auto state = std::make_unique<State>();
615   state->engine = context->engine;
616   state->module_name = argv[0];
617 
618   TableDescriptor t1_desc;
619   auto status = TableDescriptor::Parse(
620       std::string(reinterpret_cast<const char*>(argv[3])), &t1_desc);
621   if (!status.ok()) {
622     *pzErr = sqlite3_mprintf("%s", status.c_message());
623     return SQLITE_ERROR;
624   }
625 
626   TableDescriptor t2_desc;
627   status = TableDescriptor::Parse(
628       std::string(reinterpret_cast<const char*>(argv[4])), &t2_desc);
629   if (!status.ok()) {
630     *pzErr = sqlite3_mprintf("%s", status.c_message());
631     return SQLITE_ERROR;
632   }
633 
634   // Check that the partition columns match between the two tables.
635   if (t1_desc.partition_col == t2_desc.partition_col) {
636     state->partitioning = t1_desc.IsPartitioned()
637                               ? PartitioningType::kSamePartitioning
638                               : PartitioningType::kNoPartitioning;
639   } else if (t1_desc.IsPartitioned() && t2_desc.IsPartitioned()) {
640     *pzErr = sqlite3_mprintf(
641         "SPAN_JOIN: mismatching partitions between the two tables; "
642         "(partition %s in table %s, partition %s in table %s)",
643         t1_desc.partition_col.c_str(), t1_desc.name.c_str(),
644         t2_desc.partition_col.c_str(), t2_desc.name.c_str());
645     return SQLITE_ERROR;
646   } else {
647     state->partitioning = PartitioningType::kMixedPartitioning;
648   }
649 
650   bool t1_part_mixed =
651       t1_desc.IsPartitioned() &&
652       state->partitioning == PartitioningType::kMixedPartitioning;
653   bool t2_part_mixed =
654       t2_desc.IsPartitioned() &&
655       state->partitioning == PartitioningType::kMixedPartitioning;
656 
657   EmitShadowType t1_shadow_type;
658   if (state->IsOuterJoin()) {
659     if (t1_part_mixed ||
660         state->partitioning == PartitioningType::kNoPartitioning) {
661       t1_shadow_type = EmitShadowType::kPresentPartitionOnly;
662     } else {
663       t1_shadow_type = EmitShadowType::kAll;
664     }
665   } else {
666     t1_shadow_type = EmitShadowType::kNone;
667   }
668   status = TableDefinition::Create(state->engine, t1_desc, t1_shadow_type,
669                                    &state->t1_defn);
670   if (!status.ok()) {
671     *pzErr = sqlite3_mprintf("%s", status.c_message());
672     return SQLITE_ERROR;
673   }
674 
675   EmitShadowType t2_shadow_type;
676   if (state->IsOuterJoin() || state->IsLeftJoin()) {
677     if (t2_part_mixed ||
678         state->partitioning == PartitioningType::kNoPartitioning) {
679       t2_shadow_type = EmitShadowType::kPresentPartitionOnly;
680     } else {
681       t2_shadow_type = EmitShadowType::kAll;
682     }
683   } else {
684     t2_shadow_type = EmitShadowType::kNone;
685   }
686   status = TableDefinition::Create(state->engine, t2_desc, t2_shadow_type,
687                                    &state->t2_defn);
688   if (!status.ok()) {
689     *pzErr = sqlite3_mprintf("%s", status.c_message());
690     return SQLITE_ERROR;
691   }
692 
693   if (auto dupe = HasDuplicateColumns(
694           state->t1_defn.columns(), state->t2_defn.columns(),
695           state->partitioning == PartitioningType::kNoPartitioning
696               ? std::nullopt
697               : std::make_optional(state->partition_col()))) {
698     *pzErr = sqlite3_mprintf(
699         "SPAN_JOIN: column %s present in both tables %s and %s", dupe->c_str(),
700         state->t1_defn.name().c_str(), state->t2_defn.name().c_str());
701     return SQLITE_ERROR;
702   }
703 
704   // Create the map from column index to the column in the child sub-queries.
705   state->PopulateColumnLocatorMap(
706       state->partitioning == PartitioningType::kNoPartitioning ? 2 : 3);
707 
708   std::string primary_key = "ts";
709   std::string partition;
710   if (state->partitioning != PartitioningType::kNoPartitioning) {
711     partition = state->partition_col() + " BIGINT,";
712     primary_key += ", " + state->partition_col();
713   }
714   std::string t1_section = state->t1_defn.CreateVtabCreateTableSection();
715   std::string t2_section = state->t2_defn.CreateVtabCreateTableSection();
716   static constexpr char kStmt[] = R"(
717     CREATE TABLE x(
718       ts BIGINT,
719       dur BIGINT,
720       %s
721       %s
722       %s
723       PRIMARY KEY(%s)
724     )
725   )";
726   base::StackString<1024> create_table_str(
727       kStmt, partition.c_str(), t1_section.c_str(), t2_section.c_str(),
728       primary_key.c_str());
729   PERFETTO_DLOG("SPAN_JOIN: create table statement: %s",
730                 create_table_str.c_str());
731   state->create_table_stmt = create_table_str.ToStdString();
732   if (int ret = sqlite3_declare_vtab(db, create_table_str.c_str());
733       ret != SQLITE_OK) {
734     return ret;
735   }
736 
737   std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
738   res->state = context->manager.OnCreate(argv, std::move(state));
739   *vtab = res.release();
740   return SQLITE_OK;
741 }
742 
Destroy(sqlite3_vtab * vtab)743 int SpanJoinOperatorModule::Destroy(sqlite3_vtab* vtab) {
744   std::unique_ptr<Vtab> tab(GetVtab(vtab));
745   sqlite::ModuleStateManager<SpanJoinOperatorModule>::OnDestroy(tab->state);
746   return SQLITE_OK;
747 }
748 
Connect(sqlite3 * db,void * ctx,int,const char * const * argv,sqlite3_vtab ** vtab,char **)749 int SpanJoinOperatorModule::Connect(sqlite3* db,
750                                     void* ctx,
751                                     int,
752                                     const char* const* argv,
753                                     sqlite3_vtab** vtab,
754                                     char**) {
755   auto* context = GetContext(ctx);
756   std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
757   res->state = context->manager.OnConnect(argv);
758 
759   auto* state =
760       sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(res->state);
761   if (int ret = sqlite3_declare_vtab(db, state->create_table_stmt.c_str());
762       ret != SQLITE_OK) {
763     return ret;
764   }
765   *vtab = res.release();
766   return SQLITE_OK;
767 }
768 
Disconnect(sqlite3_vtab * vtab)769 int SpanJoinOperatorModule::Disconnect(sqlite3_vtab* vtab) {
770   std::unique_ptr<Vtab> tab(GetVtab(vtab));
771   sqlite::ModuleStateManager<SpanJoinOperatorModule>::OnDisconnect(tab->state);
772   return SQLITE_OK;
773 }
774 
BestIndex(sqlite3_vtab * tab,sqlite3_index_info * info)775 int SpanJoinOperatorModule::BestIndex(sqlite3_vtab* tab,
776                                       sqlite3_index_info* info) {
777   int argvIndex = 1;
778   for (int i = 0; i < info->nConstraint; ++i) {
779     if (!info->aConstraint[i].usable) {
780       continue;
781     }
782     info->aConstraintUsage[i].argvIndex = argvIndex++;
783   }
784 
785   Vtab* table = GetVtab(tab);
786   State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
787       table->state);
788   if (state->partitioning == PartitioningType::kNoPartitioning) {
789     // If both tables are not partitioned and we have a single order by on ts,
790     // we return data in the correct order.
791     info->orderByConsumed = info->nOrderBy == 1 &&
792                             info->aOrderBy[0].iColumn == Column::kTimestamp &&
793                             !info->aOrderBy[0].desc;
794   } else {
795     // If one of the tables is partitioned, and we have an order by on the
796     // partition column followed (optionally) by an order by on timestamp, we
797     // return data in the correct order.
798     bool is_first_ob_partition =
799         info->nOrderBy > 0 && info->aOrderBy[0].iColumn == Column::kPartition &&
800         !info->aOrderBy[0].desc;
801     bool is_second_ob_ts = info->nOrderBy >= 2 &&
802                            info->aOrderBy[1].iColumn == Column::kTimestamp &&
803                            !info->aOrderBy[1].desc;
804     info->orderByConsumed =
805         (info->nOrderBy == 1 && is_first_ob_partition) ||
806         (info->nOrderBy == 2 && is_first_ob_partition && is_second_ob_ts);
807   }
808 
809   for (int i = 0; i < info->nConstraint; ++i) {
810     if (info->aConstraint[i].op == kSourceGeqOpCode) {
811       info->aConstraintUsage[i].omit = true;
812     }
813   }
814 
815   std::string t1 = state->BestIndexStrForDefinition(info, state->t1_defn);
816   std::string t2 = state->BestIndexStrForDefinition(info, state->t2_defn);
817   info->idxStr = sqlite3_mprintf("%s,%s", t1.c_str(), t2.c_str());
818   info->needToFreeIdxStr = true;
819 
820   return SQLITE_OK;
821 }
822 
Open(sqlite3_vtab * tab,sqlite3_vtab_cursor ** cursor)823 int SpanJoinOperatorModule::Open(sqlite3_vtab* tab,
824                                  sqlite3_vtab_cursor** cursor) {
825   State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
826       GetVtab(tab)->state);
827   std::unique_ptr<Cursor> c = std::make_unique<Cursor>(state);
828   *cursor = c.release();
829   return SQLITE_OK;
830 }
831 
Close(sqlite3_vtab_cursor * cursor)832 int SpanJoinOperatorModule::Close(sqlite3_vtab_cursor* cursor) {
833   std::unique_ptr<Cursor> c(GetCursor(cursor));
834   return SQLITE_OK;
835 }
836 
Filter(sqlite3_vtab_cursor * cursor,int,const char * idxStr,int argc,sqlite3_value ** argv)837 int SpanJoinOperatorModule::Filter(sqlite3_vtab_cursor* cursor,
838                                    int,
839                                    const char* idxStr,
840                                    int argc,
841                                    sqlite3_value** argv) {
842   PERFETTO_TP_TRACE(metatrace::Category::QUERY_DETAILED, "SPAN_JOIN_XFILTER");
843   PERFETTO_DLOG("SpanJoin::Filter: argc=%d, idxStr=%s", argc, idxStr);
844 
845   Cursor* c = GetCursor(cursor);
846   Vtab* table = GetVtab(cursor->pVtab);
847   State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
848       table->state);
849 
850   base::StringSplitter splitter(std::string(idxStr), ',');
851   bool t1_partitioned_mixed =
852       c->t1.definition()->IsPartitioned() &&
853       state->partitioning == PartitioningType::kMixedPartitioning;
854   auto t1_eof = state->IsOuterJoin() && !t1_partitioned_mixed
855                     ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
856                     : Query::InitialEofBehavior::kTreatAsEof;
857   base::Status status =
858       c->t1.Initialize(state->t1_defn.CreateSqlQuery(splitter, argv), t1_eof);
859   if (!status.ok()) {
860     return sqlite::utils::SetError(table, status.c_message());
861   }
862 
863   bool t2_partitioned_mixed =
864       c->t2.definition()->IsPartitioned() &&
865       state->partitioning == PartitioningType::kMixedPartitioning;
866   auto t2_eof =
867       (state->IsLeftJoin() || state->IsOuterJoin()) && !t2_partitioned_mixed
868           ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
869           : Query::InitialEofBehavior::kTreatAsEof;
870   status =
871       c->t2.Initialize(state->t2_defn.CreateSqlQuery(splitter, argv), t2_eof);
872   if (!status.ok()) {
873     return sqlite::utils::SetError(table, status.c_message());
874   }
875 
876   status = c->FindOverlappingSpan();
877   if (!status.ok()) {
878     return sqlite::utils::SetError(table, status.c_message());
879   }
880   return SQLITE_OK;
881 }
882 
Next(sqlite3_vtab_cursor * cursor)883 int SpanJoinOperatorModule::Next(sqlite3_vtab_cursor* cursor) {
884   Cursor* c = GetCursor(cursor);
885   Vtab* table = GetVtab(cursor->pVtab);
886   base::Status status = c->next_query->Next();
887   if (!status.ok()) {
888     return sqlite::utils::SetError(table, status.c_message());
889   }
890   status = c->FindOverlappingSpan();
891   if (!status.ok()) {
892     return sqlite::utils::SetError(table, status.c_message());
893   }
894   return SQLITE_OK;
895 }
896 
Eof(sqlite3_vtab_cursor * cur)897 int SpanJoinOperatorModule::Eof(sqlite3_vtab_cursor* cur) {
898   Cursor* c = GetCursor(cur);
899   return c->t1.IsEof() || c->t2.IsEof();
900 }
901 
Column(sqlite3_vtab_cursor * cursor,sqlite3_context * context,int N)902 int SpanJoinOperatorModule::Column(sqlite3_vtab_cursor* cursor,
903                                    sqlite3_context* context,
904                                    int N) {
905   Cursor* c = GetCursor(cursor);
906   Vtab* table = GetVtab(cursor->pVtab);
907   State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
908       table->state);
909 
910   PERFETTO_DCHECK(c->t1.IsReal() || c->t2.IsReal());
911 
912   switch (N) {
913     case Column::kTimestamp: {
914       auto max_ts = std::max(c->t1.ts(), c->t2.ts());
915       sqlite::result::Long(context, static_cast<sqlite3_int64>(max_ts));
916       break;
917     }
918     case Column::kDuration: {
919       auto max_start = std::max(c->t1.ts(), c->t2.ts());
920       auto min_end = std::min(c->t1.raw_ts_end(), c->t2.raw_ts_end());
921       auto dur = min_end - max_start;
922       sqlite::result::Long(context, static_cast<sqlite3_int64>(dur));
923       break;
924     }
925     case Column::kPartition: {
926       if (state->partitioning != PartitioningType::kNoPartitioning) {
927         int64_t partition;
928         if (state->partitioning == PartitioningType::kMixedPartitioning) {
929           partition = c->last_mixed_partition_;
930         } else {
931           partition = c->t1.IsReal() ? c->t1.partition() : c->t2.partition();
932         }
933         sqlite::result::Long(context, static_cast<sqlite3_int64>(partition));
934         break;
935       }
936       PERFETTO_FALLTHROUGH;
937     }
938     default: {
939       const auto* locator =
940           state->global_index_to_column_locator.Find(static_cast<size_t>(N));
941       PERFETTO_CHECK(locator);
942       if (locator->defn == c->t1.definition()) {
943         c->t1.ReportSqliteResult(context, locator->col_index);
944       } else {
945         c->t2.ReportSqliteResult(context, locator->col_index);
946       }
947     }
948   }
949   return SQLITE_OK;
950 }
951 
Rowid(sqlite3_vtab_cursor *,sqlite_int64 *)952 int SpanJoinOperatorModule::Rowid(sqlite3_vtab_cursor*, sqlite_int64*) {
953   return SQLITE_ERROR;
954 }
955 
FindFunction(sqlite3_vtab *,int,const char * name,FindFunctionFn ** fn,void **)956 int SpanJoinOperatorModule::FindFunction(sqlite3_vtab*,
957                                          int,
958                                          const char* name,
959                                          FindFunctionFn** fn,
960                                          void**) {
961   if (base::CaseInsensitiveEqual(name, "source_geq")) {
962     *fn = [](sqlite3_context* ctx, int, sqlite3_value**) {
963       return sqlite::result::Error(ctx, "Should not be called.");
964     };
965     return kSourceGeqOpCode;
966   }
967   return 0;
968 }
969 
IsOverlappingSpan() const970 bool SpanJoinOperatorModule::Cursor::IsOverlappingSpan() const {
971   // If either of the tables are eof, then we cannot possibly have an
972   // overlapping span.
973   if (t1.IsEof() || t2.IsEof())
974     return false;
975 
976   // One of the tables always needs to have a real span to have a valid
977   // overlapping span.
978   if (!t1.IsReal() && !t2.IsReal())
979     return false;
980 
981   using PartitioningType = PartitioningType;
982   if (state->partitioning == PartitioningType::kSamePartitioning) {
983     // If both tables are partitioned, then ensure that the partitions overlap.
984     bool partition_in_bounds = (t1.FirstPartition() >= t2.FirstPartition() &&
985                                 t1.FirstPartition() <= t2.LastPartition()) ||
986                                (t2.FirstPartition() >= t1.FirstPartition() &&
987                                 t2.FirstPartition() <= t1.LastPartition());
988     if (!partition_in_bounds)
989       return false;
990   }
991 
992   // We consider all slices to be [start, end) - that is the range of
993   // timestamps has an open interval at the start but a closed interval
994   // at the end. (with the exception of dur == -1 which we treat as if
995   // end == start for the purpose of this function).
996   return (t1.ts() == t2.ts() && t1.IsReal() && t2.IsReal()) ||
997          (t1.ts() >= t2.ts() && t1.ts() < t2.AdjustedTsEnd()) ||
998          (t2.ts() >= t1.ts() && t2.ts() < t1.AdjustedTsEnd());
999 }
1000 
FindOverlappingSpan()1001 base::Status SpanJoinOperatorModule::Cursor::FindOverlappingSpan() {
1002   // We loop until we find a slice which overlaps from the two tables.
1003   while (true) {
1004     if (state->partitioning == PartitioningType::kMixedPartitioning) {
1005       // If we have a mixed partition setup, we need to have special checks
1006       // for eof and to reset the unpartitioned cursor every time the partition
1007       // changes in the partitioned table.
1008       auto* partitioned = t1.definition()->IsPartitioned() ? &t1 : &t2;
1009       auto* unpartitioned = t1.definition()->IsPartitioned() ? &t2 : &t1;
1010 
1011       // If the partitioned table reaches eof, then we are really done.
1012       if (partitioned->IsEof())
1013         break;
1014 
1015       // If the partition has changed from the previous one, reset the cursor
1016       // and keep a lot of the new partition.
1017       if (last_mixed_partition_ != partitioned->partition()) {
1018         base::Status status = unpartitioned->Rewind();
1019         if (!status.ok())
1020           return status;
1021         last_mixed_partition_ = partitioned->partition();
1022       }
1023     } else if (t1.IsEof() || t2.IsEof()) {
1024       // For both no partition and same partition cases, either cursor ending
1025       // ends the whole span join.
1026       break;
1027     }
1028 
1029     // Find which slice finishes first.
1030     next_query = FindEarliestFinishQuery();
1031 
1032     // If the current span is overlapping, just finish there to emit the current
1033     // slice.
1034     if (IsOverlappingSpan())
1035       break;
1036 
1037     // Otherwise, step to the next row.
1038     base::Status status = next_query->Next();
1039     if (!status.ok())
1040       return status;
1041   }
1042   return base::OkStatus();
1043 }
1044 
1045 SpanJoinOperatorModule::Query*
FindEarliestFinishQuery()1046 SpanJoinOperatorModule::Cursor::FindEarliestFinishQuery() {
1047   int64_t t1_part;
1048   int64_t t2_part;
1049 
1050   switch (state->partitioning) {
1051     case PartitioningType::kMixedPartitioning: {
1052       // If either table is EOF, forward the other table to try and make
1053       // the partitions not match anymore.
1054       if (t1.IsEof())
1055         return &t2;
1056       if (t2.IsEof())
1057         return &t1;
1058 
1059       // Otherwise, just make the partition equal from both tables.
1060       t1_part = last_mixed_partition_;
1061       t2_part = last_mixed_partition_;
1062       break;
1063     }
1064     case PartitioningType::kSamePartitioning: {
1065       // Get the partition values from the cursor.
1066       t1_part = t1.LastPartition();
1067       t2_part = t2.LastPartition();
1068       break;
1069     }
1070     case PartitioningType::kNoPartitioning: {
1071       t1_part = 0;
1072       t2_part = 0;
1073       break;
1074     }
1075   }
1076 
1077   // Prefer to forward the earliest cursors based on the following
1078   // lexiographical ordering:
1079   // 1. partition
1080   // 2. end timestamp
1081   // 3. whether the slice is real or shadow (shadow < real)
1082   bool t1_less = std::make_tuple(t1_part, t1.AdjustedTsEnd(), t1.IsReal()) <
1083                  std::make_tuple(t2_part, t2.AdjustedTsEnd(), t2.IsReal());
1084   return t1_less ? &t1 : &t2;
1085 }
1086 
1087 }  // namespace perfetto::trace_processor
1088