• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define LOG_TAG "RdbPerfStat"
17 #include "rdb_perfStat.h"
18 
19 #include <thread>
20 
21 #include "concurrent_map.h"
22 #include "logger.h"
23 #include "rdb_errno.h"
24 #include "rdb_platform.h"
25 #include "sqlite_utils.h"
26 #include "task_executor.h"
27 namespace OHOS::DistributedRdb {
28 using namespace OHOS::Rdb;
29 using namespace OHOS::NativeRdb;
30 using SqlExecInfo = SqlObserver::SqlExecutionInfo;
31 std::shared_mutex PerfStat::mutex_;
32 std::map<uint64_t, PerfStat::ThreadParam> PerfStat::threadParams_;
33 ConcurrentMap<std::string, std::set<std::shared_ptr<SqlObserver>>> PerfStat::observers_;
34 ConcurrentMap<uint64_t, std::shared_ptr<SqlExecInfo>> PerfStat::execInfos_;
35 bool PerfStat::enabled_ = false;
36 std::atomic_uint32_t PerfStat::seqId_ = 0;
Subscribe(const std::string & storeId,std::shared_ptr<SqlObserver> observer)37 int PerfStat::Subscribe(const std::string &storeId, std::shared_ptr<SqlObserver> observer)
38 {
39     observers_.Compute(storeId, [observer](const auto &, auto &observers) {
40         observers.insert(observer);
41         enabled_ = true;
42         return true;
43     });
44     return E_OK;
45 }
46 
Unsubscribe(const std::string & storeId,std::shared_ptr<SqlObserver> observer)47 int PerfStat::Unsubscribe(const std::string &storeId, std::shared_ptr<SqlObserver> observer)
48 {
49     observers_.ComputeIfPresent(storeId, [observer](const auto &key, auto &observers) {
50         observers.erase(observer);
51         return !observers.empty();
52     });
53     observers_.DoActionIfEmpty([]() {
54         enabled_ = false;
55         execInfos_.Clear();
56         std::unique_lock<decltype(mutex_)> lock(mutex_);
57         threadParams_.clear();
58     });
59     return E_OK;
60 }
61 
GenerateId()62 uint32_t PerfStat::GenerateId()
63 {
64     return ++seqId_;
65 }
66 
PerfStat(const std::string & storeId,const std::string & sql,int32_t step,uint32_t seqId,size_t size)67 PerfStat::PerfStat(const std::string &storeId, const std::string &sql, int32_t step, uint32_t seqId, size_t size)
68 {
69     if (!enabled_ || IsPaused()) {
70         return;
71     }
72     auto storeObservers = observers_.Find(storeId);
73     if (!storeObservers.first) {
74         return;
75     }
76 
77     step_ = step;
78     key_ = (seqId == 0) ? GetThreadId() : uint64_t(seqId);
79     time_ = std::chrono::steady_clock::now();
80 
81     if (step == STEP_TOTAL || step == STEP_TRANS || step == STEP_TRANS_END) {
82         execInfo_ = std::shared_ptr<SqlExecInfo>(new (std::nothrow) SqlExecInfo(), GetRelease(step, seqId, storeId));
83         execInfos_.Insert(GetThreadId(), execInfo_);
84     } else {
85         auto it = execInfos_.Find(key_);
86         if (it.first) {
87             execInfo_ = it.second;
88         }
89     }
90 
91     if (execInfo_ == nullptr && seqId != 0) {
92         auto it = execInfos_.Find(GetThreadId());
93         execInfo_ = it.second;
94     }
95 
96     if (step_ == STEP_TRANS_START && execInfo_ != nullptr) {
97         execInfos_.Insert(seqId, execInfo_);
98     }
99 
100     if ((step_ == STEP_TOTAL || step_ == STEP_TRANS) && size > 0) {
101         SetSize(size);
102     }
103 
104     if (step_ == STEP_PREPARE && !sql.empty() && execInfo_ != nullptr) {
105         FormatSql(sql);
106     }
107 }
108 
~PerfStat()109 PerfStat::~PerfStat()
110 {
111     if (!enabled_ || IsPaused()) {
112         return;
113     }
114     if (execInfo_ == nullptr) {
115         return;
116     }
117 
118     auto interval = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - time_);
119     switch (step_) {
120         case STEP_WAIT:
121             execInfo_->waitTime_ += interval.count();
122             break;
123         case STEP_PREPARE:
124             execInfo_->prepareTime_ += interval.count();
125             break;
126         case STEP_EXECUTE:
127             execInfo_->executeTime_ += interval.count();
128             break;
129         case STEP_TOTAL:
130             SetSize(0);
131             [[fallthrough]];
132         case STEP_TOTAL_RES:
133             execInfo_->totalTime_ += interval.count();
134             execInfos_.Erase(key_);
135             break;
136         case STEP_TRANS:
137             execInfo_->totalTime_ += interval.count();
138             execInfos_.Erase(GetThreadId());
139             SetSize(0);
140             break;
141         case STEP_TRANS_END:
142             execInfo_->totalTime_ += interval.count();
143             execInfos_.Erase(GetThreadId());
144             execInfo_ = nullptr;
145             execInfos_.Erase(key_);
146             break;
147         default:
148             execInfo_->totalTime_ += interval.count();
149             break;
150     }
151 }
152 
GetRelease(int32_t step,uint32_t seqId,const std::string & storeId)153 PerfStat::Release PerfStat::GetRelease(int32_t step, uint32_t seqId, const std::string &storeId)
154 {
155     switch (step) {
156         case STEP_TRANS:
157             [[fallthrough]];
158         case STEP_TRANS_END:
159             return [seqId](SqlExecInfo *execInfo) {
160                 Merge(seqId, execInfo);
161             };
162         default:
163             return [seqId, storeId](SqlExecInfo *execInfo) {
164                 Notify(execInfo, storeId);
165             };
166     }
167 }
168 
Merge(uint32_t seqId,SqlExecInfo * execInfo)169 void PerfStat::Merge(uint32_t seqId, SqlExecInfo *execInfo)
170 {
171     if (execInfo == nullptr) {
172         return;
173     }
174     if (execInfo->sql_.empty()) {
175         delete execInfo;
176         return;
177     }
178     execInfos_.ComputeIfPresent(seqId, [execInfo](const auto &, std::shared_ptr<SqlExecInfo> info) {
179         info->totalTime_ += execInfo->totalTime_;
180         info->waitTime_ += execInfo->waitTime_;
181         info->prepareTime_ += execInfo->prepareTime_;
182         info->executeTime_ += execInfo->executeTime_;
183         info->sql_.insert(info->sql_.end(), execInfo->sql_.begin(), execInfo->sql_.end());
184         return true;
185     });
186     delete execInfo;
187 }
188 
Notify(SqlExecInfo * execInfo,const std::string & storeId)189 void PerfStat::Notify(SqlExecInfo *execInfo, const std::string &storeId)
190 {
191     if (execInfo == nullptr) {
192         return;
193     }
194     if (execInfo->sql_.empty()) {
195         delete execInfo;
196         return;
197     }
198     auto executor = TaskExecutor::GetInstance().GetExecutor();
199     if (executor == nullptr) {
200         delete execInfo;
201         return;
202     }
203     executor->Execute([info = std::move(*execInfo), storeId]() {
204         std::set<std::shared_ptr<SqlObserver>> sqlObservers;
205         observers_.ComputeIfPresent(storeId, [&sqlObservers](const auto &, auto &observers) {
206             sqlObservers = observers;
207             return true;
208         });
209         for (auto &obs : sqlObservers) {
210             if (obs != nullptr) {
211                 obs->OnStatistic(info);
212             }
213         }
214     });
215     delete execInfo;
216 }
Pause(uint32_t seqId)217 void PerfStat::Pause(uint32_t seqId)
218 {
219     std::unique_lock<decltype(mutex_)> lock(mutex_);
220     threadParams_[GetThreadId()].suspenders_ = std::max(1, threadParams_[GetThreadId()].suspenders_ + 1);
221 }
Resume(uint32_t seqId)222 void PerfStat::Resume(uint32_t seqId)
223 {
224     std::unique_lock<decltype(mutex_)> lock(mutex_);
225     threadParams_[GetThreadId()].suspenders_ = std::max(0, threadParams_[GetThreadId()].suspenders_ - 1);
226 }
227 
FormatSql(const std::string & sql)228 void PerfStat::FormatSql(const std::string &sql)
229 {
230     auto size = GetSize();
231     if (size == 0) {
232         execInfo_->sql_.emplace_back(sql);
233         return;
234     }
235     if (size > 0 && execInfo_->sql_.size() > 0) {
236         return;
237     }
238     size_t firstPos = sql.find("),(");
239     if (firstPos != std::string::npos) {
240         std::string newSql = sql.substr(0, firstPos + 1);
241         newSql.append(",...,").append(std::to_string(size));
242         execInfo_->sql_.emplace_back(std::move(newSql));
243         return;
244     }
245     execInfo_->sql_.emplace_back(sql);
246 }
247 
IsPaused()248 bool PerfStat::IsPaused()
249 {
250     std::shared_lock<decltype(mutex_)> lock(mutex_);
251     auto it = threadParams_.find(GetThreadId());
252     return it != threadParams_.end() && it->second.suspenders_ > 0;
253 }
254 
GetSize()255 size_t PerfStat::GetSize()
256 {
257     std::shared_lock<decltype(mutex_)> lock(mutex_);
258     auto it = threadParams_.find(GetThreadId());
259     return it != threadParams_.end() ? it->second.size_ : 0;
260 }
261 
SetSize(size_t size)262 void PerfStat::SetSize(size_t size)
263 {
264     std::unique_lock<decltype(mutex_)> lock(mutex_);
265     threadParams_[GetThreadId()].size_ = size;
266 }
267 } // namespace OHOS::DistributedRdb