• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "SchedulerManager"
16 
17 #include "scheduler_manager.h"
18 
19 #include <cinttypes>
20 
21 #include "log_print.h"
22 #include "timer_info.h"
23 #include "utils.h"
24 #include "utils/anonymous.h"
25 #include "log_debug.h"
26 
27 namespace OHOS::DataShare {
28 static constexpr int64_t MAX_MILLISECONDS = 31536000000; // 365 days
29 static constexpr int32_t DELAYED_MILLISECONDS = 200;
GetInstance()30 SchedulerManager &SchedulerManager::GetInstance()
31 {
32     static SchedulerManager instance;
33     return instance;
34 }
35 
Execute(const std::string & uri,const int32_t userId,DistributedData::StoreMetaData & metaData)36 void SchedulerManager::Execute(const std::string &uri, const int32_t userId, DistributedData::StoreMetaData &metaData)
37 {
38     if (!URIUtils::IsDataProxyURI(uri)) {
39         return;
40     }
41     metaData.user = std::to_string(userId);
42     auto delegate = DBDelegate::Create(metaData);
43     if (delegate == nullptr) {
44         ZLOGE("malloc fail %{public}s", URIUtils::Anonymous(uri).c_str());
45         return;
46     }
47     std::vector<Key> keys = RdbSubscriberManager::GetInstance().GetKeysByUri(uri);
48     for (auto const &key : keys) {
49         ExecuteSchedulerSQL(userId, metaData, key, delegate);
50     }
51 }
52 
Execute(const Key & key,const int32_t userId,const DistributedData::StoreMetaData & metaData)53 void SchedulerManager::Execute(const Key &key, const int32_t userId, const DistributedData::StoreMetaData &metaData)
54 {
55     DistributedData::StoreMetaData meta = metaData;
56     meta.bundleName = key.bundleName;
57     meta.user = std::to_string(userId);
58     auto delegate = DBDelegate::Create(meta);
59     if (delegate == nullptr) {
60         ZLOGE("malloc fail %{public}s", URIUtils::Anonymous(key.uri).c_str());
61         return;
62     }
63     ExecuteSchedulerSQL(userId, meta, key, delegate);
64 }
65 
Start(const Key & key,int32_t userId,const DistributedData::StoreMetaData & metaData)66 void SchedulerManager::Start(const Key &key, int32_t userId, const DistributedData::StoreMetaData &metaData)
67 {
68     {
69         std::lock_guard<std::mutex> lock(mutex_);
70         auto it = schedulerStatusCache_.find(key);
71         if (it == schedulerStatusCache_.end()) {
72             schedulerStatusCache_.emplace(key, true);
73         }
74     }
75     Execute(key, userId, metaData);
76 }
77 
Stop(const Key & key)78 void SchedulerManager::Stop(const Key &key)
79 {
80     std::lock_guard<std::mutex> lock(mutex_);
81     RemoveTimer(key);
82     auto it = schedulerStatusCache_.find(key);
83     if (it != schedulerStatusCache_.end()) {
84         schedulerStatusCache_.erase(it);
85     }
86 }
87 
Enable(const Key & key,int32_t userId,const DistributedData::StoreMetaData & metaData)88 void SchedulerManager::Enable(const Key &key, int32_t userId, const DistributedData::StoreMetaData &metaData)
89 {
90     Template tpl;
91     if (!TemplateManager::GetInstance().Get(key, userId, tpl) ||
92         tpl.scheduler_.empty() || tpl.scheduler_.find(REMIND_TIMER_FUNC) == std::string::npos) {
93         ZLOGE("find template scheduler failed, %{public}s, %{public}" PRId64 ", %{public}s",
94             URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
95         return;
96     }
97     bool isTimerStopped = false;
98     {
99         std::lock_guard<std::mutex> lock(mutex_);
100         auto it = schedulerStatusCache_.find(key);
101         if (it != schedulerStatusCache_.end()) {
102             it->second = true;
103         } else {
104             ZLOGW("enable key not found, %{public}s, %{public}" PRId64 ", %{public}s",
105                 URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
106         }
107         auto timer = timerCache_.find(key);
108         if (timer == timerCache_.end()) {
109             isTimerStopped = true;
110         }
111     }
112     if (isTimerStopped) {
113         Execute(key, userId, metaData);
114         RdbSubscriberManager::GetInstance().EmitByKey(key, userId, metaData);
115     }
116 }
117 
Disable(const Key & key)118 void SchedulerManager::Disable(const Key &key)
119 {
120     std::lock_guard<std::mutex> lock(mutex_);
121     auto it = schedulerStatusCache_.find(key);
122     if (it != schedulerStatusCache_.end()) {
123         it->second = false;
124     } else {
125         ZLOGW("disable key not found, %{public}s, %{public}" PRId64 ", %{public}s",
126             URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
127     }
128 }
129 
SetTimerTask(uint64_t & timerId,const std::function<void ()> & callback,int64_t reminderTime)130 bool SchedulerManager::SetTimerTask(uint64_t &timerId, const std::function<void()> &callback,
131     int64_t reminderTime)
132 {
133     auto timerInfo = std::make_shared<TimerInfo>();
134     timerInfo->SetType(timerInfo->TIMER_TYPE_EXACT);
135     timerInfo->SetRepeat(false);
136     auto wantAgent = std::shared_ptr<AbilityRuntime::WantAgent::WantAgent>();
137     timerInfo->SetWantAgent(wantAgent);
138     timerInfo->SetCallbackInfo(callback);
139     timerId = TimeServiceClient::GetInstance()->CreateTimer(timerInfo);
140     if (timerId == 0) {
141         return false;
142     }
143     TimeServiceClient::GetInstance()->StartTimer(timerId, static_cast<uint64_t>(reminderTime));
144     return true;
145 }
146 
DestoryTimerTask(int64_t timerId)147 void SchedulerManager::DestoryTimerTask(int64_t timerId)
148 {
149     if (timerId > 0) {
150         TimeServiceClient::GetInstance()->DestroyTimer(timerId);
151     }
152 }
153 
ResetTimerTask(int64_t timerId,int64_t reminderTime)154 void SchedulerManager::ResetTimerTask(int64_t timerId, int64_t reminderTime)
155 {
156     // This start also means reset, new one will replace old one
157     TimeServiceClient::GetInstance()->StartTimer(timerId, static_cast<uint64_t>(reminderTime));
158 }
159 
EraseTimerTaskId(const Key & key)160 int64_t SchedulerManager::EraseTimerTaskId(const Key &key)
161 {
162     int64_t timerId = -1;
163     std::lock_guard<std::mutex> lock(mutex_);
164     auto it = timerCache_.find(key);
165     if (it != timerCache_.end()) {
166         timerId = it->second;
167         timerCache_.erase(key);
168     }
169     return timerId;
170 }
171 
GetSchedulerStatus(const Key & key)172 bool SchedulerManager::GetSchedulerStatus(const Key &key)
173 {
174     bool enabled = false;
175     std::lock_guard<std::mutex> lock(mutex_);
176     uint32_t lastSize = lastStatusCacheSize_;
177     uint32_t nowSize = schedulerStatusCache_.size();
178     if (nowSize != lastSize) {
179         lastStatusCacheSize_ = nowSize;
180         ZLOGI("size changed last %{public}d, now %{public}d", lastSize, nowSize);
181     }
182     auto it = schedulerStatusCache_.find(key);
183     if (it != schedulerStatusCache_.end()) {
184         enabled = it->second;
185     } else {
186         ZLOGW("key not found, %{public}s, %{public}" PRId64 ", %{public}s",
187             URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
188     }
189     return enabled;
190 }
191 
SetTimer(const int32_t userId,DistributedData::StoreMetaData & metaData,const Key & key,int64_t reminderTime)192 void SchedulerManager::SetTimer(
193     const int32_t userId, DistributedData::StoreMetaData &metaData, const Key &key, int64_t reminderTime)
194 {
195     std::lock_guard<std::mutex> lock(mutex_);
196     if (executor_ == nullptr) {
197         ZLOGE("executor_ is nullptr");
198         return;
199     }
200     int64_t now = 0;
201     TimeServiceClient::GetInstance()->GetWallTimeMs(now);
202     if (reminderTime <= now || reminderTime - now >= MAX_MILLISECONDS) {
203         ZLOGE("invalid args, %{public}" PRId64 ", %{public}" PRId64 ", subId=%{public}" PRId64
204             ", bundleName=%{public}s.", reminderTime, now, key.subscriberId, key.bundleName.c_str());
205         return;
206     }
207     auto duration = reminderTime - now;
208     ZLOGI("the task will notify in %{public}" PRId64 " ms, %{public}" PRId64 ", %{public}s.",
209           duration, key.subscriberId, key.bundleName.c_str());
210     auto it = timerCache_.find(key);
211     if (it != timerCache_.end()) {
212         ZLOGD_MACRO("has current taskId: %{private}s, subscriberId is %{public}" PRId64 ", bundleName is %{public}s",
213             URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
214         auto timerId = it->second;
215         ResetTimerTask(timerId, reminderTime);
216         return;
217     }
218     auto callback = [key, metaData, userId, this]() {
219         ZLOGI("schedule notify start, uri is %{private}s, subscriberId is %{public}" PRId64 ", bundleName is "
220             "%{public}s", URIUtils::Anonymous(key.uri).c_str(),
221             key.subscriberId, key.bundleName.c_str());
222         int64_t timerId = EraseTimerTaskId(key);
223         DestoryTimerTask(timerId);
224         if (GetSchedulerStatus(key)) {
225             Execute(key, userId, metaData);
226             RdbSubscriberManager::GetInstance().EmitByKey(key, userId, metaData);
227         }
228     };
229     uint64_t timerId = 0;
230     if (!SetTimerTask(timerId, callback, reminderTime)) {
231         ZLOGE("create timer failed.");
232         return;
233     }
234     ZLOGI("create new task success, uri is %{public}s, subscriberId is %{public}" PRId64 ", bundleName is %{public}s",
235         URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
236     timerCache_.emplace(key, timerId);
237 }
238 
ExecuteSchedulerSQL(const int32_t userId,DistributedData::StoreMetaData & metaData,const Key & key,std::shared_ptr<DBDelegate> delegate)239 void SchedulerManager::ExecuteSchedulerSQL(const int32_t userId, DistributedData::StoreMetaData &metaData,
240     const Key &key, std::shared_ptr<DBDelegate> delegate)
241 {
242     Template tpl;
243     if (!TemplateManager::GetInstance().Get(key, userId, tpl)) {
244         ZLOGE("template undefined, %{public}s, %{public}" PRId64 ", %{public}s",
245             URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
246         return;
247     }
248     if (tpl.scheduler_.empty()) {
249         ZLOGW("template scheduler_ empty, %{public}s, %{public}" PRId64 ", %{public}s",
250             URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
251         return;
252     }
253     GenRemindTimerFuncParams(userId, metaData, key, tpl.scheduler_);
254     auto resultSet = delegate->QuerySql(tpl.scheduler_);
255     if (resultSet == nullptr) {
256         ZLOGE("resultSet is nullptr, %{public}s, %{public}" PRId64 ", %{public}s",
257             URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
258         return;
259     }
260     int count;
261     int errCode = resultSet->GetRowCount(count);
262     if (errCode != E_OK || count == 0) {
263         ZLOGE("GetRowCount error, %{public}s, %{public}" PRId64 ", %{public}s, errorCode is %{public}d, count is "
264             "%{public}d",
265             URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str(), errCode,
266             count);
267         return;
268     }
269 }
270 
GenRemindTimerFuncParams(const int32_t userId,DistributedData::StoreMetaData & metaData,const Key & key,std::string & schedulerSQL)271 void SchedulerManager::GenRemindTimerFuncParams(
272     const int32_t userId, DistributedData::StoreMetaData &metaData, const Key &key, std::string &schedulerSQL)
273 {
274     auto index = schedulerSQL.find(REMIND_TIMER_FUNC);
275     if (index == std::string::npos) {
276         ZLOGW("not find remindTimer, sql is %{public}s", URIUtils::Anonymous(schedulerSQL).c_str());
277         return;
278     }
279     index += REMIND_TIMER_FUNC_LEN;
280     std::string keyStr = "'" + metaData.dataDir + "', " + std::to_string(metaData.tokenId) + ", '" + key.uri + "', " +
281                          std::to_string(key.subscriberId) + ", '" + key.bundleName + "', " + std::to_string(userId) +
282                          ", '" + metaData.storeId + "', " + std::to_string(metaData.haMode) + ", ";
283     schedulerSQL.insert(index, keyStr);
284     return;
285 }
286 
RemoveTimer(const Key & key)287 void SchedulerManager::RemoveTimer(const Key &key)
288 {
289     if (executor_ == nullptr) {
290         ZLOGE("executor_ is nullptr");
291         return;
292     }
293     auto it = timerCache_.find(key);
294     if (it != timerCache_.end()) {
295         ZLOGW("RemoveTimer %{public}s %{public}s %{public}" PRId64,
296             URIUtils::Anonymous(key.uri).c_str(), key.bundleName.c_str(), key.subscriberId);
297         DestoryTimerTask(it->second);
298         timerCache_.erase(key);
299     }
300 }
301 
ClearTimer()302 void SchedulerManager::ClearTimer()
303 {
304     ZLOGI("Clear all timer");
305     std::lock_guard<std::mutex> lock(mutex_);
306     if (executor_ == nullptr) {
307         ZLOGE("executor_ is nullptr");
308         return;
309     }
310     auto it = timerCache_.begin();
311     while (it != timerCache_.end()) {
312         DestoryTimerTask(it->second);
313         it = timerCache_.erase(it);
314     }
315 }
316 
SetExecutorPool(std::shared_ptr<ExecutorPool> executor)317 void SchedulerManager::SetExecutorPool(std::shared_ptr<ExecutorPool> executor)
318 {
319     executor_ = executor;
320 }
321 
ReExecuteAll()322 void SchedulerManager::ReExecuteAll()
323 {
324     std::lock_guard<std::mutex> lock(mutex_);
325     for (const auto &item : timerCache_) {
326         // restart in 200ms
327         auto timerId = item.second;
328         int64_t currentTime = 0;
329         TimeServiceClient::GetInstance()->GetWallTimeMs(currentTime);
330         ResetTimerTask(timerId, currentTime + DELAYED_MILLISECONDS);
331     }
332 }
333 } // namespace OHOS::DataShare
334 
335