• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/sqlite/span_join_operator_table.h"
18 
19 #include <sqlite3.h>
20 #include <string.h>
21 
22 #include <algorithm>
23 #include <set>
24 #include <utility>
25 
26 #include "perfetto/base/logging.h"
27 #include "perfetto/ext/base/string_splitter.h"
28 #include "perfetto/ext/base/string_utils.h"
29 #include "perfetto/ext/base/string_view.h"
30 #include "src/trace_processor/sqlite/sqlite_utils.h"
31 #include "src/trace_processor/tp_metatrace.h"
32 #include "src/trace_processor/util/status_macros.h"
33 
34 namespace perfetto {
35 namespace trace_processor {
36 
37 namespace {
38 
39 constexpr char kTsColumnName[] = "ts";
40 constexpr char kDurColumnName[] = "dur";
41 
IsRequiredColumn(const std::string & name)42 bool IsRequiredColumn(const std::string& name) {
43   return name == kTsColumnName || name == kDurColumnName;
44 }
45 
HasDuplicateColumns(const std::vector<SqliteTable::Column> & cols)46 base::Optional<std::string> HasDuplicateColumns(
47     const std::vector<SqliteTable::Column>& cols) {
48   std::set<std::string> names;
49   for (const auto& col : cols) {
50     if (names.count(col.name()) > 0)
51       return col.name();
52     names.insert(col.name());
53   }
54   return base::nullopt;
55 }
56 
OpToString(int op)57 std::string OpToString(int op) {
58   switch (op) {
59     case SQLITE_INDEX_CONSTRAINT_EQ:
60       return "=";
61     case SQLITE_INDEX_CONSTRAINT_NE:
62       return "!=";
63     case SQLITE_INDEX_CONSTRAINT_GE:
64       return ">=";
65     case SQLITE_INDEX_CONSTRAINT_GT:
66       return ">";
67     case SQLITE_INDEX_CONSTRAINT_LE:
68       return "<=";
69     case SQLITE_INDEX_CONSTRAINT_LT:
70       return "<";
71     case SQLITE_INDEX_CONSTRAINT_LIKE:
72       return "like";
73     case SQLITE_INDEX_CONSTRAINT_ISNULL:
74       // The "null" will be added below in EscapedSqliteValueAsString.
75       return " is ";
76     case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
77       // The "null" will be added below in EscapedSqliteValueAsString.
78       return " is not";
79     default:
80       PERFETTO_FATAL("Operator to string conversion not impemented for %d", op);
81   }
82 }
83 
EscapedSqliteValueAsString(sqlite3_value * value)84 std::string EscapedSqliteValueAsString(sqlite3_value* value) {
85   switch (sqlite3_value_type(value)) {
86     case SQLITE_INTEGER:
87       return std::to_string(sqlite3_value_int64(value));
88     case SQLITE_FLOAT:
89       return std::to_string(sqlite3_value_double(value));
90     case SQLITE_TEXT: {
91       // If str itself contains a single quote, we need to escape it with
92       // another single quote.
93       const char* str =
94           reinterpret_cast<const char*>(sqlite3_value_text(value));
95       return "'" + base::ReplaceAll(str, "'", "''") + "'";
96     }
97     case SQLITE_NULL:
98       return " null";
99     default:
100       PERFETTO_FATAL("Unknown value type %d", sqlite3_value_type(value));
101   }
102 }
103 
104 }  // namespace
105 
SpanJoinOperatorTable(sqlite3 * db,const TraceStorage *)106 SpanJoinOperatorTable::SpanJoinOperatorTable(sqlite3* db, const TraceStorage*)
107     : db_(db) {}
108 
RegisterTable(sqlite3 * db,const TraceStorage * storage)109 void SpanJoinOperatorTable::RegisterTable(sqlite3* db,
110                                           const TraceStorage* storage) {
111   SqliteTable::Register<SpanJoinOperatorTable>(db, storage, "span_join",
112                                                /* read_write */ false,
113                                                /* requires_args */ true);
114 
115   SqliteTable::Register<SpanJoinOperatorTable>(db, storage, "span_left_join",
116                                                /* read_write */ false,
117                                                /* requires_args */ true);
118 
119   SqliteTable::Register<SpanJoinOperatorTable>(db, storage, "span_outer_join",
120                                                /* read_write */ false,
121                                                /* requires_args */ true);
122 }
123 
Init(int argc,const char * const * argv,Schema * schema)124 util::Status SpanJoinOperatorTable::Init(int argc,
125                                          const char* const* argv,
126                                          Schema* schema) {
127   // argv[0] - argv[2] are SQLite populated fields which are always present.
128   if (argc < 5)
129     return util::Status("SPAN_JOIN: expected at least 2 args");
130 
131   TableDescriptor t1_desc;
132   auto status = TableDescriptor::Parse(
133       std::string(reinterpret_cast<const char*>(argv[3])), &t1_desc);
134   if (!status.ok())
135     return status;
136 
137   TableDescriptor t2_desc;
138   status = TableDescriptor::Parse(
139       std::string(reinterpret_cast<const char*>(argv[4])), &t2_desc);
140   if (!status.ok())
141     return status;
142 
143   // Check that the partition columns match between the two tables.
144   if (t1_desc.partition_col == t2_desc.partition_col) {
145     partitioning_ = t1_desc.IsPartitioned()
146                         ? PartitioningType::kSamePartitioning
147                         : PartitioningType::kNoPartitioning;
148   } else if (t1_desc.IsPartitioned() && t2_desc.IsPartitioned()) {
149     return util::ErrStatus(
150         "SPAN_JOIN: mismatching partitions between the two tables; "
151         "(partition %s in table %s, partition %s in table %s)",
152         t1_desc.partition_col.c_str(), t1_desc.name.c_str(),
153         t2_desc.partition_col.c_str(), t2_desc.name.c_str());
154   } else {
155     partitioning_ = PartitioningType::kMixedPartitioning;
156   }
157 
158   bool t1_part_mixed = t1_desc.IsPartitioned() &&
159                        partitioning_ == PartitioningType::kMixedPartitioning;
160   bool t2_part_mixed = t2_desc.IsPartitioned() &&
161                        partitioning_ == PartitioningType::kMixedPartitioning;
162 
163   EmitShadowType t1_shadow_type;
164   if (IsOuterJoin()) {
165     if (t1_part_mixed || partitioning_ == PartitioningType::kNoPartitioning) {
166       t1_shadow_type = EmitShadowType::kPresentPartitionOnly;
167     } else {
168       t1_shadow_type = EmitShadowType::kAll;
169     }
170   } else {
171     t1_shadow_type = EmitShadowType::kNone;
172   }
173   status = CreateTableDefinition(t1_desc, t1_shadow_type, &t1_defn_);
174   if (!status.ok())
175     return status;
176 
177   EmitShadowType t2_shadow_type;
178   if (IsOuterJoin() || IsLeftJoin()) {
179     if (t2_part_mixed || partitioning_ == PartitioningType::kNoPartitioning) {
180       t2_shadow_type = EmitShadowType::kPresentPartitionOnly;
181     } else {
182       t2_shadow_type = EmitShadowType::kAll;
183     }
184   } else {
185     t2_shadow_type = EmitShadowType::kNone;
186   }
187   status = CreateTableDefinition(t2_desc, t2_shadow_type, &t2_defn_);
188   if (!status.ok())
189     return status;
190 
191   std::vector<SqliteTable::Column> cols;
192   // Ensure the shared columns are consistently ordered and are not
193   // present twice in the final schema
194   cols.emplace_back(Column::kTimestamp, kTsColumnName, SqlValue::Type::kLong);
195   cols.emplace_back(Column::kDuration, kDurColumnName, SqlValue::Type::kLong);
196   if (partitioning_ != PartitioningType::kNoPartitioning)
197     cols.emplace_back(Column::kPartition, partition_col(),
198                       SqlValue::Type::kLong);
199 
200   CreateSchemaColsForDefn(t1_defn_, &cols);
201   CreateSchemaColsForDefn(t2_defn_, &cols);
202 
203   // Check if any column has : in its name. This often happens when SELECT *
204   // is used to create a view with the same column name in two joined tables.
205   for (const auto& col : cols) {
206     if (base::Contains(col.name(), ':')) {
207       return util::ErrStatus("SPAN_JOIN: column %s has illegal character :",
208                              col.name().c_str());
209     }
210   }
211 
212   if (auto opt_dupe_col = HasDuplicateColumns(cols)) {
213     return util::ErrStatus(
214         "SPAN_JOIN: column %s present in both tables %s and %s",
215         opt_dupe_col->c_str(), t1_defn_.name().c_str(),
216         t2_defn_.name().c_str());
217   }
218   std::vector<size_t> primary_keys = {Column::kTimestamp};
219   if (partitioning_ != PartitioningType::kNoPartitioning)
220     primary_keys.push_back(Column::kPartition);
221   *schema = Schema(cols, primary_keys);
222 
223   return util::OkStatus();
224 }
225 
CreateSchemaColsForDefn(const TableDefinition & defn,std::vector<SqliteTable::Column> * cols)226 void SpanJoinOperatorTable::CreateSchemaColsForDefn(
227     const TableDefinition& defn,
228     std::vector<SqliteTable::Column>* cols) {
229   for (size_t i = 0; i < defn.columns().size(); i++) {
230     const auto& n = defn.columns()[i].name();
231     if (IsRequiredColumn(n) || n == defn.partition_col())
232       continue;
233 
234     ColumnLocator* locator = &global_index_to_column_locator_[cols->size()];
235     locator->defn = &defn;
236     locator->col_index = i;
237 
238     cols->emplace_back(cols->size(), n, defn.columns()[i].type());
239   }
240 }
241 
CreateCursor()242 std::unique_ptr<SqliteTable::Cursor> SpanJoinOperatorTable::CreateCursor() {
243   return std::unique_ptr<SpanJoinOperatorTable::Cursor>(new Cursor(this, db_));
244 }
245 
BestIndex(const QueryConstraints & qc,BestIndexInfo * info)246 int SpanJoinOperatorTable::BestIndex(const QueryConstraints& qc,
247                                      BestIndexInfo* info) {
248   // TODO(lalitm): figure out cost estimation.
249   const auto& ob = qc.order_by();
250 
251   if (partitioning_ == PartitioningType::kNoPartitioning) {
252     // If both tables are not partitioned and we have a single order by on ts,
253     // we return data in the correct order.
254     info->sqlite_omit_order_by =
255         ob.size() == 1 && ob[0].iColumn == Column::kTimestamp && !ob[0].desc;
256   } else {
257     // If one of the tables is partitioned, and we have an order by on the
258     // partition column followed (optionally) by an order by on timestamp, we
259     // return data in the correct order.
260     bool is_first_ob_partition =
261         ob.size() >= 1 && ob[0].iColumn == Column::kPartition && !ob[0].desc;
262     bool is_second_ob_ts =
263         ob.size() >= 2 && ob[1].iColumn == Column::kTimestamp && !ob[1].desc;
264     info->sqlite_omit_order_by =
265         (ob.size() == 1 && is_first_ob_partition) ||
266         (ob.size() == 2 && is_first_ob_partition && is_second_ob_ts);
267   }
268 
269   const auto& cs = qc.constraints();
270   for (uint32_t i = 0; i < cs.size(); ++i) {
271     if (cs[i].op == kSourceGeqOpCode) {
272       info->sqlite_omit_constraint[i] = true;
273     }
274   }
275 
276   return SQLITE_OK;
277 }
278 
FindFunction(const char * name,FindFunctionFn * fn,void **)279 int SpanJoinOperatorTable::FindFunction(const char* name,
280                                         FindFunctionFn* fn,
281                                         void**) {
282   if (base::CaseInsensitiveEqual(name, "source_geq")) {
283     *fn = [](sqlite3_context* ctx, int, sqlite3_value**) {
284       sqlite3_result_error(ctx, "Should not be called.", -1);
285     };
286     return kSourceGeqOpCode;
287   }
288   return 0;
289 }
290 
291 std::vector<std::string>
ComputeSqlConstraintsForDefinition(const TableDefinition & defn,const QueryConstraints & qc,sqlite3_value ** argv)292 SpanJoinOperatorTable::ComputeSqlConstraintsForDefinition(
293     const TableDefinition& defn,
294     const QueryConstraints& qc,
295     sqlite3_value** argv) {
296   std::vector<std::string> constraints;
297   for (size_t i = 0; i < qc.constraints().size(); i++) {
298     const auto& cs = qc.constraints()[i];
299     auto col_name = GetNameForGlobalColumnIndex(defn, cs.column);
300     if (col_name.empty())
301       continue;
302 
303     // Le constraints can be passed straight to the child tables as they won't
304     // affect the span join computation. Similarily, source_geq constraints
305     // explicitly request that they are passed as geq constraints to the source
306     // tables.
307     if (col_name == kTsColumnName && !sqlite_utils::IsOpLe(cs.op) &&
308         cs.op != kSourceGeqOpCode)
309       continue;
310 
311     // Allow SQLite handle any constraints on duration apart from source_geq
312     // constraints.
313     if (col_name == kDurColumnName && cs.op != kSourceGeqOpCode)
314       continue;
315 
316     // If we're emitting shadow slices, don't propogate any constraints
317     // on this table as this will break the shadow slice computation.
318     if (defn.ShouldEmitPresentPartitionShadow())
319       continue;
320 
321     auto op = OpToString(cs.op == kSourceGeqOpCode ? SQLITE_INDEX_CONSTRAINT_GE
322                                                    : cs.op);
323     auto value = EscapedSqliteValueAsString(argv[i]);
324 
325     constraints.emplace_back("`" + col_name + "`" + op + value);
326   }
327   return constraints;
328 }
329 
CreateTableDefinition(const TableDescriptor & desc,EmitShadowType emit_shadow_type,SpanJoinOperatorTable::TableDefinition * defn)330 util::Status SpanJoinOperatorTable::CreateTableDefinition(
331     const TableDescriptor& desc,
332     EmitShadowType emit_shadow_type,
333     SpanJoinOperatorTable::TableDefinition* defn) {
334   if (desc.partition_col == kTsColumnName ||
335       desc.partition_col == kDurColumnName) {
336     return util::ErrStatus(
337         "SPAN_JOIN: partition column cannot be any of {ts, dur} for table %s",
338         desc.name.c_str());
339   }
340 
341   std::vector<SqliteTable::Column> cols;
342   auto status = sqlite_utils::GetColumnsForTable(db_, desc.name, cols);
343   if (!status.ok()) {
344     return status;
345   }
346 
347   uint32_t required_columns_found = 0;
348   uint32_t ts_idx = std::numeric_limits<uint32_t>::max();
349   uint32_t dur_idx = std::numeric_limits<uint32_t>::max();
350   uint32_t partition_idx = std::numeric_limits<uint32_t>::max();
351   for (uint32_t i = 0; i < cols.size(); i++) {
352     auto col = cols[i];
353     if (IsRequiredColumn(col.name())) {
354       ++required_columns_found;
355       if (col.type() != SqlValue::Type::kLong &&
356           col.type() != SqlValue::Type::kNull) {
357         return util::ErrStatus(
358             "SPAN_JOIN: Invalid type for column %s in table %s",
359             col.name().c_str(), desc.name.c_str());
360       }
361     }
362 
363     if (col.name() == kTsColumnName) {
364       ts_idx = i;
365     } else if (col.name() == kDurColumnName) {
366       dur_idx = i;
367     } else if (col.name() == desc.partition_col) {
368       partition_idx = i;
369     }
370   }
371   if (required_columns_found != 2) {
372     return util::ErrStatus(
373         "SPAN_JOIN: Missing one of columns {ts, dur} in table %s",
374         desc.name.c_str());
375   } else if (desc.IsPartitioned() && partition_idx >= cols.size()) {
376     return util::ErrStatus("SPAN_JOIN: Missing partition column %s in table %s",
377                            desc.partition_col.c_str(), desc.name.c_str());
378   }
379 
380   PERFETTO_DCHECK(ts_idx < cols.size());
381   PERFETTO_DCHECK(dur_idx < cols.size());
382 
383   *defn = TableDefinition(desc.name, desc.partition_col, std::move(cols),
384                           emit_shadow_type, ts_idx, dur_idx, partition_idx);
385   return util::OkStatus();
386 }
387 
GetNameForGlobalColumnIndex(const TableDefinition & defn,int global_column)388 std::string SpanJoinOperatorTable::GetNameForGlobalColumnIndex(
389     const TableDefinition& defn,
390     int global_column) {
391   size_t col_idx = static_cast<size_t>(global_column);
392   if (col_idx == Column::kTimestamp)
393     return kTsColumnName;
394   else if (col_idx == Column::kDuration)
395     return kDurColumnName;
396   else if (col_idx == Column::kPartition &&
397            partitioning_ != PartitioningType::kNoPartitioning)
398     return defn.partition_col().c_str();
399 
400   const auto& locator = global_index_to_column_locator_[col_idx];
401   if (locator.defn != &defn)
402     return "";
403   return defn.columns()[locator.col_index].name().c_str();
404 }
405 
Cursor(SpanJoinOperatorTable * table,sqlite3 * db)406 SpanJoinOperatorTable::Cursor::Cursor(SpanJoinOperatorTable* table, sqlite3* db)
407     : SqliteTable::Cursor(table),
408       t1_(table, &table->t1_defn_, db),
409       t2_(table, &table->t2_defn_, db),
410       table_(table) {}
411 
Filter(const QueryConstraints & qc,sqlite3_value ** argv,FilterHistory)412 int SpanJoinOperatorTable::Cursor::Filter(const QueryConstraints& qc,
413                                           sqlite3_value** argv,
414                                           FilterHistory) {
415   PERFETTO_TP_TRACE("SPAN_JOIN_XFILTER");
416 
417   bool t1_partitioned_mixed =
418       t1_.definition()->IsPartitioned() &&
419       table_->partitioning_ == PartitioningType::kMixedPartitioning;
420   auto t1_eof = table_->IsOuterJoin() && !t1_partitioned_mixed
421                     ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
422                     : Query::InitialEofBehavior::kTreatAsEof;
423   util::Status status = t1_.Initialize(qc, argv, t1_eof);
424   if (!status.ok())
425     return SQLITE_ERROR;
426 
427   bool t2_partitioned_mixed =
428       t2_.definition()->IsPartitioned() &&
429       table_->partitioning_ == PartitioningType::kMixedPartitioning;
430   auto t2_eof =
431       (table_->IsLeftJoin() || table_->IsOuterJoin()) && !t2_partitioned_mixed
432           ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
433           : Query::InitialEofBehavior::kTreatAsEof;
434   status = t2_.Initialize(qc, argv, t2_eof);
435   if (!status.ok())
436     return SQLITE_ERROR;
437 
438   status = FindOverlappingSpan();
439   return status.ok() ? SQLITE_OK : SQLITE_ERROR;
440 }
441 
Next()442 int SpanJoinOperatorTable::Cursor::Next() {
443   util::Status status = next_query_->Next();
444   if (!status.ok())
445     return SQLITE_ERROR;
446 
447   status = FindOverlappingSpan();
448   return status.ok() ? SQLITE_OK : SQLITE_ERROR;
449 }
450 
IsOverlappingSpan()451 bool SpanJoinOperatorTable::Cursor::IsOverlappingSpan() {
452   // If either of the tables are eof, then we cannot possibly have an
453   // overlapping span.
454   if (t1_.IsEof() || t2_.IsEof())
455     return false;
456 
457   // One of the tables always needs to have a real span to have a valid
458   // overlapping span.
459   if (!t1_.IsReal() && !t2_.IsReal())
460     return false;
461 
462   if (table_->partitioning_ == PartitioningType::kSamePartitioning) {
463     // If both tables are partitioned, then ensure that the partitions overlap.
464     bool partition_in_bounds = (t1_.FirstPartition() >= t2_.FirstPartition() &&
465                                 t1_.FirstPartition() <= t2_.LastPartition()) ||
466                                (t2_.FirstPartition() >= t1_.FirstPartition() &&
467                                 t2_.FirstPartition() <= t1_.LastPartition());
468     if (!partition_in_bounds)
469       return false;
470   }
471 
472   // We consider all slices to be [start, end) - that is the range of
473   // timestamps has an open interval at the start but a closed interval
474   // at the end. (with the exception of dur == -1 which we treat as if
475   // end == start for the purpose of this function).
476   return (t1_.ts() == t2_.ts() && t1_.IsReal() && t2_.IsReal()) ||
477          (t1_.ts() >= t2_.ts() && t1_.ts() < t2_.AdjustedTsEnd()) ||
478          (t2_.ts() >= t1_.ts() && t2_.ts() < t1_.AdjustedTsEnd());
479 }
480 
FindOverlappingSpan()481 util::Status SpanJoinOperatorTable::Cursor::FindOverlappingSpan() {
482   // We loop until we find a slice which overlaps from the two tables.
483   while (true) {
484     if (table_->partitioning_ == PartitioningType::kMixedPartitioning) {
485       // If we have a mixed partition setup, we need to have special checks
486       // for eof and to reset the unpartitioned cursor every time the partition
487       // changes in the partitioned table.
488       auto* partitioned = t1_.definition()->IsPartitioned() ? &t1_ : &t2_;
489       auto* unpartitioned = t1_.definition()->IsPartitioned() ? &t2_ : &t1_;
490 
491       // If the partitioned table reaches eof, then we are really done.
492       if (partitioned->IsEof())
493         break;
494 
495       // If the partition has changed from the previous one, reset the cursor
496       // and keep a lot of the new partition.
497       if (last_mixed_partition_ != partitioned->partition()) {
498         util::Status status = unpartitioned->Rewind();
499         if (!status.ok())
500           return status;
501         last_mixed_partition_ = partitioned->partition();
502       }
503     } else if (t1_.IsEof() || t2_.IsEof()) {
504       // For both no partition and same partition cases, either cursor ending
505       // ends the whole span join.
506       break;
507     }
508 
509     // Find which slice finishes first.
510     next_query_ = FindEarliestFinishQuery();
511 
512     // If the current span is overlapping, just finish there to emit the current
513     // slice.
514     if (IsOverlappingSpan())
515       break;
516 
517     // Otherwise, step to the next row.
518     util::Status status = next_query_->Next();
519     if (!status.ok())
520       return status;
521   }
522   return util::OkStatus();
523 }
524 
525 SpanJoinOperatorTable::Query*
FindEarliestFinishQuery()526 SpanJoinOperatorTable::Cursor::FindEarliestFinishQuery() {
527   int64_t t1_part;
528   int64_t t2_part;
529 
530   switch (table_->partitioning_) {
531     case PartitioningType::kMixedPartitioning: {
532       // If either table is EOF, forward the other table to try and make
533       // the partitions not match anymore.
534       if (t1_.IsEof())
535         return &t2_;
536       if (t2_.IsEof())
537         return &t1_;
538 
539       // Otherwise, just make the partition equal from both tables.
540       t1_part = last_mixed_partition_;
541       t2_part = last_mixed_partition_;
542       break;
543     }
544     case PartitioningType::kSamePartitioning: {
545       // Get the partition values from the cursor.
546       t1_part = t1_.LastPartition();
547       t2_part = t2_.LastPartition();
548       break;
549     }
550     case PartitioningType::kNoPartitioning: {
551       t1_part = 0;
552       t2_part = 0;
553       break;
554     }
555   }
556 
557   // Prefer to forward the earliest cursors based on the following
558   // lexiographical ordering:
559   // 1. partition
560   // 2. end timestamp
561   // 3. whether the slice is real or shadow (shadow < real)
562   bool t1_less = std::make_tuple(t1_part, t1_.AdjustedTsEnd(), t1_.IsReal()) <
563                  std::make_tuple(t2_part, t2_.AdjustedTsEnd(), t2_.IsReal());
564   return t1_less ? &t1_ : &t2_;
565 }
566 
Eof()567 int SpanJoinOperatorTable::Cursor::Eof() {
568   return t1_.IsEof() || t2_.IsEof();
569 }
570 
Column(sqlite3_context * context,int N)571 int SpanJoinOperatorTable::Cursor::Column(sqlite3_context* context, int N) {
572   PERFETTO_DCHECK(t1_.IsReal() || t2_.IsReal());
573 
574   switch (N) {
575     case Column::kTimestamp: {
576       auto max_ts = std::max(t1_.ts(), t2_.ts());
577       sqlite3_result_int64(context, static_cast<sqlite3_int64>(max_ts));
578       break;
579     }
580     case Column::kDuration: {
581       auto max_start = std::max(t1_.ts(), t2_.ts());
582       auto min_end = std::min(t1_.raw_ts_end(), t2_.raw_ts_end());
583       auto dur = min_end - max_start;
584       sqlite3_result_int64(context, static_cast<sqlite3_int64>(dur));
585       break;
586     }
587     case Column::kPartition: {
588       if (table_->partitioning_ != PartitioningType::kNoPartitioning) {
589         int64_t partition;
590         if (table_->partitioning_ == PartitioningType::kMixedPartitioning) {
591           partition = last_mixed_partition_;
592         } else {
593           partition = t1_.IsReal() ? t1_.partition() : t2_.partition();
594         }
595         sqlite3_result_int64(context, static_cast<sqlite3_int64>(partition));
596         break;
597       }
598       [[clang::fallthrough]];
599     }
600     default: {
601       size_t index = static_cast<size_t>(N);
602       const auto& locator = table_->global_index_to_column_locator_[index];
603       if (locator.defn == t1_.definition())
604         t1_.ReportSqliteResult(context, locator.col_index);
605       else
606         t2_.ReportSqliteResult(context, locator.col_index);
607     }
608   }
609   return SQLITE_OK;
610 }
611 
Query(SpanJoinOperatorTable * table,const TableDefinition * definition,sqlite3 * db)612 SpanJoinOperatorTable::Query::Query(SpanJoinOperatorTable* table,
613                                     const TableDefinition* definition,
614                                     sqlite3* db)
615     : defn_(definition), db_(db), table_(table) {
616   PERFETTO_DCHECK(!defn_->IsPartitioned() ||
617                   defn_->partition_idx() < defn_->columns().size());
618 }
619 
620 SpanJoinOperatorTable::Query::~Query() = default;
621 
Initialize(const QueryConstraints & qc,sqlite3_value ** argv,InitialEofBehavior eof_behavior)622 util::Status SpanJoinOperatorTable::Query::Initialize(
623     const QueryConstraints& qc,
624     sqlite3_value** argv,
625     InitialEofBehavior eof_behavior) {
626   *this = Query(table_, definition(), db_);
627   sql_query_ = CreateSqlQuery(
628       table_->ComputeSqlConstraintsForDefinition(*defn_, qc, argv));
629   util::Status status = Rewind();
630   if (!status.ok())
631     return status;
632   if (eof_behavior == InitialEofBehavior::kTreatAsMissingPartitionShadow &&
633       IsEof()) {
634     state_ = State::kMissingPartitionShadow;
635   }
636   return status;
637 }
638 
Next()639 util::Status SpanJoinOperatorTable::Query::Next() {
640   RETURN_IF_ERROR(NextSliceState());
641   return FindNextValidSlice();
642 }
643 
IsValidSlice()644 bool SpanJoinOperatorTable::Query::IsValidSlice() {
645   // Disallow any single partition shadow slices if the definition doesn't allow
646   // them.
647   if (IsPresentPartitionShadow() && !defn_->ShouldEmitPresentPartitionShadow())
648     return false;
649 
650   // Disallow any missing partition shadow slices if the definition doesn't
651   // allow them.
652   if (IsMissingPartitionShadow() && !defn_->ShouldEmitMissingPartitionShadow())
653     return false;
654 
655   // Disallow any "empty" shadows; these are shadows which either have the same
656   // start and end time or missing-partition shadows which have the same start
657   // and end partition.
658   if (IsEmptyShadow())
659     return false;
660 
661   return true;
662 }
663 
FindNextValidSlice()664 util::Status SpanJoinOperatorTable::Query::FindNextValidSlice() {
665   // The basic idea of this function is that |NextSliceState()| always emits
666   // all possible slices (including shadows for any gaps inbetween the real
667   // slices) and we filter out the invalid slices (as defined by the table
668   // definition) using |IsValidSlice()|.
669   //
670   // This has proved to be a lot cleaner to implement than trying to choose
671   // when to emit and not emit shadows directly.
672   while (!IsEof() && !IsValidSlice()) {
673     RETURN_IF_ERROR(NextSliceState());
674   }
675   return util::OkStatus();
676 }
677 
NextSliceState()678 util::Status SpanJoinOperatorTable::Query::NextSliceState() {
679   switch (state_) {
680     case State::kReal: {
681       // Forward the cursor to figure out where the next slice should be.
682       RETURN_IF_ERROR(CursorNext());
683 
684       // Depending on the next slice, we can do two things here:
685       // 1. If the next slice is on the same partition, we can just emit a
686       //    single shadow until the start of the next slice.
687       // 2. If the next slice is on another partition or we hit eof, just emit
688       //    a shadow to the end of the whole partition.
689       bool shadow_to_end = cursor_eof_ || (defn_->IsPartitioned() &&
690                                            partition_ != CursorPartition());
691       state_ = State::kPresentPartitionShadow;
692       ts_ = AdjustedTsEnd();
693       ts_end_ =
694           shadow_to_end ? std::numeric_limits<int64_t>::max() : CursorTs();
695       return util::OkStatus();
696     }
697     case State::kPresentPartitionShadow: {
698       if (ts_end_ == std::numeric_limits<int64_t>::max()) {
699         // If the shadow is to the end of the slice, create a missing partition
700         // shadow to the start of the partition of the next slice or to the max
701         // partition if we hit eof.
702         state_ = State::kMissingPartitionShadow;
703         ts_ = 0;
704         ts_end_ = std::numeric_limits<int64_t>::max();
705 
706         missing_partition_start_ = partition_ + 1;
707         missing_partition_end_ = cursor_eof_
708                                      ? std::numeric_limits<int64_t>::max()
709                                      : CursorPartition();
710       } else {
711         // If the shadow is not to the end, we must have another slice on the
712         // current partition.
713         state_ = State::kReal;
714         ts_ = CursorTs();
715         ts_end_ = ts_ + CursorDur();
716 
717         PERFETTO_DCHECK(!defn_->IsPartitioned() ||
718                         partition_ == CursorPartition());
719       }
720       return util::OkStatus();
721     }
722     case State::kMissingPartitionShadow: {
723       if (missing_partition_end_ == std::numeric_limits<int64_t>::max()) {
724         PERFETTO_DCHECK(cursor_eof_);
725 
726         // If we have a missing partition to the max partition, we must have hit
727         // eof.
728         state_ = State::kEof;
729       } else {
730         PERFETTO_DCHECK(!defn_->IsPartitioned() ||
731                         CursorPartition() == missing_partition_end_);
732 
733         // Otherwise, setup a single partition slice on the end partition to the
734         // start of the next slice.
735         state_ = State::kPresentPartitionShadow;
736         ts_ = 0;
737         ts_end_ = CursorTs();
738         partition_ = missing_partition_end_;
739       }
740       return util::OkStatus();
741     }
742     case State::kEof: {
743       PERFETTO_DFATAL("Called Next when EOF");
744       return util::ErrStatus("Called Next when EOF");
745     }
746   }
747   PERFETTO_FATAL("For GCC");
748 }
749 
Rewind()750 util::Status SpanJoinOperatorTable::Query::Rewind() {
751   sqlite3_stmt* stmt = nullptr;
752   int res =
753       sqlite3_prepare_v2(db_, sql_query_.c_str(),
754                          static_cast<int>(sql_query_.size()), &stmt, nullptr);
755   stmt_.reset(stmt);
756 
757   cursor_eof_ = res != SQLITE_OK;
758   if (res != SQLITE_OK)
759     return util::ErrStatus("%s", sqlite3_errmsg(db_));
760 
761   RETURN_IF_ERROR(CursorNext());
762 
763   // Setup the first slice as a missing partition shadow from the lowest
764   // partition until the first slice partition. We will handle finding the real
765   // slice in |FindNextValidSlice()|.
766   state_ = State::kMissingPartitionShadow;
767   ts_ = 0;
768   ts_end_ = std::numeric_limits<int64_t>::max();
769   missing_partition_start_ = std::numeric_limits<int64_t>::min();
770 
771   if (cursor_eof_) {
772     missing_partition_end_ = std::numeric_limits<int64_t>::max();
773   } else if (defn_->IsPartitioned()) {
774     missing_partition_end_ = CursorPartition();
775   } else {
776     missing_partition_end_ = std::numeric_limits<int64_t>::min();
777   }
778 
779   // Actually compute the first valid slice.
780   return FindNextValidSlice();
781 }
782 
CursorNext()783 util::Status SpanJoinOperatorTable::Query::CursorNext() {
784   auto* stmt = stmt_.get();
785   int res;
786   if (defn_->IsPartitioned()) {
787     auto partition_idx = static_cast<int>(defn_->partition_idx());
788     // Fastforward through any rows with null partition keys.
789     int row_type;
790     do {
791       res = sqlite3_step(stmt);
792       row_type = sqlite3_column_type(stmt, partition_idx);
793     } while (res == SQLITE_ROW && row_type == SQLITE_NULL);
794 
795     if (res == SQLITE_ROW && row_type != SQLITE_INTEGER) {
796       return util::ErrStatus("SPAN_JOIN: partition is not an int");
797     }
798   } else {
799     res = sqlite3_step(stmt);
800   }
801   cursor_eof_ = res != SQLITE_ROW;
802   return res == SQLITE_ROW || res == SQLITE_DONE
803              ? util::OkStatus()
804              : util::ErrStatus("SPAN_JOIN: %s", sqlite3_errmsg(db_));
805 }
806 
CreateSqlQuery(const std::vector<std::string> & cs) const807 std::string SpanJoinOperatorTable::Query::CreateSqlQuery(
808     const std::vector<std::string>& cs) const {
809   std::vector<std::string> col_names;
810   for (const SqliteTable::Column& c : defn_->columns()) {
811     col_names.push_back("`" + c.name() + "`");
812   }
813 
814   std::string sql = "SELECT " + base::Join(col_names, ", ");
815   sql += " FROM " + defn_->name();
816   if (!cs.empty()) {
817     sql += " WHERE " + base::Join(cs, " AND ");
818   }
819   sql += " ORDER BY ";
820   sql += defn_->IsPartitioned()
821              ? base::Join({"`" + defn_->partition_col() + "`", "ts"}, ", ")
822              : "ts";
823   sql += ";";
824   PERFETTO_DLOG("%s", sql.c_str());
825   return sql;
826 }
827 
ReportSqliteResult(sqlite3_context * context,size_t index)828 void SpanJoinOperatorTable::Query::ReportSqliteResult(sqlite3_context* context,
829                                                       size_t index) {
830   if (state_ != State::kReal) {
831     sqlite3_result_null(context);
832     return;
833   }
834 
835   sqlite3_stmt* stmt = stmt_.get();
836   int idx = static_cast<int>(index);
837   switch (sqlite3_column_type(stmt, idx)) {
838     case SQLITE_INTEGER:
839       sqlite3_result_int64(context, sqlite3_column_int64(stmt, idx));
840       break;
841     case SQLITE_FLOAT:
842       sqlite3_result_double(context, sqlite3_column_double(stmt, idx));
843       break;
844     case SQLITE_TEXT: {
845       // TODO(lalitm): note for future optimizations: if we knew the addresses
846       // of the string intern pool, we could check if the string returned here
847       // comes from the pool, and pass it as non-transient.
848       const auto kSqliteTransient =
849           reinterpret_cast<sqlite3_destructor_type>(-1);
850       auto ptr = reinterpret_cast<const char*>(sqlite3_column_text(stmt, idx));
851       sqlite3_result_text(context, ptr, -1, kSqliteTransient);
852       break;
853     }
854   }
855 }
856 
TableDefinition(std::string name,std::string partition_col,std::vector<SqliteTable::Column> cols,EmitShadowType emit_shadow_type,uint32_t ts_idx,uint32_t dur_idx,uint32_t partition_idx)857 SpanJoinOperatorTable::TableDefinition::TableDefinition(
858     std::string name,
859     std::string partition_col,
860     std::vector<SqliteTable::Column> cols,
861     EmitShadowType emit_shadow_type,
862     uint32_t ts_idx,
863     uint32_t dur_idx,
864     uint32_t partition_idx)
865     : emit_shadow_type_(emit_shadow_type),
866       name_(std::move(name)),
867       partition_col_(std::move(partition_col)),
868       cols_(std::move(cols)),
869       ts_idx_(ts_idx),
870       dur_idx_(dur_idx),
871       partition_idx_(partition_idx) {}
872 
Parse(const std::string & raw_descriptor,SpanJoinOperatorTable::TableDescriptor * descriptor)873 util::Status SpanJoinOperatorTable::TableDescriptor::Parse(
874     const std::string& raw_descriptor,
875     SpanJoinOperatorTable::TableDescriptor* descriptor) {
876   // Descriptors have one of the following forms:
877   // table_name [PARTITIONED column_name]
878 
879   // Find the table name.
880   base::StringSplitter splitter(raw_descriptor, ' ');
881   if (!splitter.Next())
882     return util::ErrStatus("SPAN_JOIN: Missing table name");
883 
884   descriptor->name = splitter.cur_token();
885   if (!splitter.Next())
886     return util::OkStatus();
887 
888   if (!base::CaseInsensitiveEqual(splitter.cur_token(), "PARTITIONED"))
889     return util::ErrStatus("SPAN_JOIN: Invalid token");
890 
891   if (!splitter.Next())
892     return util::ErrStatus("SPAN_JOIN: Missing partitioning column");
893 
894   descriptor->partition_col = splitter.cur_token();
895   return util::OkStatus();
896 }
897 
898 }  // namespace trace_processor
899 }  // namespace perfetto
900