• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_able_kvdb.h"
17 
18 #include "db_dump_helper.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "parcel.h"
22 #include "runtime_context.h"
23 #include "user_change_monitor.h"
24 
25 namespace DistributedDB {
26 const EventType SyncAbleKvDB::REMOTE_PUSH_FINISHED = 1;
27 
SyncAbleKvDB()28 SyncAbleKvDB::SyncAbleKvDB()
29     : started_(false),
30       closed_(false),
31       isSyncModuleActiveCheck_(false),
32       isSyncNeedActive_(true),
33       notifyChain_(nullptr),
34       userChangeListener_(nullptr)
35 {}
36 
~SyncAbleKvDB()37 SyncAbleKvDB::~SyncAbleKvDB()
38 {
39     if (notifyChain_ != nullptr) {
40         (void)notifyChain_->UnRegisterEventType(REMOTE_PUSH_FINISHED);
41         KillAndDecObjRef(notifyChain_);
42         notifyChain_ = nullptr;
43     }
44     if (userChangeListener_ != nullptr) {
45         userChangeListener_->Drop(true);
46         userChangeListener_ = nullptr;
47     }
48 }
49 
DelConnection(GenericKvDBConnection * connection)50 void SyncAbleKvDB::DelConnection(GenericKvDBConnection *connection)
51 {
52     auto realConnection = static_cast<SyncAbleKvDBConnection *>(connection);
53     if (realConnection != nullptr) {
54         KillAndDecObjRef(realConnection);
55         realConnection = nullptr;
56     }
57 }
58 
TriggerSync(int notifyEvent)59 void SyncAbleKvDB::TriggerSync(int notifyEvent)
60 {
61     if (!started_) {
62         StartSyncer();
63     }
64     if (started_) {
65         syncer_.LocalDataChanged(notifyEvent);
66     }
67 }
68 
CommitNotify(int notifyEvent,KvDBCommitNotifyFilterAbleData * data)69 void SyncAbleKvDB::CommitNotify(int notifyEvent, KvDBCommitNotifyFilterAbleData *data)
70 {
71     SyncAbleKvDB::TriggerSync(notifyEvent);
72 
73     GenericKvDB::CommitNotify(notifyEvent, data);
74 }
75 
Close()76 void SyncAbleKvDB::Close()
77 {
78     StopSyncer(true);
79 }
80 
81 // Start a sync action.
Sync(const ISyncer::SyncParma & parma,uint64_t connectionId)82 int SyncAbleKvDB::Sync(const ISyncer::SyncParma &parma, uint64_t connectionId)
83 {
84     if (!started_) {
85         int errCode = StartSyncer();
86         if (!started_) {
87             return errCode;
88         }
89     }
90     return syncer_.Sync(parma, connectionId);
91 }
92 
EnableAutoSync(bool enable)93 void SyncAbleKvDB::EnableAutoSync(bool enable)
94 {
95     if (!started_) {
96         StartSyncer();
97     }
98     return syncer_.EnableAutoSync(enable);
99 }
100 
WakeUpSyncer()101 void SyncAbleKvDB::WakeUpSyncer()
102 {
103     if (!started_) {
104         StartSyncer();
105     }
106 }
107 
108 // Stop a sync action in progress.
StopSync(uint64_t connectionId)109 void SyncAbleKvDB::StopSync(uint64_t connectionId)
110 {
111     if (started_) {
112         syncer_.StopSync(connectionId);
113     }
114 }
115 
SetSyncModuleActive()116 void SyncAbleKvDB::SetSyncModuleActive()
117 {
118     if (isSyncModuleActiveCheck_) {
119         return;
120     }
121     IKvDBSyncInterface *syncInterface = GetSyncInterface();
122     if (syncInterface == nullptr) {
123         LOGF("KvDB got null sync interface.");
124         return;
125     }
126     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
127         false);
128     if (!isSyncDualTupleMode) {
129         isSyncNeedActive_ = true;
130         isSyncModuleActiveCheck_ = true;
131         return;
132     }
133     isSyncNeedActive_ = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
134     if (!isSyncNeedActive_) {
135         LOGI("syncer no need to active");
136     }
137     isSyncModuleActiveCheck_ = true;
138 }
139 
GetSyncModuleActive()140 bool SyncAbleKvDB::GetSyncModuleActive()
141 {
142     return isSyncNeedActive_;
143 }
144 
ReSetSyncModuleActive()145 void SyncAbleKvDB::ReSetSyncModuleActive()
146 {
147     isSyncModuleActiveCheck_ = false;
148     isSyncNeedActive_ = true;
149 }
150 
151 // Start syncer
StartSyncer(bool isCheckSyncActive,bool isNeedActive)152 int SyncAbleKvDB::StartSyncer(bool isCheckSyncActive, bool isNeedActive)
153 {
154     int errCode = E_OK;
155     {
156         std::unique_lock<std::mutex> lock(syncerOperateLock_);
157         errCode = StartSyncerWithNoLock(isCheckSyncActive, isNeedActive);
158         closed_ = false;
159     }
160     UserChangeHandle();
161     return errCode;
162 }
163 
StartSyncerWithNoLock(bool isCheckSyncActive,bool isNeedActive)164 int SyncAbleKvDB::StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive)
165 {
166     IKvDBSyncInterface *syncInterface = GetSyncInterface();
167     if (syncInterface == nullptr) {
168         LOGF("KvDB got null sync interface.");
169         return -E_INVALID_ARGS;
170     }
171     if (!isCheckSyncActive) {
172         SetSyncModuleActive();
173         isNeedActive = GetSyncModuleActive();
174     }
175     int errCode = syncer_.Initialize(syncInterface, isNeedActive);
176     if (errCode == E_OK) {
177         started_ = true;
178     } else {
179         LOGW("KvDB start syncer failed, err:'%d'.", errCode);
180     }
181     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
182         false);
183     if (isSyncDualTupleMode && isCheckSyncActive && !isNeedActive && (userChangeListener_ == nullptr)) {
184         // active to non_active
185         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
186             std::bind(&SyncAbleKvDB::ChangeUserListerner, this), UserChangeMonitor::USER_ACTIVE_TO_NON_ACTIVE_EVENT);
187     } else if (isSyncDualTupleMode && (userChangeListener_ == nullptr)) {
188         EventType event = isNeedActive ?
189             UserChangeMonitor::USER_ACTIVE_EVENT : UserChangeMonitor::USER_NON_ACTIVE_EVENT;
190         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
191             std::bind(&SyncAbleKvDB::UserChangeHandle, this), event);
192     }
193     return errCode;
194 }
195 
196 // Stop syncer
StopSyncer(bool isClosedOperation)197 void SyncAbleKvDB::StopSyncer(bool isClosedOperation)
198 {
199     NotificationChain::Listener *userChangeListener = nullptr;
200     {
201         std::unique_lock<std::mutex> lock(syncerOperateLock_);
202         StopSyncerWithNoLock(isClosedOperation);
203         userChangeListener = userChangeListener_;
204         userChangeListener_ = nullptr;
205     }
206     if (userChangeListener != nullptr) {
207         userChangeListener->Drop(true);
208         userChangeListener = nullptr;
209     }
210 }
211 
StopSyncerWithNoLock(bool isClosedOperation)212 void SyncAbleKvDB::StopSyncerWithNoLock(bool isClosedOperation)
213 {
214     ReSetSyncModuleActive();
215     syncer_.Close(isClosedOperation);
216     if (started_) {
217         started_ = false;
218     }
219     closed_ = isClosedOperation;
220     if (!isClosedOperation && userChangeListener_ != nullptr) {
221         userChangeListener_->Drop(false);
222         userChangeListener_ = nullptr;
223     }
224 }
225 
UserChangeHandle()226 void SyncAbleKvDB::UserChangeHandle()
227 {
228     bool isNeedChange;
229     bool isNeedActive = true;
230     IKvDBSyncInterface *syncInterface = GetSyncInterface();
231     if (syncInterface == nullptr) {
232         LOGF("KvDB got null sync interface.");
233         return;
234     }
235     std::unique_lock<std::mutex> lock(syncerOperateLock_);
236     if (closed_) {
237         LOGI("kvDB is already closed");
238         return;
239     }
240     isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
241     isNeedChange = (isNeedActive != isSyncNeedActive_) ? true : false;
242     // non_active to active or active to non_active
243     if (isNeedChange) {
244         StopSyncerWithNoLock(); // will drop userChangeListener
245         isSyncModuleActiveCheck_ = true;
246         isSyncNeedActive_ = isNeedActive;
247         StartSyncerWithNoLock(true, isNeedActive);
248     }
249 }
250 
ChangeUserListerner()251 void SyncAbleKvDB::ChangeUserListerner()
252 {
253     // only active to non_active call, put into USER_NON_ACTIVE_EVENT listener from USER_ACTIVE_TO_NON_ACTIVE_EVENT
254     if (userChangeListener_ != nullptr) {
255         userChangeListener_->Drop(false);
256         userChangeListener_ = nullptr;
257     }
258     if (userChangeListener_ == nullptr) {
259         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
260             std::bind(&SyncAbleKvDB::UserChangeHandle, this), UserChangeMonitor::USER_NON_ACTIVE_EVENT);
261     }
262 }
263 
264 // Get The current virtual timestamp
GetTimestamp()265 uint64_t SyncAbleKvDB::GetTimestamp()
266 {
267     if (!started_ && !isSyncModuleActiveCheck_) {
268         StartSyncer();
269     }
270     return syncer_.GetTimestamp();
271 }
272 
273 // Get the dataItem's append length
GetAppendedLen() const274 uint32_t SyncAbleKvDB::GetAppendedLen() const
275 {
276     return Parcel::GetAppendedLen();
277 }
278 
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)279 int SyncAbleKvDB::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
280 {
281     if (!started_) {
282         StartSyncer();
283     }
284     return syncer_.EraseDeviceWaterMark(deviceId, isNeedHash);
285 }
286 
GetQueuedSyncSize(int * queuedSyncSize) const287 int SyncAbleKvDB::GetQueuedSyncSize(int *queuedSyncSize) const
288 {
289     return syncer_.GetQueuedSyncSize(queuedSyncSize);
290 }
291 
SetQueuedSyncLimit(const int * queuedSyncLimit)292 int SyncAbleKvDB::SetQueuedSyncLimit(const int *queuedSyncLimit)
293 {
294     return syncer_.SetQueuedSyncLimit(queuedSyncLimit);
295 }
296 
GetQueuedSyncLimit(int * queuedSyncLimit) const297 int SyncAbleKvDB::GetQueuedSyncLimit(int *queuedSyncLimit) const
298 {
299     return syncer_.GetQueuedSyncLimit(queuedSyncLimit);
300 }
301 
DisableManualSync(void)302 int SyncAbleKvDB::DisableManualSync(void)
303 {
304     return syncer_.DisableManualSync();
305 }
306 
EnableManualSync(void)307 int SyncAbleKvDB::EnableManualSync(void)
308 {
309     return syncer_.EnableManualSync();
310 }
311 
GetLocalIdentity(std::string & outTarget)312 int SyncAbleKvDB::GetLocalIdentity(std::string &outTarget)
313 {
314     return syncer_.GetLocalIdentity(outTarget);
315 }
316 
SetStaleDataWipePolicy(WipePolicy policy)317 int SyncAbleKvDB::SetStaleDataWipePolicy(WipePolicy policy)
318 {
319     return syncer_.SetStaleDataWipePolicy(policy);
320 }
321 
RegisterEventType(EventType type)322 int SyncAbleKvDB::RegisterEventType(EventType type)
323 {
324     if (notifyChain_ == nullptr) {
325         notifyChain_ = new (std::nothrow) NotificationChain;
326         if (notifyChain_ == nullptr) {
327             return -E_OUT_OF_MEMORY;
328         }
329     }
330 
331     int errCode = notifyChain_->RegisterEventType(type);
332     if (errCode == -E_ALREADY_REGISTER) {
333         return E_OK;
334     }
335     if (errCode != E_OK) {
336         LOGE("[SyncAbleKvDB] Register event type %u failed! err %d", type, errCode);
337         KillAndDecObjRef(notifyChain_);
338         notifyChain_ = nullptr;
339     }
340     return errCode;
341 }
342 
AddRemotePushFinishedNotify(const RemotePushFinishedNotifier & notifier,int & errCode)343 NotificationChain::Listener *SyncAbleKvDB::AddRemotePushFinishedNotify(const RemotePushFinishedNotifier &notifier,
344     int &errCode)
345 {
346     std::unique_lock<std::shared_mutex> lock(notifyChainLock_);
347     errCode = RegisterEventType(REMOTE_PUSH_FINISHED);
348     if (errCode != E_OK) {
349         return nullptr;
350     }
351 
352     auto listener = notifyChain_->RegisterListener(REMOTE_PUSH_FINISHED,
353         [notifier](void *arg) {
354             if (arg == nullptr) {
355                 LOGE("PragmaRemotePushNotify is null.");
356                 return;
357             }
358             notifier(*static_cast<RemotePushNotifyInfo *>(arg));
359         }, nullptr, errCode);
360     if (errCode != E_OK) {
361         LOGE("[SyncAbleKvDB] Add remote push finished notifier failed! err %d", errCode);
362     }
363     return listener;
364 }
365 
NotifyRemotePushFinishedInner(const std::string & targetId) const366 void SyncAbleKvDB::NotifyRemotePushFinishedInner(const std::string &targetId) const
367 {
368     {
369         std::shared_lock<std::shared_mutex> lock(notifyChainLock_);
370         if (notifyChain_ == nullptr) {
371             return;
372         }
373     }
374     RemotePushNotifyInfo info;
375     info.deviceId = targetId;
376     notifyChain_->NotifyEvent(REMOTE_PUSH_FINISHED, static_cast<void *>(&info));
377 }
378 
SetSyncRetry(bool isRetry)379 int SyncAbleKvDB::SetSyncRetry(bool isRetry)
380 {
381     return syncer_.SetSyncRetry(isRetry);
382 }
383 
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)384 int SyncAbleKvDB::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
385 {
386     return syncer_.SetEqualIdentifier(identifier, targets);
387 }
388 
Dump(int fd)389 void SyncAbleKvDB::Dump(int fd)
390 {
391     SyncerBasicInfo basicInfo = syncer_.DumpSyncerBasicInfo();
392     DBDumpHelper::Dump(fd, "\tisSyncActive = %d, isAutoSync = %d\n\n", basicInfo.isSyncActive,
393         basicInfo.isAutoSync);
394     if (basicInfo.isSyncActive) {
395         DBDumpHelper::Dump(fd, "\tDistributedDB Database Sync Module Message Info:\n");
396         syncer_.Dump(fd);
397     }
398 }
399 
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)400 int SyncAbleKvDB::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
401 {
402     if (!started_) {
403         StartSyncer();
404     }
405     return syncer_.GetHashDeviceId(clientId, hashDevId);
406 }
407 }
408