• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "span_join.h"
17 #include <vector>
18 
19 namespace SysTuning {
20 namespace TraceStreamer {
21 
22 const std::string TS_COLUMN_NAME = "ts";
23 const std::string DUR_COLUMN_NAME = "dur";
24 constexpr int MINSIZE = 5;
25 constexpr int MAXSIZE = 1024;
26 constexpr int NEXT_NUMBER = 1;
27 constexpr int TSANDDUR_COLUMN = 2;
28 constexpr int PARTITIONED_COUNT = 3;
29 
30 enum Index { TS, DUR, PARTITION };
31 
SpanJoin(const TraceDataCache * dataCache)32 SpanJoin::SpanJoin(const TraceDataCache* dataCache) : TableBase(dataCache)
33 {
34     tableColumn_ = {};
35     tablePriKey_ = {};
36 }
37 
Init(int argc,const char * const * argv)38 void SpanJoin::Init(int argc, const char* const* argv)
39 {
40     if (argc < MINSIZE) {
41         return;
42     }
43     // Parse the fields of the two tables separately
44     TableParse tableFirstParse;
45     Parse(std::string(reinterpret_cast<const char*>(argv[3])), tableFirstParse);
46     TableParse tableSecondParse;
47     Parse(std::string(reinterpret_cast<const char*>(argv[4])), tableSecondParse);
48 
49     // you must ensure that the two partitions exist and are the same when using
50     if (tableFirstDesc_.partition != tableSecondDesc_.partition) {
51         return;
52     }
53     isSamepartitioning_ = true;
54     GetTableField(tableFirstParse, tableFirstDesc_);
55     GetTableField(tableSecondParse, tableSecondDesc_);
56     tableColumn_.emplace_back(TS_COLUMN_NAME, "INTEGER");
57     tableColumn_.emplace_back(DUR_COLUMN_NAME, "INTEGER");
58     tableColumn_.emplace_back(tableFirstDesc_.partition, "INTEGER");
59     CreateCols(tableFirstDesc_, tableColumn_);
60     CreateCols(tableSecondDesc_, tableColumn_);
61     std::vector<std::string> primaryKeys = {"ts"};
62     primaryKeys.push_back(tableFirstDesc_.partition);
63     tablePriKey_ = primaryKeys;
64     return;
65 }
66 
CreateCols(TableDesc & tableDesc,std::vector<TableBase::ColumnInfo> & cols)67 void SpanJoin::CreateCols(TableDesc& tableDesc, std::vector<TableBase::ColumnInfo>& cols)
68 {
69     for (int i = 0; i < tableDesc.cols.size(); i++) {
70         auto& n = tableDesc.cols.at(i).name_;
71         if (IsTsOrDurCol(n)) {
72             continue;
73         }
74         auto columnInfo = &mTableColumnInfo_[cols.size()];
75         columnInfo->tableDesc = &tableDesc;
76         columnInfo->colIdx = i;
77         if (!DeduplicationForColumn(n, cols)) {
78             continue;
79         }
80         cols.emplace_back(n, tableDesc.cols.at(i).type_);
81     }
82 }
83 
DeduplicationForColumn(const std::string & name,std::vector<ColumnInfo> & cols)84 bool SpanJoin::DeduplicationForColumn(const std::string& name, std::vector<ColumnInfo>& cols)
85 {
86     for (size_t i = 0; i < cols.size(); i++) {
87         if (name == cols.at(i).name_) {
88             return false;
89         }
90     }
91     return true;
92 }
93 
TableNameSplitToVec(std::string & str,const std::string & pat)94 std::vector<std::string> SpanJoin::TableNameSplitToVec(std::string& str, const std::string& pat)
95 {
96     std::string::size_type pos;
97     std::vector<std::string> result;
98     str += pat;
99     int size = str.size();
100     for (int i = 0; i < size; i++) {
101         pos = str.find(pat, i);
102         if (pos == std::string::npos) {
103             break;
104         }
105         if (pos < size) {
106             std::string s = str.substr(i, pos - i);
107             result.push_back(s);
108             i = pos + pat.size() - 1;
109         }
110     }
111     return result;
112 }
113 
Parse(const std::string & tablePartition,TableParse & tableParse)114 void SpanJoin::Parse(const std::string& tablePartition, TableParse& tableParse)
115 {
116     std::vector<std::string> result = TableNameSplitToVec(const_cast<std::string&>(tablePartition), " ");
117     if (result.size() < PARTITIONED_COUNT) {
118         TS_LOGW("span_join sql is invalid!");
119     }
120     tableParse.name = result.at(0);
121     if (0 != strcasecmp(result.at(1).c_str(), "PARTITIONED")) {
122         TS_LOGW("sql has not PARTITIONED");
123         return;
124     }
125     tableParse.partitionCol = result.at(2);
126     return;
127 }
128 
IsTsOrDurCol(const std::string & name)129 bool SpanJoin::IsTsOrDurCol(const std::string& name)
130 {
131     if (name == TS_COLUMN_NAME || name == DUR_COLUMN_NAME) {
132         return true;
133     }
134     return false;
135 }
136 
GetTableField(TableParse & tableParse,TableDesc & tableDesc)137 void SpanJoin::GetTableField(TableParse& tableParse, TableDesc& tableDesc)
138 {
139     std::vector<TableBase::ColumnInfo> cols;
140     GetColumns(dataCache_, tableParse.name, cols);
141     int tsDurCount = 0;
142     for (int i = 0; i < cols.size(); i++) {
143         auto col = cols.at(i);
144         if (IsTsOrDurCol(col.name_)) {
145             tsDurCount++;
146         }
147         if (col.name_ == TS_COLUMN_NAME) {
148             tableDesc.tsIdx = i;
149         } else if (col.name_ == DUR_COLUMN_NAME) {
150             tableDesc.durIdx = i;
151         } else if (col.name_ == tableParse.partitionCol) {
152             tableDesc.partitionIdx = i;
153         }
154     }
155     if (tsDurCount != TSANDDUR_COLUMN) {
156         return;
157     }
158     tableDesc.name = tableParse.name;
159     tableDesc.partition = tableParse.partitionCol;
160     tableDesc.cols = std::move(cols);
161     return;
162 }
163 
GetColumns(const TraceDataCache * dataCache,const std::string & tableName,std::vector<TableBase::ColumnInfo> & columns)164 void SpanJoin::GetColumns(const TraceDataCache* dataCache,
165                           const std::string& tableName,
166                           std::vector<TableBase::ColumnInfo>& columns)
167 {
168     char sql[MAXSIZE];
169     std::string querySql = "SELECT name, type from PRAGMA_table_info(\"%s\")";
170     int n = snprintf(sql, sizeof(sql), querySql.c_str(), tableName.c_str());
171     sqlite3_stmt* stmt = nullptr;
172     int ret = sqlite3_prepare_v2(dataCache->db_, sql, n, &stmt, nullptr);
173     while (!ret) {
174         int err = sqlite3_step(stmt);
175         if (err == SQLITE_ROW) {
176             columns.emplace_back((reinterpret_cast<const char*>(sqlite3_column_text(stmt, 0))),
177                                  reinterpret_cast<const char*>(sqlite3_column_text(stmt, 1)));
178             continue;
179         }
180         if (err == SQLITE_DONE) {
181             break;
182         }
183         ret = err;
184     }
185     return;
186 }
187 
CaclSpan(TableBase * tableBase,const TableDesc * tableDesc,sqlite3 * db)188 SpanJoin::CaclSpan::CaclSpan(TableBase* tableBase, const TableDesc* tableDesc, sqlite3* db)
189     : desc_(tableDesc), db_(db), table_(reinterpret_cast<SpanJoin*>(tableBase))
190 {
191 }
192 
193 SpanJoin::CaclSpan::~CaclSpan() = default;
194 
InitQuerySql(sqlite3_value ** argv)195 int SpanJoin::CaclSpan::InitQuerySql(sqlite3_value** argv)
196 {
197     sqlQuery_ = GetSqlQuery();
198     bool status = IsQueryNext();
199     if (!status) {
200         return SQLITE_ERROR;
201     }
202     return SQLITE_OK;
203 }
204 
GetSqlQuery()205 std::string SpanJoin::CaclSpan::GetSqlQuery()
206 {
207     std::vector<std::string> columnNames;
208     for (int i = 0; i < desc_->cols.size(); i++) {
209         columnNames.push_back(desc_->cols.at(i).name_);
210     }
211     std::string str;
212     str = GetMergeColumns(columnNames);
213     std::string sql = "SELECT " + str + " FROM " + desc_->name + " ORDER BY " + desc_->partition + ", " + "ts;";
214     return sql;
215 }
216 
setResult(sqlite3_context * context,size_t index) const217 void SpanJoin::CaclSpan::setResult(sqlite3_context* context, size_t index) const
218 {
219     if (partitionState_ != PartitionState::TS_REAL) {
220         sqlite3_result_null(context);
221         return;
222     }
223     int sqliteType = sqlite3_column_type(stmt_, index);
224     if (sqliteType == SQLITE_TEXT) {
225         sqlite3_result_text(context, reinterpret_cast<const char*>(sqlite3_column_int64(stmt_, index)), -1,
226                             reinterpret_cast<sqlite3_destructor_type>(-1));
227     } else if (sqliteType == SQLITE_INTEGER) {
228         sqlite3_result_int64(context, sqlite3_column_int64(stmt_, index));
229     } else if (sqliteType == SQLITE_FLOAT) {
230         sqlite3_result_double(context, sqlite3_column_double(stmt_, index));
231     }
232 }
233 
GetCursorNext()234 bool SpanJoin::CaclSpan::GetCursorNext()
235 {
236     int res;
237     int rowType;
238     do {
239         res = sqlite3_step(stmt_);
240         rowType = sqlite3_column_type(stmt_, desc_->partitionIdx);
241     } while (res == SQLITE_ROW && rowType == SQLITE_NULL);
242     if (res != SQLITE_ROW) {
243         isEof_ = true;
244     } else {
245         isEof_ = false;
246     }
247 
248     return res == SQLITE_ROW || res == SQLITE_DONE;
249 }
250 
Next()251 void SpanJoin::CaclSpan::Next()
252 {
253     GetNextState();
254     SearchNextslice();
255 }
256 
IsQueryNext()257 bool SpanJoin::CaclSpan::IsQueryNext()
258 {
259     int res = sqlite3_prepare_v2(db_, sqlQuery_.c_str(), static_cast<int>(sqlQuery_.size()), &stmt_, nullptr);
260     isEof_ = res != SQLITE_OK;
261     if (res != SQLITE_OK) {
262         return true;
263     }
264     auto status = GetCursorNext();
265     if (!status) {
266         return false;
267     }
268     missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int>(desc_->partitionIdx));
269     status = SearchNextslice();
270     return status;
271 }
272 
SearchNextslice()273 bool SpanJoin::CaclSpan::SearchNextslice()
274 {
275     while (partitionState_ != TS_REAL) {
276         bool status = GetNextState();
277         if (!status) {
278             return false;
279         }
280     }
281     return true;
282 }
283 
GetNextState()284 bool SpanJoin::CaclSpan::GetNextState()
285 {
286     switch (partitionState_) {
287         case PartitionState::TS_REAL: {
288             GetCursorNext();
289             partitionState_ = PartitionState::TS_PARTITION;
290             ts_ = endTs_;
291             if (isEof_ || partition_ != sqlite3_column_int64(stmt_, static_cast<int>(desc_->partitionIdx))) {
292                 endTs_ = std::numeric_limits<int64_t>::max();
293             } else {
294                 endTs_ = sqlite3_column_int64(stmt_, static_cast<int>(desc_->tsIdx));
295             }
296             return true;
297         }
298         case PartitionState::TS_PARTITION: {
299             if (endTs_ == std::numeric_limits<int64_t>::max()) {
300                 partitionState_ = PartitionState::TS_MISSING;
301                 if (isEof_) {
302                     missPartitionEnd_ = std::numeric_limits<int32_t>::max();
303                 } else {
304                     missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int>(desc_->partitionIdx));
305                 }
306                 missPartitionStart_ = partition_ + NEXT_NUMBER;
307                 ts_ = 0;
308             } else {
309                 partitionState_ = PartitionState::TS_REAL;
310                 ts_ = sqlite3_column_int64(stmt_, static_cast<int>(desc_->tsIdx));
311                 endTs_ = ts_ + sqlite3_column_int64(stmt_, static_cast<int>(desc_->durIdx));
312             }
313             return true;
314         }
315         case PartitionState::TS_MISSING: {
316             if (missPartitionEnd_ == std::numeric_limits<int32_t>::max()) {
317                 partitionState_ = PartitionState::TS_EOF;
318             } else {
319                 partitionState_ = PartitionState::TS_PARTITION;
320                 ts_ = 0;
321                 endTs_ = sqlite3_column_int64(stmt_, static_cast<int>(desc_->tsIdx));
322                 partition_ = missPartitionEnd_;
323             }
324             return true;
325         }
326         default:
327             return false;
328     }
329 }
330 
GetMergeColumns(std::vector<std::string> & columns)331 std::string SpanJoin::CaclSpan::GetMergeColumns(std::vector<std::string>& columns)
332 {
333     std::string str;
334     int size = columns.size();
335     for (int i = 0; i < size; i++) {
336         if (i == size - 1) {
337             str += columns.at(i);
338         } else {
339             str += columns.at(i) + ", ";
340         }
341     }
342     return str;
343 }
344 
GetPatitonForMiss()345 int64_t SpanJoin::CaclSpan::GetPatitonForMiss()
346 {
347     return partitionState_ == TS_MISSING ? missPartitionEnd_ - NEXT_NUMBER : partition_;
348 }
349 
CreateCursor()350 std::unique_ptr<TableBase::Cursor> SpanJoin::CreateCursor()
351 {
352     return std::make_unique<Cursor>(dataCache_, this);
353 }
354 
Cursor(const TraceDataCache * dataCache,SpanJoin * table)355 SpanJoin::Cursor::Cursor(const TraceDataCache* dataCache, SpanJoin* table)
356     : TableBase::Cursor(dataCache, table, 0),
357       tableFirst_(table, &table->tableFirstDesc_, dataCache_->db_),
358       tableSecond_(table, &table->tableSecondDesc_, dataCache_->db_),
359       table_(table)
360 {
361 }
362 
Filter(const FilterConstraints & fc,sqlite3_value ** argv)363 int SpanJoin::Cursor::Filter(const FilterConstraints& fc, sqlite3_value** argv)
364 {
365     tableFirst_.InitQuerySql(argv);
366     tableSecond_.InitQuerySql(argv);
367     auto status = IsFindSpan();
368     if (!status) {
369         return SQLITE_ERROR;
370     }
371     return SQLITE_OK;
372 }
373 
CaclOverLap()374 bool SpanJoin::Cursor::CaclOverLap()
375 {
376     if (tableFirst_.ts_ >= tableSecond_.ts_) {
377         if (tableFirst_.partitionState_ == PartitionState::TS_REAL &&
378                 tableSecond_.partitionState_ == PartitionState::TS_REAL ||
379             tableFirst_.ts_ < tableSecond_.endTs_) {
380             return true;
381         }
382     } else if (tableFirst_.ts_ <= tableSecond_.ts_ && tableSecond_.ts_ < tableFirst_.endTs_) {
383         return true;
384     }
385     return false;
386 }
387 
IsFindSpan()388 bool SpanJoin::Cursor::IsFindSpan()
389 {
390     for (;;) {
391         if (tableFirst_.isEof_ || tableSecond_.isEof_) {
392             break;
393         }
394         queryNext_ = FindQueryResult();
395         if (CaclOverLap()) {
396             break;
397         }
398         queryNext_->Next();
399     }
400     return true;
401 }
402 
FindQueryResult()403 SpanJoin::CaclSpan* SpanJoin::Cursor::FindQueryResult()
404 {
405     if (!table_->isSamepartitioning_) {
406         return nullptr;
407     }
408 
409     auto tableFirstResult = std::make_tuple(tableFirst_.GetPatitonForMiss(), tableFirst_.endTs_,
410                                             tableFirst_.partitionState_ == PartitionState::TS_REAL ? true : false);
411     auto tableSecondResult = std::make_tuple(tableSecond_.GetPatitonForMiss(), tableSecond_.endTs_,
412                                              tableSecond_.partitionState_ == PartitionState::TS_REAL ? true : false);
413     if (tableFirstResult < tableSecondResult) {
414         return &tableFirst_;
415     }
416     return &tableSecond_;
417 }
418 
Column(int column) const419 int SpanJoin::Cursor::Column(int column) const
420 {
421     switch (column) {
422         case TS: {
423             sqlite3_result_int64(context_, static_cast<sqlite3_int64>(std::max(tableFirst_.ts_, tableSecond_.ts_)));
424             break;
425         }
426         case DUR: {
427             sqlite3_result_int64(context_,
428                                  static_cast<sqlite3_int64>(std::min(tableFirst_.endTs_, tableSecond_.endTs_) -
429                                                             std::max(tableFirst_.ts_, tableSecond_.ts_)));
430             break;
431         }
432         case PARTITION: {
433             auto partResult = tableFirst_.partitionState_ == PartitionState::TS_REAL ? tableFirst_.partition_
434                                                                                      : tableSecond_.partition_;
435             sqlite3_result_int64(context_, static_cast<sqlite3_int64>(partResult));
436             break;
437         }
438         default: {
439             const auto ColumnInfo = table_->mTableColumnInfo_[column];
440             if (ColumnInfo.tableDesc == tableFirst_.desc_) {
441                 tableFirst_.setResult(context_, ColumnInfo.colIdx);
442             } else {
443                 tableSecond_.setResult(context_, ColumnInfo.colIdx);
444             }
445         }
446     }
447     return SQLITE_OK;
448 }
449 
450 } // namespace TraceStreamer
451 } // namespace SysTuning
452