• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "thread_table.h"
17 
18 namespace SysTuning {
19 namespace TraceStreamer {
20 enum class Index : int32_t {
21     ID = 0,
22     ITID,
23     TYPE,
24     TID,
25     NAME,
26     START_TS,
27     END_TS,
28     INTERNAL_PID,
29     IS_MAIN_THREAD,
30     SWITCH_COUNT
31 };
ThreadTable(const TraceDataCache * dataCache)32 ThreadTable::ThreadTable(const TraceDataCache* dataCache) : TableBase(dataCache)
33 {
34     tableColumn_.push_back(TableBase::ColumnInfo("id", "INTEGER"));
35     tableColumn_.push_back(TableBase::ColumnInfo("itid", "INTEGER"));
36     tableColumn_.push_back(TableBase::ColumnInfo("type", "TEXT"));
37     tableColumn_.push_back(TableBase::ColumnInfo("tid", "INTEGER"));
38     tableColumn_.push_back(TableBase::ColumnInfo("name", "TEXT"));
39     tableColumn_.push_back(TableBase::ColumnInfo("start_ts", "INTEGER"));
40     tableColumn_.push_back(TableBase::ColumnInfo("end_ts", "INTEGER"));
41     tableColumn_.push_back(TableBase::ColumnInfo("ipid", "INTEGER"));
42     tableColumn_.push_back(TableBase::ColumnInfo("is_main_thread", "INTEGER"));
43     tableColumn_.push_back(TableBase::ColumnInfo("switch_count", "INTEGER"));
44     tablePriKey_.push_back("id");
45 }
46 
~ThreadTable()47 ThreadTable::~ThreadTable() {}
48 
FilterByConstraint(FilterConstraints & threadfc,double & threadfilterCost,size_t threadrowCount,uint32_t threadcurrenti)49 void ThreadTable::FilterByConstraint(FilterConstraints& threadfc,
50                                      double& threadfilterCost,
51                                      size_t threadrowCount,
52                                      uint32_t threadcurrenti)
53 {
54     // To use the EstimateFilterCost function in the TableBase parent class function to calculate the i-value of each
55     // for loop
56     const auto& threadc = threadfc.GetConstraints()[threadcurrenti];
57     switch (static_cast<Index>(threadc.col)) {
58         case Index::ITID:
59         case Index::ID: {
60             if (CanFilterId(threadc.op, threadrowCount)) {
61                 threadfc.UpdateConstraint(threadcurrenti, true);
62                 threadfilterCost += 1; // id can position by 1 step
63             } else {
64                 threadfilterCost += threadrowCount; // scan all rows
65             }
66             break;
67         }
68         default:                                // other column
69             threadfilterCost += threadrowCount; // scan all rows
70             break;
71     }
72 }
73 
CreateCursor()74 std::unique_ptr<TableBase::Cursor> ThreadTable::CreateCursor()
75 {
76     return std::make_unique<Cursor>(dataCache_, this);
77 }
78 
Cursor(const TraceDataCache * dataCache,TableBase * table)79 ThreadTable::Cursor::Cursor(const TraceDataCache* dataCache, TableBase* table)
80     : TableBase::Cursor(dataCache, table, dataCache->ThreadSize())
81 {
82 }
83 
~Cursor()84 ThreadTable::Cursor::~Cursor() {}
FilterTid(unsigned char op,uint64_t value)85 void ThreadTable::Cursor::FilterTid(unsigned char op, uint64_t value)
86 {
87     bool remove = false;
88     if (indexMapBack_->HasData()) {
89         indexMapBack_->CovertToIndexMap();
90         remove = true;
91     }
92     const auto& threadQueue = dataCache_->GetConstThreadData();
93     auto size = threadQueue.size();
94     switch (op) {
95         case SQLITE_INDEX_CONSTRAINT_EQ:
96             if (remove) {
97                 for (auto i = indexMapBack_->rowIndex_.begin(); i != indexMapBack_->rowIndex_.end();) {
98                     if (threadQueue[*i].tid_ != value) {
99                         i = indexMapBack_->rowIndex_.erase(i);
100                     } else {
101                         i++;
102                     }
103                 }
104             } else {
105                 for (auto i = 0; i < size; i++) {
106                     if (threadQueue[i].tid_ == value) {
107                         indexMapBack_->rowIndex_.push_back(i);
108                     }
109                 }
110             }
111             indexMapBack_->FixSize();
112             break;
113         case SQLITE_INDEX_CONSTRAINT_ISNOT:
114         case SQLITE_INDEX_CONSTRAINT_NE:
115             if (remove) {
116                 for (auto i = indexMapBack_->rowIndex_.begin(); i != indexMapBack_->rowIndex_.end();) {
117                     if (threadQueue[*i].tid_ == value) {
118                         i = indexMapBack_->rowIndex_.erase(i);
119                     } else {
120                         i++;
121                     }
122                 }
123             } else {
124                 for (auto i = 0; i < size; i++) {
125                     if (threadQueue[i].tid_ != value) {
126                         indexMapBack_->rowIndex_.push_back(i);
127                     }
128                 }
129             }
130             indexMapBack_->FixSize();
131             break;
132         default:
133             break;
134     } // end of switch (op)
135 }
FilterIpid(unsigned char op,uint64_t value)136 void ThreadTable::Cursor::FilterIpid(unsigned char op, uint64_t value)
137 {
138     bool isRemove = false;
139     if (indexMapBack_->HasData()) {
140         indexMapBack_->CovertToIndexMap();
141         isRemove = true;
142     }
143     const auto& threadQueue = dataCache_->GetConstThreadData();
144     auto thdQueueSize = threadQueue.size();
145     rowIndexBak_.clear();
146     bool changed = false;
147     switch (op) {
148         case SQLITE_INDEX_CONSTRAINT_EQ:
149             HandleIpidConstraint(isRemove, changed, value, thdQueueSize, threadQueue);
150             break;
151         case SQLITE_INDEX_CONSTRAINT_ISNULL:
152             HandleIpidConstraint(isRemove, changed, INVALID_UINT32, thdQueueSize, threadQueue);
153             break;
154         case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
155             HandleIpidConstraint(threadQueue, thdQueueSize, isRemove, changed);
156             break;
157         default:
158             break;
159     } // end of switch (op)
160 }
HandleIpidConstraint(const std::deque<SysTuning::TraceStdtype::Thread> & threadQueue,std::size_t size,bool remove,bool changed)161 void ThreadTable::Cursor::HandleIpidConstraint(const std::deque<SysTuning::TraceStdtype::Thread>& threadQueue,
162                                                std::size_t size,
163                                                bool remove,
164                                                bool changed)
165 {
166     if (remove) {
167         for (auto i = indexMapBack_->rowIndex_.begin(); i != indexMapBack_->rowIndex_.end();) {
168             if (threadQueue[*i].internalPid_ == INVALID_UINT32) {
169                 i++;
170             } else {
171                 changed = true;
172                 rowIndexBak_.push_back(*i);
173                 i++;
174             }
175         }
176         if (changed) {
177             indexMapBack_->rowIndex_ = rowIndexBak_;
178         }
179     } else {
180         for (auto i = 0; i < size; i++) {
181             if (threadQueue[i].internalPid_ != INVALID_UINT32) {
182                 indexMapBack_->rowIndex_.push_back(i);
183             }
184         }
185     }
186     indexMapBack_->FixSize();
187 }
FilterSwitchCount(unsigned char op,uint64_t value)188 void ThreadTable::Cursor::FilterSwitchCount(unsigned char op, uint64_t value)
189 {
190     bool remove = false;
191     if (indexMapBack_->HasData()) {
192         indexMapBack_->CovertToIndexMap();
193         remove = true;
194     }
195     const auto& threadQueue = dataCache_->GetConstThreadData();
196     auto size = threadQueue.size();
197     rowIndexBak_.clear();
198     bool changed = false;
199     switch (op) {
200         case SQLITE_INDEX_CONSTRAINT_EQ:
201             HandleIpidConstraint(remove, changed, value, size, threadQueue);
202             break;
203         case SQLITE_INDEX_CONSTRAINT_ISNULL:
204             HandleIpidConstraint(remove, changed, value, size, threadQueue);
205             break;
206         case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
207             if (remove) {
208                 for (auto i = indexMapBack_->rowIndex_.begin(); i != indexMapBack_->rowIndex_.end();) {
209                     if (threadQueue[*i].switchCount_ == INVALID_UINT32) {
210                         i++;
211                     } else {
212                         changed = true;
213                         rowIndexBak_.push_back(*i);
214                         i++;
215                     }
216                 }
217                 if (changed) {
218                     indexMapBack_->rowIndex_ = rowIndexBak_;
219                 }
220             } else {
221                 for (auto i = 0; i < size; i++) {
222                     if (threadQueue[i].switchCount_ != INVALID_UINT32) {
223                         indexMapBack_->rowIndex_.push_back(i);
224                     }
225                 }
226             }
227             indexMapBack_->FixSize();
228             break;
229         default:
230             break;
231     } // end of switch (op)
232 }
FilterIndex(int32_t col,unsigned char op,sqlite3_value * argv)233 void ThreadTable::Cursor::FilterIndex(int32_t col, unsigned char op, sqlite3_value* argv)
234 {
235     switch (static_cast<Index>(col)) {
236         case Index::INTERNAL_PID:
237             FilterIpid(op, static_cast<uint64_t>(sqlite3_value_int64(argv)));
238             break;
239         case Index::TID:
240             FilterTid(op, static_cast<uint64_t>(sqlite3_value_int64(argv)));
241             break;
242         case Index::SWITCH_COUNT:
243             FilterSwitchCount(op, static_cast<uint64_t>(sqlite3_value_int64(argv)));
244             break;
245         default:
246             // we can't filter all rows
247             break;
248     }
249 }
Filter(const FilterConstraints & fc,sqlite3_value ** argv)250 int32_t ThreadTable::Cursor::Filter(const FilterConstraints& fc, sqlite3_value** argv)
251 {
252     // reset indexMapBack_
253     if (rowCount_ <= 0) {
254         return SQLITE_OK;
255     }
256     indexMapBack_ = indexMap_.get();
257     if (indexMap_->HasData()) {
258         indexMapBack_ = std::make_unique<IndexMap>(0, rowCount_).get();
259     }
260     auto cs = fc.GetConstraints();
261     std::set<uint32_t> sId = {static_cast<uint32_t>(Index::ID)};
262     SwapIndexFront(cs, sId);
263     for (size_t i = 0; i < cs.size(); i++) {
264         const auto& c = cs[i];
265         switch (static_cast<Index>(c.col)) {
266             case Index::ID:
267             case Index::ITID:
268                 FilterId(c.op, argv[i]);
269                 break;
270             case Index::TID:
271             case Index::INTERNAL_PID:
272             case Index::SWITCH_COUNT:
273                 FilterIndex(c.col, c.op, argv[i]);
274                 break;
275             default:
276                 break;
277         }
278     }
279     if (indexMap_->HasData()) {
280         indexMap_->Merge(indexMapBack_);
281     }
282 
283     auto orderbys = fc.GetOrderBys();
284     for (auto i = orderbys.size(); i > 0;) {
285         i--;
286         switch (static_cast<Index>(orderbys[i].iColumn)) {
287             case Index::ID:
288             case Index::ITID:
289                 indexMap_->SortBy(orderbys[i].desc);
290                 break;
291             default:
292                 break;
293         }
294     }
295 
296     return SQLITE_OK;
297 }
298 
Column(int32_t col) const299 int32_t ThreadTable::Cursor::Column(int32_t col) const
300 {
301     const auto& thread = dataCache_->GetConstThreadData(CurrentRow());
302     switch (static_cast<Index>(col)) {
303         case Index::ID:
304         case Index::ITID: {
305             sqlite3_result_int64(context_, CurrentRow());
306             break;
307         }
308         case Index::TYPE: {
309             sqlite3_result_text(context_, "thread", strlen("thread"), nullptr);
310             break;
311         }
312         case Index::TID: {
313             SetTypeColumnInt64(thread.tid_, INVALID_UINT32);
314             break;
315         }
316         case Index::NAME: {
317             SetNameColumn(thread);
318             break;
319         }
320         case Index::START_TS:
321             SetTypeColumnInt64NotZero(thread.startT_);
322             break;
323         case Index::END_TS:
324             SetTypeColumnInt64NotZero(thread.endT_);
325 
326             break;
327         case Index::INTERNAL_PID:
328             SetTypeColumnInt32(thread.internalPid_, INVALID_UINT32);
329             break;
330         case Index::IS_MAIN_THREAD: {
331             // When it is not clear which process the thread belongs to, is_main_thread should be set to null
332             if (thread.internalPid_ == INVALID_UINT32) {
333                 break;
334             }
335             const auto& process = dataCache_->GetConstProcessData(thread.internalPid_);
336             sqlite3_result_int(context_, thread.tid_ == process.pid_);
337             break;
338         }
339         case Index::SWITCH_COUNT: {
340             // When it is not clear which process the thread belongs to, is_main_thread should be set to null
341             sqlite3_result_int(context_, thread.switchCount_);
342             break;
343         }
344 
345         default:
346             TS_LOGF("Unregistered column : %d", col);
347             break;
348     }
349     return SQLITE_OK;
350 }
351 
SetNameColumn(const Thread & thread) const352 void ThreadTable::Cursor::SetNameColumn(const Thread& thread) const
353 {
354     const auto& name = dataCache_->GetDataFromDict(thread.nameIndex_);
355     if (name.size()) {
356         sqlite3_result_text(context_, name.c_str(), static_cast<int32_t>(name.length()), nullptr);
357     }
358 }
359 
Update(int32_t argc,sqlite3_value ** argv,sqlite3_int64 * pRowid)360 int32_t ThreadTable::Update(int32_t argc, sqlite3_value** argv, sqlite3_int64* pRowid)
361 {
362     if (argc <= 1) {
363         return SQLITE_READONLY;
364     }
365     if (sqlite3_value_type(argv[0]) == SQLITE_NULL) {
366         return SQLITE_READONLY;
367     }
368     auto id = sqlite3_value_int64(argv[0]);
369     auto thread = wdataCache_->GetThreadData(static_cast<InternalPid>(id));
370     constexpr int32_t colOffset = 2;
371     for (auto i = colOffset; i < argc; i++) {
372         auto col = i - colOffset;
373         if (static_cast<Index>(col) != Index::INTERNAL_PID) {
374             continue;
375         }
376         auto ipid = static_cast<uint32_t>(sqlite3_value_int(argv[i]));
377         if (ipid) {
378             thread->internalPid_ = ipid;
379         }
380         break;
381     }
382     return SQLITE_OK;
383 }
FilterId(unsigned char op,sqlite3_value * argv)384 void ThreadTable::Cursor::FilterId(unsigned char op, sqlite3_value* argv)
385 {
386     auto type = sqlite3_value_type(argv);
387     if (type != SQLITE_INTEGER) {
388         // other type consider it NULL
389         indexMapBack_->Intersect(0, 0);
390         return;
391     }
392 
393     auto threadTabArgv = static_cast<TableRowId>(sqlite3_value_int64(argv));
394     switch (op) {
395         case SQLITE_INDEX_CONSTRAINT_EQ:
396             indexMapBack_->Intersect(threadTabArgv, threadTabArgv + 1);
397             break;
398         case SQLITE_INDEX_CONSTRAINT_GE:
399             indexMapBack_->Intersect(threadTabArgv, rowCount_);
400             break;
401         case SQLITE_INDEX_CONSTRAINT_GT:
402             threadTabArgv++;
403             indexMapBack_->Intersect(threadTabArgv, rowCount_);
404             break;
405         case SQLITE_INDEX_CONSTRAINT_LE:
406             threadTabArgv++;
407             indexMapBack_->Intersect(0, threadTabArgv);
408             break;
409         case SQLITE_INDEX_CONSTRAINT_LT:
410             indexMapBack_->Intersect(0, threadTabArgv);
411             break;
412         default:
413             // can't filter, all rows
414             break;
415     }
416 }
GetOrbyes(FilterConstraints & fc,EstimatedIndexInfo & ei)417 void ThreadTable::GetOrbyes(FilterConstraints& fc, EstimatedIndexInfo& ei)
418 {
419     auto orderbys = fc.GetOrderBys();
420     for (auto i = 0; i < orderbys.size(); i++) {
421         switch (static_cast<Index>(orderbys[i].iColumn)) {
422             case Index::ITID:
423             case Index::ID:
424                 break;
425             default: // other columns can be sorted by SQLite
426                 ei.isOrdered = false;
427                 break;
428         }
429     }
430 }
431 } // namespace TraceStreamer
432 } // namespace SysTuning
433