1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "span_join.h"
17 #include <vector>
18 #include "string_help.h"
19
20 namespace SysTuning {
21 namespace TraceStreamer {
22
23 const std::string TS_COLUMN_NAME = "ts";
24 const std::string DUR_COLUMN_NAME = "dur";
25 const uint32_t RESULT = 2;
26 constexpr int32_t MINSIZE = 5;
27 constexpr int32_t MAXSIZE = 1024;
28 constexpr int32_t NEXT_NUMBER = 1;
29 constexpr int32_t TSANDDUR_COLUMN = 2;
30 constexpr int32_t PARTITIONED_COUNT = 3;
31
32 enum class Index : int32_t { TS, DUR, PARTITION };
33
SpanJoin(const TraceDataCache * dataCache)34 SpanJoin::SpanJoin(const TraceDataCache *dataCache) : TableBase(dataCache)
35 {
36 tableColumn_ = {};
37 tablePriKey_ = {};
38 tableFirstDesc_ = {};
39 tableSecondDesc_ = {};
40 }
41
Init(int32_t argc,const char * const * argv)42 void SpanJoin::Init(int32_t argc, const char *const *argv)
43 {
44 if (argc < MINSIZE) {
45 return;
46 }
47 // Parse the fields of the two tables separately
48 TableParse tableFirstParse;
49 Parse(std::string(reinterpret_cast<const char *>(argv[3])), tableFirstParse);
50 TableParse tableSecondParse;
51 Parse(std::string(reinterpret_cast<const char *>(argv[4])), tableSecondParse);
52
53 // you must ensure that the two partitions exist and are the same when using
54 if (tableFirstDesc_.partition != tableSecondDesc_.partition) {
55 return;
56 }
57 isSamepartitioning_ = true;
58 GetTableField(tableFirstParse, tableFirstDesc_);
59 GetTableField(tableSecondParse, tableSecondDesc_);
60 tableColumn_.emplace_back(TS_COLUMN_NAME, "INTEGER");
61 tableColumn_.emplace_back(DUR_COLUMN_NAME, "INTEGER");
62 tableColumn_.emplace_back(tableFirstDesc_.partition, "INTEGER");
63 CreateCols(tableFirstDesc_, tableColumn_);
64 CreateCols(tableSecondDesc_, tableColumn_);
65 std::vector<std::string> primaryKeys = {"ts"};
66 primaryKeys.push_back(tableFirstDesc_.partition);
67 tablePriKey_ = primaryKeys;
68 return;
69 }
70
CreateCols(TableDesc & tableDesc,std::vector<TableBase::ColumnInfo> & cols)71 void SpanJoin::CreateCols(TableDesc &tableDesc, std::vector<TableBase::ColumnInfo> &cols)
72 {
73 for (int32_t i = 0; i < tableDesc.cols.size(); i++) {
74 auto &n = tableDesc.cols.at(i).name_;
75 if (IsTsOrDurCol(n)) {
76 continue;
77 }
78 auto columnInfo = &mTableColumnInfo_[cols.size()];
79 columnInfo->tableDesc = &tableDesc;
80 columnInfo->colIdx = i;
81 if (!DeduplicationForColumn(n, cols)) {
82 continue;
83 }
84 cols.emplace_back(n, tableDesc.cols.at(i).type_);
85 }
86 }
87
DeduplicationForColumn(const std::string & name,std::vector<ColumnInfo> & cols)88 bool SpanJoin::DeduplicationForColumn(const std::string &name, std::vector<ColumnInfo> &cols)
89 {
90 for (size_t i = 0; i < cols.size(); i++) {
91 if (name == cols.at(i).name_) {
92 return false;
93 }
94 }
95 return true;
96 }
97
Parse(const std::string & tablePartition,TableParse & tableParse)98 void SpanJoin::Parse(const std::string &tablePartition, TableParse &tableParse)
99 {
100 std::vector<std::string> result = base::SplitStringToVec(tablePartition, " ");
101 if (result.size() < PARTITIONED_COUNT) {
102 TS_LOGW("span_join sql is invalid!");
103 }
104 tableParse.name = result.at(0);
105 if (0 != strcasecmp(result.at(1).c_str(), "PARTITIONED")) {
106 TS_LOGW("sql has not PARTITIONED");
107 return;
108 }
109 tableParse.partitionCol = result.at(RESULT);
110 return;
111 }
112
IsTsOrDurCol(const std::string & name)113 bool SpanJoin::IsTsOrDurCol(const std::string &name)
114 {
115 if (name == TS_COLUMN_NAME || name == DUR_COLUMN_NAME) {
116 return true;
117 }
118 return false;
119 }
120
GetTableField(const TableParse & tableParse,TableDesc & tableDesc)121 void SpanJoin::GetTableField(const TableParse &tableParse, TableDesc &tableDesc)
122 {
123 std::vector<TableBase::ColumnInfo> cols;
124 GetColumns(dataCache_, tableParse.name, cols);
125 int32_t tsDurCount = 0;
126 for (int32_t i = 0; i < cols.size(); i++) {
127 auto col = cols.at(i);
128 if (IsTsOrDurCol(col.name_)) {
129 tsDurCount++;
130 }
131 if (col.name_ == TS_COLUMN_NAME) {
132 tableDesc.tsIdx = i;
133 } else if (col.name_ == DUR_COLUMN_NAME) {
134 tableDesc.durIdx = i;
135 } else if (col.name_ == tableParse.partitionCol) {
136 tableDesc.partitionIdx = i;
137 }
138 }
139 if (tsDurCount != TSANDDUR_COLUMN) {
140 return;
141 }
142 tableDesc.name = tableParse.name;
143 tableDesc.partition = tableParse.partitionCol;
144 tableDesc.cols = std::move(cols);
145 return;
146 }
147
GetColumns(const TraceDataCache * dataCache,const std::string & tableName,std::vector<TableBase::ColumnInfo> & columns)148 void SpanJoin::GetColumns(const TraceDataCache *dataCache,
149 const std::string &tableName,
150 std::vector<TableBase::ColumnInfo> &columns)
151 {
152 char sql[MAXSIZE];
153 std::string querySql = "SELECT name, type from PRAGMA_table_info(\"%s\")";
154 int32_t n = snprintf_s(sql, sizeof(sql), sizeof(sql), querySql.c_str(), tableName.c_str());
155 if (n < 0 || n >= sizeof(sql)) {
156 TS_LOGE(" Failed to format SQL string ");
157 }
158 sqlite3_stmt *stmt = nullptr;
159 int32_t ret = sqlite3_prepare_v2(dataCache->db_, sql, n, &stmt, nullptr);
160 while (!ret) {
161 int32_t err = sqlite3_step(stmt);
162 if (err == SQLITE_ROW) {
163 columns.emplace_back((reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0))),
164 reinterpret_cast<const char *>(sqlite3_column_text(stmt, 1)));
165 continue;
166 }
167 if (err == SQLITE_DONE) {
168 break;
169 }
170 ret = err;
171 }
172 return;
173 }
174
CaclSpan(TableBase * tableBase,const TableDesc * tableDesc,sqlite3 * db)175 SpanJoin::CaclSpan::CaclSpan(TableBase *tableBase, const TableDesc *tableDesc, sqlite3 *db)
176 : desc_(tableDesc), db_(db), table_(reinterpret_cast<SpanJoin *>(tableBase))
177 {
178 }
179
180 SpanJoin::CaclSpan::~CaclSpan() = default;
181
InitQuerySql(sqlite3_value ** argv)182 int32_t SpanJoin::CaclSpan::InitQuerySql(sqlite3_value **argv)
183 {
184 sqlQuery_ = GetSqlQuery();
185 bool status = IsQueryNext();
186 if (!status) {
187 return SQLITE_ERROR;
188 }
189 return SQLITE_OK;
190 }
191
GetSqlQuery()192 std::string SpanJoin::CaclSpan::GetSqlQuery()
193 {
194 std::vector<std::string> columnNames;
195 for (int32_t i = 0; i < desc_->cols.size(); i++) {
196 columnNames.push_back(desc_->cols.at(i).name_);
197 }
198 auto str = GetMergeColumns(columnNames);
199 std::string sql = "SELECT " + str + " FROM " + desc_->name + " ORDER BY " + desc_->partition + ", " + "ts;";
200 return sql;
201 }
202
setResult(sqlite3_context * context,size_t index) const203 void SpanJoin::CaclSpan::setResult(sqlite3_context *context, size_t index) const
204 {
205 if (partitionState_ != PartitionState::TS_REAL) {
206 sqlite3_result_null(context);
207 return;
208 }
209 int32_t sqliteType = sqlite3_column_type(stmt_, index);
210 if (sqliteType == SQLITE_TEXT) {
211 sqlite3_result_text(context, reinterpret_cast<const char *>(sqlite3_column_int64(stmt_, index)), -1,
212 reinterpret_cast<sqlite3_destructor_type>(-1));
213 } else if (sqliteType == SQLITE_INTEGER) {
214 sqlite3_result_int64(context, sqlite3_column_int64(stmt_, index));
215 } else if (sqliteType == SQLITE_FLOAT) {
216 sqlite3_result_double(context, sqlite3_column_double(stmt_, index));
217 }
218 }
219
GetCursorNext()220 bool SpanJoin::CaclSpan::GetCursorNext()
221 {
222 int32_t res;
223 int32_t rowType;
224 do {
225 res = sqlite3_step(stmt_);
226 rowType = sqlite3_column_type(stmt_, desc_->partitionIdx);
227 } while (res == SQLITE_ROW && rowType == SQLITE_NULL);
228 if (res != SQLITE_ROW) {
229 isEof_ = true;
230 } else {
231 isEof_ = false;
232 }
233
234 return res == SQLITE_ROW || res == SQLITE_DONE;
235 }
236
Next()237 void SpanJoin::CaclSpan::Next()
238 {
239 GetNextState();
240 SearchNextslice();
241 }
242
IsQueryNext()243 bool SpanJoin::CaclSpan::IsQueryNext()
244 {
245 int32_t res = sqlite3_prepare_v2(db_, sqlQuery_.c_str(), static_cast<int32_t>(sqlQuery_.size()), &stmt_, nullptr);
246 isEof_ = res != SQLITE_OK;
247 if (res != SQLITE_OK) {
248 return true;
249 }
250 auto status = GetCursorNext();
251 if (!status) {
252 return false;
253 }
254 missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx));
255 status = SearchNextslice();
256 return status;
257 }
258
SearchNextslice()259 bool SpanJoin::CaclSpan::SearchNextslice()
260 {
261 while (partitionState_ != PartitionState::TS_REAL) {
262 bool status = GetNextState();
263 if (!status) {
264 return false;
265 }
266 }
267 return true;
268 }
269
GetNextState()270 bool SpanJoin::CaclSpan::GetNextState()
271 {
272 switch (partitionState_) {
273 case PartitionState::TS_REAL: {
274 GetCursorNext();
275 partitionState_ = PartitionState::TS_PARTITION;
276 ts_ = endTs_;
277 if (isEof_ || partition_ != sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx))) {
278 endTs_ = std::numeric_limits<int64_t>::max();
279 } else {
280 endTs_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx));
281 }
282 return true;
283 }
284 case PartitionState::TS_PARTITION: {
285 if (endTs_ == std::numeric_limits<int64_t>::max()) {
286 partitionState_ = PartitionState::TS_MISSING;
287 if (isEof_) {
288 missPartitionEnd_ = std::numeric_limits<int32_t>::max();
289 } else {
290 missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx));
291 }
292 missPartitionStart_ = partition_ + NEXT_NUMBER;
293 ts_ = 0;
294 } else {
295 partitionState_ = PartitionState::TS_REAL;
296 ts_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx));
297 endTs_ = ts_ + sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->durIdx));
298 }
299 return true;
300 }
301 case PartitionState::TS_MISSING: {
302 if (missPartitionEnd_ == std::numeric_limits<int32_t>::max()) {
303 partitionState_ = PartitionState::TS_EOF;
304 } else {
305 partitionState_ = PartitionState::TS_PARTITION;
306 ts_ = 0;
307 endTs_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx));
308 partition_ = missPartitionEnd_;
309 }
310 return true;
311 }
312 default:
313 return false;
314 }
315 }
316
GetMergeColumns(std::vector<std::string> & columns)317 std::string SpanJoin::CaclSpan::GetMergeColumns(std::vector<std::string> &columns)
318 {
319 std::string str;
320 int32_t size = columns.size();
321 for (int32_t i = 0; i < size; i++) {
322 if (i == size - 1) {
323 str += columns.at(i);
324 } else {
325 str += columns.at(i) + ", ";
326 }
327 }
328 return str;
329 }
330
GetPatitonForMiss()331 int64_t SpanJoin::CaclSpan::GetPatitonForMiss()
332 {
333 return partitionState_ == PartitionState::TS_MISSING ? missPartitionEnd_ - NEXT_NUMBER : partition_;
334 }
335
CreateCursor()336 std::unique_ptr<TableBase::Cursor> SpanJoin::CreateCursor()
337 {
338 return std::make_unique<Cursor>(dataCache_, this);
339 }
340
Cursor(const TraceDataCache * dataCache,SpanJoin * table)341 SpanJoin::Cursor::Cursor(const TraceDataCache *dataCache, SpanJoin *table)
342 : TableBase::Cursor(dataCache, table, 0),
343 tableFirst_(table, &table->tableFirstDesc_, dataCache_->db_),
344 tableSecond_(table, &table->tableSecondDesc_, dataCache_->db_),
345 spanTable_(table)
346 {
347 }
348
Filter(const FilterConstraints & fc,sqlite3_value ** argv)349 int32_t SpanJoin::Cursor::Filter(const FilterConstraints &fc, sqlite3_value **argv)
350 {
351 tableFirst_.InitQuerySql(argv);
352 tableSecond_.InitQuerySql(argv);
353 auto status = IsFindSpan();
354 if (!status) {
355 return SQLITE_ERROR;
356 }
357 return SQLITE_OK;
358 }
359
CaclOverLap()360 bool SpanJoin::Cursor::CaclOverLap()
361 {
362 if (tableFirst_.ts_ >= tableSecond_.ts_) {
363 if ((tableFirst_.partitionState_ == PartitionState::TS_REAL &&
364 tableSecond_.partitionState_ == PartitionState::TS_REAL) ||
365 tableFirst_.ts_ < tableSecond_.endTs_) {
366 return true;
367 }
368 } else if (tableFirst_.ts_ <= tableSecond_.ts_ && tableSecond_.ts_ < tableFirst_.endTs_) {
369 return true;
370 }
371 return false;
372 }
373
IsFindSpan()374 bool SpanJoin::Cursor::IsFindSpan()
375 {
376 for (;;) {
377 if (tableFirst_.isEof_ || tableSecond_.isEof_) {
378 break;
379 }
380 queryNext_ = FindQueryResult();
381 if (CaclOverLap()) {
382 break;
383 }
384 queryNext_->Next();
385 }
386 return true;
387 }
388
FindQueryResult()389 SpanJoin::CaclSpan *SpanJoin::Cursor::FindQueryResult()
390 {
391 if (!spanTable_->isSamepartitioning_) {
392 return nullptr;
393 }
394
395 auto tableFirstResult = std::make_tuple(tableFirst_.GetPatitonForMiss(), tableFirst_.endTs_,
396 tableFirst_.partitionState_ == PartitionState::TS_REAL ? true : false);
397 auto tableSecondResult = std::make_tuple(tableSecond_.GetPatitonForMiss(), tableSecond_.endTs_,
398 tableSecond_.partitionState_ == PartitionState::TS_REAL ? true : false);
399 if (tableFirstResult < tableSecondResult) {
400 return &tableFirst_;
401 }
402 return &tableSecond_;
403 }
404
Column(int32_t column) const405 int32_t SpanJoin::Cursor::Column(int32_t column) const
406 {
407 switch (static_cast<Index>(column)) {
408 case Index::TS: {
409 sqlite3_result_int64(context_, static_cast<sqlite3_int64>(std::max(tableFirst_.ts_, tableSecond_.ts_)));
410 break;
411 }
412 case Index::DUR: {
413 sqlite3_result_int64(context_,
414 static_cast<sqlite3_int64>(std::min(tableFirst_.endTs_, tableSecond_.endTs_) -
415 std::max(tableFirst_.ts_, tableSecond_.ts_)));
416 break;
417 }
418 case Index::PARTITION: {
419 auto partResult = tableFirst_.partitionState_ == PartitionState::TS_REAL ? tableFirst_.partition_
420 : tableSecond_.partition_;
421 sqlite3_result_int64(context_, static_cast<sqlite3_int64>(partResult));
422 break;
423 }
424 default: {
425 const auto ColumnInfo = spanTable_->mTableColumnInfo_[column];
426 if (ColumnInfo.tableDesc == tableFirst_.desc_) {
427 tableFirst_.setResult(context_, ColumnInfo.colIdx);
428 } else {
429 tableSecond_.setResult(context_, ColumnInfo.colIdx);
430 }
431 }
432 }
433 return SQLITE_OK;
434 }
435
436 } // namespace TraceStreamer
437 } // namespace SysTuning
438