1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define LOG_TAG "RdbPerfStat"
17 #include "rdb_perfStat.h"
18
19 #include <thread>
20
21 #include "concurrent_map.h"
22 #include "logger.h"
23 #include "rdb_errno.h"
24 #include "rdb_platform.h"
25 #include "sqlite_utils.h"
26 #include "task_executor.h"
27 namespace OHOS::DistributedRdb {
28 using namespace OHOS::Rdb;
29 using namespace OHOS::NativeRdb;
30 using SqlExecInfo = SqlObserver::SqlExecutionInfo;
31 std::shared_mutex PerfStat::mutex_;
32 std::map<uint64_t, PerfStat::ThreadParam> PerfStat::threadParams_;
33 ConcurrentMap<std::string, std::set<std::shared_ptr<SqlObserver>>> PerfStat::observers_;
34 ConcurrentMap<uint64_t, std::shared_ptr<SqlExecInfo>> PerfStat::execInfos_;
35 bool PerfStat::enabled_ = false;
36 std::atomic_uint32_t PerfStat::seqId_ = 0;
Subscribe(const std::string & storeId,std::shared_ptr<SqlObserver> observer)37 int PerfStat::Subscribe(const std::string &storeId, std::shared_ptr<SqlObserver> observer)
38 {
39 observers_.Compute(storeId, [observer](const auto &, auto &observers) {
40 observers.insert(observer);
41 enabled_ = true;
42 return true;
43 });
44 return E_OK;
45 }
46
Unsubscribe(const std::string & storeId,std::shared_ptr<SqlObserver> observer)47 int PerfStat::Unsubscribe(const std::string &storeId, std::shared_ptr<SqlObserver> observer)
48 {
49 observers_.ComputeIfPresent(storeId, [observer](const auto &key, auto &observers) {
50 observers.erase(observer);
51 return !observers.empty();
52 });
53 observers_.DoActionIfEmpty([]() {
54 enabled_ = false;
55 execInfos_.Clear();
56 std::unique_lock<decltype(mutex_)> lock(mutex_);
57 threadParams_.clear();
58 });
59 return E_OK;
60 }
61
GenerateId()62 uint32_t PerfStat::GenerateId()
63 {
64 return ++seqId_;
65 }
66
PerfStat(const std::string & storeId,const std::string & sql,int32_t step,uint32_t seqId,size_t size)67 PerfStat::PerfStat(const std::string &storeId, const std::string &sql, int32_t step, uint32_t seqId, size_t size)
68 {
69 if (!enabled_ || IsPaused()) {
70 return;
71 }
72 auto storeObservers = observers_.Find(storeId);
73 if (!storeObservers.first) {
74 return;
75 }
76
77 step_ = step;
78 key_ = (seqId == 0) ? GetThreadId() : uint64_t(seqId);
79 time_ = std::chrono::steady_clock::now();
80
81 if (step == STEP_TOTAL || step == STEP_TRANS || step == STEP_TRANS_END) {
82 execInfo_ = std::shared_ptr<SqlExecInfo>(new (std::nothrow) SqlExecInfo(), GetRelease(step, seqId, storeId));
83 execInfos_.Insert(GetThreadId(), execInfo_);
84 } else {
85 auto it = execInfos_.Find(key_);
86 if (it.first) {
87 execInfo_ = it.second;
88 }
89 }
90
91 if (execInfo_ == nullptr && seqId != 0) {
92 auto it = execInfos_.Find(GetThreadId());
93 execInfo_ = it.second;
94 }
95
96 if (step_ == STEP_TRANS_START && execInfo_ != nullptr) {
97 execInfos_.Insert(seqId, execInfo_);
98 }
99
100 if ((step_ == STEP_TOTAL || step_ == STEP_TRANS) && size > 0) {
101 SetSize(size);
102 }
103
104 if (step_ == STEP_PREPARE && !sql.empty() && execInfo_ != nullptr) {
105 FormatSql(sql);
106 }
107 }
108
~PerfStat()109 PerfStat::~PerfStat()
110 {
111 if (!enabled_ || IsPaused()) {
112 return;
113 }
114 if (execInfo_ == nullptr) {
115 return;
116 }
117
118 auto interval = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - time_);
119 switch (step_) {
120 case STEP_WAIT:
121 execInfo_->waitTime_ += interval.count();
122 break;
123 case STEP_PREPARE:
124 execInfo_->prepareTime_ += interval.count();
125 break;
126 case STEP_EXECUTE:
127 execInfo_->executeTime_ += interval.count();
128 break;
129 case STEP_TOTAL:
130 SetSize(0);
131 [[fallthrough]];
132 case STEP_TOTAL_RES:
133 execInfo_->totalTime_ += interval.count();
134 execInfos_.Erase(key_);
135 break;
136 case STEP_TRANS:
137 execInfo_->totalTime_ += interval.count();
138 execInfos_.Erase(GetThreadId());
139 SetSize(0);
140 break;
141 case STEP_TRANS_END:
142 execInfo_->totalTime_ += interval.count();
143 execInfos_.Erase(GetThreadId());
144 execInfo_ = nullptr;
145 execInfos_.Erase(key_);
146 break;
147 default:
148 execInfo_->totalTime_ += interval.count();
149 break;
150 }
151 }
152
GetRelease(int32_t step,uint32_t seqId,const std::string & storeId)153 PerfStat::Release PerfStat::GetRelease(int32_t step, uint32_t seqId, const std::string &storeId)
154 {
155 switch (step) {
156 case STEP_TRANS:
157 [[fallthrough]];
158 case STEP_TRANS_END:
159 return [seqId](SqlExecInfo *execInfo) {
160 Merge(seqId, execInfo);
161 };
162 default:
163 return [seqId, storeId](SqlExecInfo *execInfo) {
164 Notify(execInfo, storeId);
165 };
166 }
167 }
168
Merge(uint32_t seqId,SqlExecInfo * execInfo)169 void PerfStat::Merge(uint32_t seqId, SqlExecInfo *execInfo)
170 {
171 if (execInfo == nullptr) {
172 return;
173 }
174 if (execInfo->sql_.empty()) {
175 delete execInfo;
176 return;
177 }
178 execInfos_.ComputeIfPresent(seqId, [execInfo](const auto &, std::shared_ptr<SqlExecInfo> info) {
179 info->totalTime_ += execInfo->totalTime_;
180 info->waitTime_ += execInfo->waitTime_;
181 info->prepareTime_ += execInfo->prepareTime_;
182 info->executeTime_ += execInfo->executeTime_;
183 info->sql_.insert(info->sql_.end(), execInfo->sql_.begin(), execInfo->sql_.end());
184 return true;
185 });
186 delete execInfo;
187 }
188
Notify(SqlExecInfo * execInfo,const std::string & storeId)189 void PerfStat::Notify(SqlExecInfo *execInfo, const std::string &storeId)
190 {
191 if (execInfo == nullptr) {
192 return;
193 }
194 if (execInfo->sql_.empty()) {
195 delete execInfo;
196 return;
197 }
198 auto executor = TaskExecutor::GetInstance().GetExecutor();
199 if (executor == nullptr) {
200 delete execInfo;
201 return;
202 }
203 executor->Execute([info = std::move(*execInfo), storeId]() {
204 std::set<std::shared_ptr<SqlObserver>> sqlObservers;
205 observers_.ComputeIfPresent(storeId, [&sqlObservers](const auto &, auto &observers) {
206 sqlObservers = observers;
207 return true;
208 });
209 for (auto &obs : sqlObservers) {
210 if (obs != nullptr) {
211 obs->OnStatistic(info);
212 }
213 }
214 });
215 delete execInfo;
216 }
Pause(uint32_t seqId)217 void PerfStat::Pause(uint32_t seqId)
218 {
219 std::unique_lock<decltype(mutex_)> lock(mutex_);
220 threadParams_[GetThreadId()].suspenders_ = std::max(1, threadParams_[GetThreadId()].suspenders_ + 1);
221 }
Resume(uint32_t seqId)222 void PerfStat::Resume(uint32_t seqId)
223 {
224 std::unique_lock<decltype(mutex_)> lock(mutex_);
225 threadParams_[GetThreadId()].suspenders_ = std::max(0, threadParams_[GetThreadId()].suspenders_ - 1);
226 }
227
FormatSql(const std::string & sql)228 void PerfStat::FormatSql(const std::string &sql)
229 {
230 auto size = GetSize();
231 if (size == 0) {
232 execInfo_->sql_.emplace_back(sql);
233 return;
234 }
235 if (size > 0 && execInfo_->sql_.size() > 0) {
236 return;
237 }
238 size_t firstPos = sql.find("),(");
239 if (firstPos != std::string::npos) {
240 std::string newSql = sql.substr(0, firstPos + 1);
241 newSql.append(",...,").append(std::to_string(size));
242 execInfo_->sql_.emplace_back(std::move(newSql));
243 return;
244 }
245 execInfo_->sql_.emplace_back(sql);
246 }
247
IsPaused()248 bool PerfStat::IsPaused()
249 {
250 std::shared_lock<decltype(mutex_)> lock(mutex_);
251 auto it = threadParams_.find(GetThreadId());
252 return it != threadParams_.end() && it->second.suspenders_ > 0;
253 }
254
GetSize()255 size_t PerfStat::GetSize()
256 {
257 std::shared_lock<decltype(mutex_)> lock(mutex_);
258 auto it = threadParams_.find(GetThreadId());
259 return it != threadParams_.end() ? it->second.size_ : 0;
260 }
261
SetSize(size_t size)262 void PerfStat::SetSize(size_t size)
263 {
264 std::unique_lock<decltype(mutex_)> lock(mutex_);
265 threadParams_[GetThreadId()].size_ = size;
266 }
267 } // namespace OHOS::DistributedRdb