1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "span_join.h"
17 #include <vector>
18 #include "string_help.h"
19
20 namespace SysTuning {
21 namespace TraceStreamer {
22
23 const std::string TS_COLUMN_NAME = "ts";
24 const std::string DUR_COLUMN_NAME = "dur";
25 constexpr int32_t MINSIZE = 5;
26 constexpr int32_t MAXSIZE = 1024;
27 constexpr int32_t NEXT_NUMBER = 1;
28 constexpr int32_t TSANDDUR_COLUMN = 2;
29 constexpr int32_t PARTITIONED_COUNT = 3;
30
31 enum Index { TS, DUR, PARTITION };
32
SpanJoin(const TraceDataCache * dataCache)33 SpanJoin::SpanJoin(const TraceDataCache* dataCache) : TableBase(dataCache)
34 {
35 tableColumn_ = {};
36 tablePriKey_ = {};
37 tableFirstDesc_ = {};
38 tableSecondDesc_ = {};
39 }
40
Init(int32_t argc,const char * const * argv)41 void SpanJoin::Init(int32_t argc, const char* const* argv)
42 {
43 if (argc < MINSIZE) {
44 return;
45 }
46 // Parse the fields of the two tables separately
47 TableParse tableFirstParse;
48 Parse(std::string(reinterpret_cast<const char*>(argv[3])), tableFirstParse);
49 TableParse tableSecondParse;
50 Parse(std::string(reinterpret_cast<const char*>(argv[4])), tableSecondParse);
51
52 // you must ensure that the two partitions exist and are the same when using
53 if (tableFirstDesc_.partition != tableSecondDesc_.partition) {
54 return;
55 }
56 isSamepartitioning_ = true;
57 GetTableField(tableFirstParse, tableFirstDesc_);
58 GetTableField(tableSecondParse, tableSecondDesc_);
59 tableColumn_.emplace_back(TS_COLUMN_NAME, "INTEGER");
60 tableColumn_.emplace_back(DUR_COLUMN_NAME, "INTEGER");
61 tableColumn_.emplace_back(tableFirstDesc_.partition, "INTEGER");
62 CreateCols(tableFirstDesc_, tableColumn_);
63 CreateCols(tableSecondDesc_, tableColumn_);
64 std::vector<std::string> primaryKeys = {"ts"};
65 primaryKeys.push_back(tableFirstDesc_.partition);
66 tablePriKey_ = primaryKeys;
67 return;
68 }
69
CreateCols(TableDesc & tableDesc,std::vector<TableBase::ColumnInfo> & cols)70 void SpanJoin::CreateCols(TableDesc& tableDesc, std::vector<TableBase::ColumnInfo>& cols)
71 {
72 for (int32_t i = 0; i < tableDesc.cols.size(); i++) {
73 auto& n = tableDesc.cols.at(i).name_;
74 if (IsTsOrDurCol(n)) {
75 continue;
76 }
77 auto columnInfo = &mTableColumnInfo_[cols.size()];
78 columnInfo->tableDesc = &tableDesc;
79 columnInfo->colIdx = i;
80 if (!DeduplicationForColumn(n, cols)) {
81 continue;
82 }
83 cols.emplace_back(n, tableDesc.cols.at(i).type_);
84 }
85 }
86
DeduplicationForColumn(const std::string & name,std::vector<ColumnInfo> & cols)87 bool SpanJoin::DeduplicationForColumn(const std::string& name, std::vector<ColumnInfo>& cols)
88 {
89 for (size_t i = 0; i < cols.size(); i++) {
90 if (name == cols.at(i).name_) {
91 return false;
92 }
93 }
94 return true;
95 }
96
Parse(const std::string & tablePartition,TableParse & tableParse)97 void SpanJoin::Parse(const std::string& tablePartition, TableParse& tableParse)
98 {
99 std::vector<std::string> result = SplitStringToVec(tablePartition, " ");
100 if (result.size() < PARTITIONED_COUNT) {
101 TS_LOGW("span_join sql is invalid!");
102 }
103 tableParse.name = result.at(0);
104 if (0 != strcasecmp(result.at(1).c_str(), "PARTITIONED")) {
105 TS_LOGW("sql has not PARTITIONED");
106 return;
107 }
108 tableParse.partitionCol = result.at(2);
109 return;
110 }
111
IsTsOrDurCol(const std::string & name)112 bool SpanJoin::IsTsOrDurCol(const std::string& name)
113 {
114 if (name == TS_COLUMN_NAME || name == DUR_COLUMN_NAME) {
115 return true;
116 }
117 return false;
118 }
119
GetTableField(const TableParse & tableParse,TableDesc & tableDesc)120 void SpanJoin::GetTableField(const TableParse& tableParse, TableDesc& tableDesc)
121 {
122 std::vector<TableBase::ColumnInfo> cols;
123 GetColumns(dataCache_, tableParse.name, cols);
124 int32_t tsDurCount = 0;
125 for (int32_t i = 0; i < cols.size(); i++) {
126 auto col = cols.at(i);
127 if (IsTsOrDurCol(col.name_)) {
128 tsDurCount++;
129 }
130 if (col.name_ == TS_COLUMN_NAME) {
131 tableDesc.tsIdx = i;
132 } else if (col.name_ == DUR_COLUMN_NAME) {
133 tableDesc.durIdx = i;
134 } else if (col.name_ == tableParse.partitionCol) {
135 tableDesc.partitionIdx = i;
136 }
137 }
138 if (tsDurCount != TSANDDUR_COLUMN) {
139 return;
140 }
141 tableDesc.name = tableParse.name;
142 tableDesc.partition = tableParse.partitionCol;
143 tableDesc.cols = std::move(cols);
144 return;
145 }
146
GetColumns(const TraceDataCache * dataCache,const std::string & tableName,std::vector<TableBase::ColumnInfo> & columns)147 void SpanJoin::GetColumns(const TraceDataCache* dataCache,
148 const std::string& tableName,
149 std::vector<TableBase::ColumnInfo>& columns)
150 {
151 char sql[MAXSIZE];
152 std::string querySql = "SELECT name, type from PRAGMA_table_info(\"%s\")";
153 int32_t n = snprintf_s(sql, sizeof(sql), 1, querySql.c_str(), tableName.c_str());
154 sqlite3_stmt* stmt = nullptr;
155 int32_t ret = sqlite3_prepare_v2(dataCache->db_, sql, n, &stmt, nullptr);
156 while (!ret) {
157 int32_t err = sqlite3_step(stmt);
158 if (err == SQLITE_ROW) {
159 columns.emplace_back((reinterpret_cast<const char*>(sqlite3_column_text(stmt, 0))),
160 reinterpret_cast<const char*>(sqlite3_column_text(stmt, 1)));
161 continue;
162 }
163 if (err == SQLITE_DONE) {
164 break;
165 }
166 ret = err;
167 }
168 return;
169 }
170
CaclSpan(TableBase * tableBase,const TableDesc * tableDesc,sqlite3 * db)171 SpanJoin::CaclSpan::CaclSpan(TableBase* tableBase, const TableDesc* tableDesc, sqlite3* db)
172 : desc_(tableDesc), db_(db), table_(reinterpret_cast<SpanJoin*>(tableBase))
173 {
174 }
175
176 SpanJoin::CaclSpan::~CaclSpan() = default;
177
InitQuerySql(sqlite3_value ** argv)178 int32_t SpanJoin::CaclSpan::InitQuerySql(sqlite3_value** argv)
179 {
180 sqlQuery_ = GetSqlQuery();
181 bool status = IsQueryNext();
182 if (!status) {
183 return SQLITE_ERROR;
184 }
185 return SQLITE_OK;
186 }
187
GetSqlQuery()188 std::string SpanJoin::CaclSpan::GetSqlQuery()
189 {
190 std::vector<std::string> columnNames;
191 for (int32_t i = 0; i < desc_->cols.size(); i++) {
192 columnNames.push_back(desc_->cols.at(i).name_);
193 }
194 std::string str;
195 str = GetMergeColumns(columnNames);
196 std::string sql = "SELECT " + str + " FROM " + desc_->name + " ORDER BY " + desc_->partition + ", " + "ts;";
197 return sql;
198 }
199
setResult(sqlite3_context * context,size_t index) const200 void SpanJoin::CaclSpan::setResult(sqlite3_context* context, size_t index) const
201 {
202 if (partitionState_ != PartitionState::TS_REAL) {
203 sqlite3_result_null(context);
204 return;
205 }
206 int32_t sqliteType = sqlite3_column_type(stmt_, index);
207 if (sqliteType == SQLITE_TEXT) {
208 sqlite3_result_text(context, reinterpret_cast<const char*>(sqlite3_column_int64(stmt_, index)), -1,
209 reinterpret_cast<sqlite3_destructor_type>(-1));
210 } else if (sqliteType == SQLITE_INTEGER) {
211 sqlite3_result_int64(context, sqlite3_column_int64(stmt_, index));
212 } else if (sqliteType == SQLITE_FLOAT) {
213 sqlite3_result_double(context, sqlite3_column_double(stmt_, index));
214 }
215 }
216
GetCursorNext()217 bool SpanJoin::CaclSpan::GetCursorNext()
218 {
219 int32_t res;
220 int32_t rowType;
221 do {
222 res = sqlite3_step(stmt_);
223 rowType = sqlite3_column_type(stmt_, desc_->partitionIdx);
224 } while (res == SQLITE_ROW && rowType == SQLITE_NULL);
225 if (res != SQLITE_ROW) {
226 isEof_ = true;
227 } else {
228 isEof_ = false;
229 }
230
231 return res == SQLITE_ROW || res == SQLITE_DONE;
232 }
233
Next()234 void SpanJoin::CaclSpan::Next()
235 {
236 GetNextState();
237 SearchNextslice();
238 }
239
IsQueryNext()240 bool SpanJoin::CaclSpan::IsQueryNext()
241 {
242 int32_t res = sqlite3_prepare_v2(db_, sqlQuery_.c_str(), static_cast<int32_t>(sqlQuery_.size()), &stmt_, nullptr);
243 isEof_ = res != SQLITE_OK;
244 if (res != SQLITE_OK) {
245 return true;
246 }
247 auto status = GetCursorNext();
248 if (!status) {
249 return false;
250 }
251 missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx));
252 status = SearchNextslice();
253 return status;
254 }
255
SearchNextslice()256 bool SpanJoin::CaclSpan::SearchNextslice()
257 {
258 while (partitionState_ != TS_REAL) {
259 bool status = GetNextState();
260 if (!status) {
261 return false;
262 }
263 }
264 return true;
265 }
266
GetNextState()267 bool SpanJoin::CaclSpan::GetNextState()
268 {
269 switch (partitionState_) {
270 case PartitionState::TS_REAL: {
271 GetCursorNext();
272 partitionState_ = PartitionState::TS_PARTITION;
273 ts_ = endTs_;
274 if (isEof_ || partition_ != sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx))) {
275 endTs_ = std::numeric_limits<int64_t>::max();
276 } else {
277 endTs_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx));
278 }
279 return true;
280 }
281 case PartitionState::TS_PARTITION: {
282 if (endTs_ == std::numeric_limits<int64_t>::max()) {
283 partitionState_ = PartitionState::TS_MISSING;
284 if (isEof_) {
285 missPartitionEnd_ = std::numeric_limits<int32_t>::max();
286 } else {
287 missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx));
288 }
289 missPartitionStart_ = partition_ + NEXT_NUMBER;
290 ts_ = 0;
291 } else {
292 partitionState_ = PartitionState::TS_REAL;
293 ts_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx));
294 endTs_ = ts_ + sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->durIdx));
295 }
296 return true;
297 }
298 case PartitionState::TS_MISSING: {
299 if (missPartitionEnd_ == std::numeric_limits<int32_t>::max()) {
300 partitionState_ = PartitionState::TS_EOF;
301 } else {
302 partitionState_ = PartitionState::TS_PARTITION;
303 ts_ = 0;
304 endTs_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx));
305 partition_ = missPartitionEnd_;
306 }
307 return true;
308 }
309 default:
310 return false;
311 }
312 }
313
GetMergeColumns(std::vector<std::string> & columns)314 std::string SpanJoin::CaclSpan::GetMergeColumns(std::vector<std::string>& columns)
315 {
316 std::string str;
317 int32_t size = columns.size();
318 for (int32_t i = 0; i < size; i++) {
319 if (i == size - 1) {
320 str += columns.at(i);
321 } else {
322 str += columns.at(i) + ", ";
323 }
324 }
325 return str;
326 }
327
GetPatitonForMiss()328 int64_t SpanJoin::CaclSpan::GetPatitonForMiss()
329 {
330 return partitionState_ == TS_MISSING ? missPartitionEnd_ - NEXT_NUMBER : partition_;
331 }
332
CreateCursor()333 std::unique_ptr<TableBase::Cursor> SpanJoin::CreateCursor()
334 {
335 return std::make_unique<Cursor>(dataCache_, this);
336 }
337
Cursor(const TraceDataCache * dataCache,SpanJoin * table)338 SpanJoin::Cursor::Cursor(const TraceDataCache* dataCache, SpanJoin* table)
339 : TableBase::Cursor(dataCache, table, 0),
340 tableFirst_(table, &table->tableFirstDesc_, dataCache_->db_),
341 tableSecond_(table, &table->tableSecondDesc_, dataCache_->db_),
342 spanTable_(table)
343 {
344 }
345
Filter(const FilterConstraints & fc,sqlite3_value ** argv)346 int32_t SpanJoin::Cursor::Filter(const FilterConstraints& fc, sqlite3_value** argv)
347 {
348 tableFirst_.InitQuerySql(argv);
349 tableSecond_.InitQuerySql(argv);
350 auto status = IsFindSpan();
351 if (!status) {
352 return SQLITE_ERROR;
353 }
354 return SQLITE_OK;
355 }
356
CaclOverLap()357 bool SpanJoin::Cursor::CaclOverLap()
358 {
359 if (tableFirst_.ts_ >= tableSecond_.ts_) {
360 if ((tableFirst_.partitionState_ == PartitionState::TS_REAL &&
361 tableSecond_.partitionState_ == PartitionState::TS_REAL) ||
362 tableFirst_.ts_ < tableSecond_.endTs_) {
363 return true;
364 }
365 } else if (tableFirst_.ts_ <= tableSecond_.ts_ && tableSecond_.ts_ < tableFirst_.endTs_) {
366 return true;
367 }
368 return false;
369 }
370
IsFindSpan()371 bool SpanJoin::Cursor::IsFindSpan()
372 {
373 for (;;) {
374 if (tableFirst_.isEof_ || tableSecond_.isEof_) {
375 break;
376 }
377 queryNext_ = FindQueryResult();
378 if (CaclOverLap()) {
379 break;
380 }
381 queryNext_->Next();
382 }
383 return true;
384 }
385
FindQueryResult()386 SpanJoin::CaclSpan* SpanJoin::Cursor::FindQueryResult()
387 {
388 if (!spanTable_->isSamepartitioning_) {
389 return nullptr;
390 }
391
392 auto tableFirstResult = std::make_tuple(tableFirst_.GetPatitonForMiss(), tableFirst_.endTs_,
393 tableFirst_.partitionState_ == PartitionState::TS_REAL ? true : false);
394 auto tableSecondResult = std::make_tuple(tableSecond_.GetPatitonForMiss(), tableSecond_.endTs_,
395 tableSecond_.partitionState_ == PartitionState::TS_REAL ? true : false);
396 if (tableFirstResult < tableSecondResult) {
397 return &tableFirst_;
398 }
399 return &tableSecond_;
400 }
401
Column(int32_t column) const402 int32_t SpanJoin::Cursor::Column(int32_t column) const
403 {
404 switch (column) {
405 case TS: {
406 sqlite3_result_int64(context_, static_cast<sqlite3_int64>(std::max(tableFirst_.ts_, tableSecond_.ts_)));
407 break;
408 }
409 case DUR: {
410 sqlite3_result_int64(context_,
411 static_cast<sqlite3_int64>(std::min(tableFirst_.endTs_, tableSecond_.endTs_) -
412 std::max(tableFirst_.ts_, tableSecond_.ts_)));
413 break;
414 }
415 case PARTITION: {
416 auto partResult = tableFirst_.partitionState_ == PartitionState::TS_REAL ? tableFirst_.partition_
417 : tableSecond_.partition_;
418 sqlite3_result_int64(context_, static_cast<sqlite3_int64>(partResult));
419 break;
420 }
421 default: {
422 const auto ColumnInfo = spanTable_->mTableColumnInfo_[column];
423 if (ColumnInfo.tableDesc == tableFirst_.desc_) {
424 tableFirst_.setResult(context_, ColumnInfo.colIdx);
425 } else {
426 tableSecond_.setResult(context_, ColumnInfo.colIdx);
427 }
428 }
429 }
430 return SQLITE_OK;
431 }
432
433 } // namespace TraceStreamer
434 } // namespace SysTuning
435