1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "SchedulerManager"
16
17 #include "scheduler_manager.h"
18
19 #include <cinttypes>
20
21 #include "log_print.h"
22 #include "timer_info.h"
23 #include "utils.h"
24 #include "utils/anonymous.h"
25 #include "log_debug.h"
26
27 namespace OHOS::DataShare {
28 static constexpr int64_t MAX_MILLISECONDS = 31536000000; // 365 days
29 static constexpr int32_t DELAYED_MILLISECONDS = 200;
GetInstance()30 SchedulerManager &SchedulerManager::GetInstance()
31 {
32 static SchedulerManager instance;
33 return instance;
34 }
35
Execute(const std::string & uri,const int32_t userId,DistributedData::StoreMetaData & metaData)36 void SchedulerManager::Execute(const std::string &uri, const int32_t userId, DistributedData::StoreMetaData &metaData)
37 {
38 if (!URIUtils::IsDataProxyURI(uri)) {
39 return;
40 }
41 metaData.user = std::to_string(userId);
42 auto delegate = DBDelegate::Create(metaData);
43 if (delegate == nullptr) {
44 ZLOGE("malloc fail %{public}s", URIUtils::Anonymous(uri).c_str());
45 return;
46 }
47 std::vector<Key> keys = RdbSubscriberManager::GetInstance().GetKeysByUri(uri);
48 for (auto const &key : keys) {
49 ExecuteSchedulerSQL(userId, metaData, key, delegate);
50 }
51 }
52
Execute(const Key & key,const int32_t userId,const DistributedData::StoreMetaData & metaData)53 void SchedulerManager::Execute(const Key &key, const int32_t userId, const DistributedData::StoreMetaData &metaData)
54 {
55 DistributedData::StoreMetaData meta = metaData;
56 meta.bundleName = key.bundleName;
57 meta.user = std::to_string(userId);
58 auto delegate = DBDelegate::Create(meta);
59 if (delegate == nullptr) {
60 ZLOGE("malloc fail %{public}s", URIUtils::Anonymous(key.uri).c_str());
61 return;
62 }
63 ExecuteSchedulerSQL(userId, meta, key, delegate);
64 }
65
Start(const Key & key,int32_t userId,const DistributedData::StoreMetaData & metaData)66 void SchedulerManager::Start(const Key &key, int32_t userId, const DistributedData::StoreMetaData &metaData)
67 {
68 {
69 std::lock_guard<std::mutex> lock(mutex_);
70 auto it = schedulerStatusCache_.find(key);
71 if (it == schedulerStatusCache_.end()) {
72 schedulerStatusCache_.emplace(key, true);
73 }
74 }
75 Execute(key, userId, metaData);
76 }
77
Stop(const Key & key)78 void SchedulerManager::Stop(const Key &key)
79 {
80 std::lock_guard<std::mutex> lock(mutex_);
81 RemoveTimer(key);
82 auto it = schedulerStatusCache_.find(key);
83 if (it != schedulerStatusCache_.end()) {
84 schedulerStatusCache_.erase(it);
85 }
86 }
87
Enable(const Key & key,int32_t userId,const DistributedData::StoreMetaData & metaData)88 void SchedulerManager::Enable(const Key &key, int32_t userId, const DistributedData::StoreMetaData &metaData)
89 {
90 Template tpl;
91 if (!TemplateManager::GetInstance().Get(key, userId, tpl) ||
92 tpl.scheduler_.empty() || tpl.scheduler_.find(REMIND_TIMER_FUNC) == std::string::npos) {
93 ZLOGE("find template scheduler failed, %{public}s, %{public}" PRId64 ", %{public}s",
94 URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
95 return;
96 }
97 bool isTimerStopped = false;
98 {
99 std::lock_guard<std::mutex> lock(mutex_);
100 auto it = schedulerStatusCache_.find(key);
101 if (it != schedulerStatusCache_.end()) {
102 it->second = true;
103 } else {
104 ZLOGW("enable key not found, %{public}s, %{public}" PRId64 ", %{public}s",
105 URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
106 }
107 auto timer = timerCache_.find(key);
108 if (timer == timerCache_.end()) {
109 isTimerStopped = true;
110 }
111 }
112 if (isTimerStopped) {
113 Execute(key, userId, metaData);
114 RdbSubscriberManager::GetInstance().EmitByKey(key, userId, metaData);
115 }
116 }
117
Disable(const Key & key)118 void SchedulerManager::Disable(const Key &key)
119 {
120 std::lock_guard<std::mutex> lock(mutex_);
121 auto it = schedulerStatusCache_.find(key);
122 if (it != schedulerStatusCache_.end()) {
123 it->second = false;
124 } else {
125 ZLOGW("disable key not found, %{public}s, %{public}" PRId64 ", %{public}s",
126 URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
127 }
128 }
129
SetTimerTask(uint64_t & timerId,const std::function<void ()> & callback,int64_t reminderTime)130 bool SchedulerManager::SetTimerTask(uint64_t &timerId, const std::function<void()> &callback,
131 int64_t reminderTime)
132 {
133 auto timerInfo = std::make_shared<TimerInfo>();
134 timerInfo->SetType(timerInfo->TIMER_TYPE_EXACT);
135 timerInfo->SetRepeat(false);
136 auto wantAgent = std::shared_ptr<AbilityRuntime::WantAgent::WantAgent>();
137 timerInfo->SetWantAgent(wantAgent);
138 timerInfo->SetCallbackInfo(callback);
139 timerId = TimeServiceClient::GetInstance()->CreateTimer(timerInfo);
140 if (timerId == 0) {
141 return false;
142 }
143 TimeServiceClient::GetInstance()->StartTimer(timerId, static_cast<uint64_t>(reminderTime));
144 return true;
145 }
146
DestoryTimerTask(int64_t timerId)147 void SchedulerManager::DestoryTimerTask(int64_t timerId)
148 {
149 if (timerId > 0) {
150 TimeServiceClient::GetInstance()->DestroyTimer(timerId);
151 }
152 }
153
ResetTimerTask(int64_t timerId,int64_t reminderTime)154 void SchedulerManager::ResetTimerTask(int64_t timerId, int64_t reminderTime)
155 {
156 // This start also means reset, new one will replace old one
157 TimeServiceClient::GetInstance()->StartTimer(timerId, static_cast<uint64_t>(reminderTime));
158 }
159
EraseTimerTaskId(const Key & key)160 int64_t SchedulerManager::EraseTimerTaskId(const Key &key)
161 {
162 int64_t timerId = -1;
163 std::lock_guard<std::mutex> lock(mutex_);
164 auto it = timerCache_.find(key);
165 if (it != timerCache_.end()) {
166 timerId = it->second;
167 timerCache_.erase(key);
168 }
169 return timerId;
170 }
171
GetSchedulerStatus(const Key & key)172 bool SchedulerManager::GetSchedulerStatus(const Key &key)
173 {
174 bool enabled = false;
175 std::lock_guard<std::mutex> lock(mutex_);
176 uint32_t lastSize = lastStatusCacheSize_;
177 uint32_t nowSize = schedulerStatusCache_.size();
178 if (nowSize != lastSize) {
179 lastStatusCacheSize_ = nowSize;
180 ZLOGI("size changed last %{public}d, now %{public}d", lastSize, nowSize);
181 }
182 auto it = schedulerStatusCache_.find(key);
183 if (it != schedulerStatusCache_.end()) {
184 enabled = it->second;
185 } else {
186 ZLOGW("key not found, %{public}s, %{public}" PRId64 ", %{public}s",
187 URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
188 }
189 return enabled;
190 }
191
SetTimer(const int32_t userId,DistributedData::StoreMetaData & metaData,const Key & key,int64_t reminderTime)192 void SchedulerManager::SetTimer(
193 const int32_t userId, DistributedData::StoreMetaData &metaData, const Key &key, int64_t reminderTime)
194 {
195 std::lock_guard<std::mutex> lock(mutex_);
196 if (executor_ == nullptr) {
197 ZLOGE("executor_ is nullptr");
198 return;
199 }
200 int64_t now = 0;
201 TimeServiceClient::GetInstance()->GetWallTimeMs(now);
202 if (reminderTime <= now || reminderTime - now >= MAX_MILLISECONDS) {
203 ZLOGE("invalid args, %{public}" PRId64 ", %{public}" PRId64 ", subId=%{public}" PRId64
204 ", bundleName=%{public}s.", reminderTime, now, key.subscriberId, key.bundleName.c_str());
205 return;
206 }
207 auto duration = reminderTime - now;
208 ZLOGI("the task will notify in %{public}" PRId64 " ms, %{public}" PRId64 ", %{public}s.",
209 duration, key.subscriberId, key.bundleName.c_str());
210 auto it = timerCache_.find(key);
211 if (it != timerCache_.end()) {
212 ZLOGD_MACRO("has current taskId: %{private}s, subscriberId is %{public}" PRId64 ", bundleName is %{public}s",
213 URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
214 auto timerId = it->second;
215 ResetTimerTask(timerId, reminderTime);
216 return;
217 }
218 auto callback = [key, metaData, userId, this]() {
219 ZLOGI("schedule notify start, uri is %{private}s, subscriberId is %{public}" PRId64 ", bundleName is "
220 "%{public}s", URIUtils::Anonymous(key.uri).c_str(),
221 key.subscriberId, key.bundleName.c_str());
222 int64_t timerId = EraseTimerTaskId(key);
223 DestoryTimerTask(timerId);
224 if (GetSchedulerStatus(key)) {
225 Execute(key, userId, metaData);
226 RdbSubscriberManager::GetInstance().EmitByKey(key, userId, metaData);
227 }
228 };
229 uint64_t timerId = 0;
230 if (!SetTimerTask(timerId, callback, reminderTime)) {
231 ZLOGE("create timer failed.");
232 return;
233 }
234 ZLOGI("create new task success, uri is %{public}s, subscriberId is %{public}" PRId64 ", bundleName is %{public}s",
235 URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
236 timerCache_.emplace(key, timerId);
237 }
238
ExecuteSchedulerSQL(const int32_t userId,DistributedData::StoreMetaData & metaData,const Key & key,std::shared_ptr<DBDelegate> delegate)239 void SchedulerManager::ExecuteSchedulerSQL(const int32_t userId, DistributedData::StoreMetaData &metaData,
240 const Key &key, std::shared_ptr<DBDelegate> delegate)
241 {
242 Template tpl;
243 if (!TemplateManager::GetInstance().Get(key, userId, tpl)) {
244 ZLOGE("template undefined, %{public}s, %{public}" PRId64 ", %{public}s",
245 URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
246 return;
247 }
248 if (tpl.scheduler_.empty()) {
249 ZLOGW("template scheduler_ empty, %{public}s, %{public}" PRId64 ", %{public}s",
250 URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
251 return;
252 }
253 GenRemindTimerFuncParams(userId, metaData, key, tpl.scheduler_);
254 auto resultSet = delegate->QuerySql(tpl.scheduler_);
255 if (resultSet == nullptr) {
256 ZLOGE("resultSet is nullptr, %{public}s, %{public}" PRId64 ", %{public}s",
257 URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
258 return;
259 }
260 int count;
261 int errCode = resultSet->GetRowCount(count);
262 if (errCode != E_OK || count == 0) {
263 ZLOGE("GetRowCount error, %{public}s, %{public}" PRId64 ", %{public}s, errorCode is %{public}d, count is "
264 "%{public}d",
265 URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str(), errCode,
266 count);
267 return;
268 }
269 }
270
GenRemindTimerFuncParams(const int32_t userId,DistributedData::StoreMetaData & metaData,const Key & key,std::string & schedulerSQL)271 void SchedulerManager::GenRemindTimerFuncParams(
272 const int32_t userId, DistributedData::StoreMetaData &metaData, const Key &key, std::string &schedulerSQL)
273 {
274 auto index = schedulerSQL.find(REMIND_TIMER_FUNC);
275 if (index == std::string::npos) {
276 ZLOGW("not find remindTimer, sql is %{public}s", URIUtils::Anonymous(schedulerSQL).c_str());
277 return;
278 }
279 index += REMIND_TIMER_FUNC_LEN;
280 std::string keyStr = "'" + metaData.dataDir + "', " + std::to_string(metaData.tokenId) + ", '" + key.uri + "', " +
281 std::to_string(key.subscriberId) + ", '" + key.bundleName + "', " + std::to_string(userId) +
282 ", '" + metaData.storeId + "', " + std::to_string(metaData.haMode) + ", ";
283 schedulerSQL.insert(index, keyStr);
284 return;
285 }
286
RemoveTimer(const Key & key)287 void SchedulerManager::RemoveTimer(const Key &key)
288 {
289 if (executor_ == nullptr) {
290 ZLOGE("executor_ is nullptr");
291 return;
292 }
293 auto it = timerCache_.find(key);
294 if (it != timerCache_.end()) {
295 ZLOGW("RemoveTimer %{public}s %{public}s %{public}" PRId64,
296 URIUtils::Anonymous(key.uri).c_str(), key.bundleName.c_str(), key.subscriberId);
297 DestoryTimerTask(it->second);
298 timerCache_.erase(key);
299 }
300 }
301
ClearTimer()302 void SchedulerManager::ClearTimer()
303 {
304 ZLOGI("Clear all timer");
305 std::lock_guard<std::mutex> lock(mutex_);
306 if (executor_ == nullptr) {
307 ZLOGE("executor_ is nullptr");
308 return;
309 }
310 auto it = timerCache_.begin();
311 while (it != timerCache_.end()) {
312 DestoryTimerTask(it->second);
313 it = timerCache_.erase(it);
314 }
315 }
316
SetExecutorPool(std::shared_ptr<ExecutorPool> executor)317 void SchedulerManager::SetExecutorPool(std::shared_ptr<ExecutorPool> executor)
318 {
319 executor_ = executor;
320 }
321
ReExecuteAll()322 void SchedulerManager::ReExecuteAll()
323 {
324 std::lock_guard<std::mutex> lock(mutex_);
325 for (const auto &item : timerCache_) {
326 // restart in 200ms
327 auto timerId = item.second;
328 int64_t currentTime = 0;
329 TimeServiceClient::GetInstance()->GetWallTimeMs(currentTime);
330 ResetTimerTask(timerId, currentTime + DELAYED_MILLISECONDS);
331 }
332 }
333 } // namespace OHOS::DataShare
334
335