1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_able_kvdb.h"
17
18 #include "db_dump_helper.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "parcel.h"
22 #include "runtime_context.h"
23 #include "user_change_monitor.h"
24
25 namespace DistributedDB {
26 const EventType SyncAbleKvDB::REMOTE_PUSH_FINISHED = 1;
27
SyncAbleKvDB()28 SyncAbleKvDB::SyncAbleKvDB()
29 : started_(false),
30 closed_(false),
31 isSyncModuleActiveCheck_(false),
32 isSyncNeedActive_(true),
33 notifyChain_(nullptr),
34 userChangeListener_(nullptr)
35 {}
36
~SyncAbleKvDB()37 SyncAbleKvDB::~SyncAbleKvDB()
38 {
39 if (notifyChain_ != nullptr) {
40 (void)notifyChain_->UnRegisterEventType(REMOTE_PUSH_FINISHED);
41 KillAndDecObjRef(notifyChain_);
42 notifyChain_ = nullptr;
43 }
44 if (userChangeListener_ != nullptr) {
45 userChangeListener_->Drop(true);
46 userChangeListener_ = nullptr;
47 }
48 }
49
DelConnection(GenericKvDBConnection * connection)50 void SyncAbleKvDB::DelConnection(GenericKvDBConnection *connection)
51 {
52 auto realConnection = static_cast<SyncAbleKvDBConnection *>(connection);
53 if (realConnection != nullptr) {
54 KillAndDecObjRef(realConnection);
55 realConnection = nullptr;
56 }
57 }
58
TriggerSync(int notifyEvent)59 void SyncAbleKvDB::TriggerSync(int notifyEvent)
60 {
61 if (!started_) {
62 StartSyncer();
63 }
64 if (started_) {
65 syncer_.LocalDataChanged(notifyEvent);
66 }
67 }
68
CommitNotify(int notifyEvent,KvDBCommitNotifyFilterAbleData * data)69 void SyncAbleKvDB::CommitNotify(int notifyEvent, KvDBCommitNotifyFilterAbleData *data)
70 {
71 SyncAbleKvDB::TriggerSync(notifyEvent);
72
73 GenericKvDB::CommitNotify(notifyEvent, data);
74 }
75
Close()76 void SyncAbleKvDB::Close()
77 {
78 StopSyncer(true);
79 }
80
81 // Start a sync action.
Sync(const ISyncer::SyncParma & parma,uint64_t connectionId)82 int SyncAbleKvDB::Sync(const ISyncer::SyncParma &parma, uint64_t connectionId)
83 {
84 if (!started_) {
85 int errCode = StartSyncer();
86 if (!started_) {
87 return errCode;
88 }
89 }
90 return syncer_.Sync(parma, connectionId);
91 }
92
EnableAutoSync(bool enable)93 void SyncAbleKvDB::EnableAutoSync(bool enable)
94 {
95 if (NeedStartSyncer()) {
96 StartSyncer();
97 }
98 return syncer_.EnableAutoSync(enable);
99 }
100
WakeUpSyncer()101 void SyncAbleKvDB::WakeUpSyncer()
102 {
103 if (NeedStartSyncer()) {
104 StartSyncer();
105 }
106 }
107
108 // Stop a sync action in progress.
StopSync(uint64_t connectionId)109 void SyncAbleKvDB::StopSync(uint64_t connectionId)
110 {
111 if (started_) {
112 syncer_.StopSync(connectionId);
113 }
114 }
115
SetSyncModuleActive()116 void SyncAbleKvDB::SetSyncModuleActive()
117 {
118 if (isSyncModuleActiveCheck_) {
119 return;
120 }
121 IKvDBSyncInterface *syncInterface = GetSyncInterface();
122 if (syncInterface == nullptr) {
123 LOGF("KvDB got null sync interface.");
124 return;
125 }
126 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
127 false);
128 if (!isSyncDualTupleMode) {
129 isSyncNeedActive_ = true;
130 isSyncModuleActiveCheck_ = true;
131 return;
132 }
133 isSyncNeedActive_ = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
134 if (!isSyncNeedActive_) {
135 LOGI("syncer no need to active");
136 }
137 isSyncModuleActiveCheck_ = true;
138 }
139
GetSyncModuleActive()140 bool SyncAbleKvDB::GetSyncModuleActive()
141 {
142 return isSyncNeedActive_;
143 }
144
ReSetSyncModuleActive()145 void SyncAbleKvDB::ReSetSyncModuleActive()
146 {
147 isSyncModuleActiveCheck_ = false;
148 isSyncNeedActive_ = true;
149 }
150
151 // Start syncer
StartSyncer(bool isCheckSyncActive,bool isNeedActive)152 int SyncAbleKvDB::StartSyncer(bool isCheckSyncActive, bool isNeedActive)
153 {
154 int errCode = E_OK;
155 {
156 std::unique_lock<std::mutex> lock(syncerOperateLock_);
157 errCode = StartSyncerWithNoLock(isCheckSyncActive, isNeedActive);
158 closed_ = false;
159 }
160 UserChangeHandle();
161 return errCode;
162 }
163
StartSyncerWithNoLock(bool isCheckSyncActive,bool isNeedActive)164 int SyncAbleKvDB::StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive)
165 {
166 IKvDBSyncInterface *syncInterface = GetSyncInterface();
167 if (syncInterface == nullptr) {
168 LOGF("KvDB got null sync interface.");
169 return -E_INVALID_ARGS;
170 }
171 if (!isCheckSyncActive) {
172 SetSyncModuleActive();
173 isNeedActive = GetSyncModuleActive();
174 }
175 int errCode = syncer_.Initialize(syncInterface, isNeedActive);
176 if (errCode == E_OK) {
177 started_ = true;
178 } else {
179 LOGW("KvDB start syncer failed, err:'%d'.", errCode);
180 }
181 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
182 false);
183 if (isSyncDualTupleMode && isCheckSyncActive && !isNeedActive && (userChangeListener_ == nullptr)) {
184 // active to non_active
185 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
186 std::bind(&SyncAbleKvDB::ChangeUserListener, this), UserChangeMonitor::USER_ACTIVE_TO_NON_ACTIVE_EVENT);
187 } else if (isSyncDualTupleMode && (userChangeListener_ == nullptr)) {
188 EventType event = isNeedActive ?
189 UserChangeMonitor::USER_ACTIVE_EVENT : UserChangeMonitor::USER_NON_ACTIVE_EVENT;
190 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
191 std::bind(&SyncAbleKvDB::UserChangeHandle, this), event);
192 }
193 return errCode;
194 }
195
196 // Stop syncer
StopSyncer(bool isClosedOperation)197 void SyncAbleKvDB::StopSyncer(bool isClosedOperation)
198 {
199 NotificationChain::Listener *userChangeListener = nullptr;
200 {
201 std::unique_lock<std::mutex> lock(syncerOperateLock_);
202 StopSyncerWithNoLock(isClosedOperation);
203 userChangeListener = userChangeListener_;
204 userChangeListener_ = nullptr;
205 }
206 if (userChangeListener != nullptr) {
207 userChangeListener->Drop(true);
208 userChangeListener = nullptr;
209 }
210 }
211
StopSyncerWithNoLock(bool isClosedOperation)212 void SyncAbleKvDB::StopSyncerWithNoLock(bool isClosedOperation)
213 {
214 ReSetSyncModuleActive();
215 syncer_.Close(isClosedOperation);
216 if (started_) {
217 started_ = false;
218 }
219 closed_ = isClosedOperation;
220 if (!isClosedOperation && userChangeListener_ != nullptr) {
221 userChangeListener_->Drop(false);
222 userChangeListener_ = nullptr;
223 }
224 }
225
UserChangeHandle()226 void SyncAbleKvDB::UserChangeHandle()
227 {
228 bool isNeedChange;
229 bool isNeedActive = true;
230 IKvDBSyncInterface *syncInterface = GetSyncInterface();
231 if (syncInterface == nullptr) {
232 LOGF("KvDB got null sync interface.");
233 return;
234 }
235 bool isSyncDualTupleMode = syncInterface->GetDbProperties().
236 GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false);
237 if (!isSyncDualTupleMode) {
238 LOGD("[SyncAbleKvDB] no use syncDualTupleMode, abort userChange");
239 return;
240 }
241 std::unique_lock<std::mutex> lock(syncerOperateLock_);
242 if (closed_) {
243 LOGI("kvDB is already closed");
244 return;
245 }
246 isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
247 isNeedChange = (isNeedActive != isSyncNeedActive_);
248 // non_active to active or active to non_active
249 if (isNeedChange) {
250 StopSyncerWithNoLock(); // will drop userChangeListener
251 isSyncModuleActiveCheck_ = true;
252 isSyncNeedActive_ = isNeedActive;
253 StartSyncerWithNoLock(true, isNeedActive);
254 }
255 }
256
ChangeUserListener()257 void SyncAbleKvDB::ChangeUserListener()
258 {
259 // only active to non_active call, put into USER_NON_ACTIVE_EVENT listener from USER_ACTIVE_TO_NON_ACTIVE_EVENT
260 if (userChangeListener_ != nullptr) {
261 userChangeListener_->Drop(false);
262 userChangeListener_ = nullptr;
263 }
264 if (userChangeListener_ == nullptr) {
265 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
266 std::bind(&SyncAbleKvDB::UserChangeHandle, this), UserChangeMonitor::USER_NON_ACTIVE_EVENT);
267 }
268 }
269
270 // Get The current virtual timestamp
GetTimestamp()271 uint64_t SyncAbleKvDB::GetTimestamp()
272 {
273 if (NeedStartSyncer()) {
274 StartSyncer();
275 }
276 return syncer_.GetTimestamp();
277 }
278
279 // Get the dataItem's append length
GetAppendedLen() const280 uint32_t SyncAbleKvDB::GetAppendedLen() const
281 {
282 return Parcel::GetAppendedLen();
283 }
284
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)285 int SyncAbleKvDB::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
286 {
287 if (NeedStartSyncer()) {
288 int errCode = StartSyncer();
289 if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
290 return errCode;
291 }
292 }
293 return syncer_.EraseDeviceWaterMark(deviceId, isNeedHash);
294 }
295
GetQueuedSyncSize(int * queuedSyncSize) const296 int SyncAbleKvDB::GetQueuedSyncSize(int *queuedSyncSize) const
297 {
298 return syncer_.GetQueuedSyncSize(queuedSyncSize);
299 }
300
SetQueuedSyncLimit(const int * queuedSyncLimit)301 int SyncAbleKvDB::SetQueuedSyncLimit(const int *queuedSyncLimit)
302 {
303 return syncer_.SetQueuedSyncLimit(queuedSyncLimit);
304 }
305
GetQueuedSyncLimit(int * queuedSyncLimit) const306 int SyncAbleKvDB::GetQueuedSyncLimit(int *queuedSyncLimit) const
307 {
308 return syncer_.GetQueuedSyncLimit(queuedSyncLimit);
309 }
310
DisableManualSync(void)311 int SyncAbleKvDB::DisableManualSync(void)
312 {
313 return syncer_.DisableManualSync();
314 }
315
EnableManualSync(void)316 int SyncAbleKvDB::EnableManualSync(void)
317 {
318 return syncer_.EnableManualSync();
319 }
320
GetLocalIdentity(std::string & outTarget) const321 int SyncAbleKvDB::GetLocalIdentity(std::string &outTarget) const
322 {
323 return syncer_.GetLocalIdentity(outTarget);
324 }
325
SetStaleDataWipePolicy(WipePolicy policy)326 int SyncAbleKvDB::SetStaleDataWipePolicy(WipePolicy policy)
327 {
328 return syncer_.SetStaleDataWipePolicy(policy);
329 }
330
RegisterEventType(EventType type)331 int SyncAbleKvDB::RegisterEventType(EventType type)
332 {
333 if (notifyChain_ == nullptr) {
334 notifyChain_ = new (std::nothrow) NotificationChain;
335 if (notifyChain_ == nullptr) {
336 return -E_OUT_OF_MEMORY;
337 }
338 }
339
340 int errCode = notifyChain_->RegisterEventType(type);
341 if (errCode == -E_ALREADY_REGISTER) {
342 return E_OK;
343 }
344 if (errCode != E_OK) {
345 LOGE("[SyncAbleKvDB] Register event type %u failed! err %d", type, errCode);
346 KillAndDecObjRef(notifyChain_);
347 notifyChain_ = nullptr;
348 }
349 return errCode;
350 }
351
AddRemotePushFinishedNotify(const RemotePushFinishedNotifier & notifier,int & errCode)352 NotificationChain::Listener *SyncAbleKvDB::AddRemotePushFinishedNotify(const RemotePushFinishedNotifier ¬ifier,
353 int &errCode)
354 {
355 std::unique_lock<std::shared_mutex> lock(notifyChainLock_);
356 errCode = RegisterEventType(REMOTE_PUSH_FINISHED);
357 if (errCode != E_OK) {
358 return nullptr;
359 }
360
361 auto listener = notifyChain_->RegisterListener(REMOTE_PUSH_FINISHED,
362 [notifier](void *arg) {
363 if (arg == nullptr) {
364 LOGE("PragmaRemotePushNotify is null.");
365 return;
366 }
367 notifier(*static_cast<RemotePushNotifyInfo *>(arg));
368 }, nullptr, errCode);
369 if (errCode != E_OK) {
370 LOGE("[SyncAbleKvDB] Add remote push finished notifier failed! err %d", errCode);
371 }
372 return listener;
373 }
374
NotifyRemotePushFinishedInner(const std::string & targetId) const375 void SyncAbleKvDB::NotifyRemotePushFinishedInner(const std::string &targetId) const
376 {
377 {
378 std::shared_lock<std::shared_mutex> lock(notifyChainLock_);
379 if (notifyChain_ == nullptr) {
380 return;
381 }
382 }
383 RemotePushNotifyInfo info;
384 info.deviceId = targetId;
385 notifyChain_->NotifyEvent(REMOTE_PUSH_FINISHED, static_cast<void *>(&info));
386 }
387
SetSyncRetry(bool isRetry)388 int SyncAbleKvDB::SetSyncRetry(bool isRetry)
389 {
390 IKvDBSyncInterface *syncInterface = GetSyncInterface();
391 if (syncInterface == nullptr) {
392 LOGF("KvDB got null sync interface.");
393 return -E_INVALID_DB;
394 }
395 bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
396 if (localOnly) {
397 return -E_NOT_SUPPORT;
398 }
399 if (NeedStartSyncer()) {
400 StartSyncer();
401 }
402 return syncer_.SetSyncRetry(isRetry);
403 }
404
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)405 int SyncAbleKvDB::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
406 {
407 if (NeedStartSyncer()) {
408 StartSyncer();
409 }
410 return syncer_.SetEqualIdentifier(identifier, targets);
411 }
412
Dump(int fd)413 void SyncAbleKvDB::Dump(int fd)
414 {
415 SyncerBasicInfo basicInfo = syncer_.DumpSyncerBasicInfo();
416 DBDumpHelper::Dump(fd, "\tisSyncActive = %d, isAutoSync = %d\n\n", basicInfo.isSyncActive,
417 basicInfo.isAutoSync);
418 if (basicInfo.isSyncActive) {
419 DBDumpHelper::Dump(fd, "\tDistributedDB Database Sync Module Message Info:\n");
420 syncer_.Dump(fd);
421 }
422 }
423
GetSyncDataSize(const std::string & device,size_t & size) const424 int SyncAbleKvDB::GetSyncDataSize(const std::string &device, size_t &size) const
425 {
426 return syncer_.GetSyncDataSize(device, size);
427 }
428
NeedStartSyncer() const429 bool SyncAbleKvDB::NeedStartSyncer() const
430 {
431 if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
432 LOGW("communicator not ready!");
433 return false;
434 }
435 // don't start when check callback got not active
436 // equivalent to !(!isSyncNeedActive_ && isSyncModuleActiveCheck_)
437 return !started_ && (isSyncNeedActive_ || !isSyncModuleActiveCheck_);
438 }
439
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)440 int SyncAbleKvDB::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
441 {
442 if (!NeedStartSyncer()) {
443 return syncer_.GetHashDeviceId(clientId, hashDevId);
444 }
445 int errCode = StartSyncer();
446 if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
447 return errCode;
448 }
449 return syncer_.GetHashDeviceId(clientId, hashDevId);
450 }
451 }
452