• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "span_join.h"
17 #include <vector>
18 #include "string_help.h"
19 
20 namespace SysTuning {
21 namespace TraceStreamer {
22 
23 const std::string TS_COLUMN_NAME = "ts";
24 const std::string DUR_COLUMN_NAME = "dur";
25 constexpr int32_t MINSIZE = 5;
26 constexpr int32_t MAXSIZE = 1024;
27 constexpr int32_t NEXT_NUMBER = 1;
28 constexpr int32_t TSANDDUR_COLUMN = 2;
29 constexpr int32_t PARTITIONED_COUNT = 3;
30 
31 enum Index { TS, DUR, PARTITION };
32 
SpanJoin(const TraceDataCache * dataCache)33 SpanJoin::SpanJoin(const TraceDataCache* dataCache) : TableBase(dataCache)
34 {
35     tableColumn_ = {};
36     tablePriKey_ = {};
37     tableFirstDesc_ = {};
38     tableSecondDesc_ = {};
39 }
40 
Init(int32_t argc,const char * const * argv)41 void SpanJoin::Init(int32_t argc, const char* const* argv)
42 {
43     if (argc < MINSIZE) {
44         return;
45     }
46     // Parse the fields of the two tables separately
47     TableParse tableFirstParse;
48     Parse(std::string(reinterpret_cast<const char*>(argv[3])), tableFirstParse);
49     TableParse tableSecondParse;
50     Parse(std::string(reinterpret_cast<const char*>(argv[4])), tableSecondParse);
51 
52     // you must ensure that the two partitions exist and are the same when using
53     if (tableFirstDesc_.partition != tableSecondDesc_.partition) {
54         return;
55     }
56     isSamepartitioning_ = true;
57     GetTableField(tableFirstParse, tableFirstDesc_);
58     GetTableField(tableSecondParse, tableSecondDesc_);
59     tableColumn_.emplace_back(TS_COLUMN_NAME, "INTEGER");
60     tableColumn_.emplace_back(DUR_COLUMN_NAME, "INTEGER");
61     tableColumn_.emplace_back(tableFirstDesc_.partition, "INTEGER");
62     CreateCols(tableFirstDesc_, tableColumn_);
63     CreateCols(tableSecondDesc_, tableColumn_);
64     std::vector<std::string> primaryKeys = {"ts"};
65     primaryKeys.push_back(tableFirstDesc_.partition);
66     tablePriKey_ = primaryKeys;
67     return;
68 }
69 
CreateCols(TableDesc & tableDesc,std::vector<TableBase::ColumnInfo> & cols)70 void SpanJoin::CreateCols(TableDesc& tableDesc, std::vector<TableBase::ColumnInfo>& cols)
71 {
72     for (int32_t i = 0; i < tableDesc.cols.size(); i++) {
73         auto& n = tableDesc.cols.at(i).name_;
74         if (IsTsOrDurCol(n)) {
75             continue;
76         }
77         auto columnInfo = &mTableColumnInfo_[cols.size()];
78         columnInfo->tableDesc = &tableDesc;
79         columnInfo->colIdx = i;
80         if (!DeduplicationForColumn(n, cols)) {
81             continue;
82         }
83         cols.emplace_back(n, tableDesc.cols.at(i).type_);
84     }
85 }
86 
DeduplicationForColumn(const std::string & name,std::vector<ColumnInfo> & cols)87 bool SpanJoin::DeduplicationForColumn(const std::string& name, std::vector<ColumnInfo>& cols)
88 {
89     for (size_t i = 0; i < cols.size(); i++) {
90         if (name == cols.at(i).name_) {
91             return false;
92         }
93     }
94     return true;
95 }
96 
Parse(const std::string & tablePartition,TableParse & tableParse)97 void SpanJoin::Parse(const std::string& tablePartition, TableParse& tableParse)
98 {
99     std::vector<std::string> result = SplitStringToVec(tablePartition, " ");
100     if (result.size() < PARTITIONED_COUNT) {
101         TS_LOGW("span_join sql is invalid!");
102     }
103     tableParse.name = result.at(0);
104     if (0 != strcasecmp(result.at(1).c_str(), "PARTITIONED")) {
105         TS_LOGW("sql has not PARTITIONED");
106         return;
107     }
108     tableParse.partitionCol = result.at(2);
109     return;
110 }
111 
IsTsOrDurCol(const std::string & name)112 bool SpanJoin::IsTsOrDurCol(const std::string& name)
113 {
114     if (name == TS_COLUMN_NAME || name == DUR_COLUMN_NAME) {
115         return true;
116     }
117     return false;
118 }
119 
GetTableField(const TableParse & tableParse,TableDesc & tableDesc)120 void SpanJoin::GetTableField(const TableParse& tableParse, TableDesc& tableDesc)
121 {
122     std::vector<TableBase::ColumnInfo> cols;
123     GetColumns(dataCache_, tableParse.name, cols);
124     int32_t tsDurCount = 0;
125     for (int32_t i = 0; i < cols.size(); i++) {
126         auto col = cols.at(i);
127         if (IsTsOrDurCol(col.name_)) {
128             tsDurCount++;
129         }
130         if (col.name_ == TS_COLUMN_NAME) {
131             tableDesc.tsIdx = i;
132         } else if (col.name_ == DUR_COLUMN_NAME) {
133             tableDesc.durIdx = i;
134         } else if (col.name_ == tableParse.partitionCol) {
135             tableDesc.partitionIdx = i;
136         }
137     }
138     if (tsDurCount != TSANDDUR_COLUMN) {
139         return;
140     }
141     tableDesc.name = tableParse.name;
142     tableDesc.partition = tableParse.partitionCol;
143     tableDesc.cols = std::move(cols);
144     return;
145 }
146 
GetColumns(const TraceDataCache * dataCache,const std::string & tableName,std::vector<TableBase::ColumnInfo> & columns)147 void SpanJoin::GetColumns(const TraceDataCache* dataCache,
148                           const std::string& tableName,
149                           std::vector<TableBase::ColumnInfo>& columns)
150 {
151     char sql[MAXSIZE];
152     std::string querySql = "SELECT name, type from PRAGMA_table_info(\"%s\")";
153     int32_t n = snprintf_s(sql, sizeof(sql), 1, querySql.c_str(), tableName.c_str());
154     sqlite3_stmt* stmt = nullptr;
155     int32_t ret = sqlite3_prepare_v2(dataCache->db_, sql, n, &stmt, nullptr);
156     while (!ret) {
157         int32_t err = sqlite3_step(stmt);
158         if (err == SQLITE_ROW) {
159             columns.emplace_back((reinterpret_cast<const char*>(sqlite3_column_text(stmt, 0))),
160                                  reinterpret_cast<const char*>(sqlite3_column_text(stmt, 1)));
161             continue;
162         }
163         if (err == SQLITE_DONE) {
164             break;
165         }
166         ret = err;
167     }
168     return;
169 }
170 
CaclSpan(TableBase * tableBase,const TableDesc * tableDesc,sqlite3 * db)171 SpanJoin::CaclSpan::CaclSpan(TableBase* tableBase, const TableDesc* tableDesc, sqlite3* db)
172     : desc_(tableDesc), db_(db), table_(reinterpret_cast<SpanJoin*>(tableBase))
173 {
174 }
175 
176 SpanJoin::CaclSpan::~CaclSpan() = default;
177 
InitQuerySql(sqlite3_value ** argv)178 int32_t SpanJoin::CaclSpan::InitQuerySql(sqlite3_value** argv)
179 {
180     sqlQuery_ = GetSqlQuery();
181     bool status = IsQueryNext();
182     if (!status) {
183         return SQLITE_ERROR;
184     }
185     return SQLITE_OK;
186 }
187 
GetSqlQuery()188 std::string SpanJoin::CaclSpan::GetSqlQuery()
189 {
190     std::vector<std::string> columnNames;
191     for (int32_t i = 0; i < desc_->cols.size(); i++) {
192         columnNames.push_back(desc_->cols.at(i).name_);
193     }
194     std::string str;
195     str = GetMergeColumns(columnNames);
196     std::string sql = "SELECT " + str + " FROM " + desc_->name + " ORDER BY " + desc_->partition + ", " + "ts;";
197     return sql;
198 }
199 
setResult(sqlite3_context * context,size_t index) const200 void SpanJoin::CaclSpan::setResult(sqlite3_context* context, size_t index) const
201 {
202     if (partitionState_ != PartitionState::TS_REAL) {
203         sqlite3_result_null(context);
204         return;
205     }
206     int32_t sqliteType = sqlite3_column_type(stmt_, index);
207     if (sqliteType == SQLITE_TEXT) {
208         sqlite3_result_text(context, reinterpret_cast<const char*>(sqlite3_column_int64(stmt_, index)), -1,
209                             reinterpret_cast<sqlite3_destructor_type>(-1));
210     } else if (sqliteType == SQLITE_INTEGER) {
211         sqlite3_result_int64(context, sqlite3_column_int64(stmt_, index));
212     } else if (sqliteType == SQLITE_FLOAT) {
213         sqlite3_result_double(context, sqlite3_column_double(stmt_, index));
214     }
215 }
216 
GetCursorNext()217 bool SpanJoin::CaclSpan::GetCursorNext()
218 {
219     int32_t res;
220     int32_t rowType;
221     do {
222         res = sqlite3_step(stmt_);
223         rowType = sqlite3_column_type(stmt_, desc_->partitionIdx);
224     } while (res == SQLITE_ROW && rowType == SQLITE_NULL);
225     if (res != SQLITE_ROW) {
226         isEof_ = true;
227     } else {
228         isEof_ = false;
229     }
230 
231     return res == SQLITE_ROW || res == SQLITE_DONE;
232 }
233 
Next()234 void SpanJoin::CaclSpan::Next()
235 {
236     GetNextState();
237     SearchNextslice();
238 }
239 
IsQueryNext()240 bool SpanJoin::CaclSpan::IsQueryNext()
241 {
242     int32_t res = sqlite3_prepare_v2(db_, sqlQuery_.c_str(), static_cast<int32_t>(sqlQuery_.size()), &stmt_, nullptr);
243     isEof_ = res != SQLITE_OK;
244     if (res != SQLITE_OK) {
245         return true;
246     }
247     auto status = GetCursorNext();
248     if (!status) {
249         return false;
250     }
251     missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx));
252     status = SearchNextslice();
253     return status;
254 }
255 
SearchNextslice()256 bool SpanJoin::CaclSpan::SearchNextslice()
257 {
258     while (partitionState_ != TS_REAL) {
259         bool status = GetNextState();
260         if (!status) {
261             return false;
262         }
263     }
264     return true;
265 }
266 
GetNextState()267 bool SpanJoin::CaclSpan::GetNextState()
268 {
269     switch (partitionState_) {
270         case PartitionState::TS_REAL: {
271             GetCursorNext();
272             partitionState_ = PartitionState::TS_PARTITION;
273             ts_ = endTs_;
274             if (isEof_ || partition_ != sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx))) {
275                 endTs_ = std::numeric_limits<int64_t>::max();
276             } else {
277                 endTs_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx));
278             }
279             return true;
280         }
281         case PartitionState::TS_PARTITION: {
282             if (endTs_ == std::numeric_limits<int64_t>::max()) {
283                 partitionState_ = PartitionState::TS_MISSING;
284                 if (isEof_) {
285                     missPartitionEnd_ = std::numeric_limits<int32_t>::max();
286                 } else {
287                     missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx));
288                 }
289                 missPartitionStart_ = partition_ + NEXT_NUMBER;
290                 ts_ = 0;
291             } else {
292                 partitionState_ = PartitionState::TS_REAL;
293                 ts_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx));
294                 endTs_ = ts_ + sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->durIdx));
295             }
296             return true;
297         }
298         case PartitionState::TS_MISSING: {
299             if (missPartitionEnd_ == std::numeric_limits<int32_t>::max()) {
300                 partitionState_ = PartitionState::TS_EOF;
301             } else {
302                 partitionState_ = PartitionState::TS_PARTITION;
303                 ts_ = 0;
304                 endTs_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx));
305                 partition_ = missPartitionEnd_;
306             }
307             return true;
308         }
309         default:
310             return false;
311     }
312 }
313 
GetMergeColumns(std::vector<std::string> & columns)314 std::string SpanJoin::CaclSpan::GetMergeColumns(std::vector<std::string>& columns)
315 {
316     std::string str;
317     int32_t size = columns.size();
318     for (int32_t i = 0; i < size; i++) {
319         if (i == size - 1) {
320             str += columns.at(i);
321         } else {
322             str += columns.at(i) + ", ";
323         }
324     }
325     return str;
326 }
327 
GetPatitonForMiss()328 int64_t SpanJoin::CaclSpan::GetPatitonForMiss()
329 {
330     return partitionState_ == TS_MISSING ? missPartitionEnd_ - NEXT_NUMBER : partition_;
331 }
332 
CreateCursor()333 std::unique_ptr<TableBase::Cursor> SpanJoin::CreateCursor()
334 {
335     return std::make_unique<Cursor>(dataCache_, this);
336 }
337 
Cursor(const TraceDataCache * dataCache,SpanJoin * table)338 SpanJoin::Cursor::Cursor(const TraceDataCache* dataCache, SpanJoin* table)
339     : TableBase::Cursor(dataCache, table, 0),
340       tableFirst_(table, &table->tableFirstDesc_, dataCache_->db_),
341       tableSecond_(table, &table->tableSecondDesc_, dataCache_->db_),
342       spanTable_(table)
343 {
344 }
345 
Filter(const FilterConstraints & fc,sqlite3_value ** argv)346 int32_t SpanJoin::Cursor::Filter(const FilterConstraints& fc, sqlite3_value** argv)
347 {
348     tableFirst_.InitQuerySql(argv);
349     tableSecond_.InitQuerySql(argv);
350     auto status = IsFindSpan();
351     if (!status) {
352         return SQLITE_ERROR;
353     }
354     return SQLITE_OK;
355 }
356 
CaclOverLap()357 bool SpanJoin::Cursor::CaclOverLap()
358 {
359     if (tableFirst_.ts_ >= tableSecond_.ts_) {
360         if ((tableFirst_.partitionState_ == PartitionState::TS_REAL &&
361              tableSecond_.partitionState_ == PartitionState::TS_REAL) ||
362             tableFirst_.ts_ < tableSecond_.endTs_) {
363             return true;
364         }
365     } else if (tableFirst_.ts_ <= tableSecond_.ts_ && tableSecond_.ts_ < tableFirst_.endTs_) {
366         return true;
367     }
368     return false;
369 }
370 
IsFindSpan()371 bool SpanJoin::Cursor::IsFindSpan()
372 {
373     for (;;) {
374         if (tableFirst_.isEof_ || tableSecond_.isEof_) {
375             break;
376         }
377         queryNext_ = FindQueryResult();
378         if (CaclOverLap()) {
379             break;
380         }
381         queryNext_->Next();
382     }
383     return true;
384 }
385 
FindQueryResult()386 SpanJoin::CaclSpan* SpanJoin::Cursor::FindQueryResult()
387 {
388     if (!spanTable_->isSamepartitioning_) {
389         return nullptr;
390     }
391 
392     auto tableFirstResult = std::make_tuple(tableFirst_.GetPatitonForMiss(), tableFirst_.endTs_,
393                                             tableFirst_.partitionState_ == PartitionState::TS_REAL ? true : false);
394     auto tableSecondResult = std::make_tuple(tableSecond_.GetPatitonForMiss(), tableSecond_.endTs_,
395                                              tableSecond_.partitionState_ == PartitionState::TS_REAL ? true : false);
396     if (tableFirstResult < tableSecondResult) {
397         return &tableFirst_;
398     }
399     return &tableSecond_;
400 }
401 
Column(int32_t column) const402 int32_t SpanJoin::Cursor::Column(int32_t column) const
403 {
404     switch (column) {
405         case TS: {
406             sqlite3_result_int64(context_, static_cast<sqlite3_int64>(std::max(tableFirst_.ts_, tableSecond_.ts_)));
407             break;
408         }
409         case DUR: {
410             sqlite3_result_int64(context_,
411                                  static_cast<sqlite3_int64>(std::min(tableFirst_.endTs_, tableSecond_.endTs_) -
412                                                             std::max(tableFirst_.ts_, tableSecond_.ts_)));
413             break;
414         }
415         case PARTITION: {
416             auto partResult = tableFirst_.partitionState_ == PartitionState::TS_REAL ? tableFirst_.partition_
417                                                                                      : tableSecond_.partition_;
418             sqlite3_result_int64(context_, static_cast<sqlite3_int64>(partResult));
419             break;
420         }
421         default: {
422             const auto ColumnInfo = spanTable_->mTableColumnInfo_[column];
423             if (ColumnInfo.tableDesc == tableFirst_.desc_) {
424                 tableFirst_.setResult(context_, ColumnInfo.colIdx);
425             } else {
426                 tableSecond_.setResult(context_, ColumnInfo.colIdx);
427             }
428         }
429     }
430     return SQLITE_OK;
431 }
432 
433 } // namespace TraceStreamer
434 } // namespace SysTuning
435