1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_able_kvdb.h"
17
18 #include "db_dump_helper.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "parcel.h"
22 #include "runtime_context.h"
23 #include "user_change_monitor.h"
24
25 namespace DistributedDB {
26 const EventType SyncAbleKvDB::REMOTE_PUSH_FINISHED = 1;
27
SyncAbleKvDB()28 SyncAbleKvDB::SyncAbleKvDB()
29 : started_(false),
30 closed_(false),
31 isSyncModuleActiveCheck_(false),
32 isSyncNeedActive_(true),
33 notifyChain_(nullptr),
34 userChangeListener_(nullptr),
35 cloudSyncer_(nullptr)
36 {}
37
~SyncAbleKvDB()38 SyncAbleKvDB::~SyncAbleKvDB()
39 {
40 {
41 std::unique_lock<std::shared_mutex> lock(notifyChainLock_);
42 if (notifyChain_ != nullptr) {
43 (void)notifyChain_->UnRegisterEventType(REMOTE_PUSH_FINISHED);
44 KillAndDecObjRef(notifyChain_);
45 notifyChain_ = nullptr;
46 }
47 }
48 NotificationChain::Listener *userChangeListener = nullptr;
49 {
50 std::unique_lock<std::mutex> lock(syncerOperateLock_);
51 userChangeListener = userChangeListener_;
52 userChangeListener_ = nullptr;
53 }
54 if (userChangeListener != nullptr) {
55 userChangeListener->Drop(true);
56 userChangeListener = nullptr;
57 }
58 std::lock_guard<std::mutex> autoLock(cloudSyncerLock_);
59 KillAndDecObjRef(cloudSyncer_);
60 cloudSyncer_ = nullptr;
61 }
62
DelConnection(GenericKvDBConnection * connection)63 void SyncAbleKvDB::DelConnection(GenericKvDBConnection *connection)
64 {
65 auto realConnection = static_cast<SyncAbleKvDBConnection *>(connection);
66 if (realConnection != nullptr) {
67 KillAndDecObjRef(realConnection);
68 realConnection = nullptr;
69 }
70 }
71
TriggerSync(int notifyEvent)72 void SyncAbleKvDB::TriggerSync(int notifyEvent)
73 {
74 if (!started_) {
75 StartSyncer();
76 }
77 if (started_) {
78 syncer_.LocalDataChanged(notifyEvent);
79 }
80 }
81
CommitNotify(int notifyEvent,KvDBCommitNotifyFilterAbleData * data)82 void SyncAbleKvDB::CommitNotify(int notifyEvent, KvDBCommitNotifyFilterAbleData *data)
83 {
84 SyncAbleKvDB::TriggerSync(notifyEvent);
85
86 GenericKvDB::CommitNotify(notifyEvent, data);
87 }
88
Close()89 void SyncAbleKvDB::Close()
90 {
91 StopSyncer(true);
92 }
93
94 // Start a sync action.
Sync(const ISyncer::SyncParam & parma,uint64_t connectionId)95 int SyncAbleKvDB::Sync(const ISyncer::SyncParam &parma, uint64_t connectionId)
96 {
97 if (!started_) {
98 int errCode = StartSyncer();
99 if (!started_) {
100 return errCode;
101 }
102 }
103 return syncer_.Sync(parma, connectionId);
104 }
105
106 // Cancel a sync action.
CancelSync(uint32_t syncId)107 int SyncAbleKvDB::CancelSync(uint32_t syncId)
108 {
109 if (!started_) {
110 return -E_NOT_INIT;
111 }
112 return syncer_.CancelSync(syncId);
113 }
114
EnableAutoSync(bool enable)115 void SyncAbleKvDB::EnableAutoSync(bool enable)
116 {
117 if (NeedStartSyncer()) {
118 StartSyncer();
119 }
120 return syncer_.EnableAutoSync(enable);
121 }
122
WakeUpSyncer()123 void SyncAbleKvDB::WakeUpSyncer()
124 {
125 if (NeedStartSyncer()) {
126 StartSyncer();
127 }
128 }
129
130 // Stop a sync action in progress.
StopSync(uint64_t connectionId)131 void SyncAbleKvDB::StopSync(uint64_t connectionId)
132 {
133 if (started_) {
134 syncer_.StopSync(connectionId);
135 }
136 }
137
SetSyncModuleActive()138 void SyncAbleKvDB::SetSyncModuleActive()
139 {
140 if (isSyncModuleActiveCheck_) {
141 return;
142 }
143 IKvDBSyncInterface *syncInterface = GetSyncInterface();
144 if (syncInterface == nullptr) {
145 LOGD("KvDB got null sync interface.");
146 return;
147 }
148 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
149 false);
150 if (!isSyncDualTupleMode) {
151 isSyncNeedActive_ = true;
152 isSyncModuleActiveCheck_ = true;
153 return;
154 }
155 isSyncNeedActive_ = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
156 if (!isSyncNeedActive_) {
157 LOGI("syncer no need to active");
158 }
159 isSyncModuleActiveCheck_ = true;
160 }
161
GetSyncModuleActive()162 bool SyncAbleKvDB::GetSyncModuleActive()
163 {
164 return isSyncNeedActive_;
165 }
166
ReSetSyncModuleActive()167 void SyncAbleKvDB::ReSetSyncModuleActive()
168 {
169 isSyncModuleActiveCheck_ = false;
170 isSyncNeedActive_ = true;
171 }
172
173 // Start syncer
StartSyncer(bool isCheckSyncActive,bool isNeedActive)174 int SyncAbleKvDB::StartSyncer(bool isCheckSyncActive, bool isNeedActive)
175 {
176 StartCloudSyncer();
177 int errCode = E_OK;
178 {
179 std::unique_lock<std::mutex> lock(syncerOperateLock_);
180 errCode = StartSyncerWithNoLock(isCheckSyncActive, isNeedActive);
181 closed_ = false;
182 }
183 UserChangeHandle();
184 return errCode;
185 }
186
StartSyncerWithNoLock(bool isCheckSyncActive,bool isNeedActive)187 int SyncAbleKvDB::StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive)
188 {
189 IKvDBSyncInterface *syncInterface = GetSyncInterface();
190 if (syncInterface == nullptr) {
191 LOGD("KvDB got null sync interface.");
192 return -E_INVALID_ARGS;
193 }
194 if (!isCheckSyncActive) {
195 SetSyncModuleActive();
196 isNeedActive = GetSyncModuleActive();
197 }
198 int errCode = syncer_.Initialize(syncInterface, isNeedActive);
199 if (errCode == E_OK) {
200 started_ = true;
201 }
202 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
203 false);
204 std::string label = syncInterface->GetDbProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
205 if (isSyncDualTupleMode && isCheckSyncActive && !isNeedActive && (userChangeListener_ == nullptr)) {
206 // active to non_active
207 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
208 [this](void *) { ChangeUserListener(); }, UserChangeMonitor::USER_ACTIVE_TO_NON_ACTIVE_EVENT);
209 LOGI("[KVDB] [StartSyncerWithNoLock] [%.3s]", label.c_str());
210 } else if (isSyncDualTupleMode && (userChangeListener_ == nullptr)) {
211 EventType event = isNeedActive ?
212 UserChangeMonitor::USER_ACTIVE_EVENT : UserChangeMonitor::USER_NON_ACTIVE_EVENT;
213 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
214 [this](void *) { UserChangeHandle(); }, event);
215 LOGI("[KVDB] [StartSyncerWithNoLock] [%.3s] event=%d", label.c_str(), event);
216 }
217 return errCode;
218 }
219
220 // Stop syncer
StopSyncer(bool isClosedOperation,bool isStopTaskOnly)221 void SyncAbleKvDB::StopSyncer(bool isClosedOperation, bool isStopTaskOnly)
222 {
223 #ifdef USE_DISTRIBUTEDDB_CLOUD
224 {
225 std::unique_lock<std::mutex> lock(cloudSyncerLock_);
226 if (cloudSyncer_ != nullptr) {
227 if (isStopTaskOnly) {
228 cloudSyncer_->StopAllTasks();
229 } else if (isClosedOperation) {
230 cloudSyncer_->Close();
231 RefObject::KillAndDecObjRef(cloudSyncer_);
232 cloudSyncer_ = nullptr;
233 }
234 }
235 }
236 #endif
237 NotificationChain::Listener *userChangeListener = nullptr;
238 {
239 std::unique_lock<std::mutex> lock(syncerOperateLock_);
240 StopSyncerWithNoLock(isClosedOperation);
241 userChangeListener = userChangeListener_;
242 userChangeListener_ = nullptr;
243 }
244 if (userChangeListener != nullptr) {
245 userChangeListener->Drop(true);
246 userChangeListener = nullptr;
247 }
248 }
249
StopSyncerWithNoLock(bool isClosedOperation)250 void SyncAbleKvDB::StopSyncerWithNoLock(bool isClosedOperation)
251 {
252 #ifdef USE_DISTRIBUTEDDB_CLOUD
253 if (!isClosedOperation && userChangeListener_ != nullptr) {
254 std::unique_lock<std::mutex> lock(cloudSyncerLock_);
255 if (cloudSyncer_ != nullptr) {
256 cloudSyncer_->StopAllTasks();
257 }
258 }
259 #endif
260 ReSetSyncModuleActive();
261 syncer_.Close(isClosedOperation);
262 if (started_) {
263 started_ = false;
264 }
265 closed_ = isClosedOperation;
266 if (!isClosedOperation && userChangeListener_ != nullptr) {
267 userChangeListener_->Drop(false);
268 userChangeListener_ = nullptr;
269 }
270 }
271
UserChangeHandle()272 void SyncAbleKvDB::UserChangeHandle()
273 {
274 bool isNeedChange;
275 bool isNeedActive = true;
276 IKvDBSyncInterface *syncInterface = GetSyncInterface();
277 if (syncInterface == nullptr) {
278 LOGD("KvDB got null sync interface.");
279 return;
280 }
281 bool isSyncDualTupleMode = syncInterface->GetDbProperties().
282 GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false);
283 if (!isSyncDualTupleMode) {
284 LOGD("[SyncAbleKvDB] no use syncDualTupleMode, abort userChange");
285 return;
286 }
287 std::unique_lock<std::mutex> lock(syncerOperateLock_);
288 if (closed_) {
289 LOGI("kvDB is already closed");
290 return;
291 }
292 isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
293 isNeedChange = (isNeedActive != isSyncNeedActive_);
294 // non_active to active or active to non_active
295 if (isNeedChange) {
296 StopSyncerWithNoLock(); // will drop userChangeListener
297 isSyncModuleActiveCheck_ = true;
298 isSyncNeedActive_ = isNeedActive;
299 StartSyncerWithNoLock(true, isNeedActive);
300 }
301 }
302
ChangeUserListener()303 void SyncAbleKvDB::ChangeUserListener()
304 {
305 // only active to non_active call, put into USER_NON_ACTIVE_EVENT listener from USER_ACTIVE_TO_NON_ACTIVE_EVENT
306 if (userChangeListener_ != nullptr) {
307 userChangeListener_->Drop(false);
308 userChangeListener_ = nullptr;
309 }
310 if (userChangeListener_ == nullptr) {
311 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
312 [this](void *) { UserChangeHandle(); }, UserChangeMonitor::USER_NON_ACTIVE_EVENT);
313 IKvDBSyncInterface *syncInterface = GetSyncInterface();
314 std::string label = syncInterface->GetDbProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
315 LOGI("[KVDB] [ChangeUserListener] [%.3s] After RegisterUserChangedListener", label.c_str());
316 }
317 }
318
GetTimestampFromDB()319 uint64_t SyncAbleKvDB::GetTimestampFromDB()
320 {
321 return 0; // default is 0
322 }
323
324 // Get The current virtual timestamp
GetTimestamp(bool needStartSync)325 uint64_t SyncAbleKvDB::GetTimestamp(bool needStartSync)
326 {
327 if (NeedStartSyncer()) {
328 if (needStartSync) {
329 StartSyncer();
330 } else {
331 // if syncer not start, get offset time from database
332 return GetTimestampFromDB();
333 }
334 }
335 return syncer_.GetTimestamp();
336 }
337
338 // Get the dataItem's append length
GetAppendedLen() const339 uint32_t SyncAbleKvDB::GetAppendedLen() const
340 {
341 return Parcel::GetAppendedLen();
342 }
343
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)344 int SyncAbleKvDB::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
345 {
346 if (NeedStartSyncer()) {
347 int errCode = StartSyncer();
348 if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
349 return errCode;
350 }
351 }
352 return syncer_.EraseDeviceWaterMark(deviceId, isNeedHash);
353 }
354
GetQueuedSyncSize(int * queuedSyncSize) const355 int SyncAbleKvDB::GetQueuedSyncSize(int *queuedSyncSize) const
356 {
357 return syncer_.GetQueuedSyncSize(queuedSyncSize);
358 }
359
SetQueuedSyncLimit(const int * queuedSyncLimit)360 int SyncAbleKvDB::SetQueuedSyncLimit(const int *queuedSyncLimit)
361 {
362 return syncer_.SetQueuedSyncLimit(queuedSyncLimit);
363 }
364
GetQueuedSyncLimit(int * queuedSyncLimit) const365 int SyncAbleKvDB::GetQueuedSyncLimit(int *queuedSyncLimit) const
366 {
367 return syncer_.GetQueuedSyncLimit(queuedSyncLimit);
368 }
369
DisableManualSync(void)370 int SyncAbleKvDB::DisableManualSync(void)
371 {
372 return syncer_.DisableManualSync();
373 }
374
EnableManualSync(void)375 int SyncAbleKvDB::EnableManualSync(void)
376 {
377 return syncer_.EnableManualSync();
378 }
379
GetLocalIdentity(std::string & outTarget) const380 int SyncAbleKvDB::GetLocalIdentity(std::string &outTarget) const
381 {
382 return syncer_.GetLocalIdentity(outTarget);
383 }
384
SetStaleDataWipePolicy(WipePolicy policy)385 int SyncAbleKvDB::SetStaleDataWipePolicy(WipePolicy policy)
386 {
387 return syncer_.SetStaleDataWipePolicy(policy);
388 }
389
RegisterEventType(EventType type)390 int SyncAbleKvDB::RegisterEventType(EventType type)
391 {
392 if (notifyChain_ == nullptr) {
393 notifyChain_ = new (std::nothrow) NotificationChain;
394 if (notifyChain_ == nullptr) {
395 return -E_OUT_OF_MEMORY;
396 }
397 }
398
399 int errCode = notifyChain_->RegisterEventType(type);
400 if (errCode == -E_ALREADY_REGISTER) {
401 return E_OK;
402 }
403 if (errCode != E_OK) {
404 LOGE("[SyncAbleKvDB] Register event type %u failed! err %d", type, errCode);
405 KillAndDecObjRef(notifyChain_);
406 notifyChain_ = nullptr;
407 }
408 return errCode;
409 }
410
AddRemotePushFinishedNotify(const RemotePushFinishedNotifier & notifier,int & errCode)411 NotificationChain::Listener *SyncAbleKvDB::AddRemotePushFinishedNotify(const RemotePushFinishedNotifier ¬ifier,
412 int &errCode)
413 {
414 std::unique_lock<std::shared_mutex> lock(notifyChainLock_);
415 errCode = RegisterEventType(REMOTE_PUSH_FINISHED);
416 if (errCode != E_OK) {
417 return nullptr;
418 }
419
420 auto listener = notifyChain_->RegisterListener(REMOTE_PUSH_FINISHED,
421 [notifier](void *arg) {
422 if (arg == nullptr) {
423 LOGE("PragmaRemotePushNotify is null.");
424 return;
425 }
426 notifier(*static_cast<RemotePushNotifyInfo *>(arg));
427 }, nullptr, errCode);
428 if (errCode != E_OK) {
429 LOGE("[SyncAbleKvDB] Add remote push finished notifier failed! err %d", errCode);
430 }
431 return listener;
432 }
433
NotifyRemotePushFinishedInner(const std::string & targetId) const434 void SyncAbleKvDB::NotifyRemotePushFinishedInner(const std::string &targetId) const
435 {
436 NotificationChain *notify = nullptr;
437 {
438 std::shared_lock<std::shared_mutex> lock(notifyChainLock_);
439 if (notifyChain_ == nullptr) {
440 return;
441 }
442 notify = notifyChain_;
443 RefObject::IncObjRef(notify);
444 }
445 RemotePushNotifyInfo info;
446 info.deviceId = targetId;
447 notify->NotifyEvent(REMOTE_PUSH_FINISHED, static_cast<void *>(&info));
448 RefObject::DecObjRef(notify);
449 }
450
SetSyncRetry(bool isRetry)451 int SyncAbleKvDB::SetSyncRetry(bool isRetry)
452 {
453 IKvDBSyncInterface *syncInterface = GetSyncInterface();
454 if (syncInterface == nullptr) {
455 LOGD("KvDB got null sync interface.");
456 return -E_INVALID_DB;
457 }
458 bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
459 if (localOnly) {
460 return -E_NOT_SUPPORT;
461 }
462 if (NeedStartSyncer()) {
463 StartSyncer();
464 }
465 return syncer_.SetSyncRetry(isRetry);
466 }
467
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)468 int SyncAbleKvDB::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
469 {
470 if (NeedStartSyncer()) {
471 StartSyncer();
472 }
473 return syncer_.SetEqualIdentifier(identifier, targets);
474 }
475
Dump(int fd)476 void SyncAbleKvDB::Dump(int fd)
477 {
478 SyncerBasicInfo basicInfo = syncer_.DumpSyncerBasicInfo();
479 DBDumpHelper::Dump(fd, "\tisSyncActive = %d, isAutoSync = %d\n\n", basicInfo.isSyncActive,
480 basicInfo.isAutoSync);
481 if (basicInfo.isSyncActive) {
482 DBDumpHelper::Dump(fd, "\tDistributedDB Database Sync Module Message Info:\n");
483 syncer_.Dump(fd);
484 }
485 }
486
GetSyncDataSize(const std::string & device,size_t & size) const487 int SyncAbleKvDB::GetSyncDataSize(const std::string &device, size_t &size) const
488 {
489 return syncer_.GetSyncDataSize(device, size);
490 }
491
NeedStartSyncer() const492 bool SyncAbleKvDB::NeedStartSyncer() const
493 {
494 if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
495 return false;
496 }
497 // don't start when check callback got not active
498 // equivalent to !(!isSyncNeedActive_ && isSyncModuleActiveCheck_)
499 return !started_ && (isSyncNeedActive_ || !isSyncModuleActiveCheck_);
500 }
501
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)502 int SyncAbleKvDB::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
503 {
504 if (!NeedStartSyncer()) {
505 return syncer_.GetHashDeviceId(clientId, hashDevId);
506 }
507 int errCode = StartSyncer();
508 if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
509 return errCode;
510 }
511 return syncer_.GetHashDeviceId(clientId, hashDevId);
512 }
513
GetWatermarkInfo(const std::string & device,WatermarkInfo & info)514 int SyncAbleKvDB::GetWatermarkInfo(const std::string &device, WatermarkInfo &info)
515 {
516 if (NeedStartSyncer()) {
517 StartSyncer();
518 }
519 return syncer_.GetWatermarkInfo(device, info);
520 }
521
UpgradeSchemaVerInMeta()522 int SyncAbleKvDB::UpgradeSchemaVerInMeta()
523 {
524 return syncer_.UpgradeSchemaVerInMeta();
525 }
526
ResetSyncStatus()527 void SyncAbleKvDB::ResetSyncStatus()
528 {
529 syncer_.ResetSyncStatus();
530 }
531
532 #ifdef USE_DISTRIBUTEDDB_CLOUD
GetICloudSyncInterface() const533 ICloudSyncStorageInterface *SyncAbleKvDB::GetICloudSyncInterface() const
534 {
535 return nullptr;
536 }
537 #endif
538
StartCloudSyncer()539 void SyncAbleKvDB::StartCloudSyncer()
540 {
541 #ifdef USE_DISTRIBUTEDDB_CLOUD
542 auto cloudStorage = GetICloudSyncInterface();
543 if (cloudStorage == nullptr) {
544 return;
545 }
546 int conflictType = MyProp().GetIntProp(KvDBProperties::CONFLICT_RESOLVE_POLICY,
547 static_cast<int>(SingleVerConflictResolvePolicy::DEFAULT_LAST_WIN));
548 {
549 std::lock_guard<std::mutex> autoLock(cloudSyncerLock_);
550 if (cloudSyncer_ != nullptr) {
551 return;
552 }
553 cloudSyncer_ = new (std::nothrow) CloudSyncer(
554 StorageProxy::GetCloudDb(cloudStorage), true, static_cast<SingleVerConflictResolvePolicy>(conflictType));
555 if (cloudSyncer_ == nullptr) {
556 LOGW("[SyncAbleKvDB][StartCloudSyncer] start cloud syncer and cloud syncer was not initialized");
557 }
558 }
559 #endif
560 }
561
GetLocalTimeOffset()562 TimeOffset SyncAbleKvDB::GetLocalTimeOffset()
563 {
564 if (NeedStartSyncer()) {
565 StartSyncer();
566 }
567 return syncer_.GetLocalTimeOffset();
568 }
569
GetDataBaseSchemas()570 std::map<std::string, DataBaseSchema> SyncAbleKvDB::GetDataBaseSchemas()
571 {
572 return {};
573 }
574
GetTaskCount()575 int32_t SyncAbleKvDB::GetTaskCount()
576 {
577 int32_t taskCount = 0;
578 #ifdef USE_DISTRIBUTEDDB_CLOUD
579 auto cloudSyncer = GetAndIncCloudSyncer();
580 if (cloudSyncer != nullptr) {
581 taskCount += cloudSyncer->GetCloudSyncTaskCount();
582 RefObject::DecObjRef(cloudSyncer);
583 }
584 if (NeedStartSyncer()) {
585 return taskCount;
586 }
587 #endif
588 taskCount += syncer_.GetTaskCount();
589 return taskCount;
590 }
591
GetAndIncCloudSyncer()592 CloudSyncer *SyncAbleKvDB::GetAndIncCloudSyncer()
593 {
594 std::lock_guard<std::mutex> autoLock(cloudSyncerLock_);
595 if (cloudSyncer_ == nullptr) {
596 return nullptr;
597 }
598 RefObject::IncObjRef(cloudSyncer_);
599 return cloudSyncer_;
600 }
601
ExchangeClosePending(bool expected)602 bool SyncAbleKvDB::ExchangeClosePending(bool expected)
603 {
604 return syncer_.ExchangeClosePending(expected);
605 }
606
PreClose()607 int SyncAbleKvDB::PreClose()
608 {
609 if (GenericKvDB::PreClose() == E_OK) {
610 int32_t taskCount = GetTaskCount();
611 if (taskCount > 0) {
612 LOGI("[PreClose] task count:%d", taskCount);
613 return -E_BUSY;
614 }
615 ExchangeClosePending(true);
616 taskCount = GetTaskCount();
617 if (taskCount > 0) {
618 LOGI("[PreClose] task count:%d.", taskCount);
619 ExchangeClosePending(false);
620 return -E_BUSY;
621 }
622 }
623 return E_OK;
624 }
625
626 #ifdef USE_DISTRIBUTEDDB_CLOUD
FillSyncInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess,CloudSyncer::CloudTaskInfo & info)627 void SyncAbleKvDB::FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess,
628 CloudSyncer::CloudTaskInfo &info)
629 {
630 QuerySyncObject query(Query::Select());
631 query.SetTableName(CloudDbConstant::CLOUD_KV_TABLE_NAME);
632 info.queryList.push_back(query);
633 info.table.push_back(CloudDbConstant::CLOUD_KV_TABLE_NAME);
634 info.callback = onProcess;
635 info.devices = option.devices;
636 info.mode = option.mode;
637 std::set<std::string> userSet(option.users.begin(), option.users.end());
638 info.users = std::vector<std::string>(userSet.begin(), userSet.end());
639 info.lockAction = option.lockAction;
640 info.storeId = MyProp().GetStringProp(DBProperties::STORE_ID, "");
641 info.merge = option.merge;
642 info.prepareTraceId = option.prepareTraceId;
643 }
644
CheckSyncOption(const CloudSyncOption & option,const CloudSyncer & syncer)645 int SyncAbleKvDB::CheckSyncOption(const CloudSyncOption &option, const CloudSyncer &syncer)
646 {
647 if (option.users.empty()) {
648 LOGE("[SyncAbleKvDB][Sync] no user in sync option");
649 return -E_INVALID_ARGS;
650 }
651 const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs = syncer.GetCloudDB();
652 if (cloudDBs.empty()) {
653 LOGE("[SyncAbleKvDB][Sync] not set cloud db");
654 return -E_CLOUD_ERROR;
655 }
656 auto schemas = GetDataBaseSchemas();
657 if (schemas.empty()) {
658 LOGE("[SyncAbleKvDB][Sync] not set cloud schema");
659 return -E_SCHEMA_MISMATCH;
660 }
661 for (const auto &user : option.users) {
662 if (cloudDBs.find(user) == cloudDBs.end()) {
663 LOGE("[SyncAbleKvDB][Sync] cloud db with invalid user: %s", user.c_str());
664 return -E_INVALID_ARGS;
665 }
666 if (schemas.find(user) == schemas.end()) {
667 LOGE("[SyncAbleKvDB][Sync] cloud schema with invalid user: %s", user.c_str());
668 return -E_SCHEMA_MISMATCH;
669 }
670 }
671 if (option.waitTime > DBConstant::MAX_SYNC_TIMEOUT || option.waitTime < DBConstant::INFINITE_WAIT) {
672 LOGE("[SyncAbleKvDB][Sync] invalid wait time of sync option: %lld", option.waitTime);
673 return -E_INVALID_ARGS;
674 }
675 if (!CheckSchemaSupportForCloudSync()) {
676 return -E_NOT_SUPPORT;
677 }
678 return E_OK;
679 }
680
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess)681 int SyncAbleKvDB::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess)
682 {
683 auto syncer = GetAndIncCloudSyncer();
684 if (syncer == nullptr) {
685 LOGE("[SyncAbleKvDB][Sync] cloud syncer was not initialized");
686 return -E_INVALID_DB;
687 }
688 int errCode = CheckSyncOption(option, *syncer);
689 if (errCode != E_OK) {
690 RefObject::DecObjRef(syncer);
691 return errCode;
692 }
693 CloudSyncer::CloudTaskInfo info;
694 FillSyncInfo(option, onProcess, info);
695 errCode = syncer->Sync(info);
696 RefObject::DecObjRef(syncer);
697 return errCode;
698 }
699
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)700 int SyncAbleKvDB::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
701 {
702 auto syncer = GetAndIncCloudSyncer();
703 if (syncer == nullptr) {
704 LOGE("[SyncAbleKvDB][SetCloudDB] cloud syncer was not initialized");
705 return -E_INVALID_DB;
706 }
707 int errCode = syncer->SetCloudDB(cloudDBs);
708 RefObject::DecObjRef(syncer);
709 return errCode;
710 }
711
CleanAllWaterMark()712 int SyncAbleKvDB::CleanAllWaterMark()
713 {
714 auto syncer = GetAndIncCloudSyncer();
715 if (syncer == nullptr) {
716 LOGE("[SyncAbleKvDB][CleanAllWaterMark] cloud syncer was not initialized");
717 return -E_INVALID_DB;
718 }
719 syncer->CleanAllWaterMark();
720 RefObject::DecObjRef(syncer);
721 return E_OK;
722 }
723
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)724 void SyncAbleKvDB::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
725 {
726 auto cloudSyncer = GetAndIncCloudSyncer();
727 if (cloudSyncer == nullptr) {
728 LOGE("[SyncAbleKvDB][SetGenCloudVersionCallback] cloud syncer was not initialized");
729 return;
730 }
731 cloudSyncer->SetGenCloudVersionCallback(callback);
732 RefObject::DecObjRef(cloudSyncer);
733 }
734
CheckSchemaSupportForCloudSync() const735 bool SyncAbleKvDB::CheckSchemaSupportForCloudSync() const
736 {
737 return true; // default is valid
738 }
739 #endif
740 }
741