• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "span_join.h"
17 #include <vector>
18 #include "string_help.h"
19 
20 namespace SysTuning {
21 namespace TraceStreamer {
22 
23 const std::string TS_COLUMN_NAME = "ts";
24 const std::string DUR_COLUMN_NAME = "dur";
25 const uint32_t RESULT = 2;
26 constexpr int32_t MINSIZE = 5;
27 constexpr int32_t MAXSIZE = 1024;
28 constexpr int32_t NEXT_NUMBER = 1;
29 constexpr int32_t TSANDDUR_COLUMN = 2;
30 constexpr int32_t PARTITIONED_COUNT = 3;
31 
32 enum class Index : int32_t { TS, DUR, PARTITION };
33 
SpanJoin(const TraceDataCache * dataCache)34 SpanJoin::SpanJoin(const TraceDataCache *dataCache) : TableBase(dataCache)
35 {
36     tableColumn_ = {};
37     tablePriKey_ = {};
38     tableFirstDesc_ = {};
39     tableSecondDesc_ = {};
40 }
41 
Init(int32_t argc,const char * const * argv)42 void SpanJoin::Init(int32_t argc, const char *const *argv)
43 {
44     if (argc < MINSIZE) {
45         return;
46     }
47     // Parse the fields of the two tables separately
48     TableParse tableFirstParse;
49     Parse(std::string(reinterpret_cast<const char *>(argv[3])), tableFirstParse);
50     TableParse tableSecondParse;
51     Parse(std::string(reinterpret_cast<const char *>(argv[4])), tableSecondParse);
52 
53     // you must ensure that the two partitions exist and are the same when using
54     if (tableFirstDesc_.partition != tableSecondDesc_.partition) {
55         return;
56     }
57     isSamepartitioning_ = true;
58     GetTableField(tableFirstParse, tableFirstDesc_);
59     GetTableField(tableSecondParse, tableSecondDesc_);
60     tableColumn_.emplace_back(TS_COLUMN_NAME, "INTEGER");
61     tableColumn_.emplace_back(DUR_COLUMN_NAME, "INTEGER");
62     tableColumn_.emplace_back(tableFirstDesc_.partition, "INTEGER");
63     CreateCols(tableFirstDesc_, tableColumn_);
64     CreateCols(tableSecondDesc_, tableColumn_);
65     std::vector<std::string> primaryKeys = {"ts"};
66     primaryKeys.push_back(tableFirstDesc_.partition);
67     tablePriKey_ = primaryKeys;
68     return;
69 }
70 
CreateCols(TableDesc & tableDesc,std::vector<TableBase::ColumnInfo> & cols)71 void SpanJoin::CreateCols(TableDesc &tableDesc, std::vector<TableBase::ColumnInfo> &cols)
72 {
73     for (int32_t i = 0; i < tableDesc.cols.size(); i++) {
74         auto &n = tableDesc.cols.at(i).name_;
75         if (IsTsOrDurCol(n)) {
76             continue;
77         }
78         auto columnInfo = &mTableColumnInfo_[cols.size()];
79         columnInfo->tableDesc = &tableDesc;
80         columnInfo->colIdx = i;
81         if (!DeduplicationForColumn(n, cols)) {
82             continue;
83         }
84         cols.emplace_back(n, tableDesc.cols.at(i).type_);
85     }
86 }
87 
DeduplicationForColumn(const std::string & name,std::vector<ColumnInfo> & cols)88 bool SpanJoin::DeduplicationForColumn(const std::string &name, std::vector<ColumnInfo> &cols)
89 {
90     for (size_t i = 0; i < cols.size(); i++) {
91         if (name == cols.at(i).name_) {
92             return false;
93         }
94     }
95     return true;
96 }
97 
Parse(const std::string & tablePartition,TableParse & tableParse)98 void SpanJoin::Parse(const std::string &tablePartition, TableParse &tableParse)
99 {
100     std::vector<std::string> result = base::SplitStringToVec(tablePartition, " ");
101     if (result.size() < PARTITIONED_COUNT) {
102         TS_LOGW("span_join sql is invalid!");
103     }
104     tableParse.name = result.at(0);
105     if (0 != strcasecmp(result.at(1).c_str(), "PARTITIONED")) {
106         TS_LOGW("sql has not PARTITIONED");
107         return;
108     }
109     tableParse.partitionCol = result.at(RESULT);
110     return;
111 }
112 
IsTsOrDurCol(const std::string & name)113 bool SpanJoin::IsTsOrDurCol(const std::string &name)
114 {
115     if (name == TS_COLUMN_NAME || name == DUR_COLUMN_NAME) {
116         return true;
117     }
118     return false;
119 }
120 
GetTableField(const TableParse & tableParse,TableDesc & tableDesc)121 void SpanJoin::GetTableField(const TableParse &tableParse, TableDesc &tableDesc)
122 {
123     std::vector<TableBase::ColumnInfo> cols;
124     GetColumns(dataCache_, tableParse.name, cols);
125     int32_t tsDurCount = 0;
126     for (int32_t i = 0; i < cols.size(); i++) {
127         auto col = cols.at(i);
128         if (IsTsOrDurCol(col.name_)) {
129             tsDurCount++;
130         }
131         if (col.name_ == TS_COLUMN_NAME) {
132             tableDesc.tsIdx = i;
133         } else if (col.name_ == DUR_COLUMN_NAME) {
134             tableDesc.durIdx = i;
135         } else if (col.name_ == tableParse.partitionCol) {
136             tableDesc.partitionIdx = i;
137         }
138     }
139     if (tsDurCount != TSANDDUR_COLUMN) {
140         return;
141     }
142     tableDesc.name = tableParse.name;
143     tableDesc.partition = tableParse.partitionCol;
144     tableDesc.cols = std::move(cols);
145     return;
146 }
147 
GetColumns(const TraceDataCache * dataCache,const std::string & tableName,std::vector<TableBase::ColumnInfo> & columns)148 void SpanJoin::GetColumns(const TraceDataCache *dataCache,
149                           const std::string &tableName,
150                           std::vector<TableBase::ColumnInfo> &columns)
151 {
152     char sql[MAXSIZE];
153     std::string querySql = "SELECT name, type from PRAGMA_table_info(\"%s\")";
154     int32_t n = snprintf_s(sql, sizeof(sql), sizeof(sql), querySql.c_str(), tableName.c_str());
155     if (n < 0 || n >= sizeof(sql)) {
156         TS_LOGE(" Failed to format SQL string ");
157     }
158     sqlite3_stmt *stmt = nullptr;
159     int32_t ret = sqlite3_prepare_v2(dataCache->db_, sql, n, &stmt, nullptr);
160     while (!ret) {
161         int32_t err = sqlite3_step(stmt);
162         if (err == SQLITE_ROW) {
163             columns.emplace_back((reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0))),
164                                  reinterpret_cast<const char *>(sqlite3_column_text(stmt, 1)));
165             continue;
166         }
167         if (err == SQLITE_DONE) {
168             break;
169         }
170         ret = err;
171     }
172     return;
173 }
174 
CaclSpan(TableBase * tableBase,const TableDesc * tableDesc,sqlite3 * db)175 SpanJoin::CaclSpan::CaclSpan(TableBase *tableBase, const TableDesc *tableDesc, sqlite3 *db)
176     : desc_(tableDesc), db_(db), table_(reinterpret_cast<SpanJoin *>(tableBase))
177 {
178 }
179 
180 SpanJoin::CaclSpan::~CaclSpan() = default;
181 
InitQuerySql(sqlite3_value ** argv)182 int32_t SpanJoin::CaclSpan::InitQuerySql(sqlite3_value **argv)
183 {
184     sqlQuery_ = GetSqlQuery();
185     bool status = IsQueryNext();
186     if (!status) {
187         return SQLITE_ERROR;
188     }
189     return SQLITE_OK;
190 }
191 
GetSqlQuery()192 std::string SpanJoin::CaclSpan::GetSqlQuery()
193 {
194     std::vector<std::string> columnNames;
195     for (int32_t i = 0; i < desc_->cols.size(); i++) {
196         columnNames.push_back(desc_->cols.at(i).name_);
197     }
198     auto str = GetMergeColumns(columnNames);
199     std::string sql = "SELECT " + str + " FROM " + desc_->name + " ORDER BY " + desc_->partition + ", " + "ts;";
200     return sql;
201 }
202 
setResult(sqlite3_context * context,size_t index) const203 void SpanJoin::CaclSpan::setResult(sqlite3_context *context, size_t index) const
204 {
205     if (partitionState_ != PartitionState::TS_REAL) {
206         sqlite3_result_null(context);
207         return;
208     }
209     int32_t sqliteType = sqlite3_column_type(stmt_, index);
210     if (sqliteType == SQLITE_TEXT) {
211         sqlite3_result_text(context, reinterpret_cast<const char *>(sqlite3_column_int64(stmt_, index)), -1,
212                             reinterpret_cast<sqlite3_destructor_type>(-1));
213     } else if (sqliteType == SQLITE_INTEGER) {
214         sqlite3_result_int64(context, sqlite3_column_int64(stmt_, index));
215     } else if (sqliteType == SQLITE_FLOAT) {
216         sqlite3_result_double(context, sqlite3_column_double(stmt_, index));
217     }
218 }
219 
GetCursorNext()220 bool SpanJoin::CaclSpan::GetCursorNext()
221 {
222     int32_t res;
223     int32_t rowType;
224     do {
225         res = sqlite3_step(stmt_);
226         rowType = sqlite3_column_type(stmt_, desc_->partitionIdx);
227     } while (res == SQLITE_ROW && rowType == SQLITE_NULL);
228     if (res != SQLITE_ROW) {
229         isEof_ = true;
230     } else {
231         isEof_ = false;
232     }
233 
234     return res == SQLITE_ROW || res == SQLITE_DONE;
235 }
236 
Next()237 void SpanJoin::CaclSpan::Next()
238 {
239     GetNextState();
240     SearchNextslice();
241 }
242 
IsQueryNext()243 bool SpanJoin::CaclSpan::IsQueryNext()
244 {
245     int32_t res = sqlite3_prepare_v2(db_, sqlQuery_.c_str(), static_cast<int32_t>(sqlQuery_.size()), &stmt_, nullptr);
246     isEof_ = res != SQLITE_OK;
247     if (res != SQLITE_OK) {
248         return true;
249     }
250     auto status = GetCursorNext();
251     if (!status) {
252         return false;
253     }
254     missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx));
255     status = SearchNextslice();
256     return status;
257 }
258 
SearchNextslice()259 bool SpanJoin::CaclSpan::SearchNextslice()
260 {
261     while (partitionState_ != PartitionState::TS_REAL) {
262         bool status = GetNextState();
263         if (!status) {
264             return false;
265         }
266     }
267     return true;
268 }
269 
GetNextState()270 bool SpanJoin::CaclSpan::GetNextState()
271 {
272     switch (partitionState_) {
273         case PartitionState::TS_REAL: {
274             GetCursorNext();
275             partitionState_ = PartitionState::TS_PARTITION;
276             ts_ = endTs_;
277             if (isEof_ || partition_ != sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx))) {
278                 endTs_ = std::numeric_limits<int64_t>::max();
279             } else {
280                 endTs_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx));
281             }
282             return true;
283         }
284         case PartitionState::TS_PARTITION: {
285             if (endTs_ == std::numeric_limits<int64_t>::max()) {
286                 partitionState_ = PartitionState::TS_MISSING;
287                 if (isEof_) {
288                     missPartitionEnd_ = std::numeric_limits<int32_t>::max();
289                 } else {
290                     missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx));
291                 }
292                 missPartitionStart_ = partition_ + NEXT_NUMBER;
293                 ts_ = 0;
294             } else {
295                 partitionState_ = PartitionState::TS_REAL;
296                 ts_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx));
297                 endTs_ = ts_ + sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->durIdx));
298             }
299             return true;
300         }
301         case PartitionState::TS_MISSING: {
302             if (missPartitionEnd_ == std::numeric_limits<int32_t>::max()) {
303                 partitionState_ = PartitionState::TS_EOF;
304             } else {
305                 partitionState_ = PartitionState::TS_PARTITION;
306                 ts_ = 0;
307                 endTs_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx));
308                 partition_ = missPartitionEnd_;
309             }
310             return true;
311         }
312         default:
313             return false;
314     }
315 }
316 
GetMergeColumns(std::vector<std::string> & columns)317 std::string SpanJoin::CaclSpan::GetMergeColumns(std::vector<std::string> &columns)
318 {
319     std::string str;
320     int32_t size = columns.size();
321     for (int32_t i = 0; i < size; i++) {
322         if (i == size - 1) {
323             str += columns.at(i);
324         } else {
325             str += columns.at(i) + ", ";
326         }
327     }
328     return str;
329 }
330 
GetPatitonForMiss()331 int64_t SpanJoin::CaclSpan::GetPatitonForMiss()
332 {
333     return partitionState_ == PartitionState::TS_MISSING ? missPartitionEnd_ - NEXT_NUMBER : partition_;
334 }
335 
CreateCursor()336 std::unique_ptr<TableBase::Cursor> SpanJoin::CreateCursor()
337 {
338     return std::make_unique<Cursor>(dataCache_, this);
339 }
340 
Cursor(const TraceDataCache * dataCache,SpanJoin * table)341 SpanJoin::Cursor::Cursor(const TraceDataCache *dataCache, SpanJoin *table)
342     : TableBase::Cursor(dataCache, table, 0),
343       tableFirst_(table, &table->tableFirstDesc_, dataCache_->db_),
344       tableSecond_(table, &table->tableSecondDesc_, dataCache_->db_),
345       spanTable_(table)
346 {
347 }
348 
Filter(const FilterConstraints & fc,sqlite3_value ** argv)349 int32_t SpanJoin::Cursor::Filter(const FilterConstraints &fc, sqlite3_value **argv)
350 {
351     tableFirst_.InitQuerySql(argv);
352     tableSecond_.InitQuerySql(argv);
353     auto status = IsFindSpan();
354     if (!status) {
355         return SQLITE_ERROR;
356     }
357     return SQLITE_OK;
358 }
359 
CaclOverLap()360 bool SpanJoin::Cursor::CaclOverLap()
361 {
362     if (tableFirst_.ts_ >= tableSecond_.ts_) {
363         if ((tableFirst_.partitionState_ == PartitionState::TS_REAL &&
364              tableSecond_.partitionState_ == PartitionState::TS_REAL) ||
365             tableFirst_.ts_ < tableSecond_.endTs_) {
366             return true;
367         }
368     } else if (tableFirst_.ts_ <= tableSecond_.ts_ && tableSecond_.ts_ < tableFirst_.endTs_) {
369         return true;
370     }
371     return false;
372 }
373 
IsFindSpan()374 bool SpanJoin::Cursor::IsFindSpan()
375 {
376     for (;;) {
377         if (tableFirst_.isEof_ || tableSecond_.isEof_) {
378             break;
379         }
380         queryNext_ = FindQueryResult();
381         if (CaclOverLap()) {
382             break;
383         }
384         queryNext_->Next();
385     }
386     return true;
387 }
388 
FindQueryResult()389 SpanJoin::CaclSpan *SpanJoin::Cursor::FindQueryResult()
390 {
391     if (!spanTable_->isSamepartitioning_) {
392         return nullptr;
393     }
394 
395     auto tableFirstResult = std::make_tuple(tableFirst_.GetPatitonForMiss(), tableFirst_.endTs_,
396                                             tableFirst_.partitionState_ == PartitionState::TS_REAL ? true : false);
397     auto tableSecondResult = std::make_tuple(tableSecond_.GetPatitonForMiss(), tableSecond_.endTs_,
398                                              tableSecond_.partitionState_ == PartitionState::TS_REAL ? true : false);
399     if (tableFirstResult < tableSecondResult) {
400         return &tableFirst_;
401     }
402     return &tableSecond_;
403 }
404 
Column(int32_t column) const405 int32_t SpanJoin::Cursor::Column(int32_t column) const
406 {
407     switch (static_cast<Index>(column)) {
408         case Index::TS: {
409             sqlite3_result_int64(context_, static_cast<sqlite3_int64>(std::max(tableFirst_.ts_, tableSecond_.ts_)));
410             break;
411         }
412         case Index::DUR: {
413             sqlite3_result_int64(context_,
414                                  static_cast<sqlite3_int64>(std::min(tableFirst_.endTs_, tableSecond_.endTs_) -
415                                                             std::max(tableFirst_.ts_, tableSecond_.ts_)));
416             break;
417         }
418         case Index::PARTITION: {
419             auto partResult = tableFirst_.partitionState_ == PartitionState::TS_REAL ? tableFirst_.partition_
420                                                                                      : tableSecond_.partition_;
421             sqlite3_result_int64(context_, static_cast<sqlite3_int64>(partResult));
422             break;
423         }
424         default: {
425             const auto ColumnInfo = spanTable_->mTableColumnInfo_[column];
426             if (ColumnInfo.tableDesc == tableFirst_.desc_) {
427                 tableFirst_.setResult(context_, ColumnInfo.colIdx);
428             } else {
429                 tableSecond_.setResult(context_, ColumnInfo.colIdx);
430             }
431         }
432     }
433     return SQLITE_OK;
434 }
435 
436 } // namespace TraceStreamer
437 } // namespace SysTuning
438