1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/prelude/operators/span_join_operator.h"
18 
19 #include <sqlite3.h>
20 #include <string.h>
21 
22 #include <algorithm>
23 #include <set>
24 #include <utility>
25 
26 #include "perfetto/base/logging.h"
27 #include "perfetto/base/status.h"
28 #include "perfetto/ext/base/string_splitter.h"
29 #include "perfetto/ext/base/string_utils.h"
30 #include "perfetto/ext/base/string_view.h"
31 #include "src/trace_processor/sqlite/sqlite_utils.h"
32 #include "src/trace_processor/tp_metatrace.h"
33 #include "src/trace_processor/util/status_macros.h"
34 
35 namespace perfetto {
36 namespace trace_processor {
37 
38 namespace {
39 
40 constexpr char kTsColumnName[] = "ts";
41 constexpr char kDurColumnName[] = "dur";
42 
IsRequiredColumn(const std::string & name)43 bool IsRequiredColumn(const std::string& name) {
44   return name == kTsColumnName || name == kDurColumnName;
45 }
46 
HasDuplicateColumns(const std::vector<SqliteTable::Column> & cols)47 std::optional<std::string> HasDuplicateColumns(
48     const std::vector<SqliteTable::Column>& cols) {
49   std::set<std::string> names;
50   for (const auto& col : cols) {
51     if (names.count(col.name()) > 0)
52       return col.name();
53     names.insert(col.name());
54   }
55   return std::nullopt;
56 }
57 
OpToString(int op)58 std::string OpToString(int op) {
59   switch (op) {
60     case SQLITE_INDEX_CONSTRAINT_EQ:
61       return "=";
62     case SQLITE_INDEX_CONSTRAINT_NE:
63       return "!=";
64     case SQLITE_INDEX_CONSTRAINT_GE:
65       return ">=";
66     case SQLITE_INDEX_CONSTRAINT_GT:
67       return ">";
68     case SQLITE_INDEX_CONSTRAINT_LE:
69       return "<=";
70     case SQLITE_INDEX_CONSTRAINT_LT:
71       return "<";
72     case SQLITE_INDEX_CONSTRAINT_LIKE:
73       return " like ";
74     case SQLITE_INDEX_CONSTRAINT_GLOB:
75       return " glob ";
76     case SQLITE_INDEX_CONSTRAINT_ISNULL:
77       // The "null" will be added below in EscapedSqliteValueAsString.
78       return " is ";
79     case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
80       // The "null" will be added below in EscapedSqliteValueAsString.
81       return " is not ";
82     default:
83       PERFETTO_FATAL("Operator to string conversion not impemented for %d", op);
84   }
85 }
86 
EscapedSqliteValueAsString(sqlite3_value * value)87 std::string EscapedSqliteValueAsString(sqlite3_value* value) {
88   switch (sqlite3_value_type(value)) {
89     case SQLITE_INTEGER:
90       return std::to_string(sqlite3_value_int64(value));
91     case SQLITE_FLOAT:
92       return std::to_string(sqlite3_value_double(value));
93     case SQLITE_TEXT: {
94       // If str itself contains a single quote, we need to escape it with
95       // another single quote.
96       const char* str =
97           reinterpret_cast<const char*>(sqlite3_value_text(value));
98       return "'" + base::ReplaceAll(str, "'", "''") + "'";
99     }
100     case SQLITE_NULL:
101       return " null";
102     default:
103       PERFETTO_FATAL("Unknown value type %d", sqlite3_value_type(value));
104   }
105 }
106 
107 }  // namespace
108 
SpanJoinOperatorTable(sqlite3 * db,const void *)109 SpanJoinOperatorTable::SpanJoinOperatorTable(sqlite3* db, const void*)
110     : db_(db) {}
111 SpanJoinOperatorTable::~SpanJoinOperatorTable() = default;
112 
Init(int argc,const char * const * argv,Schema * schema)113 util::Status SpanJoinOperatorTable::Init(int argc,
114                                          const char* const* argv,
115                                          Schema* schema) {
116   // argv[0] - argv[2] are SQLite populated fields which are always present.
117   if (argc < 5)
118     return util::Status("SPAN_JOIN: expected at least 2 args");
119 
120   TableDescriptor t1_desc;
121   auto status = TableDescriptor::Parse(
122       std::string(reinterpret_cast<const char*>(argv[3])), &t1_desc);
123   if (!status.ok())
124     return status;
125 
126   TableDescriptor t2_desc;
127   status = TableDescriptor::Parse(
128       std::string(reinterpret_cast<const char*>(argv[4])), &t2_desc);
129   if (!status.ok())
130     return status;
131 
132   // Check that the partition columns match between the two tables.
133   if (t1_desc.partition_col == t2_desc.partition_col) {
134     partitioning_ = t1_desc.IsPartitioned()
135                         ? PartitioningType::kSamePartitioning
136                         : PartitioningType::kNoPartitioning;
137   } else if (t1_desc.IsPartitioned() && t2_desc.IsPartitioned()) {
138     return util::ErrStatus(
139         "SPAN_JOIN: mismatching partitions between the two tables; "
140         "(partition %s in table %s, partition %s in table %s)",
141         t1_desc.partition_col.c_str(), t1_desc.name.c_str(),
142         t2_desc.partition_col.c_str(), t2_desc.name.c_str());
143   } else {
144     partitioning_ = PartitioningType::kMixedPartitioning;
145   }
146 
147   bool t1_part_mixed = t1_desc.IsPartitioned() &&
148                        partitioning_ == PartitioningType::kMixedPartitioning;
149   bool t2_part_mixed = t2_desc.IsPartitioned() &&
150                        partitioning_ == PartitioningType::kMixedPartitioning;
151 
152   EmitShadowType t1_shadow_type;
153   if (IsOuterJoin()) {
154     if (t1_part_mixed || partitioning_ == PartitioningType::kNoPartitioning) {
155       t1_shadow_type = EmitShadowType::kPresentPartitionOnly;
156     } else {
157       t1_shadow_type = EmitShadowType::kAll;
158     }
159   } else {
160     t1_shadow_type = EmitShadowType::kNone;
161   }
162   status = CreateTableDefinition(t1_desc, t1_shadow_type, &t1_defn_);
163   if (!status.ok())
164     return status;
165 
166   EmitShadowType t2_shadow_type;
167   if (IsOuterJoin() || IsLeftJoin()) {
168     if (t2_part_mixed || partitioning_ == PartitioningType::kNoPartitioning) {
169       t2_shadow_type = EmitShadowType::kPresentPartitionOnly;
170     } else {
171       t2_shadow_type = EmitShadowType::kAll;
172     }
173   } else {
174     t2_shadow_type = EmitShadowType::kNone;
175   }
176   status = CreateTableDefinition(t2_desc, t2_shadow_type, &t2_defn_);
177   if (!status.ok())
178     return status;
179 
180   std::vector<SqliteTable::Column> cols;
181   // Ensure the shared columns are consistently ordered and are not
182   // present twice in the final schema
183   cols.emplace_back(Column::kTimestamp, kTsColumnName, SqlValue::Type::kLong);
184   cols.emplace_back(Column::kDuration, kDurColumnName, SqlValue::Type::kLong);
185   if (partitioning_ != PartitioningType::kNoPartitioning)
186     cols.emplace_back(Column::kPartition, partition_col(),
187                       SqlValue::Type::kLong);
188 
189   CreateSchemaColsForDefn(t1_defn_, &cols);
190   CreateSchemaColsForDefn(t2_defn_, &cols);
191 
192   // Check if any column has : in its name. This often happens when SELECT *
193   // is used to create a view with the same column name in two joined tables.
194   for (const auto& col : cols) {
195     if (base::Contains(col.name(), ':')) {
196       return util::ErrStatus("SPAN_JOIN: column %s has illegal character :",
197                              col.name().c_str());
198     }
199   }
200 
201   if (auto opt_dupe_col = HasDuplicateColumns(cols)) {
202     return util::ErrStatus(
203         "SPAN_JOIN: column %s present in both tables %s and %s",
204         opt_dupe_col->c_str(), t1_defn_.name().c_str(),
205         t2_defn_.name().c_str());
206   }
207   std::vector<size_t> primary_keys = {Column::kTimestamp};
208   if (partitioning_ != PartitioningType::kNoPartitioning)
209     primary_keys.push_back(Column::kPartition);
210   *schema = Schema(cols, primary_keys);
211 
212   return util::OkStatus();
213 }
214 
CreateSchemaColsForDefn(const TableDefinition & defn,std::vector<SqliteTable::Column> * cols)215 void SpanJoinOperatorTable::CreateSchemaColsForDefn(
216     const TableDefinition& defn,
217     std::vector<SqliteTable::Column>* cols) {
218   for (size_t i = 0; i < defn.columns().size(); i++) {
219     const auto& n = defn.columns()[i].name();
220     if (IsRequiredColumn(n) || n == defn.partition_col())
221       continue;
222 
223     ColumnLocator* locator = &global_index_to_column_locator_[cols->size()];
224     locator->defn = &defn;
225     locator->col_index = i;
226 
227     cols->emplace_back(cols->size(), n, defn.columns()[i].type());
228   }
229 }
230 
CreateCursor()231 std::unique_ptr<SqliteTable::BaseCursor> SpanJoinOperatorTable::CreateCursor() {
232   return std::unique_ptr<SpanJoinOperatorTable::Cursor>(new Cursor(this, db_));
233 }
234 
BestIndex(const QueryConstraints & qc,BestIndexInfo * info)235 int SpanJoinOperatorTable::BestIndex(const QueryConstraints& qc,
236                                      BestIndexInfo* info) {
237   // TODO(lalitm): figure out cost estimation.
238   const auto& ob = qc.order_by();
239 
240   if (partitioning_ == PartitioningType::kNoPartitioning) {
241     // If both tables are not partitioned and we have a single order by on ts,
242     // we return data in the correct order.
243     info->sqlite_omit_order_by =
244         ob.size() == 1 && ob[0].iColumn == Column::kTimestamp && !ob[0].desc;
245   } else {
246     // If one of the tables is partitioned, and we have an order by on the
247     // partition column followed (optionally) by an order by on timestamp, we
248     // return data in the correct order.
249     bool is_first_ob_partition =
250         ob.size() >= 1 && ob[0].iColumn == Column::kPartition && !ob[0].desc;
251     bool is_second_ob_ts =
252         ob.size() >= 2 && ob[1].iColumn == Column::kTimestamp && !ob[1].desc;
253     info->sqlite_omit_order_by =
254         (ob.size() == 1 && is_first_ob_partition) ||
255         (ob.size() == 2 && is_first_ob_partition && is_second_ob_ts);
256   }
257 
258   const auto& cs = qc.constraints();
259   for (uint32_t i = 0; i < cs.size(); ++i) {
260     if (cs[i].op == kSourceGeqOpCode) {
261       info->sqlite_omit_constraint[i] = true;
262     }
263   }
264 
265   return SQLITE_OK;
266 }
267 
FindFunction(const char * name,FindFunctionFn * fn,void **)268 int SpanJoinOperatorTable::FindFunction(const char* name,
269                                         FindFunctionFn* fn,
270                                         void**) {
271   if (base::CaseInsensitiveEqual(name, "source_geq")) {
272     *fn = [](sqlite3_context* ctx, int, sqlite3_value**) {
273       sqlite3_result_error(ctx, "Should not be called.", -1);
274     };
275     return kSourceGeqOpCode;
276   }
277   return 0;
278 }
279 
280 std::vector<std::string>
ComputeSqlConstraintsForDefinition(const TableDefinition & defn,const QueryConstraints & qc,sqlite3_value ** argv)281 SpanJoinOperatorTable::ComputeSqlConstraintsForDefinition(
282     const TableDefinition& defn,
283     const QueryConstraints& qc,
284     sqlite3_value** argv) {
285   std::vector<std::string> constraints;
286   for (size_t i = 0; i < qc.constraints().size(); i++) {
287     const auto& cs = qc.constraints()[i];
288     auto col_name = GetNameForGlobalColumnIndex(defn, cs.column);
289     if (col_name.empty())
290       continue;
291 
292     // Le constraints can be passed straight to the child tables as they won't
293     // affect the span join computation. Similarily, source_geq constraints
294     // explicitly request that they are passed as geq constraints to the source
295     // tables.
296     if (col_name == kTsColumnName && !sqlite_utils::IsOpLe(cs.op) &&
297         cs.op != kSourceGeqOpCode)
298       continue;
299 
300     // Allow SQLite handle any constraints on duration apart from source_geq
301     // constraints.
302     if (col_name == kDurColumnName && cs.op != kSourceGeqOpCode)
303       continue;
304 
305     // If we're emitting shadow slices, don't propogate any constraints
306     // on this table as this will break the shadow slice computation.
307     if (defn.ShouldEmitPresentPartitionShadow())
308       continue;
309 
310     auto op = OpToString(cs.op == kSourceGeqOpCode ? SQLITE_INDEX_CONSTRAINT_GE
311                                                    : cs.op);
312     auto value = EscapedSqliteValueAsString(argv[i]);
313 
314     constraints.emplace_back("`" + col_name + "`" + op + value);
315   }
316   return constraints;
317 }
318 
CreateTableDefinition(const TableDescriptor & desc,EmitShadowType emit_shadow_type,SpanJoinOperatorTable::TableDefinition * defn)319 util::Status SpanJoinOperatorTable::CreateTableDefinition(
320     const TableDescriptor& desc,
321     EmitShadowType emit_shadow_type,
322     SpanJoinOperatorTable::TableDefinition* defn) {
323   if (desc.partition_col == kTsColumnName ||
324       desc.partition_col == kDurColumnName) {
325     return util::ErrStatus(
326         "SPAN_JOIN: partition column cannot be any of {ts, dur} for table %s",
327         desc.name.c_str());
328   }
329 
330   std::vector<SqliteTable::Column> cols;
331   auto status = sqlite_utils::GetColumnsForTable(db_, desc.name, cols);
332   if (!status.ok()) {
333     return status;
334   }
335 
336   uint32_t required_columns_found = 0;
337   uint32_t ts_idx = std::numeric_limits<uint32_t>::max();
338   uint32_t dur_idx = std::numeric_limits<uint32_t>::max();
339   uint32_t partition_idx = std::numeric_limits<uint32_t>::max();
340   for (uint32_t i = 0; i < cols.size(); i++) {
341     auto col = cols[i];
342     if (IsRequiredColumn(col.name())) {
343       ++required_columns_found;
344       if (col.type() != SqlValue::Type::kLong &&
345           col.type() != SqlValue::Type::kNull) {
346         return util::ErrStatus(
347             "SPAN_JOIN: Invalid type for column %s in table %s",
348             col.name().c_str(), desc.name.c_str());
349       }
350     }
351 
352     if (col.name() == kTsColumnName) {
353       ts_idx = i;
354     } else if (col.name() == kDurColumnName) {
355       dur_idx = i;
356     } else if (col.name() == desc.partition_col) {
357       partition_idx = i;
358     }
359   }
360   if (required_columns_found != 2) {
361     return util::ErrStatus(
362         "SPAN_JOIN: Missing one of columns {ts, dur} in table %s",
363         desc.name.c_str());
364   } else if (desc.IsPartitioned() && partition_idx >= cols.size()) {
365     return util::ErrStatus("SPAN_JOIN: Missing partition column %s in table %s",
366                            desc.partition_col.c_str(), desc.name.c_str());
367   }
368 
369   PERFETTO_DCHECK(ts_idx < cols.size());
370   PERFETTO_DCHECK(dur_idx < cols.size());
371 
372   *defn = TableDefinition(desc.name, desc.partition_col, std::move(cols),
373                           emit_shadow_type, ts_idx, dur_idx, partition_idx);
374   return util::OkStatus();
375 }
376 
GetNameForGlobalColumnIndex(const TableDefinition & defn,int global_column)377 std::string SpanJoinOperatorTable::GetNameForGlobalColumnIndex(
378     const TableDefinition& defn,
379     int global_column) {
380   size_t col_idx = static_cast<size_t>(global_column);
381   if (col_idx == Column::kTimestamp)
382     return kTsColumnName;
383   else if (col_idx == Column::kDuration)
384     return kDurColumnName;
385   else if (col_idx == Column::kPartition &&
386            partitioning_ != PartitioningType::kNoPartitioning)
387     return defn.partition_col().c_str();
388 
389   const auto& locator = global_index_to_column_locator_[col_idx];
390   if (locator.defn != &defn)
391     return "";
392   return defn.columns()[locator.col_index].name().c_str();
393 }
394 
Cursor(SpanJoinOperatorTable * table,sqlite3 * db)395 SpanJoinOperatorTable::Cursor::Cursor(SpanJoinOperatorTable* table, sqlite3* db)
396     : SqliteTable::BaseCursor(table),
397       t1_(table, &table->t1_defn_, db),
398       t2_(table, &table->t2_defn_, db),
399       table_(table) {}
400 SpanJoinOperatorTable::Cursor::~Cursor() = default;
401 
Filter(const QueryConstraints & qc,sqlite3_value ** argv,FilterHistory)402 base::Status SpanJoinOperatorTable::Cursor::Filter(const QueryConstraints& qc,
403                                                    sqlite3_value** argv,
404                                                    FilterHistory) {
405   PERFETTO_TP_TRACE(metatrace::Category::QUERY, "SPAN_JOIN_XFILTER");
406 
407   bool t1_partitioned_mixed =
408       t1_.definition()->IsPartitioned() &&
409       table_->partitioning_ == PartitioningType::kMixedPartitioning;
410   auto t1_eof = table_->IsOuterJoin() && !t1_partitioned_mixed
411                     ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
412                     : Query::InitialEofBehavior::kTreatAsEof;
413   RETURN_IF_ERROR(t1_.Initialize(qc, argv, t1_eof));
414 
415   bool t2_partitioned_mixed =
416       t2_.definition()->IsPartitioned() &&
417       table_->partitioning_ == PartitioningType::kMixedPartitioning;
418   auto t2_eof =
419       (table_->IsLeftJoin() || table_->IsOuterJoin()) && !t2_partitioned_mixed
420           ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
421           : Query::InitialEofBehavior::kTreatAsEof;
422   RETURN_IF_ERROR(t2_.Initialize(qc, argv, t2_eof));
423   return FindOverlappingSpan();
424 }
425 
Next()426 base::Status SpanJoinOperatorTable::Cursor::Next() {
427   RETURN_IF_ERROR(next_query_->Next());
428   return FindOverlappingSpan();
429 }
430 
IsOverlappingSpan()431 bool SpanJoinOperatorTable::Cursor::IsOverlappingSpan() {
432   // If either of the tables are eof, then we cannot possibly have an
433   // overlapping span.
434   if (t1_.IsEof() || t2_.IsEof())
435     return false;
436 
437   // One of the tables always needs to have a real span to have a valid
438   // overlapping span.
439   if (!t1_.IsReal() && !t2_.IsReal())
440     return false;
441 
442   if (table_->partitioning_ == PartitioningType::kSamePartitioning) {
443     // If both tables are partitioned, then ensure that the partitions overlap.
444     bool partition_in_bounds = (t1_.FirstPartition() >= t2_.FirstPartition() &&
445                                 t1_.FirstPartition() <= t2_.LastPartition()) ||
446                                (t2_.FirstPartition() >= t1_.FirstPartition() &&
447                                 t2_.FirstPartition() <= t1_.LastPartition());
448     if (!partition_in_bounds)
449       return false;
450   }
451 
452   // We consider all slices to be [start, end) - that is the range of
453   // timestamps has an open interval at the start but a closed interval
454   // at the end. (with the exception of dur == -1 which we treat as if
455   // end == start for the purpose of this function).
456   return (t1_.ts() == t2_.ts() && t1_.IsReal() && t2_.IsReal()) ||
457          (t1_.ts() >= t2_.ts() && t1_.ts() < t2_.AdjustedTsEnd()) ||
458          (t2_.ts() >= t1_.ts() && t2_.ts() < t1_.AdjustedTsEnd());
459 }
460 
FindOverlappingSpan()461 util::Status SpanJoinOperatorTable::Cursor::FindOverlappingSpan() {
462   // We loop until we find a slice which overlaps from the two tables.
463   while (true) {
464     if (table_->partitioning_ == PartitioningType::kMixedPartitioning) {
465       // If we have a mixed partition setup, we need to have special checks
466       // for eof and to reset the unpartitioned cursor every time the partition
467       // changes in the partitioned table.
468       auto* partitioned = t1_.definition()->IsPartitioned() ? &t1_ : &t2_;
469       auto* unpartitioned = t1_.definition()->IsPartitioned() ? &t2_ : &t1_;
470 
471       // If the partitioned table reaches eof, then we are really done.
472       if (partitioned->IsEof())
473         break;
474 
475       // If the partition has changed from the previous one, reset the cursor
476       // and keep a lot of the new partition.
477       if (last_mixed_partition_ != partitioned->partition()) {
478         util::Status status = unpartitioned->Rewind();
479         if (!status.ok())
480           return status;
481         last_mixed_partition_ = partitioned->partition();
482       }
483     } else if (t1_.IsEof() || t2_.IsEof()) {
484       // For both no partition and same partition cases, either cursor ending
485       // ends the whole span join.
486       break;
487     }
488 
489     // Find which slice finishes first.
490     next_query_ = FindEarliestFinishQuery();
491 
492     // If the current span is overlapping, just finish there to emit the current
493     // slice.
494     if (IsOverlappingSpan())
495       break;
496 
497     // Otherwise, step to the next row.
498     util::Status status = next_query_->Next();
499     if (!status.ok())
500       return status;
501   }
502   return util::OkStatus();
503 }
504 
505 SpanJoinOperatorTable::Query*
FindEarliestFinishQuery()506 SpanJoinOperatorTable::Cursor::FindEarliestFinishQuery() {
507   int64_t t1_part;
508   int64_t t2_part;
509 
510   switch (table_->partitioning_) {
511     case PartitioningType::kMixedPartitioning: {
512       // If either table is EOF, forward the other table to try and make
513       // the partitions not match anymore.
514       if (t1_.IsEof())
515         return &t2_;
516       if (t2_.IsEof())
517         return &t1_;
518 
519       // Otherwise, just make the partition equal from both tables.
520       t1_part = last_mixed_partition_;
521       t2_part = last_mixed_partition_;
522       break;
523     }
524     case PartitioningType::kSamePartitioning: {
525       // Get the partition values from the cursor.
526       t1_part = t1_.LastPartition();
527       t2_part = t2_.LastPartition();
528       break;
529     }
530     case PartitioningType::kNoPartitioning: {
531       t1_part = 0;
532       t2_part = 0;
533       break;
534     }
535   }
536 
537   // Prefer to forward the earliest cursors based on the following
538   // lexiographical ordering:
539   // 1. partition
540   // 2. end timestamp
541   // 3. whether the slice is real or shadow (shadow < real)
542   bool t1_less = std::make_tuple(t1_part, t1_.AdjustedTsEnd(), t1_.IsReal()) <
543                  std::make_tuple(t2_part, t2_.AdjustedTsEnd(), t2_.IsReal());
544   return t1_less ? &t1_ : &t2_;
545 }
546 
Eof()547 bool SpanJoinOperatorTable::Cursor::Eof() {
548   return t1_.IsEof() || t2_.IsEof();
549 }
550 
Column(sqlite3_context * context,int N)551 base::Status SpanJoinOperatorTable::Cursor::Column(sqlite3_context* context,
552                                                    int N) {
553   PERFETTO_DCHECK(t1_.IsReal() || t2_.IsReal());
554 
555   switch (N) {
556     case Column::kTimestamp: {
557       auto max_ts = std::max(t1_.ts(), t2_.ts());
558       sqlite3_result_int64(context, static_cast<sqlite3_int64>(max_ts));
559       break;
560     }
561     case Column::kDuration: {
562       auto max_start = std::max(t1_.ts(), t2_.ts());
563       auto min_end = std::min(t1_.raw_ts_end(), t2_.raw_ts_end());
564       auto dur = min_end - max_start;
565       sqlite3_result_int64(context, static_cast<sqlite3_int64>(dur));
566       break;
567     }
568     case Column::kPartition: {
569       if (table_->partitioning_ != PartitioningType::kNoPartitioning) {
570         int64_t partition;
571         if (table_->partitioning_ == PartitioningType::kMixedPartitioning) {
572           partition = last_mixed_partition_;
573         } else {
574           partition = t1_.IsReal() ? t1_.partition() : t2_.partition();
575         }
576         sqlite3_result_int64(context, static_cast<sqlite3_int64>(partition));
577         break;
578       }
579       [[clang::fallthrough]];
580     }
581     default: {
582       size_t index = static_cast<size_t>(N);
583       const auto& locator = table_->global_index_to_column_locator_[index];
584       if (locator.defn == t1_.definition())
585         t1_.ReportSqliteResult(context, locator.col_index);
586       else
587         t2_.ReportSqliteResult(context, locator.col_index);
588     }
589   }
590   return base::OkStatus();
591 }
592 
Query(SpanJoinOperatorTable * table,const TableDefinition * definition,sqlite3 * db)593 SpanJoinOperatorTable::Query::Query(SpanJoinOperatorTable* table,
594                                     const TableDefinition* definition,
595                                     sqlite3* db)
596     : defn_(definition), db_(db), table_(table) {
597   PERFETTO_DCHECK(!defn_->IsPartitioned() ||
598                   defn_->partition_idx() < defn_->columns().size());
599 }
600 
601 SpanJoinOperatorTable::Query::~Query() = default;
602 
Initialize(const QueryConstraints & qc,sqlite3_value ** argv,InitialEofBehavior eof_behavior)603 util::Status SpanJoinOperatorTable::Query::Initialize(
604     const QueryConstraints& qc,
605     sqlite3_value** argv,
606     InitialEofBehavior eof_behavior) {
607   *this = Query(table_, definition(), db_);
608   sql_query_ = CreateSqlQuery(
609       table_->ComputeSqlConstraintsForDefinition(*defn_, qc, argv));
610   util::Status status = Rewind();
611   if (!status.ok())
612     return status;
613   if (eof_behavior == InitialEofBehavior::kTreatAsMissingPartitionShadow &&
614       IsEof()) {
615     state_ = State::kMissingPartitionShadow;
616   }
617   return status;
618 }
619 
Next()620 util::Status SpanJoinOperatorTable::Query::Next() {
621   RETURN_IF_ERROR(NextSliceState());
622   return FindNextValidSlice();
623 }
624 
IsValidSlice()625 bool SpanJoinOperatorTable::Query::IsValidSlice() {
626   // Disallow any single partition shadow slices if the definition doesn't allow
627   // them.
628   if (IsPresentPartitionShadow() && !defn_->ShouldEmitPresentPartitionShadow())
629     return false;
630 
631   // Disallow any missing partition shadow slices if the definition doesn't
632   // allow them.
633   if (IsMissingPartitionShadow() && !defn_->ShouldEmitMissingPartitionShadow())
634     return false;
635 
636   // Disallow any "empty" shadows; these are shadows which either have the same
637   // start and end time or missing-partition shadows which have the same start
638   // and end partition.
639   if (IsEmptyShadow())
640     return false;
641 
642   return true;
643 }
644 
FindNextValidSlice()645 util::Status SpanJoinOperatorTable::Query::FindNextValidSlice() {
646   // The basic idea of this function is that |NextSliceState()| always emits
647   // all possible slices (including shadows for any gaps inbetween the real
648   // slices) and we filter out the invalid slices (as defined by the table
649   // definition) using |IsValidSlice()|.
650   //
651   // This has proved to be a lot cleaner to implement than trying to choose
652   // when to emit and not emit shadows directly.
653   while (!IsEof() && !IsValidSlice()) {
654     RETURN_IF_ERROR(NextSliceState());
655   }
656   return util::OkStatus();
657 }
658 
NextSliceState()659 util::Status SpanJoinOperatorTable::Query::NextSliceState() {
660   switch (state_) {
661     case State::kReal: {
662       // Forward the cursor to figure out where the next slice should be.
663       RETURN_IF_ERROR(CursorNext());
664 
665       // Depending on the next slice, we can do two things here:
666       // 1. If the next slice is on the same partition, we can just emit a
667       //    single shadow until the start of the next slice.
668       // 2. If the next slice is on another partition or we hit eof, just emit
669       //    a shadow to the end of the whole partition.
670       bool shadow_to_end = cursor_eof_ || (defn_->IsPartitioned() &&
671                                            partition_ != CursorPartition());
672       state_ = State::kPresentPartitionShadow;
673       ts_ = AdjustedTsEnd();
674       ts_end_ =
675           shadow_to_end ? std::numeric_limits<int64_t>::max() : CursorTs();
676       return util::OkStatus();
677     }
678     case State::kPresentPartitionShadow: {
679       if (ts_end_ == std::numeric_limits<int64_t>::max()) {
680         // If the shadow is to the end of the slice, create a missing partition
681         // shadow to the start of the partition of the next slice or to the max
682         // partition if we hit eof.
683         state_ = State::kMissingPartitionShadow;
684         ts_ = 0;
685         ts_end_ = std::numeric_limits<int64_t>::max();
686 
687         missing_partition_start_ = partition_ + 1;
688         missing_partition_end_ = cursor_eof_
689                                      ? std::numeric_limits<int64_t>::max()
690                                      : CursorPartition();
691       } else {
692         // If the shadow is not to the end, we must have another slice on the
693         // current partition.
694         state_ = State::kReal;
695         ts_ = CursorTs();
696         ts_end_ = ts_ + CursorDur();
697 
698         PERFETTO_DCHECK(!defn_->IsPartitioned() ||
699                         partition_ == CursorPartition());
700       }
701       return util::OkStatus();
702     }
703     case State::kMissingPartitionShadow: {
704       if (missing_partition_end_ == std::numeric_limits<int64_t>::max()) {
705         PERFETTO_DCHECK(cursor_eof_);
706 
707         // If we have a missing partition to the max partition, we must have hit
708         // eof.
709         state_ = State::kEof;
710       } else {
711         PERFETTO_DCHECK(!defn_->IsPartitioned() ||
712                         CursorPartition() == missing_partition_end_);
713 
714         // Otherwise, setup a single partition slice on the end partition to the
715         // start of the next slice.
716         state_ = State::kPresentPartitionShadow;
717         ts_ = 0;
718         ts_end_ = CursorTs();
719         partition_ = missing_partition_end_;
720       }
721       return util::OkStatus();
722     }
723     case State::kEof: {
724       PERFETTO_DFATAL("Called Next when EOF");
725       return util::ErrStatus("Called Next when EOF");
726     }
727   }
728   PERFETTO_FATAL("For GCC");
729 }
730 
Rewind()731 util::Status SpanJoinOperatorTable::Query::Rewind() {
732   sqlite3_stmt* stmt = nullptr;
733   int res =
734       sqlite3_prepare_v2(db_, sql_query_.c_str(),
735                          static_cast<int>(sql_query_.size()), &stmt, nullptr);
736   stmt_.reset(stmt);
737 
738   cursor_eof_ = res != SQLITE_OK;
739   if (res != SQLITE_OK)
740     return util::ErrStatus(
741         "%s", sqlite_utils::FormatErrorMessage(
742                   stmt_.get(), base::StringView(sql_query_), db_, res)
743                   .c_message());
744 
745   RETURN_IF_ERROR(CursorNext());
746 
747   // Setup the first slice as a missing partition shadow from the lowest
748   // partition until the first slice partition. We will handle finding the real
749   // slice in |FindNextValidSlice()|.
750   state_ = State::kMissingPartitionShadow;
751   ts_ = 0;
752   ts_end_ = std::numeric_limits<int64_t>::max();
753   missing_partition_start_ = std::numeric_limits<int64_t>::min();
754 
755   if (cursor_eof_) {
756     missing_partition_end_ = std::numeric_limits<int64_t>::max();
757   } else if (defn_->IsPartitioned()) {
758     missing_partition_end_ = CursorPartition();
759   } else {
760     missing_partition_end_ = std::numeric_limits<int64_t>::min();
761   }
762 
763   // Actually compute the first valid slice.
764   return FindNextValidSlice();
765 }
766 
CursorNext()767 util::Status SpanJoinOperatorTable::Query::CursorNext() {
768   auto* stmt = stmt_.get();
769   int res;
770   if (defn_->IsPartitioned()) {
771     auto partition_idx = static_cast<int>(defn_->partition_idx());
772     // Fastforward through any rows with null partition keys.
773     int row_type;
774     do {
775       res = sqlite3_step(stmt);
776       row_type = sqlite3_column_type(stmt, partition_idx);
777     } while (res == SQLITE_ROW && row_type == SQLITE_NULL);
778 
779     if (res == SQLITE_ROW && row_type != SQLITE_INTEGER) {
780       return util::ErrStatus("SPAN_JOIN: partition is not an int");
781     }
782   } else {
783     res = sqlite3_step(stmt);
784   }
785   cursor_eof_ = res != SQLITE_ROW;
786   return res == SQLITE_ROW || res == SQLITE_DONE
787              ? util::OkStatus()
788              : util::ErrStatus("SPAN_JOIN: %s", sqlite3_errmsg(db_));
789 }
790 
CreateSqlQuery(const std::vector<std::string> & cs) const791 std::string SpanJoinOperatorTable::Query::CreateSqlQuery(
792     const std::vector<std::string>& cs) const {
793   std::vector<std::string> col_names;
794   for (const SqliteTable::Column& c : defn_->columns()) {
795     col_names.push_back("`" + c.name() + "`");
796   }
797 
798   std::string sql = "SELECT " + base::Join(col_names, ", ");
799   sql += " FROM " + defn_->name();
800   if (!cs.empty()) {
801     sql += " WHERE " + base::Join(cs, " AND ");
802   }
803   sql += " ORDER BY ";
804   sql += defn_->IsPartitioned()
805              ? base::Join({"`" + defn_->partition_col() + "`", "ts"}, ", ")
806              : "ts";
807   sql += ";";
808   PERFETTO_DLOG("%s", sql.c_str());
809   return sql;
810 }
811 
ReportSqliteResult(sqlite3_context * context,size_t index)812 void SpanJoinOperatorTable::Query::ReportSqliteResult(sqlite3_context* context,
813                                                       size_t index) {
814   if (state_ != State::kReal) {
815     sqlite3_result_null(context);
816     return;
817   }
818 
819   sqlite3_stmt* stmt = stmt_.get();
820   int idx = static_cast<int>(index);
821   switch (sqlite3_column_type(stmt, idx)) {
822     case SQLITE_INTEGER:
823       sqlite3_result_int64(context, sqlite3_column_int64(stmt, idx));
824       break;
825     case SQLITE_FLOAT:
826       sqlite3_result_double(context, sqlite3_column_double(stmt, idx));
827       break;
828     case SQLITE_TEXT: {
829       // TODO(lalitm): note for future optimizations: if we knew the addresses
830       // of the string intern pool, we could check if the string returned here
831       // comes from the pool, and pass it as non-transient.
832       const auto kSqliteTransient =
833           reinterpret_cast<sqlite3_destructor_type>(-1);
834       auto ptr = reinterpret_cast<const char*>(sqlite3_column_text(stmt, idx));
835       sqlite3_result_text(context, ptr, -1, kSqliteTransient);
836       break;
837     }
838   }
839 }
840 
TableDefinition(std::string name,std::string partition_col,std::vector<SqliteTable::Column> cols,EmitShadowType emit_shadow_type,uint32_t ts_idx,uint32_t dur_idx,uint32_t partition_idx)841 SpanJoinOperatorTable::TableDefinition::TableDefinition(
842     std::string name,
843     std::string partition_col,
844     std::vector<SqliteTable::Column> cols,
845     EmitShadowType emit_shadow_type,
846     uint32_t ts_idx,
847     uint32_t dur_idx,
848     uint32_t partition_idx)
849     : emit_shadow_type_(emit_shadow_type),
850       name_(std::move(name)),
851       partition_col_(std::move(partition_col)),
852       cols_(std::move(cols)),
853       ts_idx_(ts_idx),
854       dur_idx_(dur_idx),
855       partition_idx_(partition_idx) {}
856 
Parse(const std::string & raw_descriptor,SpanJoinOperatorTable::TableDescriptor * descriptor)857 util::Status SpanJoinOperatorTable::TableDescriptor::Parse(
858     const std::string& raw_descriptor,
859     SpanJoinOperatorTable::TableDescriptor* descriptor) {
860   // Descriptors have one of the following forms:
861   // table_name [PARTITIONED column_name]
862 
863   // Find the table name.
864   base::StringSplitter splitter(raw_descriptor, ' ');
865   if (!splitter.Next())
866     return util::ErrStatus("SPAN_JOIN: Missing table name");
867 
868   descriptor->name = splitter.cur_token();
869   if (!splitter.Next())
870     return util::OkStatus();
871 
872   if (!base::CaseInsensitiveEqual(splitter.cur_token(), "PARTITIONED"))
873     return util::ErrStatus("SPAN_JOIN: Invalid token");
874 
875   if (!splitter.Next())
876     return util::ErrStatus("SPAN_JOIN: Missing partitioning column");
877 
878   descriptor->partition_col = splitter.cur_token();
879   return util::OkStatus();
880 }
881 
882 }  // namespace trace_processor
883 }  // namespace perfetto
884