• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/perfetto_sql/intrinsics/operators/span_join_operator.h"
18 
19 #include <sqlite3.h>
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstring>
23 #include <limits>
24 #include <memory>
25 #include <optional>
26 #include <string>
27 #include <tuple>
28 #include <unordered_set>
29 #include <utility>
30 #include <vector>
31 
32 #include "perfetto/base/compiler.h"
33 #include "perfetto/base/logging.h"
34 #include "perfetto/base/status.h"
35 #include "perfetto/ext/base/string_splitter.h"
36 #include "perfetto/ext/base/string_utils.h"
37 #include "perfetto/trace_processor/basic_types.h"
38 #include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
39 #include "src/trace_processor/sqlite/bindings/sqlite_result.h"
40 #include "src/trace_processor/sqlite/module_lifecycle_manager.h"
41 #include "src/trace_processor/sqlite/sql_source.h"
42 #include "src/trace_processor/sqlite/sqlite_utils.h"
43 #include "src/trace_processor/tp_metatrace.h"
44 #include "src/trace_processor/util/status_macros.h"
45 
46 #include "protos/perfetto/trace_processor/metatrace_categories.pbzero.h"
47 
48 namespace perfetto::trace_processor {
49 
50 namespace {
51 
52 constexpr char kTsColumnName[] = "ts";
53 constexpr char kDurColumnName[] = "dur";
54 
IsRequiredColumn(const std::string & name)55 bool IsRequiredColumn(const std::string& name) {
56   return name == kTsColumnName;
57 }
58 
IsSpecialColumn(const std::string & name,const std::optional<std::string> & partition_col)59 bool IsSpecialColumn(const std::string& name,
60                      const std::optional<std::string>& partition_col) {
61   return name == kTsColumnName || name == kDurColumnName ||
62          name == partition_col;
63 }
64 
HasDuplicateColumns(const std::vector<std::pair<SqlValue::Type,std::string>> & t1,const std::vector<std::pair<SqlValue::Type,std::string>> & t2,const std::optional<std::string> & partition_col)65 std::optional<std::string> HasDuplicateColumns(
66     const std::vector<std::pair<SqlValue::Type, std::string>>& t1,
67     const std::vector<std::pair<SqlValue::Type, std::string>>& t2,
68     const std::optional<std::string>& partition_col) {
69   std::unordered_set<std::string> seen_names;
70   for (const auto& col : t1) {
71     if (IsSpecialColumn(col.second, partition_col)) {
72       continue;
73     }
74     if (seen_names.count(col.second) > 0) {
75       return col.second;
76     }
77     seen_names.insert(col.second);
78   }
79   for (const auto& col : t2) {
80     if (IsSpecialColumn(col.second, partition_col)) {
81       continue;
82     }
83     if (seen_names.count(col.second) > 0) {
84       return col.second;
85     }
86     seen_names.insert(col.second);
87   }
88   return std::nullopt;
89 }
90 
OpToString(int op)91 std::string OpToString(int op) {
92   switch (op) {
93     case SQLITE_INDEX_CONSTRAINT_EQ:
94       return "=";
95     case SQLITE_INDEX_CONSTRAINT_NE:
96       return "!=";
97     case SQLITE_INDEX_CONSTRAINT_GE:
98       return ">=";
99     case SQLITE_INDEX_CONSTRAINT_GT:
100       return ">";
101     case SQLITE_INDEX_CONSTRAINT_LE:
102       return "<=";
103     case SQLITE_INDEX_CONSTRAINT_LT:
104       return "<";
105     case SQLITE_INDEX_CONSTRAINT_LIKE:
106       return " like ";
107     case SQLITE_INDEX_CONSTRAINT_GLOB:
108       return " glob ";
109     case SQLITE_INDEX_CONSTRAINT_ISNULL:
110       // The "null" will be added below in EscapedSqliteValueAsString.
111       return " is ";
112     case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
113       // The "null" will be added below in EscapedSqliteValueAsString.
114       return " is not ";
115     default:
116       PERFETTO_FATAL("Operator to string conversion not impemented for %d", op);
117   }
118 }
119 
EscapedSqliteValueAsString(sqlite3_value * value)120 std::string EscapedSqliteValueAsString(sqlite3_value* value) {
121   switch (sqlite3_value_type(value)) {
122     case SQLITE_INTEGER:
123       return std::to_string(sqlite3_value_int64(value));
124     case SQLITE_FLOAT:
125       return std::to_string(sqlite3_value_double(value));
126     case SQLITE_TEXT: {
127       // If str itself contains a single quote, we need to escape it with
128       // another single quote.
129       const char* str =
130           reinterpret_cast<const char*>(sqlite3_value_text(value));
131       return "'" + base::ReplaceAll(str, "'", "''") + "'";
132     }
133     case SQLITE_NULL:
134       return " null";
135     default:
136       PERFETTO_FATAL("Unknown value type %d", sqlite3_value_type(value));
137   }
138 }
139 
140 }  // namespace
141 
PopulateColumnLocatorMap(uint32_t offset)142 void SpanJoinOperatorModule::State::PopulateColumnLocatorMap(uint32_t offset) {
143   for (uint32_t i = 0; i < t1_defn.columns().size(); ++i) {
144     if (i == t1_defn.ts_idx() || i == t1_defn.dur_idx() ||
145         i == t1_defn.partition_idx()) {
146       continue;
147     }
148     ColumnLocator* locator = &global_index_to_column_locator[offset++];
149     locator->defn = &t1_defn;
150     locator->col_index = i;
151   }
152   for (uint32_t i = 0; i < t2_defn.columns().size(); ++i) {
153     if (i == t2_defn.ts_idx() || i == t2_defn.dur_idx() ||
154         i == t2_defn.partition_idx()) {
155       continue;
156     }
157     ColumnLocator* locator = &global_index_to_column_locator[offset++];
158     locator->defn = &t2_defn;
159     locator->col_index = i;
160   }
161 }
162 
BestIndexStrForDefinition(const sqlite3_index_info * info,const TableDefinition & defn)163 std::string SpanJoinOperatorModule::State::BestIndexStrForDefinition(
164     const sqlite3_index_info* info,
165     const TableDefinition& defn) {
166   uint32_t count = 0;
167   std::string constraints;
168   for (int i = 0; i < info->nConstraint; i++) {
169     const auto& c = info->aConstraint[i];
170     if (!c.usable) {
171       continue;
172     }
173 
174     auto col_name = GetNameForGlobalColumnIndex(defn, c.iColumn);
175     if (col_name.empty()) {
176       continue;
177     }
178 
179     // Le constraints can be passed straight to the child tables as they won't
180     // affect the span join computation. Similarily, source_geq constraints
181     // explicitly request that they are passed as geq constraints to the source
182     // tables.
183     if (col_name == kTsColumnName && !sqlite::utils::IsOpLe(c.op) &&
184         c.op != kSourceGeqOpCode) {
185       continue;
186     }
187 
188     // Allow SQLite handle any constraints on duration apart from source_geq
189     // constraints.
190     if (col_name == kDurColumnName && c.op != kSourceGeqOpCode) {
191       continue;
192     }
193 
194     // If we're emitting shadow slices, don't propogate any constraints
195     // on this table as this will break the shadow slice computation.
196     if (defn.ShouldEmitPresentPartitionShadow()) {
197       continue;
198     }
199 
200     PERFETTO_DCHECK(info->aConstraintUsage[i].argvIndex > 0);
201     std::string argvIndex =
202         std::to_string(info->aConstraintUsage[i].argvIndex - 1);
203     std::string op = OpToString(
204         c.op == kSourceGeqOpCode ? SQLITE_INDEX_CONSTRAINT_GE : c.op);
205     constraints += "," + argvIndex + "," + "`" + col_name + "`" + op;
206     count++;
207   }
208   return std::to_string(count) + constraints;
209 }
210 
Create(PerfettoSqlEngine * engine,const TableDescriptor & desc,EmitShadowType emit_shadow_type,TableDefinition * defn)211 base::Status SpanJoinOperatorModule::TableDefinition::Create(
212     PerfettoSqlEngine* engine,
213     const TableDescriptor& desc,
214     EmitShadowType emit_shadow_type,
215     TableDefinition* defn) {
216   if (desc.partition_col == kTsColumnName ||
217       desc.partition_col == kDurColumnName) {
218     return base::ErrStatus(
219         "SPAN_JOIN: partition column cannot be any of {ts, dur} for table %s",
220         desc.name.c_str());
221   }
222 
223   std::vector<std::pair<SqlValue::Type, std::string>> cols;
224   RETURN_IF_ERROR(sqlite::utils::GetColumnsForTable(
225       engine->sqlite_engine()->db(), desc.name, cols));
226 
227   uint32_t required_columns_found = 0;
228   uint32_t ts_idx = std::numeric_limits<uint32_t>::max();
229   std::optional<uint32_t> dur_idx;
230   uint32_t partition_idx = std::numeric_limits<uint32_t>::max();
231   for (uint32_t i = 0; i < cols.size(); i++) {
232     auto col = cols[i];
233     if (IsRequiredColumn(col.second)) {
234       ++required_columns_found;
235     }
236     if (base::Contains(col.second, ",")) {
237       return base::ErrStatus("SPAN_JOIN: column '%s' cannot contain any ','",
238                              col.second.c_str());
239     }
240     if (base::Contains(col.second, ':')) {
241       return base::ErrStatus("SPAN_JOIN: column '%s' cannot contain any ':'",
242                              col.second.c_str());
243     }
244 
245     if (col.second == kTsColumnName) {
246       ts_idx = i;
247     } else if (col.second == kDurColumnName) {
248       dur_idx = i;
249     } else if (col.second == desc.partition_col) {
250       partition_idx = i;
251     }
252   }
253   if (required_columns_found != 1) {
254     return base::ErrStatus("SPAN_JOIN: Missing ts column in table %s",
255                            desc.name.c_str());
256   }
257   if (desc.IsPartitioned() && partition_idx >= cols.size()) {
258     return base::ErrStatus(
259         "SPAN_JOIN: Missing partition column '%s' in table '%s'",
260         desc.partition_col.c_str(), desc.name.c_str());
261   }
262 
263   PERFETTO_DCHECK(ts_idx < cols.size());
264 
265   *defn = TableDefinition(desc.name, desc.partition_col, std::move(cols),
266                           emit_shadow_type, ts_idx, dur_idx, partition_idx);
267   return base::OkStatus();
268 }
269 
270 std::string
CreateVtabCreateTableSection() const271 SpanJoinOperatorModule::TableDefinition::CreateVtabCreateTableSection() const {
272   std::string cols;
273   for (const auto& col : columns()) {
274     if (IsSpecialColumn(col.second, partition_col())) {
275       continue;
276     }
277     if (col.first == SqlValue::Type::kNull) {
278       cols += col.second + ",";
279     } else {
280       cols += col.second + " " +
281               sqlite::utils::SqlValueTypeToSqliteTypeName(col.first) + ",";
282     }
283   }
284   return cols;
285 }
286 
GetNameForGlobalColumnIndex(const TableDefinition & defn,int global_column)287 std::string SpanJoinOperatorModule::State::GetNameForGlobalColumnIndex(
288     const TableDefinition& defn,
289     int global_column) {
290   auto col_idx = static_cast<size_t>(global_column);
291   if (col_idx == Column::kTimestamp) {
292     return kTsColumnName;
293   }
294   if (col_idx == Column::kDuration) {
295     return kDurColumnName;
296   }
297   if (col_idx == Column::kPartition &&
298       partitioning != PartitioningType::kNoPartitioning) {
299     return defn.partition_col();
300   }
301 
302   const auto& locator = global_index_to_column_locator[col_idx];
303   if (locator.defn != &defn) {
304     return "";
305   }
306   return defn.columns()[locator.col_index].second;
307 }
308 
Query(SpanJoinOperatorModule::State * state,const TableDefinition * definition)309 SpanJoinOperatorModule::Query::Query(SpanJoinOperatorModule::State* state,
310                                      const TableDefinition* definition)
311     : defn_(definition), in_state_(state) {
312   PERFETTO_DCHECK(!defn_->IsPartitioned() ||
313                   defn_->partition_idx() < defn_->columns().size());
314 }
315 
316 SpanJoinOperatorModule::Query::~Query() = default;
317 
Initialize(std::string sql_query,InitialEofBehavior eof_behavior)318 base::Status SpanJoinOperatorModule::Query::Initialize(
319     std::string sql_query,
320     InitialEofBehavior eof_behavior) {
321   *this = Query(in_state_, definition());
322   sql_query_ = std::move(sql_query);
323   base::Status status = Rewind();
324   if (!status.ok())
325     return status;
326   if (eof_behavior == InitialEofBehavior::kTreatAsMissingPartitionShadow &&
327       IsEof()) {
328     state_ = State::kMissingPartitionShadow;
329   }
330   return status;
331 }
332 
Next()333 base::Status SpanJoinOperatorModule::Query::Next() {
334   RETURN_IF_ERROR(NextSliceState());
335   return FindNextValidSlice();
336 }
337 
IsValidSlice()338 bool SpanJoinOperatorModule::Query::IsValidSlice() {
339   // Disallow any single partition shadow slices if the definition doesn't allow
340   // them.
341   if (IsPresentPartitionShadow() && !defn_->ShouldEmitPresentPartitionShadow())
342     return false;
343 
344   // Disallow any missing partition shadow slices if the definition doesn't
345   // allow them.
346   if (IsMissingPartitionShadow() && !defn_->ShouldEmitMissingPartitionShadow())
347     return false;
348 
349   // Disallow any "empty" shadows; these are shadows which either have the same
350   // start and end time or missing-partition shadows which have the same start
351   // and end partition.
352   if (IsEmptyShadow())
353     return false;
354 
355   return true;
356 }
357 
FindNextValidSlice()358 base::Status SpanJoinOperatorModule::Query::FindNextValidSlice() {
359   // The basic idea of this function is that |NextSliceState()| always emits
360   // all possible slices (including shadows for any gaps inbetween the real
361   // slices) and we filter out the invalid slices (as defined by the table
362   // definition) using |IsValidSlice()|.
363   //
364   // This has proved to be a lot cleaner to implement than trying to choose
365   // when to emit and not emit shadows directly.
366   while (!IsEof() && !IsValidSlice()) {
367     RETURN_IF_ERROR(NextSliceState());
368   }
369   return base::OkStatus();
370 }
371 
NextSliceState()372 base::Status SpanJoinOperatorModule::Query::NextSliceState() {
373   switch (state_) {
374     case State::kReal: {
375       // Forward the cursor to figure out where the next slice should be.
376       RETURN_IF_ERROR(CursorNext());
377 
378       // Depending on the next slice, we can do two things here:
379       // 1. If the next slice is on the same partition, we can just emit a
380       //    single shadow until the start of the next slice.
381       // 2. If the next slice is on another partition or we hit eof, just emit
382       //    a shadow to the end of the whole partition.
383       bool shadow_to_end = cursor_eof_ || (defn_->IsPartitioned() &&
384                                            partition_ != CursorPartition());
385       state_ = State::kPresentPartitionShadow;
386       ts_ = AdjustedTsEnd();
387       ts_end_ =
388           shadow_to_end ? std::numeric_limits<int64_t>::max() : CursorTs();
389       return base::OkStatus();
390     }
391     case State::kPresentPartitionShadow: {
392       if (ts_end_ == std::numeric_limits<int64_t>::max()) {
393         // If the shadow is to the end of the slice, create a missing partition
394         // shadow to the start of the partition of the next slice or to the max
395         // partition if we hit eof.
396         state_ = State::kMissingPartitionShadow;
397         ts_ = 0;
398         ts_end_ = std::numeric_limits<int64_t>::max();
399 
400         missing_partition_start_ = partition_ + 1;
401         missing_partition_end_ = cursor_eof_
402                                      ? std::numeric_limits<int64_t>::max()
403                                      : CursorPartition();
404       } else {
405         // If the shadow is not to the end, we must have another slice on the
406         // current partition.
407         state_ = State::kReal;
408         ts_ = CursorTs();
409         ts_end_ = ts_ + CursorDur();
410 
411         PERFETTO_DCHECK(!defn_->IsPartitioned() ||
412                         partition_ == CursorPartition());
413       }
414       return base::OkStatus();
415     }
416     case State::kMissingPartitionShadow: {
417       if (missing_partition_end_ == std::numeric_limits<int64_t>::max()) {
418         PERFETTO_DCHECK(cursor_eof_);
419 
420         // If we have a missing partition to the max partition, we must have hit
421         // eof.
422         state_ = State::kEof;
423       } else {
424         PERFETTO_DCHECK(!defn_->IsPartitioned() ||
425                         CursorPartition() == missing_partition_end_);
426 
427         // Otherwise, setup a single partition slice on the end partition to the
428         // start of the next slice.
429         state_ = State::kPresentPartitionShadow;
430         ts_ = 0;
431         ts_end_ = CursorTs();
432         partition_ = missing_partition_end_;
433       }
434       return base::OkStatus();
435     }
436     case State::kEof: {
437       PERFETTO_DFATAL("Called Next when EOF");
438       return base::ErrStatus("Called Next when EOF");
439     }
440   }
441   PERFETTO_FATAL("For GCC");
442 }
443 
Rewind()444 base::Status SpanJoinOperatorModule::Query::Rewind() {
445   auto res = in_state_->engine->sqlite_engine()->PrepareStatement(
446       SqlSource::FromTraceProcessorImplementation(sql_query_));
447   cursor_eof_ = false;
448   RETURN_IF_ERROR(res.status());
449   stmt_ = std::move(res);
450 
451   RETURN_IF_ERROR(CursorNext());
452 
453   // Setup the first slice as a missing partition shadow from the lowest
454   // partition until the first slice partition. We will handle finding the real
455   // slice in |FindNextValidSlice()|.
456   state_ = State::kMissingPartitionShadow;
457   ts_ = 0;
458   ts_end_ = std::numeric_limits<int64_t>::max();
459   missing_partition_start_ = std::numeric_limits<int64_t>::min();
460 
461   if (cursor_eof_) {
462     missing_partition_end_ = std::numeric_limits<int64_t>::max();
463   } else if (defn_->IsPartitioned()) {
464     missing_partition_end_ = CursorPartition();
465   } else {
466     missing_partition_end_ = std::numeric_limits<int64_t>::min();
467   }
468 
469   // Actually compute the first valid slice.
470   return FindNextValidSlice();
471 }
472 
CursorNext()473 base::Status SpanJoinOperatorModule::Query::CursorNext() {
474   if (defn_->IsPartitioned()) {
475     auto partition_idx = static_cast<int>(defn_->partition_idx());
476     // Fastforward through any rows with null partition keys.
477     int row_type;
478     do {
479       cursor_eof_ = !stmt_->Step();
480       RETURN_IF_ERROR(stmt_->status());
481       row_type = sqlite3_column_type(stmt_->sqlite_stmt(), partition_idx);
482     } while (!cursor_eof_ && row_type == SQLITE_NULL);
483 
484     if (!cursor_eof_ && row_type != SQLITE_INTEGER) {
485       return base::ErrStatus("SPAN_JOIN: partition is not an INT column");
486     }
487   } else {
488     cursor_eof_ = !stmt_->Step();
489   }
490   return base::OkStatus();
491 }
492 
ReportSqliteResult(sqlite3_context * context,size_t index)493 void SpanJoinOperatorModule::Query::ReportSqliteResult(sqlite3_context* context,
494                                                        size_t index) {
495   if (state_ != State::kReal) {
496     return sqlite::result::Null(context);
497   }
498 
499   sqlite3_stmt* stmt = stmt_->sqlite_stmt();
500   int idx = static_cast<int>(index);
501   switch (sqlite3_column_type(stmt, idx)) {
502     case SQLITE_INTEGER:
503       return sqlite::result::Long(context, sqlite3_column_int64(stmt, idx));
504     case SQLITE_FLOAT:
505       return sqlite::result::Double(context, sqlite3_column_double(stmt, idx));
506     case SQLITE_TEXT: {
507       // TODO(lalitm): note for future optimizations: if we knew the addresses
508       // of the string intern pool, we could check if the string returned here
509       // comes from the pool, and pass it as non-transient.
510       const auto* ptr =
511           reinterpret_cast<const char*>(sqlite3_column_text(stmt, idx));
512       return sqlite::result::TransientString(context, ptr);
513     }
514     case SQLITE_BLOB: {
515       return sqlite::result::TransientBytes(context,
516                                             sqlite3_column_blob(stmt, idx),
517                                             sqlite3_column_bytes(stmt, idx));
518     }
519   }
520 }
521 
TableDefinition(std::string name,std::string partition_col,std::vector<std::pair<SqlValue::Type,std::string>> cols,EmitShadowType emit_shadow_type,uint32_t ts_idx,std::optional<uint32_t> dur_idx,uint32_t partition_idx)522 SpanJoinOperatorModule::TableDefinition::TableDefinition(
523     std::string name,
524     std::string partition_col,
525     std::vector<std::pair<SqlValue::Type, std::string>> cols,
526     EmitShadowType emit_shadow_type,
527     uint32_t ts_idx,
528     std::optional<uint32_t> dur_idx,
529     uint32_t partition_idx)
530     : emit_shadow_type_(emit_shadow_type),
531       name_(std::move(name)),
532       partition_col_(std::move(partition_col)),
533       cols_(std::move(cols)),
534       ts_idx_(ts_idx),
535       dur_idx_(dur_idx),
536       partition_idx_(partition_idx) {}
537 
Parse(const std::string & raw_descriptor,TableDescriptor * descriptor)538 base::Status SpanJoinOperatorModule::TableDescriptor::Parse(
539     const std::string& raw_descriptor,
540     TableDescriptor* descriptor) {
541   // Descriptors have one of the following forms:
542   // table_name [PARTITIONED column_name]
543 
544   // Find the table name.
545   base::StringSplitter splitter(raw_descriptor, ' ');
546   if (!splitter.Next())
547     return base::ErrStatus("SPAN_JOIN: Missing table name");
548 
549   descriptor->name = splitter.cur_token();
550   if (!splitter.Next())
551     return base::OkStatus();
552 
553   if (!base::CaseInsensitiveEqual(splitter.cur_token(), "PARTITIONED"))
554     return base::ErrStatus("SPAN_JOIN: Invalid token");
555 
556   if (!splitter.Next())
557     return base::ErrStatus("SPAN_JOIN: Missing partitioning column");
558 
559   descriptor->partition_col = splitter.cur_token();
560   return base::OkStatus();
561 }
562 
CreateSqlQuery(base::StringSplitter & idx,sqlite3_value ** argv) const563 std::string SpanJoinOperatorModule::TableDefinition::CreateSqlQuery(
564     base::StringSplitter& idx,
565     sqlite3_value** argv) const {
566   std::vector<std::string> col_names;
567   for (const auto& c : columns()) {
568     col_names.push_back("`" + c.second + "`");
569   }
570 
571   PERFETTO_CHECK(idx.Next());
572   std::optional<uint32_t> cs_count = base::StringToUInt32(idx.cur_token());
573   PERFETTO_CHECK(cs_count);
574   std::vector<std::string> cs;
575   cs.reserve(*cs_count);
576   for (uint32_t i = 0; i < *cs_count; ++i) {
577     PERFETTO_CHECK(idx.Next());
578     std::optional<uint32_t> argv_idx = base::StringToUInt32(idx.cur_token());
579     PERFETTO_CHECK(argv_idx);
580 
581     PERFETTO_CHECK(idx.Next());
582     cs.emplace_back(idx.cur_token() +
583                     EscapedSqliteValueAsString(argv[*argv_idx]));
584   }
585 
586   std::string sql = "SELECT " + base::Join(col_names, ", ");
587   sql += " FROM " + name();
588   if (!cs.empty()) {
589     sql += " WHERE " + base::Join(cs, " AND ");
590   }
591   sql += " ORDER BY ";
592   sql += IsPartitioned() ? base::Join({"`" + partition_col() + "`", "ts"}, ", ")
593                          : "ts";
594   sql += ";";
595   return sql;
596 }
597 
Create(sqlite3 * db,void * ctx,int argc,const char * const * argv,sqlite3_vtab ** vtab,char ** pzErr)598 int SpanJoinOperatorModule::Create(sqlite3* db,
599                                    void* ctx,
600                                    int argc,
601                                    const char* const* argv,
602                                    sqlite3_vtab** vtab,
603                                    char** pzErr) {
604   // argv[0] - argv[2] are SQLite populated fields which are always present.
605   if (argc != 5) {
606     *pzErr = sqlite3_mprintf("SPAN_JOIN: expected exactly two arguments");
607     return SQLITE_ERROR;
608   }
609 
610   auto* context = GetContext(ctx);
611   auto state = std::make_unique<State>();
612   state->engine = context->engine;
613   state->module_name = argv[0];
614 
615   TableDescriptor t1_desc;
616   auto status = TableDescriptor::Parse(
617       std::string(reinterpret_cast<const char*>(argv[3])), &t1_desc);
618   if (!status.ok()) {
619     *pzErr = sqlite3_mprintf("%s", status.c_message());
620     return SQLITE_ERROR;
621   }
622 
623   TableDescriptor t2_desc;
624   status = TableDescriptor::Parse(
625       std::string(reinterpret_cast<const char*>(argv[4])), &t2_desc);
626   if (!status.ok()) {
627     *pzErr = sqlite3_mprintf("%s", status.c_message());
628     return SQLITE_ERROR;
629   }
630 
631   // Check that the partition columns match between the two tables.
632   if (t1_desc.partition_col == t2_desc.partition_col) {
633     state->partitioning = t1_desc.IsPartitioned()
634                               ? PartitioningType::kSamePartitioning
635                               : PartitioningType::kNoPartitioning;
636   } else if (t1_desc.IsPartitioned() && t2_desc.IsPartitioned()) {
637     *pzErr = sqlite3_mprintf(
638         "SPAN_JOIN: mismatching partitions between the two tables; "
639         "(partition %s in table %s, partition %s in table %s)",
640         t1_desc.partition_col.c_str(), t1_desc.name.c_str(),
641         t2_desc.partition_col.c_str(), t2_desc.name.c_str());
642     return SQLITE_ERROR;
643   } else {
644     state->partitioning = PartitioningType::kMixedPartitioning;
645   }
646 
647   bool t1_part_mixed =
648       t1_desc.IsPartitioned() &&
649       state->partitioning == PartitioningType::kMixedPartitioning;
650   bool t2_part_mixed =
651       t2_desc.IsPartitioned() &&
652       state->partitioning == PartitioningType::kMixedPartitioning;
653 
654   EmitShadowType t1_shadow_type;
655   if (state->IsOuterJoin()) {
656     if (t1_part_mixed ||
657         state->partitioning == PartitioningType::kNoPartitioning) {
658       t1_shadow_type = EmitShadowType::kPresentPartitionOnly;
659     } else {
660       t1_shadow_type = EmitShadowType::kAll;
661     }
662   } else {
663     t1_shadow_type = EmitShadowType::kNone;
664   }
665   status = TableDefinition::Create(state->engine, t1_desc, t1_shadow_type,
666                                    &state->t1_defn);
667   if (!status.ok()) {
668     *pzErr = sqlite3_mprintf("%s", status.c_message());
669     return SQLITE_ERROR;
670   }
671 
672   EmitShadowType t2_shadow_type;
673   if (state->IsOuterJoin() || state->IsLeftJoin()) {
674     if (t2_part_mixed ||
675         state->partitioning == PartitioningType::kNoPartitioning) {
676       t2_shadow_type = EmitShadowType::kPresentPartitionOnly;
677     } else {
678       t2_shadow_type = EmitShadowType::kAll;
679     }
680   } else {
681     t2_shadow_type = EmitShadowType::kNone;
682   }
683   status = TableDefinition::Create(state->engine, t2_desc, t2_shadow_type,
684                                    &state->t2_defn);
685   if (!status.ok()) {
686     *pzErr = sqlite3_mprintf("%s", status.c_message());
687     return SQLITE_ERROR;
688   }
689 
690   if (!state->t1_defn.dur_idx().has_value() &&
691       !state->t2_defn.dur_idx().has_value()) {
692     *pzErr = sqlite3_mprintf(
693         "SPAN_JOIN: column %s must be present in at least one of tables %s and "
694         "%s",
695         kDurColumnName, state->t1_defn.name().c_str(),
696         state->t2_defn.name().c_str());
697     return SQLITE_ERROR;
698   }
699 
700   if (auto dupe = HasDuplicateColumns(
701           state->t1_defn.columns(), state->t2_defn.columns(),
702           state->partitioning == PartitioningType::kNoPartitioning
703               ? std::nullopt
704               : std::make_optional(state->partition_col()))) {
705     *pzErr = sqlite3_mprintf(
706         "SPAN_JOIN: column %s present in both tables %s and %s", dupe->c_str(),
707         state->t1_defn.name().c_str(), state->t2_defn.name().c_str());
708     return SQLITE_ERROR;
709   }
710 
711   // Create the map from column index to the column in the child sub-queries.
712   state->PopulateColumnLocatorMap(
713       state->partitioning == PartitioningType::kNoPartitioning ? 2 : 3);
714 
715   std::string primary_key = "ts";
716   std::string partition;
717   if (state->partitioning != PartitioningType::kNoPartitioning) {
718     partition = state->partition_col() + " BIGINT,";
719     primary_key += ", " + state->partition_col();
720   }
721   std::string t1_section = state->t1_defn.CreateVtabCreateTableSection();
722   std::string t2_section = state->t2_defn.CreateVtabCreateTableSection();
723   static constexpr char kStmt[] = R"(
724     CREATE TABLE x(
725       ts BIGINT,
726       dur BIGINT,
727       %s
728       %s
729       %s
730       PRIMARY KEY(%s)
731     )
732   )";
733   base::StackString<1024> create_table_str(
734       kStmt, partition.c_str(), t1_section.c_str(), t2_section.c_str(),
735       primary_key.c_str());
736   state->create_table_stmt = create_table_str.ToStdString();
737   if (int ret = sqlite3_declare_vtab(db, create_table_str.c_str());
738       ret != SQLITE_OK) {
739     return ret;
740   }
741 
742   std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
743   res->state = context->manager.OnCreate(argv, std::move(state));
744   *vtab = res.release();
745   return SQLITE_OK;
746 }
747 
Destroy(sqlite3_vtab * vtab)748 int SpanJoinOperatorModule::Destroy(sqlite3_vtab* vtab) {
749   std::unique_ptr<Vtab> tab(GetVtab(vtab));
750   sqlite::ModuleStateManager<SpanJoinOperatorModule>::OnDestroy(tab->state);
751   return SQLITE_OK;
752 }
753 
Connect(sqlite3 * db,void * ctx,int,const char * const * argv,sqlite3_vtab ** vtab,char **)754 int SpanJoinOperatorModule::Connect(sqlite3* db,
755                                     void* ctx,
756                                     int,
757                                     const char* const* argv,
758                                     sqlite3_vtab** vtab,
759                                     char**) {
760   auto* context = GetContext(ctx);
761   std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
762   res->state = context->manager.OnConnect(argv);
763 
764   auto* state =
765       sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(res->state);
766   if (int ret = sqlite3_declare_vtab(db, state->create_table_stmt.c_str());
767       ret != SQLITE_OK) {
768     return ret;
769   }
770   *vtab = res.release();
771   return SQLITE_OK;
772 }
773 
Disconnect(sqlite3_vtab * vtab)774 int SpanJoinOperatorModule::Disconnect(sqlite3_vtab* vtab) {
775   std::unique_ptr<Vtab> tab(GetVtab(vtab));
776   sqlite::ModuleStateManager<SpanJoinOperatorModule>::OnDisconnect(tab->state);
777   return SQLITE_OK;
778 }
779 
BestIndex(sqlite3_vtab * tab,sqlite3_index_info * info)780 int SpanJoinOperatorModule::BestIndex(sqlite3_vtab* tab,
781                                       sqlite3_index_info* info) {
782   int argvIndex = 1;
783   for (int i = 0; i < info->nConstraint; ++i) {
784     if (!info->aConstraint[i].usable) {
785       continue;
786     }
787     info->aConstraintUsage[i].argvIndex = argvIndex++;
788   }
789 
790   Vtab* table = GetVtab(tab);
791   State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
792       table->state);
793   if (state->partitioning == PartitioningType::kNoPartitioning) {
794     // If both tables are not partitioned and we have a single order by on ts,
795     // we return data in the correct order.
796     info->orderByConsumed = info->nOrderBy == 1 &&
797                             info->aOrderBy[0].iColumn == Column::kTimestamp &&
798                             !info->aOrderBy[0].desc;
799   } else {
800     // If one of the tables is partitioned, and we have an order by on the
801     // partition column followed (optionally) by an order by on timestamp, we
802     // return data in the correct order.
803     bool is_first_ob_partition =
804         info->nOrderBy > 0 && info->aOrderBy[0].iColumn == Column::kPartition &&
805         !info->aOrderBy[0].desc;
806     bool is_second_ob_ts = info->nOrderBy >= 2 &&
807                            info->aOrderBy[1].iColumn == Column::kTimestamp &&
808                            !info->aOrderBy[1].desc;
809     info->orderByConsumed =
810         (info->nOrderBy == 1 && is_first_ob_partition) ||
811         (info->nOrderBy == 2 && is_first_ob_partition && is_second_ob_ts);
812   }
813 
814   for (int i = 0; i < info->nConstraint; ++i) {
815     if (info->aConstraint[i].op == kSourceGeqOpCode) {
816       info->aConstraintUsage[i].omit = true;
817     }
818   }
819 
820   std::string t1 = state->BestIndexStrForDefinition(info, state->t1_defn);
821   std::string t2 = state->BestIndexStrForDefinition(info, state->t2_defn);
822   info->idxStr = sqlite3_mprintf("%s,%s", t1.c_str(), t2.c_str());
823   info->needToFreeIdxStr = true;
824 
825   return SQLITE_OK;
826 }
827 
Open(sqlite3_vtab * tab,sqlite3_vtab_cursor ** cursor)828 int SpanJoinOperatorModule::Open(sqlite3_vtab* tab,
829                                  sqlite3_vtab_cursor** cursor) {
830   State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
831       GetVtab(tab)->state);
832   std::unique_ptr<Cursor> c = std::make_unique<Cursor>(state);
833   *cursor = c.release();
834   return SQLITE_OK;
835 }
836 
Close(sqlite3_vtab_cursor * cursor)837 int SpanJoinOperatorModule::Close(sqlite3_vtab_cursor* cursor) {
838   std::unique_ptr<Cursor> c(GetCursor(cursor));
839   return SQLITE_OK;
840 }
841 
Filter(sqlite3_vtab_cursor * cursor,int,const char * idxStr,int,sqlite3_value ** argv)842 int SpanJoinOperatorModule::Filter(sqlite3_vtab_cursor* cursor,
843                                    int,
844                                    const char* idxStr,
845                                    int,
846                                    sqlite3_value** argv) {
847   PERFETTO_TP_TRACE(metatrace::Category::QUERY_DETAILED, "SPAN_JOIN_XFILTER");
848 
849   Cursor* c = GetCursor(cursor);
850   Vtab* table = GetVtab(cursor->pVtab);
851   State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
852       table->state);
853 
854   base::StringSplitter splitter(std::string(idxStr), ',');
855   bool t1_partitioned_mixed =
856       c->t1.definition()->IsPartitioned() &&
857       state->partitioning == PartitioningType::kMixedPartitioning;
858   auto t1_eof = state->IsOuterJoin() && !t1_partitioned_mixed
859                     ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
860                     : Query::InitialEofBehavior::kTreatAsEof;
861   base::Status status =
862       c->t1.Initialize(state->t1_defn.CreateSqlQuery(splitter, argv), t1_eof);
863   if (!status.ok()) {
864     return sqlite::utils::SetError(table, status.c_message());
865   }
866 
867   bool t2_partitioned_mixed =
868       c->t2.definition()->IsPartitioned() &&
869       state->partitioning == PartitioningType::kMixedPartitioning;
870   auto t2_eof =
871       (state->IsLeftJoin() || state->IsOuterJoin()) && !t2_partitioned_mixed
872           ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
873           : Query::InitialEofBehavior::kTreatAsEof;
874   status =
875       c->t2.Initialize(state->t2_defn.CreateSqlQuery(splitter, argv), t2_eof);
876   if (!status.ok()) {
877     return sqlite::utils::SetError(table, status.c_message());
878   }
879 
880   status = c->FindOverlappingSpan();
881   if (!status.ok()) {
882     return sqlite::utils::SetError(table, status.c_message());
883   }
884   return SQLITE_OK;
885 }
886 
Next(sqlite3_vtab_cursor * cursor)887 int SpanJoinOperatorModule::Next(sqlite3_vtab_cursor* cursor) {
888   Cursor* c = GetCursor(cursor);
889   Vtab* table = GetVtab(cursor->pVtab);
890   base::Status status = c->next_query->Next();
891   if (!status.ok()) {
892     return sqlite::utils::SetError(table, status.c_message());
893   }
894   status = c->FindOverlappingSpan();
895   if (!status.ok()) {
896     return sqlite::utils::SetError(table, status.c_message());
897   }
898   return SQLITE_OK;
899 }
900 
Eof(sqlite3_vtab_cursor * cur)901 int SpanJoinOperatorModule::Eof(sqlite3_vtab_cursor* cur) {
902   Cursor* c = GetCursor(cur);
903   return c->t1.IsEof() || c->t2.IsEof();
904 }
905 
Column(sqlite3_vtab_cursor * cursor,sqlite3_context * context,int N)906 int SpanJoinOperatorModule::Column(sqlite3_vtab_cursor* cursor,
907                                    sqlite3_context* context,
908                                    int N) {
909   Cursor* c = GetCursor(cursor);
910   Vtab* table = GetVtab(cursor->pVtab);
911   State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
912       table->state);
913 
914   PERFETTO_DCHECK(c->t1.IsReal() || c->t2.IsReal());
915 
916   switch (N) {
917     case Column::kTimestamp: {
918       auto max_ts = std::max(c->t1.ts(), c->t2.ts());
919       sqlite::result::Long(context, static_cast<sqlite3_int64>(max_ts));
920       break;
921     }
922     case Column::kDuration: {
923       auto max_start = std::max(c->t1.ts(), c->t2.ts());
924       auto min_end = std::min(c->t1.raw_ts_end(), c->t2.raw_ts_end());
925       auto dur = min_end - max_start;
926       sqlite::result::Long(context, static_cast<sqlite3_int64>(dur));
927       break;
928     }
929     case Column::kPartition: {
930       if (state->partitioning != PartitioningType::kNoPartitioning) {
931         int64_t partition;
932         if (state->partitioning == PartitioningType::kMixedPartitioning) {
933           partition = c->last_mixed_partition_;
934         } else {
935           partition = c->t1.IsReal() ? c->t1.partition() : c->t2.partition();
936         }
937         sqlite::result::Long(context, static_cast<sqlite3_int64>(partition));
938         break;
939       }
940       PERFETTO_FALLTHROUGH;
941     }
942     default: {
943       const auto* locator =
944           state->global_index_to_column_locator.Find(static_cast<size_t>(N));
945       PERFETTO_CHECK(locator);
946       if (locator->defn == c->t1.definition()) {
947         c->t1.ReportSqliteResult(context, locator->col_index);
948       } else {
949         c->t2.ReportSqliteResult(context, locator->col_index);
950       }
951     }
952   }
953   return SQLITE_OK;
954 }
955 
Rowid(sqlite3_vtab_cursor *,sqlite_int64 *)956 int SpanJoinOperatorModule::Rowid(sqlite3_vtab_cursor*, sqlite_int64*) {
957   return SQLITE_ERROR;
958 }
959 
FindFunction(sqlite3_vtab *,int,const char * name,FindFunctionFn ** fn,void **)960 int SpanJoinOperatorModule::FindFunction(sqlite3_vtab*,
961                                          int,
962                                          const char* name,
963                                          FindFunctionFn** fn,
964                                          void**) {
965   if (base::CaseInsensitiveEqual(name, "source_geq")) {
966     *fn = [](sqlite3_context* ctx, int, sqlite3_value**) {
967       return sqlite::result::Error(ctx, "Should not be called.");
968     };
969     return kSourceGeqOpCode;
970   }
971   return 0;
972 }
973 
IsOverlappingSpan() const974 bool SpanJoinOperatorModule::Cursor::IsOverlappingSpan() const {
975   // If either of the tables are eof, then we cannot possibly have an
976   // overlapping span.
977   if (t1.IsEof() || t2.IsEof())
978     return false;
979 
980   // One of the tables always needs to have a real span to have a valid
981   // overlapping span.
982   if (!t1.IsReal() && !t2.IsReal())
983     return false;
984 
985   using PartitioningType = PartitioningType;
986   if (state->partitioning == PartitioningType::kSamePartitioning) {
987     // If both tables are partitioned, then ensure that the partitions overlap.
988     bool partition_in_bounds = (t1.FirstPartition() >= t2.FirstPartition() &&
989                                 t1.FirstPartition() <= t2.LastPartition()) ||
990                                (t2.FirstPartition() >= t1.FirstPartition() &&
991                                 t2.FirstPartition() <= t1.LastPartition());
992     if (!partition_in_bounds)
993       return false;
994   }
995 
996   // We consider all slices to be [start, end) - that is the range of
997   // timestamps has an open interval at the start but a closed interval
998   // at the end. (with the exception of dur == -1 which we treat as if
999   // end == start for the purpose of this function).
1000   return (t1.ts() == t2.ts() && t1.IsReal() && t2.IsReal()) ||
1001          (t1.ts() >= t2.ts() && t1.ts() < t2.AdjustedTsEnd()) ||
1002          (t2.ts() >= t1.ts() && t2.ts() < t1.AdjustedTsEnd());
1003 }
1004 
FindOverlappingSpan()1005 base::Status SpanJoinOperatorModule::Cursor::FindOverlappingSpan() {
1006   // We loop until we find a slice which overlaps from the two tables.
1007   while (true) {
1008     if (state->partitioning == PartitioningType::kMixedPartitioning) {
1009       // If we have a mixed partition setup, we need to have special checks
1010       // for eof and to reset the unpartitioned cursor every time the partition
1011       // changes in the partitioned table.
1012       auto* partitioned = t1.definition()->IsPartitioned() ? &t1 : &t2;
1013       auto* unpartitioned = t1.definition()->IsPartitioned() ? &t2 : &t1;
1014 
1015       // If the partitioned table reaches eof, then we are really done.
1016       if (partitioned->IsEof())
1017         break;
1018 
1019       // If the partition has changed from the previous one, reset the cursor
1020       // and keep a lot of the new partition.
1021       if (last_mixed_partition_ != partitioned->partition()) {
1022         base::Status status = unpartitioned->Rewind();
1023         if (!status.ok())
1024           return status;
1025         last_mixed_partition_ = partitioned->partition();
1026       }
1027     } else if (t1.IsEof() || t2.IsEof()) {
1028       // For both no partition and same partition cases, either cursor ending
1029       // ends the whole span join.
1030       break;
1031     }
1032 
1033     // Find which slice finishes first.
1034     next_query = FindEarliestFinishQuery();
1035 
1036     // If the current span is overlapping, just finish there to emit the current
1037     // slice.
1038     if (IsOverlappingSpan())
1039       break;
1040 
1041     // Otherwise, step to the next row.
1042     base::Status status = next_query->Next();
1043     if (!status.ok())
1044       return status;
1045   }
1046   return base::OkStatus();
1047 }
1048 
1049 SpanJoinOperatorModule::Query*
FindEarliestFinishQuery()1050 SpanJoinOperatorModule::Cursor::FindEarliestFinishQuery() {
1051   int64_t t1_part;
1052   int64_t t2_part;
1053 
1054   switch (state->partitioning) {
1055     case PartitioningType::kMixedPartitioning: {
1056       // If either table is EOF, forward the other table to try and make
1057       // the partitions not match anymore.
1058       if (t1.IsEof())
1059         return &t2;
1060       if (t2.IsEof())
1061         return &t1;
1062 
1063       // Otherwise, just make the partition equal from both tables.
1064       t1_part = last_mixed_partition_;
1065       t2_part = last_mixed_partition_;
1066       break;
1067     }
1068     case PartitioningType::kSamePartitioning: {
1069       // Get the partition values from the cursor.
1070       t1_part = t1.LastPartition();
1071       t2_part = t2.LastPartition();
1072       break;
1073     }
1074     case PartitioningType::kNoPartitioning: {
1075       t1_part = 0;
1076       t2_part = 0;
1077       break;
1078     }
1079   }
1080 
1081   // Prefer to forward the earliest cursors based on the following
1082   // lexiographical ordering:
1083   // 1. partition
1084   // 2. end timestamp
1085   // 3. whether the slice is real or shadow (shadow < real)
1086   bool t1_less = std::make_tuple(t1_part, t1.AdjustedTsEnd(), t1.IsReal()) <
1087                  std::make_tuple(t2_part, t2.AdjustedTsEnd(), t2.IsReal());
1088   return t1_less ? &t1 : &t2;
1089 }
1090 
1091 }  // namespace perfetto::trace_processor
1092