1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "thread_table.h"
17
18 namespace SysTuning {
19 namespace TraceStreamer {
20 enum class Index : int32_t {
21 ID = 0,
22 ITID,
23 TYPE,
24 TID,
25 NAME,
26 START_TS,
27 END_TS,
28 INTERNAL_PID,
29 IS_MAIN_THREAD,
30 SWITCH_COUNT
31 };
ThreadTable(const TraceDataCache * dataCache)32 ThreadTable::ThreadTable(const TraceDataCache* dataCache) : TableBase(dataCache)
33 {
34 tableColumn_.push_back(TableBase::ColumnInfo("id", "INTEGER"));
35 tableColumn_.push_back(TableBase::ColumnInfo("itid", "INTEGER"));
36 tableColumn_.push_back(TableBase::ColumnInfo("type", "TEXT"));
37 tableColumn_.push_back(TableBase::ColumnInfo("tid", "INTEGER"));
38 tableColumn_.push_back(TableBase::ColumnInfo("name", "TEXT"));
39 tableColumn_.push_back(TableBase::ColumnInfo("start_ts", "INTEGER"));
40 tableColumn_.push_back(TableBase::ColumnInfo("end_ts", "INTEGER"));
41 tableColumn_.push_back(TableBase::ColumnInfo("ipid", "INTEGER"));
42 tableColumn_.push_back(TableBase::ColumnInfo("is_main_thread", "INTEGER"));
43 tableColumn_.push_back(TableBase::ColumnInfo("switch_count", "INTEGER"));
44 tablePriKey_.push_back("id");
45 }
46
~ThreadTable()47 ThreadTable::~ThreadTable() {}
48
FilterByConstraint(FilterConstraints & threadfc,double & threadfilterCost,size_t threadrowCount,uint32_t threadcurrenti)49 void ThreadTable::FilterByConstraint(FilterConstraints& threadfc,
50 double& threadfilterCost,
51 size_t threadrowCount,
52 uint32_t threadcurrenti)
53 {
54 // To use the EstimateFilterCost function in the TableBase parent class function to calculate the i-value of each
55 // for loop
56 const auto& threadc = threadfc.GetConstraints()[threadcurrenti];
57 switch (static_cast<Index>(threadc.col)) {
58 case Index::ITID:
59 case Index::ID: {
60 if (CanFilterId(threadc.op, threadrowCount)) {
61 threadfc.UpdateConstraint(threadcurrenti, true);
62 threadfilterCost += 1; // id can position by 1 step
63 } else {
64 threadfilterCost += threadrowCount; // scan all rows
65 }
66 break;
67 }
68 default: // other column
69 threadfilterCost += threadrowCount; // scan all rows
70 break;
71 }
72 }
73
CreateCursor()74 std::unique_ptr<TableBase::Cursor> ThreadTable::CreateCursor()
75 {
76 return std::make_unique<Cursor>(dataCache_, this);
77 }
78
Cursor(const TraceDataCache * dataCache,TableBase * table)79 ThreadTable::Cursor::Cursor(const TraceDataCache* dataCache, TableBase* table)
80 : TableBase::Cursor(dataCache, table, dataCache->ThreadSize())
81 {
82 }
83
~Cursor()84 ThreadTable::Cursor::~Cursor() {}
FilterTid(unsigned char op,uint64_t value)85 void ThreadTable::Cursor::FilterTid(unsigned char op, uint64_t value)
86 {
87 bool remove = false;
88 if (indexMapBack_->HasData()) {
89 indexMapBack_->CovertToIndexMap();
90 remove = true;
91 }
92 const auto& threadQueue = dataCache_->GetConstThreadData();
93 auto size = threadQueue.size();
94 switch (op) {
95 case SQLITE_INDEX_CONSTRAINT_EQ:
96 if (remove) {
97 for (auto i = indexMapBack_->rowIndex_.begin(); i != indexMapBack_->rowIndex_.end();) {
98 if (threadQueue[*i].tid_ != value) {
99 i = indexMapBack_->rowIndex_.erase(i);
100 } else {
101 i++;
102 }
103 }
104 } else {
105 for (auto i = 0; i < size; i++) {
106 if (threadQueue[i].tid_ == value) {
107 indexMapBack_->rowIndex_.push_back(i);
108 }
109 }
110 }
111 indexMapBack_->FixSize();
112 break;
113 case SQLITE_INDEX_CONSTRAINT_ISNOT:
114 case SQLITE_INDEX_CONSTRAINT_NE:
115 if (remove) {
116 for (auto i = indexMapBack_->rowIndex_.begin(); i != indexMapBack_->rowIndex_.end();) {
117 if (threadQueue[*i].tid_ == value) {
118 i = indexMapBack_->rowIndex_.erase(i);
119 } else {
120 i++;
121 }
122 }
123 } else {
124 for (auto i = 0; i < size; i++) {
125 if (threadQueue[i].tid_ != value) {
126 indexMapBack_->rowIndex_.push_back(i);
127 }
128 }
129 }
130 indexMapBack_->FixSize();
131 break;
132 default:
133 break;
134 } // end of switch (op)
135 }
FilterIpid(unsigned char op,uint64_t value)136 void ThreadTable::Cursor::FilterIpid(unsigned char op, uint64_t value)
137 {
138 bool isRemove = false;
139 if (indexMapBack_->HasData()) {
140 indexMapBack_->CovertToIndexMap();
141 isRemove = true;
142 }
143 const auto& threadQueue = dataCache_->GetConstThreadData();
144 auto thdQueueSize = threadQueue.size();
145 rowIndexBak_.clear();
146 bool changed = false;
147 switch (op) {
148 case SQLITE_INDEX_CONSTRAINT_EQ:
149 HandleIpidConstraint(isRemove, changed, value, thdQueueSize, threadQueue);
150 break;
151 case SQLITE_INDEX_CONSTRAINT_ISNULL:
152 HandleIpidConstraint(isRemove, changed, INVALID_UINT32, thdQueueSize, threadQueue);
153 break;
154 case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
155 HandleIpidConstraint(threadQueue, thdQueueSize, isRemove, changed);
156 break;
157 default:
158 break;
159 } // end of switch (op)
160 }
HandleIpidConstraint(const std::deque<SysTuning::TraceStdtype::Thread> & threadQueue,std::size_t size,bool remove,bool changed)161 void ThreadTable::Cursor::HandleIpidConstraint(const std::deque<SysTuning::TraceStdtype::Thread>& threadQueue,
162 std::size_t size,
163 bool remove,
164 bool changed)
165 {
166 if (remove) {
167 for (auto i = indexMapBack_->rowIndex_.begin(); i != indexMapBack_->rowIndex_.end();) {
168 if (threadQueue[*i].internalPid_ == INVALID_UINT32) {
169 i++;
170 } else {
171 changed = true;
172 rowIndexBak_.push_back(*i);
173 i++;
174 }
175 }
176 if (changed) {
177 indexMapBack_->rowIndex_ = rowIndexBak_;
178 }
179 } else {
180 for (auto i = 0; i < size; i++) {
181 if (threadQueue[i].internalPid_ != INVALID_UINT32) {
182 indexMapBack_->rowIndex_.push_back(i);
183 }
184 }
185 }
186 indexMapBack_->FixSize();
187 }
FilterSwitchCount(unsigned char op,uint64_t value)188 void ThreadTable::Cursor::FilterSwitchCount(unsigned char op, uint64_t value)
189 {
190 bool remove = false;
191 if (indexMapBack_->HasData()) {
192 indexMapBack_->CovertToIndexMap();
193 remove = true;
194 }
195 const auto& threadQueue = dataCache_->GetConstThreadData();
196 auto size = threadQueue.size();
197 rowIndexBak_.clear();
198 bool changed = false;
199 switch (op) {
200 case SQLITE_INDEX_CONSTRAINT_EQ:
201 HandleIpidConstraint(remove, changed, value, size, threadQueue);
202 break;
203 case SQLITE_INDEX_CONSTRAINT_ISNULL:
204 HandleIpidConstraint(remove, changed, value, size, threadQueue);
205 break;
206 case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
207 if (remove) {
208 for (auto i = indexMapBack_->rowIndex_.begin(); i != indexMapBack_->rowIndex_.end();) {
209 if (threadQueue[*i].switchCount_ == INVALID_UINT32) {
210 i++;
211 } else {
212 changed = true;
213 rowIndexBak_.push_back(*i);
214 i++;
215 }
216 }
217 if (changed) {
218 indexMapBack_->rowIndex_ = rowIndexBak_;
219 }
220 } else {
221 for (auto i = 0; i < size; i++) {
222 if (threadQueue[i].switchCount_ != INVALID_UINT32) {
223 indexMapBack_->rowIndex_.push_back(i);
224 }
225 }
226 }
227 indexMapBack_->FixSize();
228 break;
229 default:
230 break;
231 } // end of switch (op)
232 }
FilterIndex(int32_t col,unsigned char op,sqlite3_value * argv)233 void ThreadTable::Cursor::FilterIndex(int32_t col, unsigned char op, sqlite3_value* argv)
234 {
235 switch (static_cast<Index>(col)) {
236 case Index::INTERNAL_PID:
237 FilterIpid(op, static_cast<uint64_t>(sqlite3_value_int64(argv)));
238 break;
239 case Index::TID:
240 FilterTid(op, static_cast<uint64_t>(sqlite3_value_int64(argv)));
241 break;
242 case Index::SWITCH_COUNT:
243 FilterSwitchCount(op, static_cast<uint64_t>(sqlite3_value_int64(argv)));
244 break;
245 default:
246 // we can't filter all rows
247 break;
248 }
249 }
Filter(const FilterConstraints & fc,sqlite3_value ** argv)250 int32_t ThreadTable::Cursor::Filter(const FilterConstraints& fc, sqlite3_value** argv)
251 {
252 // reset indexMapBack_
253 if (rowCount_ <= 0) {
254 return SQLITE_OK;
255 }
256 indexMapBack_ = indexMap_.get();
257 if (indexMap_->HasData()) {
258 indexMapBack_ = std::make_unique<IndexMap>(0, rowCount_).get();
259 }
260 auto cs = fc.GetConstraints();
261 std::set<uint32_t> sId = {static_cast<uint32_t>(Index::ID)};
262 SwapIndexFront(cs, sId);
263 for (size_t i = 0; i < cs.size(); i++) {
264 const auto& c = cs[i];
265 switch (static_cast<Index>(c.col)) {
266 case Index::ID:
267 case Index::ITID:
268 FilterId(c.op, argv[i]);
269 break;
270 case Index::TID:
271 case Index::INTERNAL_PID:
272 case Index::SWITCH_COUNT:
273 FilterIndex(c.col, c.op, argv[i]);
274 break;
275 default:
276 break;
277 }
278 }
279 if (indexMap_->HasData()) {
280 indexMap_->Merge(indexMapBack_);
281 }
282
283 auto orderbys = fc.GetOrderBys();
284 for (auto i = orderbys.size(); i > 0;) {
285 i--;
286 switch (static_cast<Index>(orderbys[i].iColumn)) {
287 case Index::ID:
288 case Index::ITID:
289 indexMap_->SortBy(orderbys[i].desc);
290 break;
291 default:
292 break;
293 }
294 }
295
296 return SQLITE_OK;
297 }
298
Column(int32_t col) const299 int32_t ThreadTable::Cursor::Column(int32_t col) const
300 {
301 const auto& thread = dataCache_->GetConstThreadData(CurrentRow());
302 switch (static_cast<Index>(col)) {
303 case Index::ID:
304 case Index::ITID: {
305 sqlite3_result_int64(context_, CurrentRow());
306 break;
307 }
308 case Index::TYPE: {
309 sqlite3_result_text(context_, "thread", strlen("thread"), nullptr);
310 break;
311 }
312 case Index::TID: {
313 SetTypeColumnInt64(thread.tid_, INVALID_UINT32);
314 break;
315 }
316 case Index::NAME: {
317 SetNameColumn(thread);
318 break;
319 }
320 case Index::START_TS:
321 SetTypeColumnInt64NotZero(thread.startT_);
322 break;
323 case Index::END_TS:
324 SetTypeColumnInt64NotZero(thread.endT_);
325
326 break;
327 case Index::INTERNAL_PID:
328 SetTypeColumnInt32(thread.internalPid_, INVALID_UINT32);
329 break;
330 case Index::IS_MAIN_THREAD: {
331 // When it is not clear which process the thread belongs to, is_main_thread should be set to null
332 if (thread.internalPid_ == INVALID_UINT32) {
333 break;
334 }
335 const auto& process = dataCache_->GetConstProcessData(thread.internalPid_);
336 sqlite3_result_int(context_, thread.tid_ == process.pid_);
337 break;
338 }
339 case Index::SWITCH_COUNT: {
340 // When it is not clear which process the thread belongs to, is_main_thread should be set to null
341 sqlite3_result_int(context_, thread.switchCount_);
342 break;
343 }
344
345 default:
346 TS_LOGF("Unregistered column : %d", col);
347 break;
348 }
349 return SQLITE_OK;
350 }
351
SetNameColumn(const Thread & thread) const352 void ThreadTable::Cursor::SetNameColumn(const Thread& thread) const
353 {
354 const auto& name = dataCache_->GetDataFromDict(thread.nameIndex_);
355 if (name.size()) {
356 sqlite3_result_text(context_, name.c_str(), static_cast<int32_t>(name.length()), nullptr);
357 }
358 }
359
Update(int32_t argc,sqlite3_value ** argv,sqlite3_int64 * pRowid)360 int32_t ThreadTable::Update(int32_t argc, sqlite3_value** argv, sqlite3_int64* pRowid)
361 {
362 if (argc <= 1) {
363 return SQLITE_READONLY;
364 }
365 if (sqlite3_value_type(argv[0]) == SQLITE_NULL) {
366 return SQLITE_READONLY;
367 }
368 auto id = sqlite3_value_int64(argv[0]);
369 auto thread = wdataCache_->GetThreadData(static_cast<InternalPid>(id));
370 constexpr int32_t colOffset = 2;
371 for (auto i = colOffset; i < argc; i++) {
372 auto col = i - colOffset;
373 if (static_cast<Index>(col) != Index::INTERNAL_PID) {
374 continue;
375 }
376 auto ipid = static_cast<uint32_t>(sqlite3_value_int(argv[i]));
377 if (ipid) {
378 thread->internalPid_ = ipid;
379 }
380 break;
381 }
382 return SQLITE_OK;
383 }
FilterId(unsigned char op,sqlite3_value * argv)384 void ThreadTable::Cursor::FilterId(unsigned char op, sqlite3_value* argv)
385 {
386 auto type = sqlite3_value_type(argv);
387 if (type != SQLITE_INTEGER) {
388 // other type consider it NULL
389 indexMapBack_->Intersect(0, 0);
390 return;
391 }
392
393 auto threadTabArgv = static_cast<TableRowId>(sqlite3_value_int64(argv));
394 switch (op) {
395 case SQLITE_INDEX_CONSTRAINT_EQ:
396 indexMapBack_->Intersect(threadTabArgv, threadTabArgv + 1);
397 break;
398 case SQLITE_INDEX_CONSTRAINT_GE:
399 indexMapBack_->Intersect(threadTabArgv, rowCount_);
400 break;
401 case SQLITE_INDEX_CONSTRAINT_GT:
402 threadTabArgv++;
403 indexMapBack_->Intersect(threadTabArgv, rowCount_);
404 break;
405 case SQLITE_INDEX_CONSTRAINT_LE:
406 threadTabArgv++;
407 indexMapBack_->Intersect(0, threadTabArgv);
408 break;
409 case SQLITE_INDEX_CONSTRAINT_LT:
410 indexMapBack_->Intersect(0, threadTabArgv);
411 break;
412 default:
413 // can't filter, all rows
414 break;
415 }
416 }
GetOrbyes(FilterConstraints & fc,EstimatedIndexInfo & ei)417 void ThreadTable::GetOrbyes(FilterConstraints& fc, EstimatedIndexInfo& ei)
418 {
419 auto orderbys = fc.GetOrderBys();
420 for (auto i = 0; i < orderbys.size(); i++) {
421 switch (static_cast<Index>(orderbys[i].iColumn)) {
422 case Index::ITID:
423 case Index::ID:
424 break;
425 default: // other columns can be sorted by SQLite
426 ei.isOrdered = false;
427 break;
428 }
429 }
430 }
431 } // namespace TraceStreamer
432 } // namespace SysTuning
433