• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_able_kvdb.h"
17 
18 #include "db_dump_helper.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "parcel.h"
22 #include "runtime_context.h"
23 #include "user_change_monitor.h"
24 
25 namespace DistributedDB {
26 const EventType SyncAbleKvDB::REMOTE_PUSH_FINISHED = 1;
27 
SyncAbleKvDB()28 SyncAbleKvDB::SyncAbleKvDB()
29     : started_(false),
30       closed_(false),
31       isSyncModuleActiveCheck_(false),
32       isSyncNeedActive_(true),
33       notifyChain_(nullptr),
34       userChangeListener_(nullptr)
35 {}
36 
~SyncAbleKvDB()37 SyncAbleKvDB::~SyncAbleKvDB()
38 {
39     if (notifyChain_ != nullptr) {
40         (void)notifyChain_->UnRegisterEventType(REMOTE_PUSH_FINISHED);
41         KillAndDecObjRef(notifyChain_);
42         notifyChain_ = nullptr;
43     }
44     if (userChangeListener_ != nullptr) {
45         userChangeListener_->Drop(true);
46         userChangeListener_ = nullptr;
47     }
48 }
49 
DelConnection(GenericKvDBConnection * connection)50 void SyncAbleKvDB::DelConnection(GenericKvDBConnection *connection)
51 {
52     auto realConnection = static_cast<SyncAbleKvDBConnection *>(connection);
53     if (realConnection != nullptr) {
54         KillAndDecObjRef(realConnection);
55         realConnection = nullptr;
56     }
57 }
58 
TriggerSync(int notifyEvent)59 void SyncAbleKvDB::TriggerSync(int notifyEvent)
60 {
61     if (!started_) {
62         StartSyncer();
63     }
64     if (started_) {
65         syncer_.LocalDataChanged(notifyEvent);
66     }
67 }
68 
CommitNotify(int notifyEvent,KvDBCommitNotifyFilterAbleData * data)69 void SyncAbleKvDB::CommitNotify(int notifyEvent, KvDBCommitNotifyFilterAbleData *data)
70 {
71     SyncAbleKvDB::TriggerSync(notifyEvent);
72 
73     GenericKvDB::CommitNotify(notifyEvent, data);
74 }
75 
Close()76 void SyncAbleKvDB::Close()
77 {
78     StopSyncer(true);
79 }
80 
81 // Start a sync action.
Sync(const ISyncer::SyncParma & parma,uint64_t connectionId)82 int SyncAbleKvDB::Sync(const ISyncer::SyncParma &parma, uint64_t connectionId)
83 {
84     if (!started_) {
85         int errCode = StartSyncer();
86         if (!started_) {
87             return errCode;
88         }
89     }
90     return syncer_.Sync(parma, connectionId);
91 }
92 
EnableAutoSync(bool enable)93 void SyncAbleKvDB::EnableAutoSync(bool enable)
94 {
95     if (NeedStartSyncer()) {
96         StartSyncer();
97     }
98     return syncer_.EnableAutoSync(enable);
99 }
100 
WakeUpSyncer()101 void SyncAbleKvDB::WakeUpSyncer()
102 {
103     if (NeedStartSyncer()) {
104         StartSyncer();
105     }
106 }
107 
108 // Stop a sync action in progress.
StopSync(uint64_t connectionId)109 void SyncAbleKvDB::StopSync(uint64_t connectionId)
110 {
111     if (started_) {
112         syncer_.StopSync(connectionId);
113     }
114 }
115 
SetSyncModuleActive()116 void SyncAbleKvDB::SetSyncModuleActive()
117 {
118     if (isSyncModuleActiveCheck_) {
119         return;
120     }
121     IKvDBSyncInterface *syncInterface = GetSyncInterface();
122     if (syncInterface == nullptr) {
123         LOGF("KvDB got null sync interface.");
124         return;
125     }
126     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
127         false);
128     if (!isSyncDualTupleMode) {
129         isSyncNeedActive_ = true;
130         isSyncModuleActiveCheck_ = true;
131         return;
132     }
133     isSyncNeedActive_ = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
134     if (!isSyncNeedActive_) {
135         LOGI("syncer no need to active");
136     }
137     isSyncModuleActiveCheck_ = true;
138 }
139 
GetSyncModuleActive()140 bool SyncAbleKvDB::GetSyncModuleActive()
141 {
142     return isSyncNeedActive_;
143 }
144 
ReSetSyncModuleActive()145 void SyncAbleKvDB::ReSetSyncModuleActive()
146 {
147     isSyncModuleActiveCheck_ = false;
148     isSyncNeedActive_ = true;
149 }
150 
151 // Start syncer
StartSyncer(bool isCheckSyncActive,bool isNeedActive)152 int SyncAbleKvDB::StartSyncer(bool isCheckSyncActive, bool isNeedActive)
153 {
154     int errCode = E_OK;
155     {
156         std::unique_lock<std::mutex> lock(syncerOperateLock_);
157         errCode = StartSyncerWithNoLock(isCheckSyncActive, isNeedActive);
158         closed_ = false;
159     }
160     UserChangeHandle();
161     return errCode;
162 }
163 
StartSyncerWithNoLock(bool isCheckSyncActive,bool isNeedActive)164 int SyncAbleKvDB::StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive)
165 {
166     IKvDBSyncInterface *syncInterface = GetSyncInterface();
167     if (syncInterface == nullptr) {
168         LOGF("KvDB got null sync interface.");
169         return -E_INVALID_ARGS;
170     }
171     if (!isCheckSyncActive) {
172         SetSyncModuleActive();
173         isNeedActive = GetSyncModuleActive();
174     }
175     int errCode = syncer_.Initialize(syncInterface, isNeedActive);
176     if (errCode == E_OK) {
177         started_ = true;
178     } else {
179         LOGW("KvDB start syncer failed, err:'%d'.", errCode);
180     }
181     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
182         false);
183     if (isSyncDualTupleMode && isCheckSyncActive && !isNeedActive && (userChangeListener_ == nullptr)) {
184         // active to non_active
185         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
186             std::bind(&SyncAbleKvDB::ChangeUserListener, this), UserChangeMonitor::USER_ACTIVE_TO_NON_ACTIVE_EVENT);
187     } else if (isSyncDualTupleMode && (userChangeListener_ == nullptr)) {
188         EventType event = isNeedActive ?
189             UserChangeMonitor::USER_ACTIVE_EVENT : UserChangeMonitor::USER_NON_ACTIVE_EVENT;
190         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
191             std::bind(&SyncAbleKvDB::UserChangeHandle, this), event);
192     }
193     return errCode;
194 }
195 
196 // Stop syncer
StopSyncer(bool isClosedOperation)197 void SyncAbleKvDB::StopSyncer(bool isClosedOperation)
198 {
199     NotificationChain::Listener *userChangeListener = nullptr;
200     {
201         std::unique_lock<std::mutex> lock(syncerOperateLock_);
202         StopSyncerWithNoLock(isClosedOperation);
203         userChangeListener = userChangeListener_;
204         userChangeListener_ = nullptr;
205     }
206     if (userChangeListener != nullptr) {
207         userChangeListener->Drop(true);
208         userChangeListener = nullptr;
209     }
210 }
211 
StopSyncerWithNoLock(bool isClosedOperation)212 void SyncAbleKvDB::StopSyncerWithNoLock(bool isClosedOperation)
213 {
214     ReSetSyncModuleActive();
215     syncer_.Close(isClosedOperation);
216     if (started_) {
217         started_ = false;
218     }
219     closed_ = isClosedOperation;
220     if (!isClosedOperation && userChangeListener_ != nullptr) {
221         userChangeListener_->Drop(false);
222         userChangeListener_ = nullptr;
223     }
224 }
225 
UserChangeHandle()226 void SyncAbleKvDB::UserChangeHandle()
227 {
228     bool isNeedChange;
229     bool isNeedActive = true;
230     IKvDBSyncInterface *syncInterface = GetSyncInterface();
231     if (syncInterface == nullptr) {
232         LOGF("KvDB got null sync interface.");
233         return;
234     }
235     bool isSyncDualTupleMode = syncInterface->GetDbProperties().
236         GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false);
237     if (!isSyncDualTupleMode) {
238         LOGD("[SyncAbleKvDB] no use syncDualTupleMode, abort userChange");
239         return;
240     }
241     std::unique_lock<std::mutex> lock(syncerOperateLock_);
242     if (closed_) {
243         LOGI("kvDB is already closed");
244         return;
245     }
246     isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
247     isNeedChange = (isNeedActive != isSyncNeedActive_);
248     // non_active to active or active to non_active
249     if (isNeedChange) {
250         StopSyncerWithNoLock(); // will drop userChangeListener
251         isSyncModuleActiveCheck_ = true;
252         isSyncNeedActive_ = isNeedActive;
253         StartSyncerWithNoLock(true, isNeedActive);
254     }
255 }
256 
ChangeUserListener()257 void SyncAbleKvDB::ChangeUserListener()
258 {
259     // only active to non_active call, put into USER_NON_ACTIVE_EVENT listener from USER_ACTIVE_TO_NON_ACTIVE_EVENT
260     if (userChangeListener_ != nullptr) {
261         userChangeListener_->Drop(false);
262         userChangeListener_ = nullptr;
263     }
264     if (userChangeListener_ == nullptr) {
265         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
266             std::bind(&SyncAbleKvDB::UserChangeHandle, this), UserChangeMonitor::USER_NON_ACTIVE_EVENT);
267     }
268 }
269 
270 // Get The current virtual timestamp
GetTimestamp()271 uint64_t SyncAbleKvDB::GetTimestamp()
272 {
273     if (NeedStartSyncer()) {
274         StartSyncer();
275     }
276     return syncer_.GetTimestamp();
277 }
278 
279 // Get the dataItem's append length
GetAppendedLen() const280 uint32_t SyncAbleKvDB::GetAppendedLen() const
281 {
282     return Parcel::GetAppendedLen();
283 }
284 
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)285 int SyncAbleKvDB::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
286 {
287     if (NeedStartSyncer()) {
288         int errCode = StartSyncer();
289         if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
290             return errCode;
291         }
292     }
293     return syncer_.EraseDeviceWaterMark(deviceId, isNeedHash);
294 }
295 
GetQueuedSyncSize(int * queuedSyncSize) const296 int SyncAbleKvDB::GetQueuedSyncSize(int *queuedSyncSize) const
297 {
298     return syncer_.GetQueuedSyncSize(queuedSyncSize);
299 }
300 
SetQueuedSyncLimit(const int * queuedSyncLimit)301 int SyncAbleKvDB::SetQueuedSyncLimit(const int *queuedSyncLimit)
302 {
303     return syncer_.SetQueuedSyncLimit(queuedSyncLimit);
304 }
305 
GetQueuedSyncLimit(int * queuedSyncLimit) const306 int SyncAbleKvDB::GetQueuedSyncLimit(int *queuedSyncLimit) const
307 {
308     return syncer_.GetQueuedSyncLimit(queuedSyncLimit);
309 }
310 
DisableManualSync(void)311 int SyncAbleKvDB::DisableManualSync(void)
312 {
313     return syncer_.DisableManualSync();
314 }
315 
EnableManualSync(void)316 int SyncAbleKvDB::EnableManualSync(void)
317 {
318     return syncer_.EnableManualSync();
319 }
320 
GetLocalIdentity(std::string & outTarget) const321 int SyncAbleKvDB::GetLocalIdentity(std::string &outTarget) const
322 {
323     return syncer_.GetLocalIdentity(outTarget);
324 }
325 
SetStaleDataWipePolicy(WipePolicy policy)326 int SyncAbleKvDB::SetStaleDataWipePolicy(WipePolicy policy)
327 {
328     return syncer_.SetStaleDataWipePolicy(policy);
329 }
330 
RegisterEventType(EventType type)331 int SyncAbleKvDB::RegisterEventType(EventType type)
332 {
333     if (notifyChain_ == nullptr) {
334         notifyChain_ = new (std::nothrow) NotificationChain;
335         if (notifyChain_ == nullptr) {
336             return -E_OUT_OF_MEMORY;
337         }
338     }
339 
340     int errCode = notifyChain_->RegisterEventType(type);
341     if (errCode == -E_ALREADY_REGISTER) {
342         return E_OK;
343     }
344     if (errCode != E_OK) {
345         LOGE("[SyncAbleKvDB] Register event type %u failed! err %d", type, errCode);
346         KillAndDecObjRef(notifyChain_);
347         notifyChain_ = nullptr;
348     }
349     return errCode;
350 }
351 
AddRemotePushFinishedNotify(const RemotePushFinishedNotifier & notifier,int & errCode)352 NotificationChain::Listener *SyncAbleKvDB::AddRemotePushFinishedNotify(const RemotePushFinishedNotifier &notifier,
353     int &errCode)
354 {
355     std::unique_lock<std::shared_mutex> lock(notifyChainLock_);
356     errCode = RegisterEventType(REMOTE_PUSH_FINISHED);
357     if (errCode != E_OK) {
358         return nullptr;
359     }
360 
361     auto listener = notifyChain_->RegisterListener(REMOTE_PUSH_FINISHED,
362         [notifier](void *arg) {
363             if (arg == nullptr) {
364                 LOGE("PragmaRemotePushNotify is null.");
365                 return;
366             }
367             notifier(*static_cast<RemotePushNotifyInfo *>(arg));
368         }, nullptr, errCode);
369     if (errCode != E_OK) {
370         LOGE("[SyncAbleKvDB] Add remote push finished notifier failed! err %d", errCode);
371     }
372     return listener;
373 }
374 
NotifyRemotePushFinishedInner(const std::string & targetId) const375 void SyncAbleKvDB::NotifyRemotePushFinishedInner(const std::string &targetId) const
376 {
377     {
378         std::shared_lock<std::shared_mutex> lock(notifyChainLock_);
379         if (notifyChain_ == nullptr) {
380             return;
381         }
382     }
383     RemotePushNotifyInfo info;
384     info.deviceId = targetId;
385     notifyChain_->NotifyEvent(REMOTE_PUSH_FINISHED, static_cast<void *>(&info));
386 }
387 
SetSyncRetry(bool isRetry)388 int SyncAbleKvDB::SetSyncRetry(bool isRetry)
389 {
390     IKvDBSyncInterface *syncInterface = GetSyncInterface();
391     if (syncInterface == nullptr) {
392         LOGF("KvDB got null sync interface.");
393         return -E_INVALID_DB;
394     }
395     bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
396     if (localOnly) {
397         return -E_NOT_SUPPORT;
398     }
399     if (NeedStartSyncer()) {
400         StartSyncer();
401     }
402     return syncer_.SetSyncRetry(isRetry);
403 }
404 
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)405 int SyncAbleKvDB::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
406 {
407     if (NeedStartSyncer()) {
408         StartSyncer();
409     }
410     return syncer_.SetEqualIdentifier(identifier, targets);
411 }
412 
Dump(int fd)413 void SyncAbleKvDB::Dump(int fd)
414 {
415     SyncerBasicInfo basicInfo = syncer_.DumpSyncerBasicInfo();
416     DBDumpHelper::Dump(fd, "\tisSyncActive = %d, isAutoSync = %d\n\n", basicInfo.isSyncActive,
417         basicInfo.isAutoSync);
418     if (basicInfo.isSyncActive) {
419         DBDumpHelper::Dump(fd, "\tDistributedDB Database Sync Module Message Info:\n");
420         syncer_.Dump(fd);
421     }
422 }
423 
GetSyncDataSize(const std::string & device,size_t & size) const424 int SyncAbleKvDB::GetSyncDataSize(const std::string &device, size_t &size) const
425 {
426     return syncer_.GetSyncDataSize(device, size);
427 }
428 
NeedStartSyncer() const429 bool SyncAbleKvDB::NeedStartSyncer() const
430 {
431     if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
432         LOGW("communicator not ready!");
433         return false;
434     }
435     // don't start when check callback got not active
436     // equivalent to !(!isSyncNeedActive_ && isSyncModuleActiveCheck_)
437     return !started_ && (isSyncNeedActive_ || !isSyncModuleActiveCheck_);
438 }
439 
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)440 int SyncAbleKvDB::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
441 {
442     if (!NeedStartSyncer()) {
443         return syncer_.GetHashDeviceId(clientId, hashDevId);
444     }
445     int errCode = StartSyncer();
446     if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
447         return errCode;
448     }
449     return syncer_.GetHashDeviceId(clientId, hashDevId);
450 }
451 }
452