1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/span_join_operator_table.h"
18
19 #include <sqlite3.h>
20 #include <string.h>
21 #include <algorithm>
22 #include <set>
23 #include <utility>
24
25 #include "perfetto/base/logging.h"
26 #include "perfetto/base/string_splitter.h"
27 #include "perfetto/base/string_utils.h"
28 #include "perfetto/base/string_view.h"
29 #include "src/trace_processor/sqlite_utils.h"
30
31 namespace perfetto {
32 namespace trace_processor {
33
34 namespace {
35
36 constexpr char kTsColumnName[] = "ts";
37 constexpr char kDurColumnName[] = "dur";
38
IsRequiredColumn(const std::string & name)39 bool IsRequiredColumn(const std::string& name) {
40 return name == kTsColumnName || name == kDurColumnName;
41 }
42
HasDuplicateColumns(const std::vector<Table::Column> & cols)43 bool HasDuplicateColumns(const std::vector<Table::Column>& cols) {
44 std::set<std::string> names;
45 for (const auto& col : cols) {
46 if (names.count(col.name()) > 0) {
47 PERFETTO_ELOG("Column '%s' present in the output schema more than once.",
48 col.name().c_str());
49 return true;
50 }
51 names.insert(col.name());
52 }
53 return false;
54 }
55 } // namespace
56
SpanJoinOperatorTable(sqlite3 * db,const TraceStorage *)57 SpanJoinOperatorTable::SpanJoinOperatorTable(sqlite3* db, const TraceStorage*)
58 : db_(db) {}
59
RegisterTable(sqlite3 * db,const TraceStorage * storage)60 void SpanJoinOperatorTable::RegisterTable(sqlite3* db,
61 const TraceStorage* storage) {
62 Table::Register<SpanJoinOperatorTable>(db, storage, "span_join",
63 /* read_write */ false,
64 /* requires_args */ true);
65
66 Table::Register<SpanJoinOperatorTable>(db, storage, "span_left_join",
67 /* read_write */ false,
68 /* requires_args */ true);
69
70 Table::Register<SpanJoinOperatorTable>(db, storage, "span_outer_join",
71 /* read_write */ false,
72 /* requires_args */ true);
73 }
74
Init(int argc,const char * const * argv)75 base::Optional<Table::Schema> SpanJoinOperatorTable::Init(
76 int argc,
77 const char* const* argv) {
78 // argv[0] - argv[2] are SQLite populated fields which are always present.
79 if (argc < 5) {
80 PERFETTO_ELOG("SPAN JOIN expected at least 2 args, received %d", argc - 3);
81 return base::nullopt;
82 }
83
84 auto maybe_t1_desc = TableDescriptor::Parse(
85 std::string(reinterpret_cast<const char*>(argv[3])));
86 if (!maybe_t1_desc.has_value())
87 return base::nullopt;
88 auto t1_desc = *maybe_t1_desc;
89
90 auto maybe_t2_desc = TableDescriptor::Parse(
91 std::string(reinterpret_cast<const char*>(argv[4])));
92 if (!maybe_t2_desc.has_value())
93 return base::nullopt;
94 auto t2_desc = *maybe_t2_desc;
95
96 if (t1_desc.partition_col == t2_desc.partition_col) {
97 partitioning_ = t1_desc.IsPartitioned()
98 ? PartitioningType::kSamePartitioning
99 : PartitioningType::kNoPartitioning;
100 if (partitioning_ == PartitioningType::kNoPartitioning && IsOuterJoin()) {
101 PERFETTO_ELOG("Outer join not supported for no partition tables");
102 return base::nullopt;
103 }
104 } else if (t1_desc.IsPartitioned() && t2_desc.IsPartitioned()) {
105 PERFETTO_ELOG("Mismatching partitions (%s, %s)",
106 t1_desc.partition_col.c_str(), t2_desc.partition_col.c_str());
107 return base::nullopt;
108 } else {
109 if (IsOuterJoin()) {
110 PERFETTO_ELOG("Outer join not supported for mixed partitioned tables");
111 return base::nullopt;
112 }
113 partitioning_ = PartitioningType::kMixedPartitioning;
114 }
115
116 auto maybe_t1_defn = CreateTableDefinition(t1_desc, IsOuterJoin());
117 if (!maybe_t1_defn.has_value())
118 return base::nullopt;
119 t1_defn_ = maybe_t1_defn.value();
120
121 auto maybe_t2_defn =
122 CreateTableDefinition(t2_desc, IsOuterJoin() || IsLeftJoin());
123 if (!maybe_t2_defn.has_value())
124 return base::nullopt;
125 t2_defn_ = maybe_t2_defn.value();
126
127 std::vector<Table::Column> cols;
128 // Ensure the shared columns are consistently ordered and are not
129 // present twice in the final schema
130 cols.emplace_back(Column::kTimestamp, kTsColumnName, ColumnType::kLong);
131 cols.emplace_back(Column::kDuration, kDurColumnName, ColumnType::kLong);
132 if (partitioning_ != PartitioningType::kNoPartitioning)
133 cols.emplace_back(Column::kPartition, partition_col(), ColumnType::kLong);
134
135 CreateSchemaColsForDefn(t1_defn_, &cols);
136 CreateSchemaColsForDefn(t2_defn_, &cols);
137
138 if (HasDuplicateColumns(cols)) {
139 return base::nullopt;
140 }
141 std::vector<size_t> primary_keys = {Column::kTimestamp};
142 if (partitioning_ != PartitioningType::kNoPartitioning)
143 primary_keys.push_back(Column::kPartition);
144 return Schema(cols, primary_keys);
145 }
146
CreateSchemaColsForDefn(const TableDefinition & defn,std::vector<Table::Column> * cols)147 void SpanJoinOperatorTable::CreateSchemaColsForDefn(
148 const TableDefinition& defn,
149 std::vector<Table::Column>* cols) {
150 for (size_t i = 0; i < defn.columns().size(); i++) {
151 const auto& n = defn.columns()[i].name();
152 if (IsRequiredColumn(n) || n == defn.partition_col())
153 continue;
154
155 ColumnLocator* locator = &global_index_to_column_locator_[cols->size()];
156 locator->defn = &defn;
157 locator->col_index = i;
158
159 cols->emplace_back(cols->size(), n, defn.columns()[i].type());
160 }
161 }
162
CreateCursor()163 std::unique_ptr<Table::Cursor> SpanJoinOperatorTable::CreateCursor() {
164 return std::unique_ptr<SpanJoinOperatorTable::Cursor>(new Cursor(this, db_));
165 }
166
BestIndex(const QueryConstraints &,BestIndexInfo *)167 int SpanJoinOperatorTable::BestIndex(const QueryConstraints&, BestIndexInfo*) {
168 // TODO(lalitm): figure out cost estimation.
169 return SQLITE_OK;
170 }
171
172 std::vector<std::string>
ComputeSqlConstraintsForDefinition(const TableDefinition & defn,const QueryConstraints & qc,sqlite3_value ** argv)173 SpanJoinOperatorTable::ComputeSqlConstraintsForDefinition(
174 const TableDefinition& defn,
175 const QueryConstraints& qc,
176 sqlite3_value** argv) {
177 std::vector<std::string> constraints;
178 for (size_t i = 0; i < qc.constraints().size(); i++) {
179 const auto& cs = qc.constraints()[i];
180 auto col_name = GetNameForGlobalColumnIndex(defn, cs.iColumn);
181 if (col_name == "")
182 continue;
183
184 if (col_name == kTsColumnName || col_name == kDurColumnName) {
185 // We don't support constraints on ts or duration in the child tables.
186 PERFETTO_DFATAL("ts or duration constraints on child tables");
187 continue;
188 }
189 auto op = sqlite_utils::OpToString(cs.op);
190 auto value = sqlite_utils::SqliteValueAsString(argv[i]);
191
192 constraints.emplace_back("`" + col_name + "`" + op + value);
193 }
194 return constraints;
195 }
196
197 base::Optional<SpanJoinOperatorTable::TableDefinition>
CreateTableDefinition(const TableDescriptor & desc,bool emit_shadow_slices)198 SpanJoinOperatorTable::CreateTableDefinition(const TableDescriptor& desc,
199 bool emit_shadow_slices) {
200 auto cols = sqlite_utils::GetColumnsForTable(db_, desc.name);
201
202 uint32_t required_columns_found = 0;
203 uint32_t ts_idx = std::numeric_limits<uint32_t>::max();
204 uint32_t dur_idx = std::numeric_limits<uint32_t>::max();
205 uint32_t partition_idx = std::numeric_limits<uint32_t>::max();
206 for (uint32_t i = 0; i < cols.size(); i++) {
207 auto col = cols[i];
208 if (IsRequiredColumn(col.name())) {
209 ++required_columns_found;
210 if (col.type() != Table::ColumnType::kLong &&
211 col.type() != Table::ColumnType::kUnknown) {
212 PERFETTO_ELOG("Invalid column type for %s", col.name().c_str());
213 return base::nullopt;
214 }
215 }
216
217 if (col.name() == kTsColumnName) {
218 ts_idx = i;
219 } else if (col.name() == kDurColumnName) {
220 dur_idx = i;
221 } else if (col.name() == desc.partition_col) {
222 partition_idx = i;
223 }
224 }
225 if (required_columns_found != 2) {
226 PERFETTO_ELOG("Required columns not found (found %d)",
227 required_columns_found);
228 return base::nullopt;
229 }
230
231 PERFETTO_DCHECK(ts_idx < cols.size());
232 PERFETTO_DCHECK(dur_idx < cols.size());
233 PERFETTO_DCHECK(desc.partition_col.empty() || partition_idx < cols.size());
234
235 return TableDefinition(desc.name, desc.partition_col, std::move(cols),
236 emit_shadow_slices, ts_idx, dur_idx, partition_idx);
237 }
238
GetNameForGlobalColumnIndex(const TableDefinition & defn,int global_column)239 std::string SpanJoinOperatorTable::GetNameForGlobalColumnIndex(
240 const TableDefinition& defn,
241 int global_column) {
242 size_t col_idx = static_cast<size_t>(global_column);
243 if (col_idx == Column::kTimestamp)
244 return kTsColumnName;
245 else if (col_idx == Column::kDuration)
246 return kDurColumnName;
247 else if (col_idx == Column::kPartition &&
248 partitioning_ != PartitioningType::kNoPartitioning)
249 return defn.partition_col().c_str();
250
251 const auto& locator = global_index_to_column_locator_[col_idx];
252 if (locator.defn != &defn)
253 return "";
254 return defn.columns()[locator.col_index].name().c_str();
255 }
256
Cursor(SpanJoinOperatorTable * table,sqlite3 * db)257 SpanJoinOperatorTable::Cursor::Cursor(SpanJoinOperatorTable* table, sqlite3* db)
258 : Table::Cursor(table),
259 t1_(table, &table->t1_defn_, db),
260 t2_(table, &table->t2_defn_, db),
261 table_(table) {}
262
Filter(const QueryConstraints & qc,sqlite3_value ** argv)263 int SpanJoinOperatorTable::Cursor::Filter(const QueryConstraints& qc,
264 sqlite3_value** argv) {
265 int err = t1_.Initialize(qc, argv);
266 if (err != SQLITE_OK)
267 return err;
268
269 err = t2_.Initialize(qc, argv);
270 if (err != SQLITE_OK)
271 return err;
272
273 // Step the partitioned table to allow for us to look into it below.
274 Query* step_now = t1_.IsPartitioned() ? &t1_ : &t2_;
275 next_stepped_ = step_now == &t1_ ? &t2_ : &t1_;
276
277 auto res = step_now->Step();
278 if (PERFETTO_UNLIKELY(res.is_err()))
279 return res.err_code;
280
281 // Forward the unpartitioned table to reflect the partition of the partitoined
282 // table.
283 if (table_->partitioning_ == PartitioningType::kMixedPartitioning) {
284 PERFETTO_DCHECK(step_now->IsPartitioned());
285
286 // If we emit shadow slices, we need to step because the first slice will
287 // be a full partition shadow slice that we need to skip.
288 if (step_now->definition()->emit_shadow_slices()) {
289 PERFETTO_DCHECK(step_now->IsFullPartitionShadowSlice());
290 res = step_now->StepToNextPartition();
291 if (PERFETTO_UNLIKELY(res.is_err()))
292 return res.err_code;
293 }
294
295 res = next_stepped_->StepToPartition(step_now->partition());
296 if (PERFETTO_UNLIKELY(res.is_err()))
297 return res.err_code;
298 }
299
300 // Otherwise, find an overlapping span.
301 return Next();
302 }
303
IsOverlappingSpan()304 bool SpanJoinOperatorTable::Cursor::IsOverlappingSpan() {
305 if (!t1_.IsRealSlice() && !t2_.IsRealSlice()) {
306 return false;
307 } else if (t1_.partition() != t2_.partition()) {
308 return false;
309 } else if (t1_.ts_end() <= t2_.ts_start() || t2_.ts_end() <= t1_.ts_start()) {
310 return false;
311 }
312 return true;
313 }
314
Next()315 int SpanJoinOperatorTable::Cursor::Next() {
316 // TODO: Propagate error msg to the table.
317 auto res = next_stepped_->Step();
318 if (res.is_err())
319 return res.err_code;
320
321 while (true) {
322 if (t1_.Eof() || t2_.Eof()) {
323 if (table_->partitioning_ != PartitioningType::kMixedPartitioning)
324 return SQLITE_OK;
325
326 auto* partitioned = t1_.IsPartitioned() ? &t1_ : &t2_;
327 auto* unpartitioned = t1_.IsPartitioned() ? &t2_ : &t1_;
328 if (partitioned->Eof())
329 return SQLITE_OK;
330
331 res = partitioned->StepToNextPartition();
332 if (PERFETTO_UNLIKELY(res.is_err()))
333 return res.err_code;
334 else if (PERFETTO_UNLIKELY(res.is_eof()))
335 continue;
336
337 res = unpartitioned->StepToPartition(partitioned->partition());
338 if (PERFETTO_UNLIKELY(res.is_err()))
339 return res.err_code;
340 else if (PERFETTO_UNLIKELY(res.is_eof()))
341 continue;
342 }
343
344 int64_t partition = std::max(t1_.partition(), t2_.partition());
345 res = t1_.StepToPartition(partition);
346 if (PERFETTO_UNLIKELY(res.is_err()))
347 return res.err_code;
348 else if (PERFETTO_UNLIKELY(res.is_eof()))
349 continue;
350
351 res = t2_.StepToPartition(t1_.partition());
352 if (PERFETTO_UNLIKELY(res.is_err()))
353 return res.err_code;
354 else if (PERFETTO_UNLIKELY(res.is_eof()))
355 continue;
356
357 if (t1_.partition() != t2_.partition())
358 continue;
359
360 auto ts = std::max(t1_.ts_start(), t2_.ts_start());
361 res = t1_.StepUntil(ts);
362 if (PERFETTO_UNLIKELY(res.is_err()))
363 return res.err_code;
364 else if (PERFETTO_UNLIKELY(res.is_eof()))
365 continue;
366
367 res = t2_.StepUntil(t1_.ts_start());
368 if (PERFETTO_UNLIKELY(res.is_err()))
369 return res.err_code;
370 else if (PERFETTO_UNLIKELY(res.is_eof()))
371 continue;
372
373 // If we're in the case where we have shadow slices on both tables, try
374 // and forward the earliest table and see what happens. IsOverlappingSpan()
375 // will double check that we have at least one non-real slice now.
376 // Note: if we don't do this, we end up in an infinite loop because all
377 // the code above will not change anything because these shadow slices will
378 // be overlapping.
379 if (!t1_.IsRealSlice() && !t2_.IsRealSlice()) {
380 PERFETTO_DCHECK(t1_.partition() == t2_.partition());
381
382 // If the table is not partitioned, partition() will return the partition
383 // the table was set to have by StepToPartition().
384 auto t1_partition =
385 t1_.IsPartitioned() ? t1_.CursorPartition() : t1_.partition();
386 auto t2_partition =
387 t2_.IsPartitioned() ? t2_.CursorPartition() : t2_.partition();
388
389 Query* stepped;
390 if (t1_partition == t2_partition) {
391 stepped = t1_.ts_end() <= t2_.ts_end() ? &t1_ : &t2_;
392 } else {
393 stepped = t1_partition <= t2_partition ? &t1_ : &t2_;
394 }
395
396 res = stepped->Step();
397 if (PERFETTO_UNLIKELY(res.is_err()))
398 return res.err_code;
399 else if (PERFETTO_UNLIKELY(res.is_eof()))
400 continue;
401 }
402
403 if (IsOverlappingSpan())
404 break;
405 }
406 next_stepped_ = t1_.ts_end() <= t2_.ts_end() ? &t1_ : &t2_;
407
408 return SQLITE_OK;
409 }
410
Eof()411 int SpanJoinOperatorTable::Cursor::Eof() {
412 return t1_.Eof() || t2_.Eof();
413 }
414
Column(sqlite3_context * context,int N)415 int SpanJoinOperatorTable::Cursor::Column(sqlite3_context* context, int N) {
416 PERFETTO_DCHECK(!t1_.Eof());
417 PERFETTO_DCHECK(!t2_.Eof());
418
419 if (N == Column::kTimestamp) {
420 auto max_ts = std::max(t1_.ts_start(), t2_.ts_start());
421 sqlite3_result_int64(context, static_cast<sqlite3_int64>(max_ts));
422 } else if (N == Column::kDuration) {
423 auto max_start = std::max(t1_.ts_start(), t2_.ts_start());
424 auto min_end = std::min(t1_.ts_end(), t2_.ts_end());
425 PERFETTO_DCHECK(min_end > max_start);
426 auto dur = min_end - max_start;
427 sqlite3_result_int64(context, static_cast<sqlite3_int64>(dur));
428 } else if (N == Column::kPartition &&
429 table_->partitioning_ != PartitioningType::kNoPartitioning) {
430 PERFETTO_DCHECK(table_->partitioning_ ==
431 PartitioningType::kMixedPartitioning ||
432 t1_.partition() == t2_.partition());
433 auto partition = t1_.IsPartitioned() ? t1_.partition() : t2_.partition();
434 sqlite3_result_int64(context, static_cast<sqlite3_int64>(partition));
435 } else {
436 size_t index = static_cast<size_t>(N);
437 const auto& locator = table_->global_index_to_column_locator_[index];
438 if (locator.defn == t1_.definition())
439 t1_.ReportSqliteResult(context, locator.col_index);
440 else
441 t2_.ReportSqliteResult(context, locator.col_index);
442 }
443 return SQLITE_OK;
444 }
445
Query(SpanJoinOperatorTable * table,const TableDefinition * definition,sqlite3 * db)446 SpanJoinOperatorTable::Query::Query(SpanJoinOperatorTable* table,
447 const TableDefinition* definition,
448 sqlite3* db)
449 : defn_(definition), db_(db), table_(table) {
450 PERFETTO_DCHECK(!defn_->IsPartitioned() ||
451 defn_->partition_idx() < defn_->columns().size());
452 }
453
454 SpanJoinOperatorTable::Query::~Query() = default;
455
Initialize(const QueryConstraints & qc,sqlite3_value ** argv)456 int SpanJoinOperatorTable::Query::Initialize(const QueryConstraints& qc,
457 sqlite3_value** argv) {
458 *this = Query(table_, definition(), db_);
459 sql_query_ = CreateSqlQuery(
460 table_->ComputeSqlConstraintsForDefinition(*defn_, qc, argv));
461 return PrepareRawStmt();
462 }
463
Step()464 SpanJoinOperatorTable::Query::StepRet SpanJoinOperatorTable::Query::Step() {
465 PERFETTO_DCHECK(!Eof());
466 sqlite3_stmt* stmt = stmt_.get();
467
468 // In this loop, we will try and find the slice to from the cursor.
469 // Terminology: "Shadow slices" are slices which fill in the gaps between real
470 // slices from the underlying cursor in each partition (if any).
471 // For queries which don't need "shadow slices", we simply return non-zero
472 // duration slices from the underlying cursor.
473 do {
474 if (mode_ == Mode::kShadowSlice) {
475 PERFETTO_DCHECK(defn_->emit_shadow_slices());
476
477 // If we're out of slices in the cursor, this shadow slice will be the
478 // final slice.
479 if (cursor_eof_) {
480 mode_ = Mode::kRealSlice;
481 return StepRet(StepRet::Code::kEof);
482 }
483
484 // Look ahead to see if the cursor changes partition (if the cursor is
485 // partitioned). If so, then we need to fill the gap between the ts == 0
486 // and the start of that slice. Otherwise after this slice, we have the
487 // real slice from the cursor.
488 if (!defn_->IsPartitioned() || partition_ == CursorPartition()) {
489 mode_ = Mode::kRealSlice;
490 ts_start_ = CursorTs();
491 ts_end_ = ts_start_ + CursorDur();
492 } else if (IsFullPartitionShadowSlice()) {
493 mode_ = Mode::kShadowSlice;
494 ts_start_ = 0;
495 ts_end_ = CursorTs();
496 partition_ = CursorPartition();
497 } else {
498 mode_ = Mode::kShadowSlice;
499 ts_start_ = 0;
500 ts_end_ = std::numeric_limits<int64_t>::max();
501 }
502 continue;
503 }
504
505 int res;
506 if (defn_->IsPartitioned()) {
507 auto partition_idx = static_cast<int>(defn_->partition_idx());
508 // Fastforward through any rows with null partition keys.
509 int row_type;
510 do {
511 res = sqlite3_step(stmt);
512 row_type = sqlite3_column_type(stmt, partition_idx);
513 } while (res == SQLITE_ROW && row_type == SQLITE_NULL);
514 } else {
515 res = sqlite3_step(stmt);
516 }
517
518 if (res == SQLITE_ROW) {
519 // After every row, there will be a shadow slice so emit that if we need
520 // to do so. Otherwise, just emit the underlying slice.
521 if (defn_->emit_shadow_slices()) {
522 mode_ = Mode::kShadowSlice;
523 ts_start_ = ts_end_;
524 ts_end_ = !defn_->IsPartitioned() || partition_ == CursorPartition()
525 ? CursorTs()
526 : std::numeric_limits<int64_t>::max();
527 } else {
528 mode_ = Mode::kRealSlice;
529 ts_start_ = CursorTs();
530 ts_end_ = ts_start_ + CursorDur();
531 if (defn_->IsPartitioned())
532 partition_ = CursorPartition();
533 }
534 } else if (res == SQLITE_DONE) {
535 cursor_eof_ = true;
536 if (!defn_->emit_shadow_slices())
537 return StepRet(StepRet::Code::kEof);
538
539 // Close off the remainder of this partition with a shadow slice.
540 mode_ = Mode::kShadowSlice;
541 ts_start_ = ts_end_;
542 ts_end_ = std::numeric_limits<int64_t>::max();
543 } else {
544 return StepRet(StepRet::Code::kError, res);
545 }
546 } while (ts_start_ == ts_end_);
547
548 return StepRet(StepRet::Code::kRow);
549 }
550
551 SpanJoinOperatorTable::Query::StepRet
StepToNextPartition()552 SpanJoinOperatorTable::Query::StepToNextPartition() {
553 PERFETTO_DCHECK(defn_->IsPartitioned());
554 PERFETTO_DCHECK(!Eof());
555
556 auto current_partition = partition_;
557 while (partition_ <= current_partition) {
558 auto res = Step();
559 if (!res.is_row())
560 return res;
561 }
562 return StepRet(StepRet::Code::kRow);
563 }
564
565 SpanJoinOperatorTable::Query::StepRet
StepToPartition(int64_t target_partition)566 SpanJoinOperatorTable::Query::StepToPartition(int64_t target_partition) {
567 PERFETTO_DCHECK(partition_ <= target_partition);
568 if (defn_->IsPartitioned()) {
569 while (partition_ < target_partition) {
570 if (IsFullPartitionShadowSlice() &&
571 target_partition < CursorPartition()) {
572 partition_ = target_partition;
573 return StepRet(StepRet::Code::kRow);
574 }
575
576 auto res = Step();
577 if (!res.is_row())
578 return res;
579 }
580 } else if (/* !defn_->IsPartitioned() && */ partition_ < target_partition) {
581 int res = PrepareRawStmt();
582 if (res != SQLITE_OK)
583 return StepRet(StepRet::Code::kError, res);
584 partition_ = target_partition;
585 }
586 return StepRet(StepRet::Code::kRow);
587 }
588
StepUntil(int64_t timestamp)589 SpanJoinOperatorTable::Query::StepRet SpanJoinOperatorTable::Query::StepUntil(
590 int64_t timestamp) {
591 PERFETTO_DCHECK(!Eof());
592 auto partition = partition_;
593 while (partition_ == partition && ts_end_ <= timestamp) {
594 auto res = Step();
595 if (!res.is_row())
596 return res;
597 }
598 return StepRet(StepRet::Code::kRow);
599 }
600
CreateSqlQuery(const std::vector<std::string> & cs) const601 std::string SpanJoinOperatorTable::Query::CreateSqlQuery(
602 const std::vector<std::string>& cs) const {
603 std::vector<std::string> col_names;
604 for (const Table::Column& c : defn_->columns()) {
605 col_names.push_back("`" + c.name() + "`");
606 }
607
608 std::string sql = "SELECT " + base::Join(col_names, ", ");
609 sql += " FROM " + defn_->name();
610 if (!cs.empty()) {
611 sql += " WHERE " + base::Join(cs, " AND ");
612 }
613 sql += " ORDER BY ";
614 sql += defn_->IsPartitioned()
615 ? base::Join({"`" + defn_->partition_col() + "`", "ts"}, ", ")
616 : "ts";
617 sql += ";";
618 PERFETTO_DLOG("%s", sql.c_str());
619 return sql;
620 }
621
PrepareRawStmt()622 int SpanJoinOperatorTable::Query::PrepareRawStmt() {
623 sqlite3_stmt* stmt = nullptr;
624 int err =
625 sqlite3_prepare_v2(db_, sql_query_.c_str(),
626 static_cast<int>(sql_query_.size()), &stmt, nullptr);
627 stmt_.reset(stmt);
628
629 ts_start_ = 0;
630 ts_end_ = 0;
631 partition_ = std::numeric_limits<int64_t>::lowest();
632 cursor_eof_ = false;
633 mode_ = Mode::kRealSlice;
634
635 return err;
636 }
637
ReportSqliteResult(sqlite3_context * context,size_t index)638 void SpanJoinOperatorTable::Query::ReportSqliteResult(sqlite3_context* context,
639 size_t index) {
640 if (mode_ != Mode::kRealSlice) {
641 sqlite3_result_null(context);
642 return;
643 }
644
645 sqlite3_stmt* stmt = stmt_.get();
646 int idx = static_cast<int>(index);
647 switch (sqlite3_column_type(stmt, idx)) {
648 case SQLITE_INTEGER:
649 sqlite3_result_int64(context, sqlite3_column_int64(stmt, idx));
650 break;
651 case SQLITE_FLOAT:
652 sqlite3_result_double(context, sqlite3_column_double(stmt, idx));
653 break;
654 case SQLITE_TEXT: {
655 // TODO(lalitm): note for future optimizations: if we knew the addresses
656 // of the string intern pool, we could check if the string returned here
657 // comes from the pool, and pass it as non-transient.
658 const auto kSqliteTransient =
659 reinterpret_cast<sqlite3_destructor_type>(-1);
660 auto ptr = reinterpret_cast<const char*>(sqlite3_column_text(stmt, idx));
661 sqlite3_result_text(context, ptr, -1, kSqliteTransient);
662 break;
663 }
664 }
665 }
666
TableDefinition(std::string name,std::string partition_col,std::vector<Table::Column> cols,bool emit_shadow_slices,uint32_t ts_idx,uint32_t dur_idx,uint32_t partition_idx)667 SpanJoinOperatorTable::TableDefinition::TableDefinition(
668 std::string name,
669 std::string partition_col,
670 std::vector<Table::Column> cols,
671 bool emit_shadow_slices,
672 uint32_t ts_idx,
673 uint32_t dur_idx,
674 uint32_t partition_idx)
675 : name_(std::move(name)),
676 partition_col_(std::move(partition_col)),
677 cols_(std::move(cols)),
678 emit_shadow_slices_(emit_shadow_slices),
679 ts_idx_(ts_idx),
680 dur_idx_(dur_idx),
681 partition_idx_(partition_idx) {}
682
683 base::Optional<SpanJoinOperatorTable::TableDescriptor>
Parse(const std::string & raw_descriptor)684 SpanJoinOperatorTable::TableDescriptor::Parse(
685 const std::string& raw_descriptor) {
686 // Descriptors have one of the following forms:
687 // table_name [PARTITIONED column_name]
688
689 // Find the table name.
690 base::StringSplitter splitter(raw_descriptor, ' ');
691 if (!splitter.Next())
692 return base::nullopt;
693
694 TableDescriptor descriptor;
695 descriptor.name = splitter.cur_token();
696 if (!splitter.Next())
697 return std::move(descriptor);
698
699 if (strcasecmp(splitter.cur_token(), "PARTITIONED") != 0) {
700 PERFETTO_ELOG("Invalid SPAN_JOIN token %s", splitter.cur_token());
701 return base::nullopt;
702 }
703 if (!splitter.Next()) {
704 PERFETTO_ELOG("Missing partitioning column");
705 return base::nullopt;
706 }
707
708 descriptor.partition_col = splitter.cur_token();
709 return std::move(descriptor);
710 }
711
712 } // namespace trace_processor
713 } // namespace perfetto
714