• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_able_kvdb.h"
17 
18 #include "db_dump_helper.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "parcel.h"
22 #include "runtime_context.h"
23 #include "user_change_monitor.h"
24 
25 namespace DistributedDB {
26 const EventType SyncAbleKvDB::REMOTE_PUSH_FINISHED = 1;
27 
SyncAbleKvDB()28 SyncAbleKvDB::SyncAbleKvDB()
29     : started_(false),
30       closed_(false),
31       isSyncModuleActiveCheck_(false),
32       isSyncNeedActive_(true),
33       notifyChain_(nullptr),
34       userChangeListener_(nullptr),
35       cloudSyncer_(nullptr)
36 {}
37 
~SyncAbleKvDB()38 SyncAbleKvDB::~SyncAbleKvDB()
39 {
40     {
41         std::unique_lock<std::shared_mutex> lock(notifyChainLock_);
42         if (notifyChain_ != nullptr) {
43             (void)notifyChain_->UnRegisterEventType(REMOTE_PUSH_FINISHED);
44             KillAndDecObjRef(notifyChain_);
45             notifyChain_ = nullptr;
46         }
47     }
48     NotificationChain::Listener *userChangeListener = nullptr;
49     {
50         std::unique_lock<std::mutex> lock(syncerOperateLock_);
51         userChangeListener = userChangeListener_;
52         userChangeListener_ = nullptr;
53     }
54     if (userChangeListener != nullptr) {
55         userChangeListener->Drop(true);
56         userChangeListener = nullptr;
57     }
58     std::lock_guard<std::mutex> autoLock(cloudSyncerLock_);
59     KillAndDecObjRef(cloudSyncer_);
60     cloudSyncer_ = nullptr;
61 }
62 
DelConnection(GenericKvDBConnection * connection)63 void SyncAbleKvDB::DelConnection(GenericKvDBConnection *connection)
64 {
65     auto realConnection = static_cast<SyncAbleKvDBConnection *>(connection);
66     if (realConnection != nullptr) {
67         KillAndDecObjRef(realConnection);
68         realConnection = nullptr;
69     }
70 }
71 
TriggerSync(int notifyEvent)72 void SyncAbleKvDB::TriggerSync(int notifyEvent)
73 {
74     if (!started_) {
75         StartSyncer();
76     }
77     if (started_) {
78         syncer_.LocalDataChanged(notifyEvent);
79     }
80 }
81 
CommitNotify(int notifyEvent,KvDBCommitNotifyFilterAbleData * data)82 void SyncAbleKvDB::CommitNotify(int notifyEvent, KvDBCommitNotifyFilterAbleData *data)
83 {
84     SyncAbleKvDB::TriggerSync(notifyEvent);
85 
86     GenericKvDB::CommitNotify(notifyEvent, data);
87 }
88 
Close()89 void SyncAbleKvDB::Close()
90 {
91     StopSyncer(true);
92 }
93 
94 // Start a sync action.
Sync(const ISyncer::SyncParam & parma,uint64_t connectionId)95 int SyncAbleKvDB::Sync(const ISyncer::SyncParam &parma, uint64_t connectionId)
96 {
97     if (!started_) {
98         int errCode = StartSyncer();
99         if (!started_) {
100             return errCode;
101         }
102     }
103     return syncer_.Sync(parma, connectionId);
104 }
105 
106 // Cancel a sync action.
CancelSync(uint32_t syncId)107 int SyncAbleKvDB::CancelSync(uint32_t syncId)
108 {
109     if (!started_) {
110         return -E_NOT_INIT;
111     }
112     return syncer_.CancelSync(syncId);
113 }
114 
EnableAutoSync(bool enable)115 void SyncAbleKvDB::EnableAutoSync(bool enable)
116 {
117     if (NeedStartSyncer()) {
118         StartSyncer();
119     }
120     return syncer_.EnableAutoSync(enable);
121 }
122 
WakeUpSyncer()123 void SyncAbleKvDB::WakeUpSyncer()
124 {
125     if (NeedStartSyncer()) {
126         StartSyncer();
127     }
128 }
129 
130 // Stop a sync action in progress.
StopSync(uint64_t connectionId)131 void SyncAbleKvDB::StopSync(uint64_t connectionId)
132 {
133     if (started_) {
134         syncer_.StopSync(connectionId);
135     }
136 }
137 
SetSyncModuleActive()138 void SyncAbleKvDB::SetSyncModuleActive()
139 {
140     if (isSyncModuleActiveCheck_) {
141         return;
142     }
143     IKvDBSyncInterface *syncInterface = GetSyncInterface();
144     if (syncInterface == nullptr) {
145         LOGD("KvDB got null sync interface.");
146         return;
147     }
148     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
149         false);
150     if (!isSyncDualTupleMode) {
151         isSyncNeedActive_ = true;
152         isSyncModuleActiveCheck_ = true;
153         return;
154     }
155     isSyncNeedActive_ = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
156     if (!isSyncNeedActive_) {
157         LOGI("syncer no need to active");
158     }
159     isSyncModuleActiveCheck_ = true;
160 }
161 
GetSyncModuleActive()162 bool SyncAbleKvDB::GetSyncModuleActive()
163 {
164     return isSyncNeedActive_;
165 }
166 
ReSetSyncModuleActive()167 void SyncAbleKvDB::ReSetSyncModuleActive()
168 {
169     isSyncModuleActiveCheck_ = false;
170     isSyncNeedActive_ = true;
171 }
172 
173 // Start syncer
StartSyncer(bool isCheckSyncActive,bool isNeedActive)174 int SyncAbleKvDB::StartSyncer(bool isCheckSyncActive, bool isNeedActive)
175 {
176     StartCloudSyncer();
177     int errCode = E_OK;
178     {
179         std::unique_lock<std::mutex> lock(syncerOperateLock_);
180         errCode = StartSyncerWithNoLock(isCheckSyncActive, isNeedActive);
181         closed_ = false;
182     }
183     UserChangeHandle();
184     return errCode;
185 }
186 
StartSyncerWithNoLock(bool isCheckSyncActive,bool isNeedActive)187 int SyncAbleKvDB::StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive)
188 {
189     IKvDBSyncInterface *syncInterface = GetSyncInterface();
190     if (syncInterface == nullptr) {
191         LOGD("KvDB got null sync interface.");
192         return -E_INVALID_ARGS;
193     }
194     if (!isCheckSyncActive) {
195         SetSyncModuleActive();
196         isNeedActive = GetSyncModuleActive();
197     }
198     int errCode = syncer_.Initialize(syncInterface, isNeedActive);
199     if (errCode == E_OK) {
200         started_ = true;
201     }
202     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
203         false);
204     std::string label = syncInterface->GetDbProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
205     if (isSyncDualTupleMode && isCheckSyncActive && !isNeedActive && (userChangeListener_ == nullptr)) {
206         // active to non_active
207         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
208             [this](void *) { ChangeUserListener(); }, UserChangeMonitor::USER_ACTIVE_TO_NON_ACTIVE_EVENT);
209         LOGI("[KVDB] [StartSyncerWithNoLock] [%.3s]", label.c_str());
210     } else if (isSyncDualTupleMode && (userChangeListener_ == nullptr)) {
211         EventType event = isNeedActive ?
212             UserChangeMonitor::USER_ACTIVE_EVENT : UserChangeMonitor::USER_NON_ACTIVE_EVENT;
213         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
214             [this](void *) { UserChangeHandle(); }, event);
215         LOGI("[KVDB] [StartSyncerWithNoLock] [%.3s] event=%d", label.c_str(), event);
216     }
217     return errCode;
218 }
219 
220 // Stop syncer
StopSyncer(bool isClosedOperation,bool isStopTaskOnly)221 void SyncAbleKvDB::StopSyncer(bool isClosedOperation, bool isStopTaskOnly)
222 {
223 #ifdef USE_DISTRIBUTEDDB_CLOUD
224     {
225         std::unique_lock<std::mutex> lock(cloudSyncerLock_);
226         if (cloudSyncer_ != nullptr) {
227             if (isStopTaskOnly) {
228                 cloudSyncer_->StopAllTasks();
229             } else if (isClosedOperation) {
230                 cloudSyncer_->Close();
231                 RefObject::KillAndDecObjRef(cloudSyncer_);
232                 cloudSyncer_ = nullptr;
233             }
234         }
235     }
236 #endif
237     NotificationChain::Listener *userChangeListener = nullptr;
238     {
239         std::unique_lock<std::mutex> lock(syncerOperateLock_);
240         StopSyncerWithNoLock(isClosedOperation);
241         userChangeListener = userChangeListener_;
242         userChangeListener_ = nullptr;
243     }
244     if (userChangeListener != nullptr) {
245         userChangeListener->Drop(true);
246         userChangeListener = nullptr;
247     }
248 }
249 
StopSyncerWithNoLock(bool isClosedOperation)250 void SyncAbleKvDB::StopSyncerWithNoLock(bool isClosedOperation)
251 {
252 #ifdef USE_DISTRIBUTEDDB_CLOUD
253     if (!isClosedOperation && userChangeListener_ != nullptr) {
254         std::unique_lock<std::mutex> lock(cloudSyncerLock_);
255         if (cloudSyncer_ != nullptr) {
256             cloudSyncer_->StopAllTasks();
257         }
258     }
259 #endif
260     ReSetSyncModuleActive();
261     syncer_.Close(isClosedOperation);
262     if (started_) {
263         started_ = false;
264     }
265     closed_ = isClosedOperation;
266     if (!isClosedOperation && userChangeListener_ != nullptr) {
267         userChangeListener_->Drop(false);
268         userChangeListener_ = nullptr;
269     }
270 }
271 
UserChangeHandle()272 void SyncAbleKvDB::UserChangeHandle()
273 {
274     bool isNeedChange;
275     bool isNeedActive = true;
276     IKvDBSyncInterface *syncInterface = GetSyncInterface();
277     if (syncInterface == nullptr) {
278         LOGD("KvDB got null sync interface.");
279         return;
280     }
281     bool isSyncDualTupleMode = syncInterface->GetDbProperties().
282         GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false);
283     if (!isSyncDualTupleMode) {
284         LOGD("[SyncAbleKvDB] no use syncDualTupleMode, abort userChange");
285         return;
286     }
287     std::unique_lock<std::mutex> lock(syncerOperateLock_);
288     if (closed_) {
289         LOGI("kvDB is already closed");
290         return;
291     }
292     isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
293     isNeedChange = (isNeedActive != isSyncNeedActive_);
294     // non_active to active or active to non_active
295     if (isNeedChange) {
296         StopSyncerWithNoLock(); // will drop userChangeListener
297         isSyncModuleActiveCheck_ = true;
298         isSyncNeedActive_ = isNeedActive;
299         StartSyncerWithNoLock(true, isNeedActive);
300     }
301 }
302 
ChangeUserListener()303 void SyncAbleKvDB::ChangeUserListener()
304 {
305     // only active to non_active call, put into USER_NON_ACTIVE_EVENT listener from USER_ACTIVE_TO_NON_ACTIVE_EVENT
306     if (userChangeListener_ != nullptr) {
307         userChangeListener_->Drop(false);
308         userChangeListener_ = nullptr;
309     }
310     if (userChangeListener_ == nullptr) {
311         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
312             [this](void *) { UserChangeHandle(); }, UserChangeMonitor::USER_NON_ACTIVE_EVENT);
313         IKvDBSyncInterface *syncInterface = GetSyncInterface();
314         std::string label = syncInterface->GetDbProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
315         LOGI("[KVDB] [ChangeUserListener] [%.3s] After RegisterUserChangedListener", label.c_str());
316     }
317 }
318 
GetTimestampFromDB()319 uint64_t SyncAbleKvDB::GetTimestampFromDB()
320 {
321     return 0; // default is 0
322 }
323 
324 // Get The current virtual timestamp
GetTimestamp(bool needStartSync)325 uint64_t SyncAbleKvDB::GetTimestamp(bool needStartSync)
326 {
327     if (NeedStartSyncer()) {
328         if (needStartSync) {
329             StartSyncer();
330         } else {
331             // if syncer not start, get offset time from database
332             return GetTimestampFromDB();
333         }
334     }
335     return syncer_.GetTimestamp();
336 }
337 
338 // Get the dataItem's append length
GetAppendedLen() const339 uint32_t SyncAbleKvDB::GetAppendedLen() const
340 {
341     return Parcel::GetAppendedLen();
342 }
343 
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)344 int SyncAbleKvDB::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
345 {
346     if (NeedStartSyncer()) {
347         int errCode = StartSyncer();
348         if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
349             return errCode;
350         }
351     }
352     return syncer_.EraseDeviceWaterMark(deviceId, isNeedHash);
353 }
354 
GetQueuedSyncSize(int * queuedSyncSize) const355 int SyncAbleKvDB::GetQueuedSyncSize(int *queuedSyncSize) const
356 {
357     return syncer_.GetQueuedSyncSize(queuedSyncSize);
358 }
359 
SetQueuedSyncLimit(const int * queuedSyncLimit)360 int SyncAbleKvDB::SetQueuedSyncLimit(const int *queuedSyncLimit)
361 {
362     return syncer_.SetQueuedSyncLimit(queuedSyncLimit);
363 }
364 
GetQueuedSyncLimit(int * queuedSyncLimit) const365 int SyncAbleKvDB::GetQueuedSyncLimit(int *queuedSyncLimit) const
366 {
367     return syncer_.GetQueuedSyncLimit(queuedSyncLimit);
368 }
369 
DisableManualSync(void)370 int SyncAbleKvDB::DisableManualSync(void)
371 {
372     return syncer_.DisableManualSync();
373 }
374 
EnableManualSync(void)375 int SyncAbleKvDB::EnableManualSync(void)
376 {
377     return syncer_.EnableManualSync();
378 }
379 
GetLocalIdentity(std::string & outTarget) const380 int SyncAbleKvDB::GetLocalIdentity(std::string &outTarget) const
381 {
382     return syncer_.GetLocalIdentity(outTarget);
383 }
384 
SetStaleDataWipePolicy(WipePolicy policy)385 int SyncAbleKvDB::SetStaleDataWipePolicy(WipePolicy policy)
386 {
387     return syncer_.SetStaleDataWipePolicy(policy);
388 }
389 
RegisterEventType(EventType type)390 int SyncAbleKvDB::RegisterEventType(EventType type)
391 {
392     if (notifyChain_ == nullptr) {
393         notifyChain_ = new (std::nothrow) NotificationChain;
394         if (notifyChain_ == nullptr) {
395             return -E_OUT_OF_MEMORY;
396         }
397     }
398 
399     int errCode = notifyChain_->RegisterEventType(type);
400     if (errCode == -E_ALREADY_REGISTER) {
401         return E_OK;
402     }
403     if (errCode != E_OK) {
404         LOGE("[SyncAbleKvDB] Register event type %u failed! err %d", type, errCode);
405         KillAndDecObjRef(notifyChain_);
406         notifyChain_ = nullptr;
407     }
408     return errCode;
409 }
410 
AddRemotePushFinishedNotify(const RemotePushFinishedNotifier & notifier,int & errCode)411 NotificationChain::Listener *SyncAbleKvDB::AddRemotePushFinishedNotify(const RemotePushFinishedNotifier &notifier,
412     int &errCode)
413 {
414     std::unique_lock<std::shared_mutex> lock(notifyChainLock_);
415     errCode = RegisterEventType(REMOTE_PUSH_FINISHED);
416     if (errCode != E_OK) {
417         return nullptr;
418     }
419 
420     auto listener = notifyChain_->RegisterListener(REMOTE_PUSH_FINISHED,
421         [notifier](void *arg) {
422             if (arg == nullptr) {
423                 LOGE("PragmaRemotePushNotify is null.");
424                 return;
425             }
426             notifier(*static_cast<RemotePushNotifyInfo *>(arg));
427         }, nullptr, errCode);
428     if (errCode != E_OK) {
429         LOGE("[SyncAbleKvDB] Add remote push finished notifier failed! err %d", errCode);
430     }
431     return listener;
432 }
433 
NotifyRemotePushFinishedInner(const std::string & targetId) const434 void SyncAbleKvDB::NotifyRemotePushFinishedInner(const std::string &targetId) const
435 {
436     NotificationChain *notify = nullptr;
437     {
438         std::shared_lock<std::shared_mutex> lock(notifyChainLock_);
439         if (notifyChain_ == nullptr) {
440             return;
441         }
442         notify = notifyChain_;
443         RefObject::IncObjRef(notify);
444     }
445     RemotePushNotifyInfo info;
446     info.deviceId = targetId;
447     notify->NotifyEvent(REMOTE_PUSH_FINISHED, static_cast<void *>(&info));
448     RefObject::DecObjRef(notify);
449 }
450 
SetSyncRetry(bool isRetry)451 int SyncAbleKvDB::SetSyncRetry(bool isRetry)
452 {
453     IKvDBSyncInterface *syncInterface = GetSyncInterface();
454     if (syncInterface == nullptr) {
455         LOGD("KvDB got null sync interface.");
456         return -E_INVALID_DB;
457     }
458     bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
459     if (localOnly) {
460         return -E_NOT_SUPPORT;
461     }
462     if (NeedStartSyncer()) {
463         StartSyncer();
464     }
465     return syncer_.SetSyncRetry(isRetry);
466 }
467 
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)468 int SyncAbleKvDB::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
469 {
470     if (NeedStartSyncer()) {
471         StartSyncer();
472     }
473     return syncer_.SetEqualIdentifier(identifier, targets);
474 }
475 
Dump(int fd)476 void SyncAbleKvDB::Dump(int fd)
477 {
478     SyncerBasicInfo basicInfo = syncer_.DumpSyncerBasicInfo();
479     DBDumpHelper::Dump(fd, "\tisSyncActive = %d, isAutoSync = %d\n\n", basicInfo.isSyncActive,
480         basicInfo.isAutoSync);
481     if (basicInfo.isSyncActive) {
482         DBDumpHelper::Dump(fd, "\tDistributedDB Database Sync Module Message Info:\n");
483         syncer_.Dump(fd);
484     }
485 }
486 
GetSyncDataSize(const std::string & device,size_t & size) const487 int SyncAbleKvDB::GetSyncDataSize(const std::string &device, size_t &size) const
488 {
489     return syncer_.GetSyncDataSize(device, size);
490 }
491 
NeedStartSyncer() const492 bool SyncAbleKvDB::NeedStartSyncer() const
493 {
494     if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
495         return false;
496     }
497     // don't start when check callback got not active
498     // equivalent to !(!isSyncNeedActive_ && isSyncModuleActiveCheck_)
499     return !started_ && (isSyncNeedActive_ || !isSyncModuleActiveCheck_);
500 }
501 
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)502 int SyncAbleKvDB::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
503 {
504     if (!NeedStartSyncer()) {
505         return syncer_.GetHashDeviceId(clientId, hashDevId);
506     }
507     int errCode = StartSyncer();
508     if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
509         return errCode;
510     }
511     return syncer_.GetHashDeviceId(clientId, hashDevId);
512 }
513 
GetWatermarkInfo(const std::string & device,WatermarkInfo & info)514 int SyncAbleKvDB::GetWatermarkInfo(const std::string &device, WatermarkInfo &info)
515 {
516     if (NeedStartSyncer()) {
517         StartSyncer();
518     }
519     return syncer_.GetWatermarkInfo(device, info);
520 }
521 
UpgradeSchemaVerInMeta()522 int SyncAbleKvDB::UpgradeSchemaVerInMeta()
523 {
524     return syncer_.UpgradeSchemaVerInMeta();
525 }
526 
ResetSyncStatus()527 void SyncAbleKvDB::ResetSyncStatus()
528 {
529     syncer_.ResetSyncStatus();
530 }
531 
532 #ifdef USE_DISTRIBUTEDDB_CLOUD
GetICloudSyncInterface() const533 ICloudSyncStorageInterface *SyncAbleKvDB::GetICloudSyncInterface() const
534 {
535     return nullptr;
536 }
537 #endif
538 
StartCloudSyncer()539 void SyncAbleKvDB::StartCloudSyncer()
540 {
541 #ifdef USE_DISTRIBUTEDDB_CLOUD
542     auto cloudStorage = GetICloudSyncInterface();
543     if (cloudStorage == nullptr) {
544         return;
545     }
546     int conflictType = MyProp().GetIntProp(KvDBProperties::CONFLICT_RESOLVE_POLICY,
547         static_cast<int>(SingleVerConflictResolvePolicy::DEFAULT_LAST_WIN));
548     {
549         std::lock_guard<std::mutex> autoLock(cloudSyncerLock_);
550         if (cloudSyncer_ != nullptr) {
551             return;
552         }
553         cloudSyncer_ = new (std::nothrow) CloudSyncer(
554             StorageProxy::GetCloudDb(cloudStorage), true, static_cast<SingleVerConflictResolvePolicy>(conflictType));
555         if (cloudSyncer_ == nullptr) {
556             LOGW("[SyncAbleKvDB][StartCloudSyncer] start cloud syncer and cloud syncer was not initialized");
557         }
558     }
559 #endif
560 }
561 
GetLocalTimeOffset()562 TimeOffset SyncAbleKvDB::GetLocalTimeOffset()
563 {
564     if (NeedStartSyncer()) {
565         StartSyncer();
566     }
567     return syncer_.GetLocalTimeOffset();
568 }
569 
GetDataBaseSchemas()570 std::map<std::string, DataBaseSchema> SyncAbleKvDB::GetDataBaseSchemas()
571 {
572     return {};
573 }
574 
GetTaskCount()575 int32_t SyncAbleKvDB::GetTaskCount()
576 {
577     int32_t taskCount = 0;
578 #ifdef USE_DISTRIBUTEDDB_CLOUD
579     auto cloudSyncer = GetAndIncCloudSyncer();
580     if (cloudSyncer != nullptr) {
581         taskCount += cloudSyncer->GetCloudSyncTaskCount();
582         RefObject::DecObjRef(cloudSyncer);
583     }
584     if (NeedStartSyncer()) {
585         return taskCount;
586     }
587 #endif
588     taskCount += syncer_.GetTaskCount();
589     return taskCount;
590 }
591 
GetAndIncCloudSyncer()592 CloudSyncer *SyncAbleKvDB::GetAndIncCloudSyncer()
593 {
594     std::lock_guard<std::mutex> autoLock(cloudSyncerLock_);
595     if (cloudSyncer_ == nullptr) {
596         return nullptr;
597     }
598     RefObject::IncObjRef(cloudSyncer_);
599     return cloudSyncer_;
600 }
601 
ExchangeClosePending(bool expected)602 bool SyncAbleKvDB::ExchangeClosePending(bool expected)
603 {
604     return syncer_.ExchangeClosePending(expected);
605 }
606 
PreClose()607 int SyncAbleKvDB::PreClose()
608 {
609     if (GenericKvDB::PreClose() == E_OK) {
610         int32_t taskCount = GetTaskCount();
611         if (taskCount > 0) {
612             LOGI("[PreClose] task count:%d", taskCount);
613             return -E_BUSY;
614         }
615         ExchangeClosePending(true);
616         taskCount = GetTaskCount();
617         if (taskCount > 0) {
618             LOGI("[PreClose] task count:%d.", taskCount);
619             ExchangeClosePending(false);
620             return -E_BUSY;
621         }
622     }
623     return E_OK;
624 }
625 
626 #ifdef USE_DISTRIBUTEDDB_CLOUD
FillSyncInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess,CloudSyncer::CloudTaskInfo & info)627 void SyncAbleKvDB::FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess,
628     CloudSyncer::CloudTaskInfo &info)
629 {
630     QuerySyncObject query(Query::Select());
631     query.SetTableName(CloudDbConstant::CLOUD_KV_TABLE_NAME);
632     info.queryList.push_back(query);
633     info.table.push_back(CloudDbConstant::CLOUD_KV_TABLE_NAME);
634     info.callback = onProcess;
635     info.devices = option.devices;
636     info.mode = option.mode;
637     std::set<std::string> userSet(option.users.begin(), option.users.end());
638     info.users = std::vector<std::string>(userSet.begin(), userSet.end());
639     info.lockAction = option.lockAction;
640     info.storeId = MyProp().GetStringProp(DBProperties::STORE_ID, "");
641     info.merge = option.merge;
642     info.prepareTraceId = option.prepareTraceId;
643 }
644 
CheckSyncOption(const CloudSyncOption & option,const CloudSyncer & syncer)645 int SyncAbleKvDB::CheckSyncOption(const CloudSyncOption &option, const CloudSyncer &syncer)
646 {
647     if (option.users.empty()) {
648         LOGE("[SyncAbleKvDB][Sync] no user in sync option");
649         return -E_INVALID_ARGS;
650     }
651     const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs = syncer.GetCloudDB();
652     if (cloudDBs.empty()) {
653         LOGE("[SyncAbleKvDB][Sync] not set cloud db");
654         return -E_CLOUD_ERROR;
655     }
656     auto schemas = GetDataBaseSchemas();
657     if (schemas.empty()) {
658         LOGE("[SyncAbleKvDB][Sync] not set cloud schema");
659         return -E_SCHEMA_MISMATCH;
660     }
661     for (const auto &user : option.users) {
662         if (cloudDBs.find(user) == cloudDBs.end()) {
663             LOGE("[SyncAbleKvDB][Sync] cloud db with invalid user: %s", user.c_str());
664             return -E_INVALID_ARGS;
665         }
666         if (schemas.find(user) == schemas.end()) {
667             LOGE("[SyncAbleKvDB][Sync] cloud schema with invalid user: %s", user.c_str());
668             return -E_SCHEMA_MISMATCH;
669         }
670     }
671     if (option.waitTime > DBConstant::MAX_SYNC_TIMEOUT || option.waitTime < DBConstant::INFINITE_WAIT) {
672         LOGE("[SyncAbleKvDB][Sync] invalid wait time of sync option: %lld", option.waitTime);
673         return -E_INVALID_ARGS;
674     }
675     if (!CheckSchemaSupportForCloudSync()) {
676         return -E_NOT_SUPPORT;
677     }
678     return E_OK;
679 }
680 
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess)681 int SyncAbleKvDB::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess)
682 {
683     auto syncer = GetAndIncCloudSyncer();
684     if (syncer == nullptr) {
685         LOGE("[SyncAbleKvDB][Sync] cloud syncer was not initialized");
686         return -E_INVALID_DB;
687     }
688     int errCode = CheckSyncOption(option, *syncer);
689     if (errCode != E_OK) {
690         RefObject::DecObjRef(syncer);
691         return errCode;
692     }
693     CloudSyncer::CloudTaskInfo info;
694     FillSyncInfo(option, onProcess, info);
695     errCode = syncer->Sync(info);
696     RefObject::DecObjRef(syncer);
697     return errCode;
698 }
699 
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)700 int SyncAbleKvDB::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
701 {
702     auto syncer = GetAndIncCloudSyncer();
703     if (syncer == nullptr) {
704         LOGE("[SyncAbleKvDB][SetCloudDB] cloud syncer was not initialized");
705         return -E_INVALID_DB;
706     }
707     int errCode = syncer->SetCloudDB(cloudDBs);
708     RefObject::DecObjRef(syncer);
709     return errCode;
710 }
711 
CleanAllWaterMark()712 int SyncAbleKvDB::CleanAllWaterMark()
713 {
714     auto syncer = GetAndIncCloudSyncer();
715     if (syncer == nullptr) {
716         LOGE("[SyncAbleKvDB][CleanAllWaterMark] cloud syncer was not initialized");
717         return -E_INVALID_DB;
718     }
719     syncer->CleanAllWaterMark();
720     RefObject::DecObjRef(syncer);
721     return E_OK;
722 }
723 
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)724 void SyncAbleKvDB::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
725 {
726     auto cloudSyncer = GetAndIncCloudSyncer();
727     if (cloudSyncer == nullptr) {
728         LOGE("[SyncAbleKvDB][SetGenCloudVersionCallback] cloud syncer was not initialized");
729         return;
730     }
731     cloudSyncer->SetGenCloudVersionCallback(callback);
732     RefObject::DecObjRef(cloudSyncer);
733 }
734 
CheckSchemaSupportForCloudSync() const735 bool SyncAbleKvDB::CheckSchemaSupportForCloudSync() const
736 {
737     return true; // default is valid
738 }
739 #endif
740 }
741