1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "db_status_adapter.h"
16
17 #include "db_common.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "param_check_utils.h"
21 #include "runtime_context.h"
22
23 namespace DistributedDB {
DBStatusAdapter()24 DBStatusAdapter::DBStatusAdapter()
25 : dbInfoHandle_(nullptr),
26 localSendLabelExchange_(false),
27 cacheLocalSendLabelExchange_(false)
28 {
29 }
30
SetDBInfoHandle(const std::shared_ptr<DBInfoHandle> & dbInfoHandle)31 void DBStatusAdapter::SetDBInfoHandle(const std::shared_ptr<DBInfoHandle> &dbInfoHandle)
32 {
33 {
34 std::lock_guard<std::mutex> autoLock(handleMutex_);
35 dbInfoHandle_ = dbInfoHandle;
36 }
37 NotifyRemoteOffline();
38 ClearAllCache();
39 LOGI("[DBStatusAdapter] SetDBInfoHandle Finish");
40 }
41
IsSupport(const std::string & devInfo)42 bool DBStatusAdapter::IsSupport(const std::string &devInfo)
43 {
44 std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
45 if (dbInfoHandle == nullptr) {
46 LOGD("[DBStatusAdapter] dbInfoHandle not set");
47 return false;
48 }
49 if (IsSendLabelExchange()) {
50 return false;
51 }
52 {
53 std::lock_guard<std::mutex> autoLock(supportMutex_);
54 if (remoteOptimizeInfo_.find(devInfo) != remoteOptimizeInfo_.end()) {
55 return remoteOptimizeInfo_[devInfo];
56 }
57 }
58 std::lock_guard<std::mutex> autoLock(supportMutex_);
59 remoteOptimizeInfo_[devInfo] = true;
60 return true;
61 }
62
GetLocalDBInfos(std::vector<DBInfo> & dbInfos)63 int DBStatusAdapter::GetLocalDBInfos(std::vector<DBInfo> &dbInfos)
64 {
65 std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
66 if (dbInfoHandle == nullptr) {
67 LOGD("[DBStatusAdapter][GetLocalDBInfos] handle not set");
68 return -E_NOT_SUPPORT;
69 }
70 if (IsSendLabelExchange()) {
71 return -E_NOT_SUPPORT;
72 }
73 std::lock_guard<std::mutex> autoLock(localInfoMutex_);
74 for (const auto &info: localDBInfos_) {
75 if (info.isNeedSync) {
76 dbInfos.push_back(info);
77 }
78 }
79 return E_OK;
80 }
81
SetDBStatusChangeCallback(const RemoteDBChangeCallback & remote,const LocalDBChangeCallback & local,const RemoteSupportChangeCallback & supportCallback)82 void DBStatusAdapter::SetDBStatusChangeCallback(const RemoteDBChangeCallback &remote,
83 const LocalDBChangeCallback &local, const RemoteSupportChangeCallback &supportCallback)
84 {
85 {
86 std::lock_guard<std::mutex> autoLock(callbackMutex_);
87 remoteCallback_ = remote;
88 localCallback_ = local;
89 supportCallback_ = supportCallback;
90 }
91 if (remote == nullptr || local == nullptr) {
92 return;
93 }
94 // avoid notify before set callback
95 bool triggerNow = false;
96 std::map<std::string, std::vector<DBInfo>> remoteDBInfos;
97 {
98 std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
99 remoteDBInfos = remoteDBInfos_;
100 triggerNow = !remoteDBInfos.empty();
101 }
102 bool triggerLocal = false;
103 {
104 std::lock_guard<std::mutex> autoLock(localInfoMutex_);
105 triggerLocal = !localDBInfos_.empty();
106 triggerNow |= triggerLocal;
107 }
108 // trigger callback async
109 if (!triggerNow) {
110 return;
111 }
112 int errCode = RuntimeContext::GetInstance()->ScheduleTask([triggerLocal, remoteDBInfos, remote, local]() {
113 for (const auto &[devInfo, dbInfos]: remoteDBInfos) {
114 remote(devInfo, dbInfos);
115 }
116 if (triggerLocal) {
117 local();
118 }
119 });
120 if (errCode != E_OK) {
121 LOGW("[DBStatusAdapter][SetDBStatusChangeCallback] Schedule Task failed! errCode = %d", errCode);
122 }
123 }
124
NotifyDBInfos(const DeviceInfos & devInfos,const std::vector<DBInfo> & dbInfos)125 void DBStatusAdapter::NotifyDBInfos(const DeviceInfos &devInfos, const std::vector<DBInfo> &dbInfos)
126 {
127 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, devInfos, dbInfos]() {
128 if (IsSendLabelExchange()) {
129 LOGW("[DBStatusAdapter][NotifyDBInfos] local dev no support communication optimize");
130 return;
131 }
132 bool isLocal = IsLocalDeviceId(devInfos.identifier);
133 if (!isLocal && !IsSupport(devInfos.identifier)) {
134 LOGW("[DBStatusAdapter][NotifyDBInfos] no support dev %s", STR_MASK(devInfos.identifier));
135 return;
136 }
137 bool isChange = LoadIntoCache(isLocal, devInfos, dbInfos);
138 std::lock_guard<std::mutex> autoLock(callbackMutex_);
139 if (!isLocal && remoteCallback_ != nullptr) {
140 remoteCallback_(devInfos.identifier, dbInfos);
141 } else if (isLocal && localCallback_ != nullptr && isChange) {
142 localCallback_();
143 }
144 });
145 if (errCode != E_OK) {
146 LOGW("[DBStatusAdapter][NotifyDBInfos] Schedule Task failed! errCode = %d", errCode);
147 }
148 }
149
TargetOffline(const std::string & device)150 void DBStatusAdapter::TargetOffline(const std::string &device)
151 {
152 std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
153 if (dbInfoHandle == nullptr) {
154 return;
155 }
156 {
157 std::lock_guard<std::mutex> autoLock(supportMutex_);
158 if (remoteOptimizeInfo_.find(device) != remoteOptimizeInfo_.end()) {
159 remoteOptimizeInfo_.erase(device);
160 }
161 }
162 RuntimeContext::GetInstance()->RemoveRemoteSubscribe(device);
163 }
164
IsNeedAutoSync(const std::string & userId,const std::string & appId,const std::string & storeId,const std::string & devInfo)165 bool DBStatusAdapter::IsNeedAutoSync(const std::string &userId, const std::string &appId, const std::string &storeId,
166 const std::string &devInfo)
167 {
168 std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
169 if (dbInfoHandle == nullptr || IsSendLabelExchange()) {
170 return true;
171 }
172 return dbInfoHandle->IsNeedAutoSync(userId, appId, storeId, { devInfo });
173 }
174
SetRemoteOptimizeCommunication(const std::string & dev,bool optimize)175 void DBStatusAdapter::SetRemoteOptimizeCommunication(const std::string &dev, bool optimize)
176 {
177 std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
178 if (dbInfoHandle == nullptr) {
179 return;
180 }
181 bool triggerLocalCallback = false;
182 {
183 std::lock_guard<std::mutex> autoLock(supportMutex_);
184 if (remoteOptimizeInfo_.find(dev) == remoteOptimizeInfo_.end()) {
185 remoteOptimizeInfo_[dev] = optimize;
186 return;
187 }
188 if (remoteOptimizeInfo_[dev] == optimize) {
189 return;
190 }
191 if (remoteOptimizeInfo_[dev] && !optimize) {
192 triggerLocalCallback = true;
193 }
194 remoteOptimizeInfo_[dev] = optimize;
195 }
196 if (!triggerLocalCallback) {
197 return;
198 }
199 LOGI("[DBStatusAdapter][SetRemoteOptimizeCommunication] remote dev %.3s optimize change!", dev.c_str());
200 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, dev]() {
201 RemoteSupportChangeCallback callback;
202 {
203 std::lock_guard<std::mutex> autoLock(callbackMutex_);
204 callback = supportCallback_;
205 }
206 if (callback) {
207 callback(dev);
208 }
209 });
210 if (errCode != E_OK) {
211 LOGW("[DBStatusAdapter][SetRemoteOptimizeCommunication] Schedule Task failed! errCode = %d", errCode);
212 }
213 }
214
IsSendLabelExchange()215 bool DBStatusAdapter::IsSendLabelExchange()
216 {
217 std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
218 if (dbInfoHandle == nullptr) {
219 return true;
220 }
221 {
222 std::lock_guard<std::mutex> autoLock(supportMutex_);
223 if (cacheLocalSendLabelExchange_) {
224 return localSendLabelExchange_;
225 }
226 }
227 bool isSupport = dbInfoHandle->IsSupport();
228 std::lock_guard<std::mutex> autoLock(supportMutex_);
229 LOGD("[DBStatusAdapter][IsSendLabelExchange] local support status is %d", isSupport);
230 localSendLabelExchange_ = !isSupport;
231 cacheLocalSendLabelExchange_ = true;
232 return localSendLabelExchange_;
233 }
234
GetDBInfoHandle() const235 std::shared_ptr<DBInfoHandle> DBStatusAdapter::GetDBInfoHandle() const
236 {
237 std::lock_guard<std::mutex> autoLock(handleMutex_);
238 return dbInfoHandle_;
239 }
240
LoadIntoCache(bool isLocal,const DeviceInfos & devInfos,const std::vector<DBInfo> & dbInfos)241 bool DBStatusAdapter::LoadIntoCache(bool isLocal, const DeviceInfos &devInfos, const std::vector<DBInfo> &dbInfos)
242 {
243 if (isLocal) {
244 std::lock_guard<std::mutex> autoLock(localInfoMutex_);
245 return MergeDBInfos(dbInfos, localDBInfos_);
246 }
247 std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
248 if (remoteDBInfos_.find(devInfos.identifier) == remoteDBInfos_.end()) {
249 remoteDBInfos_.insert({devInfos.identifier, {}});
250 }
251 return MergeDBInfos(dbInfos, remoteDBInfos_[devInfos.identifier]);
252 }
253
ClearAllCache()254 void DBStatusAdapter::ClearAllCache()
255 {
256 {
257 std::lock_guard<std::mutex> autoLock(localInfoMutex_);
258 localDBInfos_.clear();
259 }
260 {
261 std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
262 remoteDBInfos_.clear();
263 }
264 std::lock_guard<std::mutex> autoLock(supportMutex_);
265 remoteOptimizeInfo_.clear();
266 cacheLocalSendLabelExchange_ = false;
267 localSendLabelExchange_ = false;
268 LOGD("[DBStatusAdapter] ClearAllCache ok");
269 }
270
NotifyRemoteOffline()271 void DBStatusAdapter::NotifyRemoteOffline()
272 {
273 std::map<std::string, std::vector<DBInfo>> remoteOnlineDBInfos;
274 {
275 std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
276 for (const auto &[dev, dbInfos]: remoteDBInfos_) {
277 for (const auto &dbInfo: dbInfos) {
278 if (dbInfo.isNeedSync) {
279 DBInfo info = dbInfo;
280 info.isNeedSync = false;
281 remoteOnlineDBInfos[dev].push_back(info);
282 }
283 }
284 }
285 }
286 RemoteDBChangeCallback callback;
287 {
288 std::lock_guard<std::mutex> autoLock(callbackMutex_);
289 callback = remoteCallback_;
290 }
291 if (callback == nullptr || remoteOnlineDBInfos.empty()) {
292 LOGD("[DBStatusAdapter] no need to notify offline when reset handle");
293 return;
294 }
295 for (const auto &[dev, dbInfos]: remoteOnlineDBInfos) {
296 callback(dev, dbInfos);
297 }
298 LOGD("[DBStatusAdapter] Notify offline ok");
299 }
300
MergeDBInfos(const std::vector<DBInfo> & srcDbInfos,std::vector<DBInfo> & dstDbInfos)301 bool DBStatusAdapter::MergeDBInfos(const std::vector<DBInfo> &srcDbInfos, std::vector<DBInfo> &dstDbInfos)
302 {
303 bool isDbInfoChange = false;
304 for (const auto &srcInfo: srcDbInfos) {
305 if (!ParamCheckUtils::CheckStoreParameter(srcInfo.storeId, srcInfo.appId, srcInfo.userId)) {
306 continue;
307 }
308 auto res = std::find_if(dstDbInfos.begin(), dstDbInfos.end(), [&srcInfo](const DBInfo &dstInfo) {
309 return srcInfo.appId == dstInfo.appId && srcInfo.userId == dstInfo.userId &&
310 srcInfo.storeId == dstInfo.storeId && srcInfo.syncDualTupleMode == dstInfo.syncDualTupleMode;
311 });
312 if (res == dstDbInfos.end()) {
313 dstDbInfos.push_back(srcInfo);
314 isDbInfoChange = true;
315 } else if (res->isNeedSync != srcInfo.isNeedSync) {
316 res->isNeedSync = srcInfo.isNeedSync;
317 isDbInfoChange = true;
318 }
319 }
320 return isDbInfoChange;
321 }
322
GetLocalDeviceId(std::string & deviceId)323 int DBStatusAdapter::GetLocalDeviceId(std::string &deviceId)
324 {
325 ICommunicatorAggregator *communicatorAggregator = nullptr;
326 int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
327 if (errCode != E_OK) {
328 return errCode;
329 }
330 return communicatorAggregator->GetLocalIdentity(deviceId);
331 }
332
IsLocalDeviceId(const std::string & deviceId)333 bool DBStatusAdapter::IsLocalDeviceId(const std::string &deviceId)
334 {
335 std::string localId;
336 if (GetLocalDeviceId(localId) != E_OK) {
337 return false;
338 }
339 return deviceId == localId;
340 }
341 }