1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "span_join.h"
17 #include <vector>
18
19 namespace SysTuning {
20 namespace TraceStreamer {
21
22 const std::string TS_COLUMN_NAME = "ts";
23 const std::string DUR_COLUMN_NAME = "dur";
24 constexpr int MINSIZE = 5;
25 constexpr int MAXSIZE = 1024;
26 constexpr int NEXT_NUMBER = 1;
27 constexpr int TSANDDUR_COLUMN = 2;
28 constexpr int PARTITIONED_COUNT = 3;
29
30 enum Index { TS, DUR, PARTITION };
31
SpanJoin(const TraceDataCache * dataCache)32 SpanJoin::SpanJoin(const TraceDataCache* dataCache) : TableBase(dataCache)
33 {
34 tableColumn_ = {};
35 tablePriKey_ = {};
36 }
37
Init(int argc,const char * const * argv)38 void SpanJoin::Init(int argc, const char* const* argv)
39 {
40 if (argc < MINSIZE) {
41 return;
42 }
43 // Parse the fields of the two tables separately
44 TableParse tableFirstParse;
45 Parse(std::string(reinterpret_cast<const char*>(argv[3])), tableFirstParse);
46 TableParse tableSecondParse;
47 Parse(std::string(reinterpret_cast<const char*>(argv[4])), tableSecondParse);
48
49 // you must ensure that the two partitions exist and are the same when using
50 if (tableFirstDesc_.partition != tableSecondDesc_.partition) {
51 return;
52 }
53 isSamepartitioning_ = true;
54 GetTableField(tableFirstParse, tableFirstDesc_);
55 GetTableField(tableSecondParse, tableSecondDesc_);
56 tableColumn_.emplace_back(TS_COLUMN_NAME, "INTEGER");
57 tableColumn_.emplace_back(DUR_COLUMN_NAME, "INTEGER");
58 tableColumn_.emplace_back(tableFirstDesc_.partition, "INTEGER");
59 CreateCols(tableFirstDesc_, tableColumn_);
60 CreateCols(tableSecondDesc_, tableColumn_);
61 std::vector<std::string> primaryKeys = {"ts"};
62 primaryKeys.push_back(tableFirstDesc_.partition);
63 tablePriKey_ = primaryKeys;
64 return;
65 }
66
CreateCols(TableDesc & tableDesc,std::vector<TableBase::ColumnInfo> & cols)67 void SpanJoin::CreateCols(TableDesc& tableDesc, std::vector<TableBase::ColumnInfo>& cols)
68 {
69 for (int i = 0; i < tableDesc.cols.size(); i++) {
70 auto& n = tableDesc.cols.at(i).name_;
71 if (IsTsOrDurCol(n)) {
72 continue;
73 }
74 auto columnInfo = &mTableColumnInfo_[cols.size()];
75 columnInfo->tableDesc = &tableDesc;
76 columnInfo->colIdx = i;
77 if (!DeduplicationForColumn(n, cols)) {
78 continue;
79 }
80 cols.emplace_back(n, tableDesc.cols.at(i).type_);
81 }
82 }
83
DeduplicationForColumn(const std::string & name,std::vector<ColumnInfo> & cols)84 bool SpanJoin::DeduplicationForColumn(const std::string& name, std::vector<ColumnInfo>& cols)
85 {
86 for (size_t i = 0; i < cols.size(); i++) {
87 if (name == cols.at(i).name_) {
88 return false;
89 }
90 }
91 return true;
92 }
93
TableNameSplitToVec(std::string & str,const std::string & pat)94 std::vector<std::string> SpanJoin::TableNameSplitToVec(std::string& str, const std::string& pat)
95 {
96 std::string::size_type pos;
97 std::vector<std::string> result;
98 str += pat;
99 int size = str.size();
100 for (int i = 0; i < size; i++) {
101 pos = str.find(pat, i);
102 if (pos == std::string::npos) {
103 break;
104 }
105 if (pos < size) {
106 std::string s = str.substr(i, pos - i);
107 result.push_back(s);
108 i = pos + pat.size() - 1;
109 }
110 }
111 return result;
112 }
113
Parse(const std::string & tablePartition,TableParse & tableParse)114 void SpanJoin::Parse(const std::string& tablePartition, TableParse& tableParse)
115 {
116 std::vector<std::string> result = TableNameSplitToVec(const_cast<std::string&>(tablePartition), " ");
117 if (result.size() < PARTITIONED_COUNT) {
118 TS_LOGW("span_join sql is invalid!");
119 }
120 tableParse.name = result.at(0);
121 if (0 != strcasecmp(result.at(1).c_str(), "PARTITIONED")) {
122 TS_LOGW("sql has not PARTITIONED");
123 return;
124 }
125 tableParse.partitionCol = result.at(2);
126 return;
127 }
128
IsTsOrDurCol(const std::string & name)129 bool SpanJoin::IsTsOrDurCol(const std::string& name)
130 {
131 if (name == TS_COLUMN_NAME || name == DUR_COLUMN_NAME) {
132 return true;
133 }
134 return false;
135 }
136
GetTableField(TableParse & tableParse,TableDesc & tableDesc)137 void SpanJoin::GetTableField(TableParse& tableParse, TableDesc& tableDesc)
138 {
139 std::vector<TableBase::ColumnInfo> cols;
140 GetColumns(dataCache_, tableParse.name, cols);
141 int tsDurCount = 0;
142 for (int i = 0; i < cols.size(); i++) {
143 auto col = cols.at(i);
144 if (IsTsOrDurCol(col.name_)) {
145 tsDurCount++;
146 }
147 if (col.name_ == TS_COLUMN_NAME) {
148 tableDesc.tsIdx = i;
149 } else if (col.name_ == DUR_COLUMN_NAME) {
150 tableDesc.durIdx = i;
151 } else if (col.name_ == tableParse.partitionCol) {
152 tableDesc.partitionIdx = i;
153 }
154 }
155 if (tsDurCount != TSANDDUR_COLUMN) {
156 return;
157 }
158 tableDesc.name = tableParse.name;
159 tableDesc.partition = tableParse.partitionCol;
160 tableDesc.cols = std::move(cols);
161 return;
162 }
163
GetColumns(const TraceDataCache * dataCache,const std::string & tableName,std::vector<TableBase::ColumnInfo> & columns)164 void SpanJoin::GetColumns(const TraceDataCache* dataCache,
165 const std::string& tableName,
166 std::vector<TableBase::ColumnInfo>& columns)
167 {
168 char sql[MAXSIZE];
169 std::string querySql = "SELECT name, type from PRAGMA_table_info(\"%s\")";
170 int n = snprintf(sql, sizeof(sql), querySql.c_str(), tableName.c_str());
171 sqlite3_stmt* stmt = nullptr;
172 int ret = sqlite3_prepare_v2(dataCache->db_, sql, n, &stmt, nullptr);
173 while (!ret) {
174 int err = sqlite3_step(stmt);
175 if (err == SQLITE_ROW) {
176 columns.emplace_back((reinterpret_cast<const char*>(sqlite3_column_text(stmt, 0))),
177 reinterpret_cast<const char*>(sqlite3_column_text(stmt, 1)));
178 continue;
179 }
180 if (err == SQLITE_DONE) {
181 break;
182 }
183 ret = err;
184 }
185 return;
186 }
187
CaclSpan(TableBase * tableBase,const TableDesc * tableDesc,sqlite3 * db)188 SpanJoin::CaclSpan::CaclSpan(TableBase* tableBase, const TableDesc* tableDesc, sqlite3* db)
189 : desc_(tableDesc), db_(db), table_(reinterpret_cast<SpanJoin*>(tableBase))
190 {
191 }
192
193 SpanJoin::CaclSpan::~CaclSpan() = default;
194
InitQuerySql(sqlite3_value ** argv)195 int SpanJoin::CaclSpan::InitQuerySql(sqlite3_value** argv)
196 {
197 sqlQuery_ = GetSqlQuery();
198 bool status = IsQueryNext();
199 if (!status) {
200 return SQLITE_ERROR;
201 }
202 return SQLITE_OK;
203 }
204
GetSqlQuery()205 std::string SpanJoin::CaclSpan::GetSqlQuery()
206 {
207 std::vector<std::string> columnNames;
208 for (int i = 0; i < desc_->cols.size(); i++) {
209 columnNames.push_back(desc_->cols.at(i).name_);
210 }
211 std::string str;
212 str = GetMergeColumns(columnNames);
213 std::string sql = "SELECT " + str + " FROM " + desc_->name + " ORDER BY " + desc_->partition + ", " + "ts;";
214 return sql;
215 }
216
setResult(sqlite3_context * context,size_t index) const217 void SpanJoin::CaclSpan::setResult(sqlite3_context* context, size_t index) const
218 {
219 if (partitionState_ != PartitionState::TS_REAL) {
220 sqlite3_result_null(context);
221 return;
222 }
223 int sqliteType = sqlite3_column_type(stmt_, index);
224 if (sqliteType == SQLITE_TEXT) {
225 sqlite3_result_text(context, reinterpret_cast<const char*>(sqlite3_column_int64(stmt_, index)), -1,
226 reinterpret_cast<sqlite3_destructor_type>(-1));
227 } else if (sqliteType == SQLITE_INTEGER) {
228 sqlite3_result_int64(context, sqlite3_column_int64(stmt_, index));
229 } else if (sqliteType == SQLITE_FLOAT) {
230 sqlite3_result_double(context, sqlite3_column_double(stmt_, index));
231 }
232 }
233
GetCursorNext()234 bool SpanJoin::CaclSpan::GetCursorNext()
235 {
236 int res;
237 int rowType;
238 do {
239 res = sqlite3_step(stmt_);
240 rowType = sqlite3_column_type(stmt_, desc_->partitionIdx);
241 } while (res == SQLITE_ROW && rowType == SQLITE_NULL);
242 if (res != SQLITE_ROW) {
243 isEof_ = true;
244 } else {
245 isEof_ = false;
246 }
247
248 return res == SQLITE_ROW || res == SQLITE_DONE;
249 }
250
Next()251 void SpanJoin::CaclSpan::Next()
252 {
253 GetNextState();
254 SearchNextslice();
255 }
256
IsQueryNext()257 bool SpanJoin::CaclSpan::IsQueryNext()
258 {
259 int res = sqlite3_prepare_v2(db_, sqlQuery_.c_str(), static_cast<int>(sqlQuery_.size()), &stmt_, nullptr);
260 isEof_ = res != SQLITE_OK;
261 if (res != SQLITE_OK) {
262 return true;
263 }
264 auto status = GetCursorNext();
265 if (!status) {
266 return false;
267 }
268 missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int>(desc_->partitionIdx));
269 status = SearchNextslice();
270 return status;
271 }
272
SearchNextslice()273 bool SpanJoin::CaclSpan::SearchNextslice()
274 {
275 while (partitionState_ != TS_REAL) {
276 bool status = GetNextState();
277 if (!status) {
278 return false;
279 }
280 }
281 return true;
282 }
283
GetNextState()284 bool SpanJoin::CaclSpan::GetNextState()
285 {
286 switch (partitionState_) {
287 case PartitionState::TS_REAL: {
288 GetCursorNext();
289 partitionState_ = PartitionState::TS_PARTITION;
290 ts_ = endTs_;
291 if (isEof_ || partition_ != sqlite3_column_int64(stmt_, static_cast<int>(desc_->partitionIdx))) {
292 endTs_ = std::numeric_limits<int64_t>::max();
293 } else {
294 endTs_ = sqlite3_column_int64(stmt_, static_cast<int>(desc_->tsIdx));
295 }
296 return true;
297 }
298 case PartitionState::TS_PARTITION: {
299 if (endTs_ == std::numeric_limits<int64_t>::max()) {
300 partitionState_ = PartitionState::TS_MISSING;
301 if (isEof_) {
302 missPartitionEnd_ = std::numeric_limits<int32_t>::max();
303 } else {
304 missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int>(desc_->partitionIdx));
305 }
306 missPartitionStart_ = partition_ + NEXT_NUMBER;
307 ts_ = 0;
308 } else {
309 partitionState_ = PartitionState::TS_REAL;
310 ts_ = sqlite3_column_int64(stmt_, static_cast<int>(desc_->tsIdx));
311 endTs_ = ts_ + sqlite3_column_int64(stmt_, static_cast<int>(desc_->durIdx));
312 }
313 return true;
314 }
315 case PartitionState::TS_MISSING: {
316 if (missPartitionEnd_ == std::numeric_limits<int32_t>::max()) {
317 partitionState_ = PartitionState::TS_EOF;
318 } else {
319 partitionState_ = PartitionState::TS_PARTITION;
320 ts_ = 0;
321 endTs_ = sqlite3_column_int64(stmt_, static_cast<int>(desc_->tsIdx));
322 partition_ = missPartitionEnd_;
323 }
324 return true;
325 }
326 default:
327 return false;
328 }
329 }
330
GetMergeColumns(std::vector<std::string> & columns)331 std::string SpanJoin::CaclSpan::GetMergeColumns(std::vector<std::string>& columns)
332 {
333 std::string str;
334 int size = columns.size();
335 for (int i = 0; i < size; i++) {
336 if (i == size - 1) {
337 str += columns.at(i);
338 } else {
339 str += columns.at(i) + ", ";
340 }
341 }
342 return str;
343 }
344
GetPatitonForMiss()345 int64_t SpanJoin::CaclSpan::GetPatitonForMiss()
346 {
347 return partitionState_ == TS_MISSING ? missPartitionEnd_ - NEXT_NUMBER : partition_;
348 }
349
CreateCursor()350 std::unique_ptr<TableBase::Cursor> SpanJoin::CreateCursor()
351 {
352 return std::make_unique<Cursor>(dataCache_, this);
353 }
354
Cursor(const TraceDataCache * dataCache,SpanJoin * table)355 SpanJoin::Cursor::Cursor(const TraceDataCache* dataCache, SpanJoin* table)
356 : TableBase::Cursor(dataCache, table, 0),
357 tableFirst_(table, &table->tableFirstDesc_, dataCache_->db_),
358 tableSecond_(table, &table->tableSecondDesc_, dataCache_->db_),
359 table_(table)
360 {
361 }
362
Filter(const FilterConstraints & fc,sqlite3_value ** argv)363 int SpanJoin::Cursor::Filter(const FilterConstraints& fc, sqlite3_value** argv)
364 {
365 tableFirst_.InitQuerySql(argv);
366 tableSecond_.InitQuerySql(argv);
367 auto status = IsFindSpan();
368 if (!status) {
369 return SQLITE_ERROR;
370 }
371 return SQLITE_OK;
372 }
373
CaclOverLap()374 bool SpanJoin::Cursor::CaclOverLap()
375 {
376 if (tableFirst_.ts_ >= tableSecond_.ts_) {
377 if (tableFirst_.partitionState_ == PartitionState::TS_REAL &&
378 tableSecond_.partitionState_ == PartitionState::TS_REAL ||
379 tableFirst_.ts_ < tableSecond_.endTs_) {
380 return true;
381 }
382 } else if (tableFirst_.ts_ <= tableSecond_.ts_ && tableSecond_.ts_ < tableFirst_.endTs_) {
383 return true;
384 }
385 return false;
386 }
387
IsFindSpan()388 bool SpanJoin::Cursor::IsFindSpan()
389 {
390 for (;;) {
391 if (tableFirst_.isEof_ || tableSecond_.isEof_) {
392 break;
393 }
394 queryNext_ = FindQueryResult();
395 if (CaclOverLap()) {
396 break;
397 }
398 queryNext_->Next();
399 }
400 return true;
401 }
402
FindQueryResult()403 SpanJoin::CaclSpan* SpanJoin::Cursor::FindQueryResult()
404 {
405 if (!table_->isSamepartitioning_) {
406 return nullptr;
407 }
408
409 auto tableFirstResult = std::make_tuple(tableFirst_.GetPatitonForMiss(), tableFirst_.endTs_,
410 tableFirst_.partitionState_ == PartitionState::TS_REAL ? true : false);
411 auto tableSecondResult = std::make_tuple(tableSecond_.GetPatitonForMiss(), tableSecond_.endTs_,
412 tableSecond_.partitionState_ == PartitionState::TS_REAL ? true : false);
413 if (tableFirstResult < tableSecondResult) {
414 return &tableFirst_;
415 }
416 return &tableSecond_;
417 }
418
Column(int column) const419 int SpanJoin::Cursor::Column(int column) const
420 {
421 switch (column) {
422 case TS: {
423 sqlite3_result_int64(context_, static_cast<sqlite3_int64>(std::max(tableFirst_.ts_, tableSecond_.ts_)));
424 break;
425 }
426 case DUR: {
427 sqlite3_result_int64(context_,
428 static_cast<sqlite3_int64>(std::min(tableFirst_.endTs_, tableSecond_.endTs_) -
429 std::max(tableFirst_.ts_, tableSecond_.ts_)));
430 break;
431 }
432 case PARTITION: {
433 auto partResult = tableFirst_.partitionState_ == PartitionState::TS_REAL ? tableFirst_.partition_
434 : tableSecond_.partition_;
435 sqlite3_result_int64(context_, static_cast<sqlite3_int64>(partResult));
436 break;
437 }
438 default: {
439 const auto ColumnInfo = table_->mTableColumnInfo_[column];
440 if (ColumnInfo.tableDesc == tableFirst_.desc_) {
441 tableFirst_.setResult(context_, ColumnInfo.colIdx);
442 } else {
443 tableSecond_.setResult(context_, ColumnInfo.colIdx);
444 }
445 }
446 }
447 return SQLITE_OK;
448 }
449
450 } // namespace TraceStreamer
451 } // namespace SysTuning
452