• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "SchedulerManager"
16 
17 #include "scheduler_manager.h"
18 
19 #include <cinttypes>
20 
21 #include "log_print.h"
22 #include "timer_info.h"
23 #include "uri_utils.h"
24 #include "utils/anonymous.h"
25 #include "log_debug.h"
26 
27 namespace OHOS::DataShare {
28 static constexpr int64_t MAX_MILLISECONDS = 31536000000; // 365 days
29 static constexpr int32_t DELAYED_MILLISECONDS = 200;
GetInstance()30 SchedulerManager &SchedulerManager::GetInstance()
31 {
32     static SchedulerManager instance;
33     return instance;
34 }
35 
Execute(const std::string & uri,const int32_t userId,DistributedData::StoreMetaData & metaData)36 void SchedulerManager::Execute(const std::string &uri, const int32_t userId, DistributedData::StoreMetaData &metaData)
37 {
38     if (!URIUtils::IsDataProxyURI(uri)) {
39         return;
40     }
41     metaData.user = std::to_string(userId);
42     auto delegate = DBDelegate::Create(metaData);
43     if (delegate == nullptr) {
44         ZLOGE("malloc fail %{public}s", DistributedData::Anonymous::Change(uri).c_str());
45         return;
46     }
47     std::vector<Key> keys = RdbSubscriberManager::GetInstance().GetKeysByUri(uri);
48     for (auto const &key : keys) {
49         ExecuteSchedulerSQL(userId, metaData, key, delegate);
50     }
51 }
52 
Execute(const Key & key,const int32_t userId,const DistributedData::StoreMetaData & metaData)53 void SchedulerManager::Execute(const Key &key, const int32_t userId, const DistributedData::StoreMetaData &metaData)
54 {
55     DistributedData::StoreMetaData meta = metaData;
56     meta.bundleName = key.bundleName;
57     meta.user = std::to_string(userId);
58     auto delegate = DBDelegate::Create(meta);
59     if (delegate == nullptr) {
60         ZLOGE("malloc fail %{public}s", DistributedData::Anonymous::Change(key.uri).c_str());
61         return;
62     }
63     ExecuteSchedulerSQL(userId, meta, key, delegate);
64 }
65 
Start(const Key & key,int32_t userId,const DistributedData::StoreMetaData & metaData)66 void SchedulerManager::Start(const Key &key, int32_t userId, const DistributedData::StoreMetaData &metaData)
67 {
68     {
69         std::lock_guard<std::mutex> lock(mutex_);
70         auto it = schedulerStatusCache_.find(key);
71         if (it == schedulerStatusCache_.end()) {
72             schedulerStatusCache_.emplace(key, true);
73         }
74     }
75     Execute(key, userId, metaData);
76 }
77 
Stop(const Key & key)78 void SchedulerManager::Stop(const Key &key)
79 {
80     std::lock_guard<std::mutex> lock(mutex_);
81     RemoveTimer(key);
82     auto it = schedulerStatusCache_.find(key);
83     if (it != schedulerStatusCache_.end()) {
84         schedulerStatusCache_.erase(it);
85     }
86 }
87 
Enable(const Key & key,int32_t userId,const DistributedData::StoreMetaData & metaData)88 void SchedulerManager::Enable(const Key &key, int32_t userId, const DistributedData::StoreMetaData &metaData)
89 {
90     bool isTimerStopped = false;
91     {
92         std::lock_guard<std::mutex> lock(mutex_);
93         auto it = schedulerStatusCache_.find(key);
94         if (it != schedulerStatusCache_.end()) {
95             it->second = true;
96         }
97         auto timer = timerCache_.find(key);
98         if (timer == timerCache_.end()) {
99             isTimerStopped = true;
100         }
101     }
102     if (isTimerStopped) {
103         Execute(key, userId, metaData);
104         RdbSubscriberManager::GetInstance().EmitByKey(key, userId, metaData);
105     }
106 }
107 
Disable(const Key & key)108 void SchedulerManager::Disable(const Key &key)
109 {
110     std::lock_guard<std::mutex> lock(mutex_);
111     auto it = schedulerStatusCache_.find(key);
112     if (it != schedulerStatusCache_.end()) {
113         it->second = false;
114     }
115 }
116 
SetTimerTask(uint64_t & timerId,const std::function<void ()> & callback,int64_t reminderTime)117 bool SchedulerManager::SetTimerTask(uint64_t &timerId, const std::function<void()> &callback,
118     int64_t reminderTime)
119 {
120     auto timerInfo = std::make_shared<TimerInfo>();
121     timerInfo->SetType(timerInfo->TIMER_TYPE_EXACT);
122     timerInfo->SetRepeat(false);
123     auto wantAgent = std::shared_ptr<AbilityRuntime::WantAgent::WantAgent>();
124     timerInfo->SetWantAgent(wantAgent);
125     timerInfo->SetCallbackInfo(callback);
126     timerId = TimeServiceClient::GetInstance()->CreateTimer(timerInfo);
127     if (timerId == 0) {
128         return false;
129     }
130     TimeServiceClient::GetInstance()->StartTimer(timerId, static_cast<uint64_t>(reminderTime));
131     return true;
132 }
133 
DestoryTimerTask(int64_t timerId)134 void SchedulerManager::DestoryTimerTask(int64_t timerId)
135 {
136     if (timerId > 0) {
137         TimeServiceClient::GetInstance()->DestroyTimer(timerId);
138     }
139 }
140 
ResetTimerTask(int64_t timerId,int64_t reminderTime)141 void SchedulerManager::ResetTimerTask(int64_t timerId, int64_t reminderTime)
142 {
143     // This start also means reset, new one will replace old one
144     TimeServiceClient::GetInstance()->StartTimer(timerId, static_cast<uint64_t>(reminderTime));
145 }
146 
EraseTimerTaskId(const Key & key)147 int64_t SchedulerManager::EraseTimerTaskId(const Key &key)
148 {
149     int64_t timerId = -1;
150     std::lock_guard<std::mutex> lock(mutex_);
151     auto it = timerCache_.find(key);
152     if (it != timerCache_.end()) {
153         timerId = it->second;
154         timerCache_.erase(key);
155     }
156     return timerId;
157 }
158 
GetSchedulerStatus(const Key & key)159 bool SchedulerManager::GetSchedulerStatus(const Key &key)
160 {
161     bool enabled = false;
162     std::lock_guard<std::mutex> lock(mutex_);
163     auto it = schedulerStatusCache_.find(key);
164     if (it != schedulerStatusCache_.end()) {
165         enabled = it->second;
166     }
167     return enabled;
168 }
169 
SetTimer(const int32_t userId,DistributedData::StoreMetaData & metaData,const Key & key,int64_t reminderTime)170 void SchedulerManager::SetTimer(
171     const int32_t userId, DistributedData::StoreMetaData &metaData, const Key &key, int64_t reminderTime)
172 {
173     std::lock_guard<std::mutex> lock(mutex_);
174     if (executor_ == nullptr) {
175         ZLOGE("executor_ is nullptr");
176         return;
177     }
178     int64_t now = 0;
179     TimeServiceClient::GetInstance()->GetWallTimeMs(now);
180     if (reminderTime <= now || reminderTime - now >= MAX_MILLISECONDS) {
181         ZLOGE("invalid args, %{public}" PRId64 ", %{public}" PRId64 ", subId=%{public}" PRId64
182             ", bundleName=%{public}s.", reminderTime, now, key.subscriberId, key.bundleName.c_str());
183         return;
184     }
185     auto duration = reminderTime - now;
186     ZLOGI("the task will notify in %{public}" PRId64 " ms, %{public}" PRId64 ", %{public}s.",
187           duration, key.subscriberId, key.bundleName.c_str());
188     auto it = timerCache_.find(key);
189     if (it != timerCache_.end()) {
190         ZLOGD_MACRO("has current taskId: %{private}s, subscriberId is %{public}" PRId64 ", bundleName is %{public}s",
191             DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
192         auto timerId = it->second;
193         ResetTimerTask(timerId, reminderTime);
194         return;
195     }
196     auto callback = [key, metaData, userId, this]() {
197         ZLOGI("schedule notify start, uri is %{private}s, subscriberId is %{public}" PRId64 ", bundleName is "
198             "%{public}s", DistributedData::Anonymous::Change(key.uri).c_str(),
199             key.subscriberId, key.bundleName.c_str());
200         int64_t timerId = EraseTimerTaskId(key);
201         DestoryTimerTask(timerId);
202         if (GetSchedulerStatus(key)) {
203             Execute(key, userId, metaData);
204             RdbSubscriberManager::GetInstance().EmitByKey(key, userId, metaData);
205         }
206     };
207     uint64_t timerId = 0;
208     if (!SetTimerTask(timerId, callback, reminderTime)) {
209         ZLOGE("create timer failed.");
210         return;
211     }
212     ZLOGI("create new task success, uri is %{public}s, subscriberId is %{public}" PRId64 ", bundleName is %{public}s",
213         DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
214     timerCache_.emplace(key, timerId);
215 }
216 
ExecuteSchedulerSQL(const int32_t userId,DistributedData::StoreMetaData & metaData,const Key & key,std::shared_ptr<DBDelegate> delegate)217 void SchedulerManager::ExecuteSchedulerSQL(const int32_t userId, DistributedData::StoreMetaData &metaData,
218     const Key &key, std::shared_ptr<DBDelegate> delegate)
219 {
220     Template tpl;
221     if (!TemplateManager::GetInstance().Get(key, userId, tpl)) {
222         ZLOGE("template undefined, %{public}s, %{public}" PRId64 ", %{public}s",
223             DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
224         return;
225     }
226     if (tpl.scheduler_.empty()) {
227         ZLOGW("template scheduler_ empty, %{public}s, %{public}" PRId64 ", %{public}s",
228             DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
229         return;
230     }
231     GenRemindTimerFuncParams(userId, metaData, key, tpl.scheduler_);
232     auto resultSet = delegate->QuerySql(tpl.scheduler_);
233     if (resultSet == nullptr) {
234         ZLOGE("resultSet is nullptr, %{public}s, %{public}" PRId64 ", %{public}s",
235             DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
236         return;
237     }
238     int count;
239     int errCode = resultSet->GetRowCount(count);
240     if (errCode != E_OK || count == 0) {
241         ZLOGE("GetRowCount error, %{public}s, %{public}" PRId64 ", %{public}s, errorCode is %{public}d, count is "
242             "%{public}d",
243             DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str(), errCode,
244             count);
245         return;
246     }
247 }
248 
GenRemindTimerFuncParams(const int32_t userId,DistributedData::StoreMetaData & metaData,const Key & key,std::string & schedulerSQL)249 void SchedulerManager::GenRemindTimerFuncParams(
250     const int32_t userId, DistributedData::StoreMetaData &metaData, const Key &key, std::string &schedulerSQL)
251 {
252     auto index = schedulerSQL.find(REMIND_TIMER_FUNC);
253     if (index == std::string::npos) {
254         ZLOGW("not find remindTimer, sql is %{public}s", schedulerSQL.c_str());
255         return;
256     }
257     index += REMIND_TIMER_FUNC_LEN;
258     std::string keyStr = "'" + metaData.dataDir + "', " + std::to_string(metaData.tokenId) + ", '" + key.uri + "', " +
259                          std::to_string(key.subscriberId) + ", '" + key.bundleName + "', " + std::to_string(userId) +
260                          ", '" + metaData.storeId + "', " + std::to_string(metaData.haMode) + ", ";
261     schedulerSQL.insert(index, keyStr);
262     return;
263 }
264 
RemoveTimer(const Key & key)265 void SchedulerManager::RemoveTimer(const Key &key)
266 {
267     if (executor_ == nullptr) {
268         ZLOGE("executor_ is nullptr");
269         return;
270     }
271     auto it = timerCache_.find(key);
272     if (it != timerCache_.end()) {
273         ZLOGW("RemoveTimer %{public}s %{public}s %{public}" PRId64,
274             DistributedData::Anonymous::Change(key.uri).c_str(), key.bundleName.c_str(), key.subscriberId);
275         DestoryTimerTask(it->second);
276         timerCache_.erase(key);
277     }
278 }
279 
ClearTimer()280 void SchedulerManager::ClearTimer()
281 {
282     ZLOGI("Clear all timer");
283     std::lock_guard<std::mutex> lock(mutex_);
284     if (executor_ == nullptr) {
285         ZLOGE("executor_ is nullptr");
286         return;
287     }
288     auto it = timerCache_.begin();
289     while (it != timerCache_.end()) {
290         DestoryTimerTask(it->second);
291         it = timerCache_.erase(it);
292     }
293 }
294 
SetExecutorPool(std::shared_ptr<ExecutorPool> executor)295 void SchedulerManager::SetExecutorPool(std::shared_ptr<ExecutorPool> executor)
296 {
297     executor_ = executor;
298 }
299 
ReExecuteAll()300 void SchedulerManager::ReExecuteAll()
301 {
302     std::lock_guard<std::mutex> lock(mutex_);
303     for (const auto &item : timerCache_) {
304         // restart in 200ms
305         auto timerId = item.second;
306         int64_t currentTime = 0;
307         TimeServiceClient::GetInstance()->GetWallTimeMs(currentTime);
308         ResetTimerTask(timerId, currentTime + DELAYED_MILLISECONDS);
309     }
310 }
311 } // namespace OHOS::DataShare
312 
313