1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "SchedulerManager"
16
17 #include "scheduler_manager.h"
18
19 #include <cinttypes>
20
21 #include "log_print.h"
22 #include "timer_info.h"
23 #include "uri_utils.h"
24 #include "utils/anonymous.h"
25 #include "log_debug.h"
26
27 namespace OHOS::DataShare {
28 static constexpr int64_t MAX_MILLISECONDS = 31536000000; // 365 days
29 static constexpr int32_t DELAYED_MILLISECONDS = 200;
GetInstance()30 SchedulerManager &SchedulerManager::GetInstance()
31 {
32 static SchedulerManager instance;
33 return instance;
34 }
35
Execute(const std::string & uri,const int32_t userId,DistributedData::StoreMetaData & metaData)36 void SchedulerManager::Execute(const std::string &uri, const int32_t userId, DistributedData::StoreMetaData &metaData)
37 {
38 if (!URIUtils::IsDataProxyURI(uri)) {
39 return;
40 }
41 metaData.user = std::to_string(userId);
42 auto delegate = DBDelegate::Create(metaData);
43 if (delegate == nullptr) {
44 ZLOGE("malloc fail %{public}s", DistributedData::Anonymous::Change(uri).c_str());
45 return;
46 }
47 std::vector<Key> keys = RdbSubscriberManager::GetInstance().GetKeysByUri(uri);
48 for (auto const &key : keys) {
49 ExecuteSchedulerSQL(userId, metaData, key, delegate);
50 }
51 }
52
Execute(const Key & key,const int32_t userId,const DistributedData::StoreMetaData & metaData)53 void SchedulerManager::Execute(const Key &key, const int32_t userId, const DistributedData::StoreMetaData &metaData)
54 {
55 DistributedData::StoreMetaData meta = metaData;
56 meta.bundleName = key.bundleName;
57 meta.user = std::to_string(userId);
58 auto delegate = DBDelegate::Create(meta);
59 if (delegate == nullptr) {
60 ZLOGE("malloc fail %{public}s", DistributedData::Anonymous::Change(key.uri).c_str());
61 return;
62 }
63 ExecuteSchedulerSQL(userId, meta, key, delegate);
64 }
65
Start(const Key & key,int32_t userId,const DistributedData::StoreMetaData & metaData)66 void SchedulerManager::Start(const Key &key, int32_t userId, const DistributedData::StoreMetaData &metaData)
67 {
68 {
69 std::lock_guard<std::mutex> lock(mutex_);
70 auto it = schedulerStatusCache_.find(key);
71 if (it == schedulerStatusCache_.end()) {
72 schedulerStatusCache_.emplace(key, true);
73 }
74 }
75 Execute(key, userId, metaData);
76 }
77
Stop(const Key & key)78 void SchedulerManager::Stop(const Key &key)
79 {
80 std::lock_guard<std::mutex> lock(mutex_);
81 RemoveTimer(key);
82 auto it = schedulerStatusCache_.find(key);
83 if (it != schedulerStatusCache_.end()) {
84 schedulerStatusCache_.erase(it);
85 }
86 }
87
Enable(const Key & key,int32_t userId,const DistributedData::StoreMetaData & metaData)88 void SchedulerManager::Enable(const Key &key, int32_t userId, const DistributedData::StoreMetaData &metaData)
89 {
90 bool isTimerStopped = false;
91 {
92 std::lock_guard<std::mutex> lock(mutex_);
93 auto it = schedulerStatusCache_.find(key);
94 if (it != schedulerStatusCache_.end()) {
95 it->second = true;
96 }
97 auto timer = timerCache_.find(key);
98 if (timer == timerCache_.end()) {
99 isTimerStopped = true;
100 }
101 }
102 if (isTimerStopped) {
103 Execute(key, userId, metaData);
104 RdbSubscriberManager::GetInstance().EmitByKey(key, userId, metaData);
105 }
106 }
107
Disable(const Key & key)108 void SchedulerManager::Disable(const Key &key)
109 {
110 std::lock_guard<std::mutex> lock(mutex_);
111 auto it = schedulerStatusCache_.find(key);
112 if (it != schedulerStatusCache_.end()) {
113 it->second = false;
114 }
115 }
116
SetTimerTask(uint64_t & timerId,const std::function<void ()> & callback,int64_t reminderTime)117 bool SchedulerManager::SetTimerTask(uint64_t &timerId, const std::function<void()> &callback,
118 int64_t reminderTime)
119 {
120 auto timerInfo = std::make_shared<TimerInfo>();
121 timerInfo->SetType(timerInfo->TIMER_TYPE_EXACT);
122 timerInfo->SetRepeat(false);
123 auto wantAgent = std::shared_ptr<AbilityRuntime::WantAgent::WantAgent>();
124 timerInfo->SetWantAgent(wantAgent);
125 timerInfo->SetCallbackInfo(callback);
126 timerId = TimeServiceClient::GetInstance()->CreateTimer(timerInfo);
127 if (timerId == 0) {
128 return false;
129 }
130 TimeServiceClient::GetInstance()->StartTimer(timerId, static_cast<uint64_t>(reminderTime));
131 return true;
132 }
133
DestoryTimerTask(int64_t timerId)134 void SchedulerManager::DestoryTimerTask(int64_t timerId)
135 {
136 if (timerId > 0) {
137 TimeServiceClient::GetInstance()->DestroyTimer(timerId);
138 }
139 }
140
ResetTimerTask(int64_t timerId,int64_t reminderTime)141 void SchedulerManager::ResetTimerTask(int64_t timerId, int64_t reminderTime)
142 {
143 // This start also means reset, new one will replace old one
144 TimeServiceClient::GetInstance()->StartTimer(timerId, static_cast<uint64_t>(reminderTime));
145 }
146
EraseTimerTaskId(const Key & key)147 int64_t SchedulerManager::EraseTimerTaskId(const Key &key)
148 {
149 int64_t timerId = -1;
150 std::lock_guard<std::mutex> lock(mutex_);
151 auto it = timerCache_.find(key);
152 if (it != timerCache_.end()) {
153 timerId = it->second;
154 timerCache_.erase(key);
155 }
156 return timerId;
157 }
158
GetSchedulerStatus(const Key & key)159 bool SchedulerManager::GetSchedulerStatus(const Key &key)
160 {
161 bool enabled = false;
162 std::lock_guard<std::mutex> lock(mutex_);
163 auto it = schedulerStatusCache_.find(key);
164 if (it != schedulerStatusCache_.end()) {
165 enabled = it->second;
166 }
167 return enabled;
168 }
169
SetTimer(const int32_t userId,DistributedData::StoreMetaData & metaData,const Key & key,int64_t reminderTime)170 void SchedulerManager::SetTimer(
171 const int32_t userId, DistributedData::StoreMetaData &metaData, const Key &key, int64_t reminderTime)
172 {
173 std::lock_guard<std::mutex> lock(mutex_);
174 if (executor_ == nullptr) {
175 ZLOGE("executor_ is nullptr");
176 return;
177 }
178 int64_t now = 0;
179 TimeServiceClient::GetInstance()->GetWallTimeMs(now);
180 if (reminderTime <= now || reminderTime - now >= MAX_MILLISECONDS) {
181 ZLOGE("invalid args, %{public}" PRId64 ", %{public}" PRId64 ", subId=%{public}" PRId64
182 ", bundleName=%{public}s.", reminderTime, now, key.subscriberId, key.bundleName.c_str());
183 return;
184 }
185 auto duration = reminderTime - now;
186 ZLOGI("the task will notify in %{public}" PRId64 " ms, %{public}" PRId64 ", %{public}s.",
187 duration, key.subscriberId, key.bundleName.c_str());
188 auto it = timerCache_.find(key);
189 if (it != timerCache_.end()) {
190 ZLOGD_MACRO("has current taskId: %{private}s, subscriberId is %{public}" PRId64 ", bundleName is %{public}s",
191 DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
192 auto timerId = it->second;
193 ResetTimerTask(timerId, reminderTime);
194 return;
195 }
196 auto callback = [key, metaData, userId, this]() {
197 ZLOGI("schedule notify start, uri is %{private}s, subscriberId is %{public}" PRId64 ", bundleName is "
198 "%{public}s", DistributedData::Anonymous::Change(key.uri).c_str(),
199 key.subscriberId, key.bundleName.c_str());
200 int64_t timerId = EraseTimerTaskId(key);
201 DestoryTimerTask(timerId);
202 if (GetSchedulerStatus(key)) {
203 Execute(key, userId, metaData);
204 RdbSubscriberManager::GetInstance().EmitByKey(key, userId, metaData);
205 }
206 };
207 uint64_t timerId = 0;
208 if (!SetTimerTask(timerId, callback, reminderTime)) {
209 ZLOGE("create timer failed.");
210 return;
211 }
212 ZLOGI("create new task success, uri is %{public}s, subscriberId is %{public}" PRId64 ", bundleName is %{public}s",
213 DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
214 timerCache_.emplace(key, timerId);
215 }
216
ExecuteSchedulerSQL(const int32_t userId,DistributedData::StoreMetaData & metaData,const Key & key,std::shared_ptr<DBDelegate> delegate)217 void SchedulerManager::ExecuteSchedulerSQL(const int32_t userId, DistributedData::StoreMetaData &metaData,
218 const Key &key, std::shared_ptr<DBDelegate> delegate)
219 {
220 Template tpl;
221 if (!TemplateManager::GetInstance().Get(key, userId, tpl)) {
222 ZLOGE("template undefined, %{public}s, %{public}" PRId64 ", %{public}s",
223 DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
224 return;
225 }
226 if (tpl.scheduler_.empty()) {
227 ZLOGW("template scheduler_ empty, %{public}s, %{public}" PRId64 ", %{public}s",
228 DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
229 return;
230 }
231 GenRemindTimerFuncParams(userId, metaData, key, tpl.scheduler_);
232 auto resultSet = delegate->QuerySql(tpl.scheduler_);
233 if (resultSet == nullptr) {
234 ZLOGE("resultSet is nullptr, %{public}s, %{public}" PRId64 ", %{public}s",
235 DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
236 return;
237 }
238 int count;
239 int errCode = resultSet->GetRowCount(count);
240 if (errCode != E_OK || count == 0) {
241 ZLOGE("GetRowCount error, %{public}s, %{public}" PRId64 ", %{public}s, errorCode is %{public}d, count is "
242 "%{public}d",
243 DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str(), errCode,
244 count);
245 return;
246 }
247 }
248
GenRemindTimerFuncParams(const int32_t userId,DistributedData::StoreMetaData & metaData,const Key & key,std::string & schedulerSQL)249 void SchedulerManager::GenRemindTimerFuncParams(
250 const int32_t userId, DistributedData::StoreMetaData &metaData, const Key &key, std::string &schedulerSQL)
251 {
252 auto index = schedulerSQL.find(REMIND_TIMER_FUNC);
253 if (index == std::string::npos) {
254 ZLOGW("not find remindTimer, sql is %{public}s", schedulerSQL.c_str());
255 return;
256 }
257 index += REMIND_TIMER_FUNC_LEN;
258 std::string keyStr = "'" + metaData.dataDir + "', " + std::to_string(metaData.tokenId) + ", '" + key.uri + "', " +
259 std::to_string(key.subscriberId) + ", '" + key.bundleName + "', " + std::to_string(userId) +
260 ", '" + metaData.storeId + "', " + std::to_string(metaData.haMode) + ", ";
261 schedulerSQL.insert(index, keyStr);
262 return;
263 }
264
RemoveTimer(const Key & key)265 void SchedulerManager::RemoveTimer(const Key &key)
266 {
267 if (executor_ == nullptr) {
268 ZLOGE("executor_ is nullptr");
269 return;
270 }
271 auto it = timerCache_.find(key);
272 if (it != timerCache_.end()) {
273 ZLOGW("RemoveTimer %{public}s %{public}s %{public}" PRId64,
274 DistributedData::Anonymous::Change(key.uri).c_str(), key.bundleName.c_str(), key.subscriberId);
275 DestoryTimerTask(it->second);
276 timerCache_.erase(key);
277 }
278 }
279
ClearTimer()280 void SchedulerManager::ClearTimer()
281 {
282 ZLOGI("Clear all timer");
283 std::lock_guard<std::mutex> lock(mutex_);
284 if (executor_ == nullptr) {
285 ZLOGE("executor_ is nullptr");
286 return;
287 }
288 auto it = timerCache_.begin();
289 while (it != timerCache_.end()) {
290 DestoryTimerTask(it->second);
291 it = timerCache_.erase(it);
292 }
293 }
294
SetExecutorPool(std::shared_ptr<ExecutorPool> executor)295 void SchedulerManager::SetExecutorPool(std::shared_ptr<ExecutorPool> executor)
296 {
297 executor_ = executor;
298 }
299
ReExecuteAll()300 void SchedulerManager::ReExecuteAll()
301 {
302 std::lock_guard<std::mutex> lock(mutex_);
303 for (const auto &item : timerCache_) {
304 // restart in 200ms
305 auto timerId = item.second;
306 int64_t currentTime = 0;
307 TimeServiceClient::GetInstance()->GetWallTimeMs(currentTime);
308 ResetTimerTask(timerId, currentTime + DELAYED_MILLISECONDS);
309 }
310 }
311 } // namespace OHOS::DataShare
312
313