• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "db_status_adapter.h"
16 
17 #include "db_common.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "param_check_utils.h"
21 #include "runtime_context.h"
22 
23 namespace DistributedDB {
DBStatusAdapter()24 DBStatusAdapter::DBStatusAdapter()
25     : dbInfoHandle_(nullptr),
26       localSendLabelExchange_(false),
27       cacheLocalSendLabelExchange_(false)
28 {
29 }
30 
SetDBInfoHandle(const std::shared_ptr<DBInfoHandle> & dbInfoHandle)31 void DBStatusAdapter::SetDBInfoHandle(const std::shared_ptr<DBInfoHandle> &dbInfoHandle)
32 {
33     {
34         std::lock_guard<std::mutex> autoLock(handleMutex_);
35         dbInfoHandle_ = dbInfoHandle;
36     }
37     NotifyRemoteOffline();
38     ClearAllCache();
39     LOGI("[DBStatusAdapter] SetDBInfoHandle Finish");
40 }
41 
IsSupport(const std::string & devInfo)42 bool DBStatusAdapter::IsSupport(const std::string &devInfo)
43 {
44     std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
45     if (dbInfoHandle == nullptr) {
46         LOGD("[DBStatusAdapter] dbInfoHandle not set");
47         return false;
48     }
49     if (IsSendLabelExchange()) {
50         return false;
51     }
52     {
53         std::lock_guard<std::mutex> autoLock(supportMutex_);
54         if (remoteOptimizeInfo_.find(devInfo) != remoteOptimizeInfo_.end()) {
55             return remoteOptimizeInfo_[devInfo];
56         }
57     }
58     std::lock_guard<std::mutex> autoLock(supportMutex_);
59     remoteOptimizeInfo_[devInfo] = true;
60     return true;
61 }
62 
GetLocalDBInfos(std::vector<DBInfo> & dbInfos)63 int DBStatusAdapter::GetLocalDBInfos(std::vector<DBInfo> &dbInfos)
64 {
65     std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
66     if (dbInfoHandle == nullptr) {
67         LOGD("[DBStatusAdapter][GetLocalDBInfos] handle not set");
68         return -E_NOT_SUPPORT;
69     }
70     if (IsSendLabelExchange()) {
71         return -E_NOT_SUPPORT;
72     }
73     std::lock_guard<std::mutex> autoLock(localInfoMutex_);
74     for (const auto &info: localDBInfos_) {
75         if (info.isNeedSync) {
76             dbInfos.push_back(info);
77         }
78     }
79     return E_OK;
80 }
81 
SetDBStatusChangeCallback(const RemoteDBChangeCallback & remote,const LocalDBChangeCallback & local,const RemoteSupportChangeCallback & supportCallback)82 void DBStatusAdapter::SetDBStatusChangeCallback(const RemoteDBChangeCallback &remote,
83     const LocalDBChangeCallback &local, const RemoteSupportChangeCallback &supportCallback)
84 {
85     {
86         std::lock_guard<std::mutex> autoLock(callbackMutex_);
87         remoteCallback_ = remote;
88         localCallback_ = local;
89         supportCallback_ = supportCallback;
90     }
91     if (remote == nullptr || local == nullptr) {
92         return;
93     }
94     // avoid notify before set callback
95     bool triggerNow = false;
96     std::map<std::string, std::vector<DBInfo>> remoteDBInfos;
97     {
98         std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
99         remoteDBInfos = remoteDBInfos_;
100         triggerNow = !remoteDBInfos.empty();
101     }
102     bool triggerLocal = false;
103     {
104         std::lock_guard<std::mutex> autoLock(localInfoMutex_);
105         triggerLocal = !localDBInfos_.empty();
106         triggerNow |= triggerLocal;
107     }
108     // trigger callback async
109     if (!triggerNow) {
110         return;
111     }
112     int errCode = RuntimeContext::GetInstance()->ScheduleTask([triggerLocal, remoteDBInfos, remote, local]() {
113         for (const auto &[devInfo, dbInfos]: remoteDBInfos) {
114             remote(devInfo, dbInfos);
115         }
116         if (triggerLocal) {
117             local();
118         }
119     });
120     if (errCode != E_OK) {
121         LOGW("[DBStatusAdapter][SetDBStatusChangeCallback] Schedule Task failed! errCode = %d", errCode);
122     }
123 }
124 
NotifyDBInfos(const DeviceInfos & devInfos,const std::vector<DBInfo> & dbInfos)125 void DBStatusAdapter::NotifyDBInfos(const DeviceInfos &devInfos, const std::vector<DBInfo> &dbInfos)
126 {
127     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, devInfos, dbInfos]() {
128         if (IsSendLabelExchange()) {
129             LOGW("[DBStatusAdapter][NotifyDBInfos] local dev no support communication optimize");
130             return;
131         }
132         bool isLocal = IsLocalDeviceId(devInfos.identifier);
133         if (!isLocal && !IsSupport(devInfos.identifier)) {
134             LOGW("[DBStatusAdapter][NotifyDBInfos] no support dev %s", STR_MASK(devInfos.identifier));
135             return;
136         }
137         bool isChange = LoadIntoCache(isLocal, devInfos, dbInfos);
138         std::lock_guard<std::mutex> autoLock(callbackMutex_);
139         if (!isLocal && remoteCallback_ != nullptr) {
140             remoteCallback_(devInfos.identifier, dbInfos);
141         } else if (isLocal && localCallback_ != nullptr && isChange) {
142             localCallback_();
143         }
144     });
145     if (errCode != E_OK) {
146         LOGW("[DBStatusAdapter][NotifyDBInfos] Schedule Task failed! errCode = %d", errCode);
147     }
148 }
149 
TargetOffline(const std::string & device)150 void DBStatusAdapter::TargetOffline(const std::string &device)
151 {
152     std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
153     if (dbInfoHandle == nullptr) {
154         return;
155     }
156     {
157         std::lock_guard<std::mutex> autoLock(supportMutex_);
158         if (remoteOptimizeInfo_.find(device) != remoteOptimizeInfo_.end()) {
159             remoteOptimizeInfo_.erase(device);
160         }
161     }
162     RuntimeContext::GetInstance()->RemoveRemoteSubscribe(device);
163 }
164 
IsNeedAutoSync(const std::string & userId,const std::string & appId,const std::string & storeId,const std::string & devInfo)165 bool DBStatusAdapter::IsNeedAutoSync(const std::string &userId, const std::string &appId, const std::string &storeId,
166     const std::string &devInfo)
167 {
168     std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
169     if (dbInfoHandle == nullptr || IsSendLabelExchange()) {
170         return true;
171     }
172     return dbInfoHandle->IsNeedAutoSync(userId, appId, storeId, { devInfo });
173 }
174 
SetRemoteOptimizeCommunication(const std::string & dev,bool optimize)175 void DBStatusAdapter::SetRemoteOptimizeCommunication(const std::string &dev, bool optimize)
176 {
177     std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
178     if (dbInfoHandle == nullptr) {
179         return;
180     }
181     bool triggerLocalCallback = false;
182     {
183         std::lock_guard<std::mutex> autoLock(supportMutex_);
184         if (remoteOptimizeInfo_.find(dev) == remoteOptimizeInfo_.end()) {
185             remoteOptimizeInfo_[dev] = optimize;
186             return;
187         }
188         if (remoteOptimizeInfo_[dev] == optimize) {
189             return;
190         }
191         if (remoteOptimizeInfo_[dev] && !optimize) {
192             triggerLocalCallback = true;
193         }
194         remoteOptimizeInfo_[dev] = optimize;
195     }
196     if (!triggerLocalCallback) {
197         return;
198     }
199     LOGI("[DBStatusAdapter][SetRemoteOptimizeCommunication] remote dev %.3s optimize change!", dev.c_str());
200     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, dev]() {
201         RemoteSupportChangeCallback callback;
202         {
203             std::lock_guard<std::mutex> autoLock(callbackMutex_);
204             callback = supportCallback_;
205         }
206         if (callback) {
207             callback(dev);
208         }
209     });
210     if (errCode != E_OK) {
211         LOGW("[DBStatusAdapter][SetRemoteOptimizeCommunication] Schedule Task failed! errCode = %d", errCode);
212     }
213 }
214 
IsSendLabelExchange()215 bool DBStatusAdapter::IsSendLabelExchange()
216 {
217     std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
218     if (dbInfoHandle == nullptr) {
219         return true;
220     }
221     {
222         std::lock_guard<std::mutex> autoLock(supportMutex_);
223         if (cacheLocalSendLabelExchange_) {
224             return localSendLabelExchange_;
225         }
226     }
227     bool isSupport = dbInfoHandle->IsSupport();
228     std::lock_guard<std::mutex> autoLock(supportMutex_);
229     LOGD("[DBStatusAdapter][IsSendLabelExchange] local support status is %d", isSupport);
230     localSendLabelExchange_ = !isSupport;
231     cacheLocalSendLabelExchange_ = true;
232     return localSendLabelExchange_;
233 }
234 
GetDBInfoHandle() const235 std::shared_ptr<DBInfoHandle> DBStatusAdapter::GetDBInfoHandle() const
236 {
237     std::lock_guard<std::mutex> autoLock(handleMutex_);
238     return dbInfoHandle_;
239 }
240 
LoadIntoCache(bool isLocal,const DeviceInfos & devInfos,const std::vector<DBInfo> & dbInfos)241 bool DBStatusAdapter::LoadIntoCache(bool isLocal, const DeviceInfos &devInfos, const std::vector<DBInfo> &dbInfos)
242 {
243     if (isLocal) {
244         std::lock_guard<std::mutex> autoLock(localInfoMutex_);
245         return MergeDBInfos(dbInfos, localDBInfos_);
246     }
247     std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
248     if (remoteDBInfos_.find(devInfos.identifier) == remoteDBInfos_.end()) {
249         remoteDBInfos_.insert({devInfos.identifier, {}});
250     }
251     return MergeDBInfos(dbInfos, remoteDBInfos_[devInfos.identifier]);
252 }
253 
ClearAllCache()254 void DBStatusAdapter::ClearAllCache()
255 {
256     {
257         std::lock_guard<std::mutex> autoLock(localInfoMutex_);
258         localDBInfos_.clear();
259     }
260     {
261         std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
262         remoteDBInfos_.clear();
263     }
264     std::lock_guard<std::mutex> autoLock(supportMutex_);
265     remoteOptimizeInfo_.clear();
266     cacheLocalSendLabelExchange_ = false;
267     localSendLabelExchange_ = false;
268     LOGD("[DBStatusAdapter] ClearAllCache ok");
269 }
270 
NotifyRemoteOffline()271 void DBStatusAdapter::NotifyRemoteOffline()
272 {
273     std::map<std::string, std::vector<DBInfo>> remoteOnlineDBInfos;
274     {
275         std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
276         for (const auto &[dev, dbInfos]: remoteDBInfos_) {
277             for (const auto &dbInfo: dbInfos) {
278                 if (dbInfo.isNeedSync) {
279                     DBInfo info = dbInfo;
280                     info.isNeedSync = false;
281                     remoteOnlineDBInfos[dev].push_back(info);
282                 }
283             }
284         }
285     }
286     RemoteDBChangeCallback callback;
287     {
288         std::lock_guard<std::mutex> autoLock(callbackMutex_);
289         callback = remoteCallback_;
290     }
291     if (callback == nullptr || remoteOnlineDBInfos.empty()) {
292         LOGD("[DBStatusAdapter] no need to notify offline when reset handle");
293         return;
294     }
295     for (const auto &[dev, dbInfos]: remoteOnlineDBInfos) {
296         callback(dev, dbInfos);
297     }
298     LOGD("[DBStatusAdapter] Notify offline ok");
299 }
300 
MergeDBInfos(const std::vector<DBInfo> & srcDbInfos,std::vector<DBInfo> & dstDbInfos)301 bool DBStatusAdapter::MergeDBInfos(const std::vector<DBInfo> &srcDbInfos, std::vector<DBInfo> &dstDbInfos)
302 {
303     bool isDbInfoChange = false;
304     for (const auto &srcInfo: srcDbInfos) {
305         if (!ParamCheckUtils::CheckStoreParameter(srcInfo.storeId, srcInfo.appId, srcInfo.userId)) {
306             continue;
307         }
308         auto res = std::find_if(dstDbInfos.begin(), dstDbInfos.end(), [&srcInfo](const DBInfo &dstInfo) {
309             return srcInfo.appId == dstInfo.appId && srcInfo.userId == dstInfo.userId &&
310                 srcInfo.storeId == dstInfo.storeId && srcInfo.syncDualTupleMode == dstInfo.syncDualTupleMode;
311         });
312         if (res == dstDbInfos.end()) {
313             dstDbInfos.push_back(srcInfo);
314             isDbInfoChange = true;
315         } else if (res->isNeedSync != srcInfo.isNeedSync) {
316             res->isNeedSync = srcInfo.isNeedSync;
317             isDbInfoChange = true;
318         }
319     }
320     return isDbInfoChange;
321 }
322 
GetLocalDeviceId(std::string & deviceId)323 int DBStatusAdapter::GetLocalDeviceId(std::string &deviceId)
324 {
325     ICommunicatorAggregator *communicatorAggregator = nullptr;
326     int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
327     if (errCode != E_OK) {
328         return errCode;
329     }
330     return communicatorAggregator->GetLocalIdentity(deviceId);
331 }
332 
IsLocalDeviceId(const std::string & deviceId)333 bool DBStatusAdapter::IsLocalDeviceId(const std::string &deviceId)
334 {
335     std::string localId;
336     if (GetLocalDeviceId(localId) != E_OK) {
337         return false;
338     }
339     return deviceId == localId;
340 }
341 }