1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/sqlite/span_join_operator_table.h"
18
19 #include <sqlite3.h>
20 #include <string.h>
21
22 #include <algorithm>
23 #include <set>
24 #include <utility>
25
26 #include "perfetto/base/logging.h"
27 #include "perfetto/ext/base/string_splitter.h"
28 #include "perfetto/ext/base/string_utils.h"
29 #include "perfetto/ext/base/string_view.h"
30 #include "src/trace_processor/sqlite/sqlite_utils.h"
31 #include "src/trace_processor/tp_metatrace.h"
32
33 namespace perfetto {
34 namespace trace_processor {
35
36 namespace {
37
38 constexpr char kTsColumnName[] = "ts";
39 constexpr char kDurColumnName[] = "dur";
40
IsRequiredColumn(const std::string & name)41 bool IsRequiredColumn(const std::string& name) {
42 return name == kTsColumnName || name == kDurColumnName;
43 }
44
HasDuplicateColumns(const std::vector<SqliteTable::Column> & cols)45 base::Optional<std::string> HasDuplicateColumns(
46 const std::vector<SqliteTable::Column>& cols) {
47 std::set<std::string> names;
48 for (const auto& col : cols) {
49 if (names.count(col.name()) > 0)
50 return col.name();
51 names.insert(col.name());
52 }
53 return base::nullopt;
54 }
55
OpToString(int op)56 std::string OpToString(int op) {
57 switch (op) {
58 case SQLITE_INDEX_CONSTRAINT_EQ:
59 return "=";
60 case SQLITE_INDEX_CONSTRAINT_NE:
61 return "!=";
62 case SQLITE_INDEX_CONSTRAINT_GE:
63 return ">=";
64 case SQLITE_INDEX_CONSTRAINT_GT:
65 return ">";
66 case SQLITE_INDEX_CONSTRAINT_LE:
67 return "<=";
68 case SQLITE_INDEX_CONSTRAINT_LT:
69 return "<";
70 case SQLITE_INDEX_CONSTRAINT_LIKE:
71 return "like";
72 case SQLITE_INDEX_CONSTRAINT_ISNULL:
73 // The "null" will be added below in EscapedSqliteValueAsString.
74 return " is ";
75 case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
76 // The "null" will be added below in EscapedSqliteValueAsString.
77 return " is not";
78 default:
79 PERFETTO_FATAL("Operator to string conversion not impemented for %d", op);
80 }
81 }
82
EscapedSqliteValueAsString(sqlite3_value * value)83 std::string EscapedSqliteValueAsString(sqlite3_value* value) {
84 switch (sqlite3_value_type(value)) {
85 case SQLITE_INTEGER:
86 return std::to_string(sqlite3_value_int64(value));
87 case SQLITE_FLOAT:
88 return std::to_string(sqlite3_value_double(value));
89 case SQLITE_TEXT: {
90 // If str itself contains a single quote, we need to escape it with
91 // another single quote.
92 const char* str =
93 reinterpret_cast<const char*>(sqlite3_value_text(value));
94 return "'" + base::ReplaceAll(str, "'", "''") + "'";
95 }
96 case SQLITE_NULL:
97 return " null";
98 default:
99 PERFETTO_FATAL("Unknown value type %d", sqlite3_value_type(value));
100 }
101 }
102
103 } // namespace
104
SpanJoinOperatorTable(sqlite3 * db,const TraceStorage *)105 SpanJoinOperatorTable::SpanJoinOperatorTable(sqlite3* db, const TraceStorage*)
106 : db_(db) {}
107
RegisterTable(sqlite3 * db,const TraceStorage * storage)108 void SpanJoinOperatorTable::RegisterTable(sqlite3* db,
109 const TraceStorage* storage) {
110 SqliteTable::Register<SpanJoinOperatorTable>(db, storage, "span_join",
111 /* read_write */ false,
112 /* requires_args */ true);
113
114 SqliteTable::Register<SpanJoinOperatorTable>(db, storage, "span_left_join",
115 /* read_write */ false,
116 /* requires_args */ true);
117
118 SqliteTable::Register<SpanJoinOperatorTable>(db, storage, "span_outer_join",
119 /* read_write */ false,
120 /* requires_args */ true);
121 }
122
Init(int argc,const char * const * argv,Schema * schema)123 util::Status SpanJoinOperatorTable::Init(int argc,
124 const char* const* argv,
125 Schema* schema) {
126 // argv[0] - argv[2] are SQLite populated fields which are always present.
127 if (argc < 5)
128 return util::Status("SPAN_JOIN: expected at least 2 args");
129
130 TableDescriptor t1_desc;
131 auto status = TableDescriptor::Parse(
132 std::string(reinterpret_cast<const char*>(argv[3])), &t1_desc);
133 if (!status.ok())
134 return status;
135
136 TableDescriptor t2_desc;
137 status = TableDescriptor::Parse(
138 std::string(reinterpret_cast<const char*>(argv[4])), &t2_desc);
139 if (!status.ok())
140 return status;
141
142 // Check that the partition columns match between the two tables.
143 if (t1_desc.partition_col == t2_desc.partition_col) {
144 partitioning_ = t1_desc.IsPartitioned()
145 ? PartitioningType::kSamePartitioning
146 : PartitioningType::kNoPartitioning;
147 if (partitioning_ == PartitioningType::kNoPartitioning && IsOuterJoin()) {
148 return util::ErrStatus(
149 "SPAN_JOIN: Outer join not supported for no partition tables");
150 }
151 } else if (t1_desc.IsPartitioned() && t2_desc.IsPartitioned()) {
152 return util::ErrStatus(
153 "SPAN_JOIN: mismatching partitions between the two tables; "
154 "(partition %s in table %s, partition %s in table %s)",
155 t1_desc.partition_col.c_str(), t1_desc.name.c_str(),
156 t2_desc.partition_col.c_str(), t2_desc.name.c_str());
157 } else {
158 if (IsOuterJoin()) {
159 return util::ErrStatus(
160 "SPAN_JOIN: Outer join not supported for mixed partitioned tables");
161 }
162 partitioning_ = PartitioningType::kMixedPartitioning;
163 }
164
165 bool t1_part_mixed = t1_desc.IsPartitioned() &&
166 partitioning_ == PartitioningType::kMixedPartitioning;
167 bool t2_part_mixed = t2_desc.IsPartitioned() &&
168 partitioning_ == PartitioningType::kMixedPartitioning;
169
170 EmitShadowType t1_shadow_type;
171 if (IsOuterJoin()) {
172 if (t1_part_mixed || partitioning_ == PartitioningType::kNoPartitioning) {
173 t1_shadow_type = EmitShadowType::kPresentPartitionOnly;
174 } else {
175 t1_shadow_type = EmitShadowType::kAll;
176 }
177 } else {
178 t1_shadow_type = EmitShadowType::kNone;
179 }
180 status = CreateTableDefinition(t1_desc, t1_shadow_type, &t1_defn_);
181 if (!status.ok())
182 return status;
183
184 EmitShadowType t2_shadow_type;
185 if (IsOuterJoin() || IsLeftJoin()) {
186 if (t2_part_mixed || partitioning_ == PartitioningType::kNoPartitioning) {
187 t2_shadow_type = EmitShadowType::kPresentPartitionOnly;
188 } else {
189 t2_shadow_type = EmitShadowType::kAll;
190 }
191 } else {
192 t2_shadow_type = EmitShadowType::kNone;
193 }
194 status = CreateTableDefinition(t2_desc, t2_shadow_type, &t2_defn_);
195 if (!status.ok())
196 return status;
197
198 std::vector<SqliteTable::Column> cols;
199 // Ensure the shared columns are consistently ordered and are not
200 // present twice in the final schema
201 cols.emplace_back(Column::kTimestamp, kTsColumnName, SqlValue::Type::kLong);
202 cols.emplace_back(Column::kDuration, kDurColumnName, SqlValue::Type::kLong);
203 if (partitioning_ != PartitioningType::kNoPartitioning)
204 cols.emplace_back(Column::kPartition, partition_col(),
205 SqlValue::Type::kLong);
206
207 CreateSchemaColsForDefn(t1_defn_, &cols);
208 CreateSchemaColsForDefn(t2_defn_, &cols);
209
210 // Check if any column has : in its name. This often happens when SELECT *
211 // is used to create a view with the same column name in two joined tables.
212 for (const auto& col : cols) {
213 if (col.name().find(':') != std::string::npos) {
214 return util::ErrStatus("SPAN_JOIN: column %s has illegal character :",
215 col.name().c_str());
216 }
217 }
218
219 if (auto opt_dupe_col = HasDuplicateColumns(cols)) {
220 return util::ErrStatus(
221 "SPAN_JOIN: column %s present in both tables %s and %s",
222 opt_dupe_col->c_str(), t1_defn_.name().c_str(),
223 t2_defn_.name().c_str());
224 }
225 std::vector<size_t> primary_keys = {Column::kTimestamp};
226 if (partitioning_ != PartitioningType::kNoPartitioning)
227 primary_keys.push_back(Column::kPartition);
228 *schema = Schema(cols, primary_keys);
229
230 return util::OkStatus();
231 }
232
CreateSchemaColsForDefn(const TableDefinition & defn,std::vector<SqliteTable::Column> * cols)233 void SpanJoinOperatorTable::CreateSchemaColsForDefn(
234 const TableDefinition& defn,
235 std::vector<SqliteTable::Column>* cols) {
236 for (size_t i = 0; i < defn.columns().size(); i++) {
237 const auto& n = defn.columns()[i].name();
238 if (IsRequiredColumn(n) || n == defn.partition_col())
239 continue;
240
241 ColumnLocator* locator = &global_index_to_column_locator_[cols->size()];
242 locator->defn = &defn;
243 locator->col_index = i;
244
245 cols->emplace_back(cols->size(), n, defn.columns()[i].type());
246 }
247 }
248
CreateCursor()249 std::unique_ptr<SqliteTable::Cursor> SpanJoinOperatorTable::CreateCursor() {
250 return std::unique_ptr<SpanJoinOperatorTable::Cursor>(new Cursor(this, db_));
251 }
252
BestIndex(const QueryConstraints & qc,BestIndexInfo * info)253 int SpanJoinOperatorTable::BestIndex(const QueryConstraints& qc,
254 BestIndexInfo* info) {
255 // TODO(lalitm): figure out cost estimation.
256 const auto& ob = qc.order_by();
257
258 if (partitioning_ == PartitioningType::kNoPartitioning) {
259 // If both tables are not partitioned and we have a single order by on ts,
260 // we return data in the correct order.
261 info->sqlite_omit_order_by =
262 ob.size() == 1 && ob[0].iColumn == Column::kTimestamp && !ob[0].desc;
263 } else {
264 // If one of the tables is partitioned, and we have an order by on the
265 // partition column followed (optionally) by an order by on timestamp, we
266 // return data in the correct order.
267 bool is_first_ob_partition =
268 ob.size() >= 1 && ob[0].iColumn == Column::kPartition && !ob[0].desc;
269 bool is_second_ob_ts =
270 ob.size() >= 2 && ob[1].iColumn == Column::kTimestamp && !ob[1].desc;
271 info->sqlite_omit_order_by =
272 (ob.size() == 1 && is_first_ob_partition) ||
273 (ob.size() == 2 && is_first_ob_partition && is_second_ob_ts);
274 }
275
276 const auto& cs = qc.constraints();
277 for (uint32_t i = 0; i < cs.size(); ++i) {
278 if (cs[i].op == kSourceGeqOpCode) {
279 info->sqlite_omit_constraint[i] = true;
280 }
281 }
282
283 return SQLITE_OK;
284 }
285
FindFunction(const char * name,FindFunctionFn * fn,void **)286 int SpanJoinOperatorTable::FindFunction(const char* name,
287 FindFunctionFn* fn,
288 void**) {
289 if (base::CaseInsensitiveEqual(name, "source_geq")) {
290 *fn = [](sqlite3_context* ctx, int, sqlite3_value**) {
291 sqlite3_result_error(ctx, "Should not be called.", -1);
292 };
293 return kSourceGeqOpCode;
294 }
295 return 0;
296 }
297
298 std::vector<std::string>
ComputeSqlConstraintsForDefinition(const TableDefinition & defn,const QueryConstraints & qc,sqlite3_value ** argv)299 SpanJoinOperatorTable::ComputeSqlConstraintsForDefinition(
300 const TableDefinition& defn,
301 const QueryConstraints& qc,
302 sqlite3_value** argv) {
303 std::vector<std::string> constraints;
304 for (size_t i = 0; i < qc.constraints().size(); i++) {
305 const auto& cs = qc.constraints()[i];
306 auto col_name = GetNameForGlobalColumnIndex(defn, cs.column);
307 if (col_name.empty())
308 continue;
309
310 // Le constraints can be passed straight to the child tables as they won't
311 // affect the span join computation. Similarily, source_geq constraints
312 // explicitly request that they are passed as geq constraints to the source
313 // tables.
314 if (col_name == kTsColumnName && !sqlite_utils::IsOpLe(cs.op) &&
315 cs.op != kSourceGeqOpCode)
316 continue;
317
318 // Allow SQLite handle any constraints on duration apart from source_geq
319 // constraints.
320 if (col_name == kDurColumnName && cs.op != kSourceGeqOpCode)
321 continue;
322
323 // If we're emitting shadow slices, don't propogate any constraints
324 // on this table as this will break the shadow slice computation.
325 if (defn.ShouldEmitPresentPartitionShadow())
326 continue;
327
328 auto op = OpToString(cs.op == kSourceGeqOpCode ? SQLITE_INDEX_CONSTRAINT_GE
329 : cs.op);
330 auto value = EscapedSqliteValueAsString(argv[i]);
331
332 constraints.emplace_back("`" + col_name + "`" + op + value);
333 }
334 return constraints;
335 }
336
CreateTableDefinition(const TableDescriptor & desc,EmitShadowType emit_shadow_type,SpanJoinOperatorTable::TableDefinition * defn)337 util::Status SpanJoinOperatorTable::CreateTableDefinition(
338 const TableDescriptor& desc,
339 EmitShadowType emit_shadow_type,
340 SpanJoinOperatorTable::TableDefinition* defn) {
341 if (desc.partition_col == kTsColumnName ||
342 desc.partition_col == kDurColumnName) {
343 return util::ErrStatus(
344 "SPAN_JOIN: partition column cannot be any of {ts, dur} for table %s",
345 desc.name.c_str());
346 }
347
348 std::vector<SqliteTable::Column> cols;
349 auto status = sqlite_utils::GetColumnsForTable(db_, desc.name, cols);
350 if (!status.ok()) {
351 return status;
352 }
353
354 uint32_t required_columns_found = 0;
355 uint32_t ts_idx = std::numeric_limits<uint32_t>::max();
356 uint32_t dur_idx = std::numeric_limits<uint32_t>::max();
357 uint32_t partition_idx = std::numeric_limits<uint32_t>::max();
358 for (uint32_t i = 0; i < cols.size(); i++) {
359 auto col = cols[i];
360 if (IsRequiredColumn(col.name())) {
361 ++required_columns_found;
362 if (col.type() != SqlValue::Type::kLong &&
363 col.type() != SqlValue::Type::kNull) {
364 return util::ErrStatus(
365 "SPAN_JOIN: Invalid type for column %s in table %s",
366 col.name().c_str(), desc.name.c_str());
367 }
368 }
369
370 if (col.name() == kTsColumnName) {
371 ts_idx = i;
372 } else if (col.name() == kDurColumnName) {
373 dur_idx = i;
374 } else if (col.name() == desc.partition_col) {
375 partition_idx = i;
376 }
377 }
378 if (required_columns_found != 2) {
379 return util::ErrStatus(
380 "SPAN_JOIN: Missing one of columns {ts, dur} in table %s",
381 desc.name.c_str());
382 } else if (desc.IsPartitioned() && partition_idx >= cols.size()) {
383 return util::ErrStatus("SPAN_JOIN: Missing partition column %s in table %s",
384 desc.partition_col.c_str(), desc.name.c_str());
385 }
386
387 PERFETTO_DCHECK(ts_idx < cols.size());
388 PERFETTO_DCHECK(dur_idx < cols.size());
389
390 *defn = TableDefinition(desc.name, desc.partition_col, std::move(cols),
391 emit_shadow_type, ts_idx, dur_idx, partition_idx);
392 return util::OkStatus();
393 }
394
GetNameForGlobalColumnIndex(const TableDefinition & defn,int global_column)395 std::string SpanJoinOperatorTable::GetNameForGlobalColumnIndex(
396 const TableDefinition& defn,
397 int global_column) {
398 size_t col_idx = static_cast<size_t>(global_column);
399 if (col_idx == Column::kTimestamp)
400 return kTsColumnName;
401 else if (col_idx == Column::kDuration)
402 return kDurColumnName;
403 else if (col_idx == Column::kPartition &&
404 partitioning_ != PartitioningType::kNoPartitioning)
405 return defn.partition_col().c_str();
406
407 const auto& locator = global_index_to_column_locator_[col_idx];
408 if (locator.defn != &defn)
409 return "";
410 return defn.columns()[locator.col_index].name().c_str();
411 }
412
Cursor(SpanJoinOperatorTable * table,sqlite3 * db)413 SpanJoinOperatorTable::Cursor::Cursor(SpanJoinOperatorTable* table, sqlite3* db)
414 : SqliteTable::Cursor(table),
415 t1_(table, &table->t1_defn_, db),
416 t2_(table, &table->t2_defn_, db),
417 table_(table) {}
418
Filter(const QueryConstraints & qc,sqlite3_value ** argv,FilterHistory)419 int SpanJoinOperatorTable::Cursor::Filter(const QueryConstraints& qc,
420 sqlite3_value** argv,
421 FilterHistory) {
422 PERFETTO_TP_TRACE("SPAN_JOIN_XFILTER");
423
424 util::Status status =
425 t1_.Initialize(qc, argv, Query::InitialEofBehavior::kTreatAsEof);
426 if (!status.ok())
427 return SQLITE_ERROR;
428
429 status = t2_.Initialize(
430 qc, argv,
431 table_->IsLeftJoin()
432 ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
433 : Query::InitialEofBehavior::kTreatAsEof);
434 if (!status.ok())
435 return SQLITE_ERROR;
436
437 status = FindOverlappingSpan();
438 return status.ok() ? SQLITE_OK : SQLITE_ERROR;
439 }
440
Next()441 int SpanJoinOperatorTable::Cursor::Next() {
442 util::Status status = next_query_->Next();
443 if (!status.ok())
444 return SQLITE_ERROR;
445
446 status = FindOverlappingSpan();
447 return status.ok() ? SQLITE_OK : SQLITE_ERROR;
448 }
449
IsOverlappingSpan()450 bool SpanJoinOperatorTable::Cursor::IsOverlappingSpan() {
451 // If either of the tables are eof, then we cannot possibly have an
452 // overlapping span.
453 if (t1_.IsEof() || t2_.IsEof())
454 return false;
455
456 // One of the tables always needs to have a real span to have a valid
457 // overlapping span.
458 if (!t1_.IsReal() && !t2_.IsReal())
459 return false;
460
461 if (table_->partitioning_ == PartitioningType::kSamePartitioning) {
462 // If both tables are partitioned, then ensure that the partitions overlap.
463 bool partition_in_bounds = (t1_.FirstPartition() >= t2_.FirstPartition() &&
464 t1_.FirstPartition() <= t2_.LastPartition()) ||
465 (t2_.FirstPartition() >= t1_.FirstPartition() &&
466 t2_.FirstPartition() <= t1_.LastPartition());
467 if (!partition_in_bounds)
468 return false;
469 }
470
471 // We consider all slices to be [start, end) - that is the range of
472 // timestamps has an open interval at the start but a closed interval
473 // at the end. (with the exception of dur == -1 which we treat as if
474 // end == start for the purpose of this function).
475 return (t1_.ts() == t2_.ts() && t1_.IsReal() && t2_.IsReal()) ||
476 (t1_.ts() >= t2_.ts() && t1_.ts() < t2_.AdjustedTsEnd()) ||
477 (t2_.ts() >= t1_.ts() && t2_.ts() < t1_.AdjustedTsEnd());
478 }
479
FindOverlappingSpan()480 util::Status SpanJoinOperatorTable::Cursor::FindOverlappingSpan() {
481 // We loop until we find a slice which overlaps from the two tables.
482 while (true) {
483 if (table_->partitioning_ == PartitioningType::kMixedPartitioning) {
484 // If we have a mixed partition setup, we need to have special checks
485 // for eof and to reset the unpartitioned cursor every time the partition
486 // changes in the partitioned table.
487 auto* partitioned = t1_.definition()->IsPartitioned() ? &t1_ : &t2_;
488 auto* unpartitioned = t1_.definition()->IsPartitioned() ? &t2_ : &t1_;
489
490 // If the partitioned table reaches eof, then we are really done.
491 if (partitioned->IsEof())
492 break;
493
494 // If the partition has changed from the previous one, reset the cursor
495 // and keep a lot of the new partition.
496 if (last_mixed_partition_ != partitioned->partition()) {
497 util::Status status = unpartitioned->Rewind();
498 if (!status.ok())
499 return status;
500 last_mixed_partition_ = partitioned->partition();
501 }
502 } else if (t1_.IsEof() || t2_.IsEof()) {
503 // For both no partition and same partition cases, either cursor ending
504 // ends the whole span join.
505 break;
506 }
507
508 // Find which slice finishes first.
509 next_query_ = FindEarliestFinishQuery();
510
511 // If the current span is overlapping, just finish there to emit the current
512 // slice.
513 if (IsOverlappingSpan())
514 break;
515
516 // Otherwise, step to the next row.
517 util::Status status = next_query_->Next();
518 if (!status.ok())
519 return status;
520 }
521 return util::OkStatus();
522 }
523
524 SpanJoinOperatorTable::Query*
FindEarliestFinishQuery()525 SpanJoinOperatorTable::Cursor::FindEarliestFinishQuery() {
526 int64_t t1_part;
527 int64_t t2_part;
528
529 switch (table_->partitioning_) {
530 case PartitioningType::kMixedPartitioning: {
531 // If either table is EOF, forward the other table to try and make
532 // the partitions not match anymore.
533 if (t1_.IsEof())
534 return &t2_;
535 if (t2_.IsEof())
536 return &t1_;
537
538 // Otherwise, just make the partition equal from both tables.
539 t1_part = last_mixed_partition_;
540 t2_part = last_mixed_partition_;
541 break;
542 }
543 case PartitioningType::kSamePartitioning: {
544 // Get the partition values from the cursor.
545 t1_part = t1_.LastPartition();
546 t2_part = t2_.LastPartition();
547 break;
548 }
549 case PartitioningType::kNoPartitioning: {
550 t1_part = 0;
551 t2_part = 0;
552 break;
553 }
554 }
555
556 // Prefer to forward the earliest cursors based on the following
557 // lexiographical ordering:
558 // 1. partition
559 // 2. end timestamp
560 // 3. whether the slice is real or shadow (shadow < real)
561 bool t1_less = std::make_tuple(t1_part, t1_.AdjustedTsEnd(), t1_.IsReal()) <
562 std::make_tuple(t2_part, t2_.AdjustedTsEnd(), t2_.IsReal());
563 return t1_less ? &t1_ : &t2_;
564 }
565
Eof()566 int SpanJoinOperatorTable::Cursor::Eof() {
567 return t1_.IsEof() || t2_.IsEof();
568 }
569
Column(sqlite3_context * context,int N)570 int SpanJoinOperatorTable::Cursor::Column(sqlite3_context* context, int N) {
571 PERFETTO_DCHECK(t1_.IsReal() || t2_.IsReal());
572
573 switch (N) {
574 case Column::kTimestamp: {
575 auto max_ts = std::max(t1_.ts(), t2_.ts());
576 sqlite3_result_int64(context, static_cast<sqlite3_int64>(max_ts));
577 break;
578 }
579 case Column::kDuration: {
580 auto max_start = std::max(t1_.ts(), t2_.ts());
581 auto min_end = std::min(t1_.raw_ts_end(), t2_.raw_ts_end());
582 auto dur = min_end - max_start;
583 sqlite3_result_int64(context, static_cast<sqlite3_int64>(dur));
584 break;
585 }
586 case Column::kPartition: {
587 if (table_->partitioning_ != PartitioningType::kNoPartitioning) {
588 int64_t partition;
589 if (table_->partitioning_ == PartitioningType::kMixedPartitioning) {
590 partition = last_mixed_partition_;
591 } else {
592 partition = t1_.IsReal() ? t1_.partition() : t2_.partition();
593 }
594 sqlite3_result_int64(context, static_cast<sqlite3_int64>(partition));
595 break;
596 }
597 [[clang::fallthrough]];
598 }
599 default: {
600 size_t index = static_cast<size_t>(N);
601 const auto& locator = table_->global_index_to_column_locator_[index];
602 if (locator.defn == t1_.definition())
603 t1_.ReportSqliteResult(context, locator.col_index);
604 else
605 t2_.ReportSqliteResult(context, locator.col_index);
606 }
607 }
608 return SQLITE_OK;
609 }
610
Query(SpanJoinOperatorTable * table,const TableDefinition * definition,sqlite3 * db)611 SpanJoinOperatorTable::Query::Query(SpanJoinOperatorTable* table,
612 const TableDefinition* definition,
613 sqlite3* db)
614 : defn_(definition), db_(db), table_(table) {
615 PERFETTO_DCHECK(!defn_->IsPartitioned() ||
616 defn_->partition_idx() < defn_->columns().size());
617 }
618
619 SpanJoinOperatorTable::Query::~Query() = default;
620
Initialize(const QueryConstraints & qc,sqlite3_value ** argv,InitialEofBehavior eof_behavior)621 util::Status SpanJoinOperatorTable::Query::Initialize(
622 const QueryConstraints& qc,
623 sqlite3_value** argv,
624 InitialEofBehavior eof_behavior) {
625 *this = Query(table_, definition(), db_);
626 sql_query_ = CreateSqlQuery(
627 table_->ComputeSqlConstraintsForDefinition(*defn_, qc, argv));
628 util::Status status = Rewind();
629 if (!status.ok())
630 return status;
631 if (eof_behavior == InitialEofBehavior::kTreatAsMissingPartitionShadow &&
632 IsEof()) {
633 state_ = State::kMissingPartitionShadow;
634 }
635 return status;
636 }
637
Next()638 util::Status SpanJoinOperatorTable::Query::Next() {
639 util::Status status = NextSliceState();
640 if (!status.ok())
641 return status;
642 return FindNextValidSlice();
643 }
644
IsValidSlice()645 bool SpanJoinOperatorTable::Query::IsValidSlice() {
646 // Disallow any single partition shadow slices if the definition doesn't allow
647 // them.
648 if (IsPresentPartitionShadow() && !defn_->ShouldEmitPresentPartitionShadow())
649 return false;
650
651 // Disallow any missing partition shadow slices if the definition doesn't
652 // allow them.
653 if (IsMissingPartitionShadow() && !defn_->ShouldEmitMissingPartitionShadow())
654 return false;
655
656 // Disallow any "empty" shadows; these are shadows which either have the same
657 // start and end time or missing-partition shadows which have the same start
658 // and end partition.
659 if (IsEmptyShadow())
660 return false;
661
662 return true;
663 }
664
FindNextValidSlice()665 util::Status SpanJoinOperatorTable::Query::FindNextValidSlice() {
666 // The basic idea of this function is that |NextSliceState()| always emits
667 // all possible slices (including shadows for any gaps inbetween the real
668 // slices) and we filter out the invalid slices (as defined by the table
669 // definition) using |IsValidSlice()|.
670 //
671 // This has proved to be a lot cleaner to implement than trying to choose
672 // when to emit and not emit shadows directly.
673 while (!IsEof() && !IsValidSlice()) {
674 util::Status status = NextSliceState();
675 if (!status.ok())
676 return status;
677 }
678 return util::OkStatus();
679 }
680
NextSliceState()681 util::Status SpanJoinOperatorTable::Query::NextSliceState() {
682 switch (state_) {
683 case State::kReal: {
684 // Forward the cursor to figure out where the next slice should be.
685 util::Status status = CursorNext();
686 if (!status.ok())
687 return status;
688
689 // Depending on the next slice, we can do two things here:
690 // 1. If the next slice is on the same partition, we can just emit a
691 // single shadow until the start of the next slice.
692 // 2. If the next slice is on another partition or we hit eof, just emit
693 // a shadow to the end of the whole partition.
694 bool shadow_to_end = cursor_eof_ || (defn_->IsPartitioned() &&
695 partition_ != CursorPartition());
696 state_ = State::kPresentPartitionShadow;
697 ts_ = AdjustedTsEnd();
698 ts_end_ =
699 shadow_to_end ? std::numeric_limits<int64_t>::max() : CursorTs();
700 return util::OkStatus();
701 }
702 case State::kPresentPartitionShadow: {
703 if (ts_end_ == std::numeric_limits<int64_t>::max()) {
704 // If the shadow is to the end of the slice, create a missing partition
705 // shadow to the start of the partition of the next slice or to the max
706 // partition if we hit eof.
707 state_ = State::kMissingPartitionShadow;
708 ts_ = 0;
709 ts_end_ = std::numeric_limits<int64_t>::max();
710
711 missing_partition_start_ = partition_ + 1;
712 missing_partition_end_ = cursor_eof_
713 ? std::numeric_limits<int64_t>::max()
714 : CursorPartition();
715 } else {
716 // If the shadow is not to the end, we must have another slice on the
717 // current partition.
718 state_ = State::kReal;
719 ts_ = CursorTs();
720 ts_end_ = ts_ + CursorDur();
721
722 PERFETTO_DCHECK(!defn_->IsPartitioned() ||
723 partition_ == CursorPartition());
724 }
725 return util::OkStatus();
726 }
727 case State::kMissingPartitionShadow: {
728 if (missing_partition_end_ == std::numeric_limits<int64_t>::max()) {
729 PERFETTO_DCHECK(cursor_eof_);
730
731 // If we have a missing partition to the max partition, we must have hit
732 // eof.
733 state_ = State::kEof;
734 } else {
735 PERFETTO_DCHECK(!defn_->IsPartitioned() ||
736 CursorPartition() == missing_partition_end_);
737
738 // Otherwise, setup a single partition slice on the end partition to the
739 // start of the next slice.
740 state_ = State::kPresentPartitionShadow;
741 ts_ = 0;
742 ts_end_ = CursorTs();
743 partition_ = missing_partition_end_;
744 }
745 return util::OkStatus();
746 }
747 case State::kEof: {
748 PERFETTO_DFATAL("Called Next when EOF");
749 return util::ErrStatus("Called Next when EOF");
750 }
751 }
752 PERFETTO_FATAL("For GCC");
753 }
754
Rewind()755 util::Status SpanJoinOperatorTable::Query::Rewind() {
756 sqlite3_stmt* stmt = nullptr;
757 int res =
758 sqlite3_prepare_v2(db_, sql_query_.c_str(),
759 static_cast<int>(sql_query_.size()), &stmt, nullptr);
760 stmt_.reset(stmt);
761
762 cursor_eof_ = res != SQLITE_OK;
763 if (res != SQLITE_OK)
764 return util::ErrStatus("%s", sqlite3_errmsg(db_));
765
766 util::Status status = CursorNext();
767 if (!status.ok())
768 return status;
769
770 // Setup the first slice as a missing partition shadow from the lowest
771 // partition until the first slice partition. We will handle finding the real
772 // slice in |FindNextValidSlice()|.
773 state_ = State::kMissingPartitionShadow;
774 ts_ = 0;
775 ts_end_ = std::numeric_limits<int64_t>::max();
776 missing_partition_start_ = std::numeric_limits<int64_t>::min();
777
778 if (cursor_eof_) {
779 missing_partition_end_ = std::numeric_limits<int64_t>::max();
780 } else if (defn_->IsPartitioned()) {
781 missing_partition_end_ = CursorPartition();
782 } else {
783 missing_partition_end_ = std::numeric_limits<int64_t>::min();
784 }
785
786 // Actually compute the first valid slice.
787 return FindNextValidSlice();
788 }
789
CursorNext()790 util::Status SpanJoinOperatorTable::Query::CursorNext() {
791 auto* stmt = stmt_.get();
792 int res;
793 if (defn_->IsPartitioned()) {
794 auto partition_idx = static_cast<int>(defn_->partition_idx());
795 // Fastforward through any rows with null partition keys.
796 int row_type;
797 do {
798 res = sqlite3_step(stmt);
799 row_type = sqlite3_column_type(stmt, partition_idx);
800 } while (res == SQLITE_ROW && row_type == SQLITE_NULL);
801 } else {
802 res = sqlite3_step(stmt);
803 }
804 cursor_eof_ = res != SQLITE_ROW;
805 return res == SQLITE_ROW || res == SQLITE_DONE
806 ? util::OkStatus()
807 : util::ErrStatus("%s", sqlite3_errmsg(db_));
808 }
809
CreateSqlQuery(const std::vector<std::string> & cs) const810 std::string SpanJoinOperatorTable::Query::CreateSqlQuery(
811 const std::vector<std::string>& cs) const {
812 std::vector<std::string> col_names;
813 for (const SqliteTable::Column& c : defn_->columns()) {
814 col_names.push_back("`" + c.name() + "`");
815 }
816
817 std::string sql = "SELECT " + base::Join(col_names, ", ");
818 sql += " FROM " + defn_->name();
819 if (!cs.empty()) {
820 sql += " WHERE " + base::Join(cs, " AND ");
821 }
822 sql += " ORDER BY ";
823 sql += defn_->IsPartitioned()
824 ? base::Join({"`" + defn_->partition_col() + "`", "ts"}, ", ")
825 : "ts";
826 sql += ";";
827 PERFETTO_DLOG("%s", sql.c_str());
828 return sql;
829 }
830
ReportSqliteResult(sqlite3_context * context,size_t index)831 void SpanJoinOperatorTable::Query::ReportSqliteResult(sqlite3_context* context,
832 size_t index) {
833 if (state_ != State::kReal) {
834 sqlite3_result_null(context);
835 return;
836 }
837
838 sqlite3_stmt* stmt = stmt_.get();
839 int idx = static_cast<int>(index);
840 switch (sqlite3_column_type(stmt, idx)) {
841 case SQLITE_INTEGER:
842 sqlite3_result_int64(context, sqlite3_column_int64(stmt, idx));
843 break;
844 case SQLITE_FLOAT:
845 sqlite3_result_double(context, sqlite3_column_double(stmt, idx));
846 break;
847 case SQLITE_TEXT: {
848 // TODO(lalitm): note for future optimizations: if we knew the addresses
849 // of the string intern pool, we could check if the string returned here
850 // comes from the pool, and pass it as non-transient.
851 const auto kSqliteTransient =
852 reinterpret_cast<sqlite3_destructor_type>(-1);
853 auto ptr = reinterpret_cast<const char*>(sqlite3_column_text(stmt, idx));
854 sqlite3_result_text(context, ptr, -1, kSqliteTransient);
855 break;
856 }
857 }
858 }
859
TableDefinition(std::string name,std::string partition_col,std::vector<SqliteTable::Column> cols,EmitShadowType emit_shadow_type,uint32_t ts_idx,uint32_t dur_idx,uint32_t partition_idx)860 SpanJoinOperatorTable::TableDefinition::TableDefinition(
861 std::string name,
862 std::string partition_col,
863 std::vector<SqliteTable::Column> cols,
864 EmitShadowType emit_shadow_type,
865 uint32_t ts_idx,
866 uint32_t dur_idx,
867 uint32_t partition_idx)
868 : emit_shadow_type_(emit_shadow_type),
869 name_(std::move(name)),
870 partition_col_(std::move(partition_col)),
871 cols_(std::move(cols)),
872 ts_idx_(ts_idx),
873 dur_idx_(dur_idx),
874 partition_idx_(partition_idx) {}
875
Parse(const std::string & raw_descriptor,SpanJoinOperatorTable::TableDescriptor * descriptor)876 util::Status SpanJoinOperatorTable::TableDescriptor::Parse(
877 const std::string& raw_descriptor,
878 SpanJoinOperatorTable::TableDescriptor* descriptor) {
879 // Descriptors have one of the following forms:
880 // table_name [PARTITIONED column_name]
881
882 // Find the table name.
883 base::StringSplitter splitter(raw_descriptor, ' ');
884 if (!splitter.Next())
885 return util::ErrStatus("SPAN_JOIN: Missing table name");
886
887 descriptor->name = splitter.cur_token();
888 if (!splitter.Next())
889 return util::OkStatus();
890
891 if (!base::CaseInsensitiveEqual(splitter.cur_token(), "PARTITIONED"))
892 return util::ErrStatus("SPAN_JOIN: Invalid token");
893
894 if (!splitter.Next())
895 return util::ErrStatus("SPAN_JOIN: Missing partitioning column");
896
897 descriptor->partition_col = splitter.cur_token();
898 return util::OkStatus();
899 }
900
901 } // namespace trace_processor
902 } // namespace perfetto
903