1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "AutoSyncTimer"
16 #include "auto_sync_timer.h"
17
18 #include "dms_handler.h"
19 #include "ipc_skeleton.h"
20 #include "kvdb_service_client.h"
21 #include "log_print.h"
22
23 namespace OHOS::DistributedKv {
GetInstance()24 AutoSyncTimer &AutoSyncTimer::GetInstance()
25 {
26 static AutoSyncTimer instance;
27 return instance;
28 }
29
StartTimer()30 void AutoSyncTimer::StartTimer()
31 {
32 std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
33 if (forceSyncTaskId_ == TaskExecutor::INVALID_TASK_ID) {
34 forceSyncTaskId_ =
35 TaskExecutor::GetInstance().Schedule(std::chrono::milliseconds(FORCE_SYNC_INTERVAL), ProcessTask());
36 }
37 if (delaySyncTaskId_ == TaskExecutor::INVALID_TASK_ID) {
38 delaySyncTaskId_ =
39 TaskExecutor::GetInstance().Schedule(std::chrono::milliseconds(AUTO_SYNC_INTERVAL), ProcessTask());
40 } else {
41 delaySyncTaskId_ =
42 TaskExecutor::GetInstance().Reset(delaySyncTaskId_, std::chrono::milliseconds(AUTO_SYNC_INTERVAL));
43 }
44 }
45
DoAutoSync(const std::string & appId,std::set<StoreId> storeIds)46 void AutoSyncTimer::DoAutoSync(const std::string &appId, std::set<StoreId> storeIds)
47 {
48 AddSyncStores(appId, std::move(storeIds));
49 StartTimer();
50 }
51
AddSyncStores(const std::string & appId,std::set<StoreId> storeIds)52 void AutoSyncTimer::AddSyncStores(const std::string &appId, std::set<StoreId> storeIds)
53 {
54 stores_.Compute(appId, [&storeIds](const auto &key, std::vector<StoreId> &value) {
55 std::set<StoreId> tempStores(value.begin(), value.end());
56 for (auto it = storeIds.begin(); it != storeIds.end(); it++) {
57 if (tempStores.count(*it) == 0) {
58 value.push_back(*it);
59 }
60 }
61 return !value.empty();
62 });
63 }
64
HasSyncStores()65 bool AutoSyncTimer::HasSyncStores()
66 {
67 return !stores_.Empty();
68 }
69
GetStoreIds()70 std::map<std::string, std::vector<StoreId>> AutoSyncTimer::GetStoreIds()
71 {
72 std::map<std::string, std::vector<StoreId>> stores;
73 int count = SYNC_STORE_NUM;
74 stores_.EraseIf([&stores, &count](const std::string &key, std::vector<StoreId> &value) {
75 int size = value.size();
76 if (size <= count) {
77 stores.insert({ key, std::move(value) });
78 count = count - size;
79 return true;
80 }
81 auto &innerStore = stores[key];
82 auto it = value.begin();
83 while (it != value.end() && count > 0) {
84 innerStore.push_back(*it);
85 it++;
86 count--;
87 }
88 value.erase(value.begin(), it);
89 return value.empty();
90 });
91 return stores;
92 }
93
ProcessTask()94 std::function<void()> AutoSyncTimer::ProcessTask()
95 {
96 return [this]() {
97 StopTimer();
98 auto service = KVDBServiceClient::GetInstance();
99 if (service == nullptr) {
100 StartTimer();
101 return;
102 }
103 auto storeIds = GetStoreIds();
104 for (const auto &id : storeIds) {
105 auto res = HasCollaboration(id.first);
106 if (!res.first) {
107 continue;
108 }
109 KVDBService::SyncInfo syncInfo;
110 syncInfo.devices.push_back(res.second);
111 ZLOGD("DoSync appId:%{public}s store size:%{public}zu", id.first.c_str(), id.second.size());
112 for (const auto &storeId : id.second) {
113 service->Sync({ id.first }, storeId, DEFAULT_USER_ID, syncInfo);
114 }
115 }
116 if (HasSyncStores()) {
117 StartTimer();
118 }
119 };
120 }
121
HasCollaboration(const std::string & appId)122 std::pair<bool, std::string> AutoSyncTimer::HasCollaboration(const std::string &appId)
123 {
124 std::vector<DistributedSchedule::EventNotify> events;
125 auto status = DistributedSchedule::DmsHandler::GetInstance().GetDSchedEventInfo(
126 DistributedSchedule::DMS_COLLABORATION, events);
127 if (status != SUCCESS) {
128 ZLOGE("Get collaboration events failed, status:%{public}d", status);
129 return { false, "" };
130 }
131 for (const auto &event : events) {
132 if (event.srcBundleName_ == appId || event.destBundleName_ == appId) {
133 ZLOGI("The application is collaboration, srcBundleName:%{public}s, destBundleName:%{public}s",
134 event.srcBundleName_.c_str(), event.destBundleName_.c_str());
135 return { true, std::move(event.dstNetworkId_) };
136 }
137 }
138 ZLOGD("The application is not collaboration, appId:%{public}s", appId.c_str());
139 return { false, "" };
140 }
141
StopTimer()142 void AutoSyncTimer::StopTimer()
143 {
144 std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
145 TaskExecutor::GetInstance().Remove(forceSyncTaskId_);
146 TaskExecutor::GetInstance().Remove(delaySyncTaskId_);
147 forceSyncTaskId_ = TaskExecutor::INVALID_TASK_ID;
148 delaySyncTaskId_ = TaskExecutor::INVALID_TASK_ID;
149 }
150 } // namespace OHOS::DistributedKv