1 2 /* 3 * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef SAPN_JOIN_H 17 #define SAPN_JOIN_H 18 #include "table_base.h" 19 #include "trace_data_cache.h" 20 namespace SysTuning { 21 namespace TraceStreamer { 22 struct TableParse { 23 std::string name; 24 std::string partitionCol; 25 }; 26 27 struct TableDesc { 28 std::string name; 29 std::string partition; 30 std::vector<TableBase::ColumnInfo> cols; 31 int32_t tsIdx; 32 int32_t durIdx; 33 int32_t partitionIdx; 34 }; 35 36 struct TableColumnInfo { 37 TableDesc *tableDesc; 38 int32_t colIdx; 39 }; 40 41 enum class PartitionState : int32_t { 42 TS_REAL = 0, 43 TS_PARTITION, 44 TS_MISSING, 45 TS_EOF, 46 }; 47 48 class SpanJoin : public TableBase { 49 public: 50 explicit SpanJoin(const TraceDataCache *); ~SpanJoin()51 ~SpanJoin() override{}; 52 void Parse(const std::string &tablePartition, TableParse &tableParse); 53 void GetTableField(const TableParse &tableParse, TableDesc &tableDesc); 54 void GetColumns(const TraceDataCache *dataCache, 55 const std::string &tableName, 56 std::vector<TableBase::ColumnInfo> &columns); 57 void CreateCols(TableDesc &tableDesc, std::vector<ColumnInfo> &cols); 58 bool IsTsOrDurCol(const std::string &name); 59 bool DeduplicationForColumn(const std::string &name, std::vector<ColumnInfo> &cols); 60 void Init(int32_t argc, const char *const *argv) override; 61 std::unique_ptr<TableBase::Cursor> CreateCursor() override; 62 63 class CaclSpan { 64 public: 65 CaclSpan(TableBase *tableBase, const TableDesc *tableDesc, sqlite3 *db); 66 virtual ~CaclSpan(); 67 static std::string GetMergeColumns(std::vector<std::string> &columns); 68 int32_t InitQuerySql(sqlite3_value **argv); 69 bool IsQueryNext(); 70 bool GetCursorNext(); 71 bool GetNextState(); 72 bool SearchNextslice(); 73 void Next(); 74 std::string GetSqlQuery(); 75 void setResult(sqlite3_context *context, size_t index) const; 76 int64_t GetPatitonForMiss(); 77 78 public: 79 bool isEof_ = false; 80 int64_t ts_ = 0; 81 int64_t endTs_ = 0; 82 PartitionState partitionState_ = PartitionState::TS_MISSING; 83 int32_t partition_ = 0; 84 int32_t missPartitionStart_ = 0; 85 int32_t missPartitionEnd_ = 0; 86 std::string sqlQuery_; 87 sqlite3_stmt *stmt_ = nullptr; 88 const TableDesc *desc_ = nullptr; 89 sqlite3 *db_ = nullptr; 90 SpanJoin *table_ = nullptr; 91 }; 92 93 class Cursor : public TableBase::Cursor { 94 public: 95 explicit Cursor(const TraceDataCache *dataCache, SpanJoin *table); 96 int32_t Filter(const FilterConstraints &fc, sqlite3_value **argv) override; 97 int32_t Column(int32_t column) const override; Next()98 int32_t Next() override 99 { 100 queryNext_->Next(); 101 auto status = IsFindSpan(); 102 if (!status) { 103 return SQLITE_ERROR; 104 } 105 return SQLITE_OK; 106 } Eof()107 int32_t Eof() override 108 { 109 return tableFirst_.isEof_ || tableSecond_.isEof_; 110 } 111 112 private: 113 bool IsFindSpan(); 114 bool CaclOverLap(); 115 CaclSpan *FindQueryResult(); 116 CaclSpan tableFirst_; 117 CaclSpan tableSecond_; 118 CaclSpan *queryNext_ = nullptr; 119 SpanJoin *spanTable_; 120 }; 121 122 public: 123 bool isSamepartitioning_ = false; 124 125 private: 126 TableDesc tableFirstDesc_ = {}; 127 TableDesc tableSecondDesc_ = {}; 128 std::unordered_map<int32_t, TableColumnInfo> mTableColumnInfo_; 129 }; 130 131 } // namespace TraceStreamer 132 } // namespace SysTuning 133 #endif // SAPN_JOIN_H 134