• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "generic_syncer.h"
17 
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "ref_object.h"
22 #include "sqlite_single_ver_natural_store.h"
23 #include "time_sync.h"
24 #include "single_ver_data_sync.h"
25 #ifndef OMIT_MULTI_VER
26 #include "commit_history_sync.h"
27 #include "multi_ver_data_sync.h"
28 #include "value_slice_sync.h"
29 #endif
30 #include "device_manager.h"
31 #include "db_constant.h"
32 #include "ability_sync.h"
33 #include "generic_single_ver_kv_entry.h"
34 #include "single_ver_serialize_manager.h"
35 
36 namespace DistributedDB {
37 namespace {
38     constexpr uint32_t DEFAULT_MTU_SIZE = 1024u * 1024u; // 1M
39 }
40 const int GenericSyncer::MIN_VALID_SYNC_ID = 1;
41 std::mutex GenericSyncer::moduleInitLock_;
42 int GenericSyncer::currentSyncId_ = 0;
43 std::mutex GenericSyncer::syncIdLock_;
GenericSyncer()44 GenericSyncer::GenericSyncer()
45     : syncEngine_(nullptr),
46       syncInterface_(nullptr),
47       timeHelper_(nullptr),
48       metadata_(nullptr),
49       initialized_(false),
50       queuedManualSyncSize_(0),
51       queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
52       manualSyncEnable_(true),
53       closing_(false),
54       engineFinalize_(false),
55       timeChangeListenerFinalize_(true),
56       timeChangedListener_(nullptr)
57 {
58 }
59 
~GenericSyncer()60 GenericSyncer::~GenericSyncer()
61 {
62     LOGD("[GenericSyncer] ~GenericSyncer!");
63     if (syncEngine_ != nullptr) {
64         syncEngine_->OnKill([this]() { this->syncEngine_->Close(); });
65         RefObject::KillAndDecObjRef(syncEngine_);
66         // waiting all thread exist
67         std::unique_lock<std::mutex> cvLock(engineMutex_);
68         bool engineFinalize = engineFinalizeCv_.wait_for(cvLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
69             [this]() { return engineFinalize_; });
70         if (!engineFinalize) {
71             LOGW("syncer finalize before engine finalize!");
72         }
73         syncEngine_ = nullptr;
74     }
75     ReleaseInnerResource();
76     std::lock_guard<std::mutex> lock(syncerLock_);
77     syncInterface_ = nullptr;
78 }
79 
Initialize(ISyncInterface * syncInterface,bool isNeedActive)80 int GenericSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
81 {
82     if (syncInterface == nullptr) {
83         LOGE("[Syncer] Init failed, the syncInterface is null!");
84         return -E_INVALID_ARGS;
85     }
86 
87     {
88         std::lock_guard<std::mutex> lock(syncerLock_);
89         if (initialized_) {
90             return E_OK;
91         }
92         if (closing_) {
93             LOGE("[Syncer] Syncer is closing, return!");
94             return -E_BUSY;
95         }
96         std::vector<uint8_t> label = syncInterface->GetIdentifier();
97         label.resize(3); // only show 3 Bytes enough
98         label_ = DBCommon::VectorToHexString(label);
99 
100         int errCode = InitStorageResource(syncInterface);
101         if (errCode != E_OK) {
102             return errCode;
103         }
104         // As timeChangedListener_ will record time change, it should not be clear even if engine init failed.
105         // It will be clear in destructor.
106         int errCodeTimeChangedListener = InitTimeChangedListener();
107         if (errCodeTimeChangedListener != E_OK) {
108             return -E_INTERNAL_ERROR;
109         }
110         errCode = CheckSyncActive(syncInterface, isNeedActive);
111         if (errCode != E_OK) {
112             return errCode;
113         }
114 
115         if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
116             LOGW("[Syncer] Communicator component not ready!");
117             return -E_NOT_INIT;
118         }
119 
120         errCode = SyncModuleInit();
121         if (errCode != E_OK) {
122             LOGE("[Syncer] Sync ModuleInit ERR!");
123             return -E_INTERNAL_ERROR;
124         }
125 
126         errCode = InitSyncEngine(syncInterface);
127         if (errCode != E_OK) {
128             return errCode;
129         }
130         syncEngine_->SetEqualIdentifier();
131         initialized_ = true;
132     }
133 
134     // StartCommunicator may start an auto sync, this function can not in syncerLock_
135     syncEngine_->StartCommunicator();
136     return E_OK;
137 }
138 
Close(bool isClosedOperation)139 int GenericSyncer::Close(bool isClosedOperation)
140 {
141     int errCode = CloseInner(isClosedOperation);
142     if (errCode != -E_BUSY && isClosedOperation) {
143         ReleaseInnerResource();
144     }
145     return errCode;
146 }
147 
Sync(const std::vector<std::string> & devices,int mode,const std::function<void (const std::map<std::string,int> &)> & onComplete,const std::function<void (void)> & onFinalize,bool wait=false)148 int GenericSyncer::Sync(const std::vector<std::string> &devices, int mode,
149     const std::function<void(const std::map<std::string, int> &)> &onComplete,
150     const std::function<void(void)> &onFinalize, bool wait = false)
151 {
152     SyncParma param;
153     param.devices = devices;
154     param.mode = mode;
155     param.onComplete = onComplete;
156     param.onFinalize = onFinalize;
157     param.wait = wait;
158     return Sync(param);
159 }
160 
Sync(const InternalSyncParma & param)161 int GenericSyncer::Sync(const InternalSyncParma &param)
162 {
163     SyncParma syncParam;
164     syncParam.devices = param.devices;
165     syncParam.mode = param.mode;
166     syncParam.isQuerySync = param.isQuerySync;
167     syncParam.syncQuery = param.syncQuery;
168     return Sync(syncParam);
169 }
170 
Sync(const SyncParma & param)171 int GenericSyncer::Sync(const SyncParma &param)
172 {
173     return Sync(param, DBConstant::IGNORE_CONNECTION_ID);
174 }
175 
Sync(const SyncParma & param,uint64_t connectionId)176 int GenericSyncer::Sync(const SyncParma &param, uint64_t connectionId)
177 {
178     int errCode = SyncPreCheck(param);
179     if (errCode != E_OK) {
180         return errCode;
181     }
182     errCode = AddQueuedManualSyncSize(param.mode, param.wait);
183     if (errCode != E_OK) {
184         return errCode;
185     }
186 
187     uint32_t syncId = GenerateSyncId();
188     errCode = PrepareSync(param, syncId, connectionId);
189     if (errCode != E_OK) {
190         LOGE("[Syncer] PrepareSync failed when sync called, err %d", errCode);
191         return errCode;
192     }
193     PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
194     return E_OK;
195 }
196 
PrepareSync(const SyncParma & param,uint32_t syncId,uint64_t connectionId)197 int GenericSyncer::PrepareSync(const SyncParma &param, uint32_t syncId, uint64_t connectionId)
198 {
199     auto *operation =
200         new (std::nothrow) SyncOperation(syncId, param.devices, param.mode, param.onComplete, param.wait);
201     if (operation == nullptr) {
202         SubQueuedSyncSize();
203         return -E_OUT_OF_MEMORY;
204     }
205     {
206         std::lock_guard<std::mutex> autoLock(syncerLock_);
207         PerformanceAnalysis::GetInstance()->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
208         InitSyncOperation(operation, param);
209         LOGI("[Syncer] GenerateSyncId %" PRIu32 ", mode = %d, wait = %d, label = %s, devices = %s", syncId, param.mode,
210             param.wait, label_.c_str(), GetSyncDevicesStr(param.devices).c_str());
211         AddSyncOperation(operation);
212         PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
213     }
214     if (connectionId != DBConstant::IGNORE_CONNECTION_ID) {
215         std::lock_guard<std::mutex> lockGuard(syncIdLock_);
216         connectionIdMap_[connectionId].push_back(static_cast<int>(syncId));
217         syncIdMap_[static_cast<int>(syncId)] = connectionId;
218     }
219     if (operation->CheckIsAllFinished()) {
220         operation->Finished();
221         RefObject::KillAndDecObjRef(operation);
222     } else {
223         operation->WaitIfNeed();
224         RefObject::DecObjRef(operation);
225     }
226     return E_OK;
227 }
228 
RemoveSyncOperation(int syncId)229 int GenericSyncer::RemoveSyncOperation(int syncId)
230 {
231     SyncOperation *operation = nullptr;
232     std::unique_lock<std::mutex> lock(operationMapLock_);
233     auto iter = syncOperationMap_.find(syncId);
234     if (iter != syncOperationMap_.end()) {
235         LOGD("[Syncer] RemoveSyncOperation id:%d.", syncId);
236         operation = iter->second;
237         syncOperationMap_.erase(syncId);
238         lock.unlock();
239         if ((!operation->IsAutoSync()) && (!operation->IsBlockSync()) && (!operation->IsAutoControlCmd())) {
240             SubQueuedSyncSize();
241         }
242         operation->NotifyIfNeed();
243         RefObject::KillAndDecObjRef(operation);
244         operation = nullptr;
245         std::lock_guard<std::mutex> lockGuard(syncIdLock_);
246         if (syncIdMap_.find(syncId) == syncIdMap_.end()) {
247             return E_OK;
248         }
249         uint64_t connectionId = syncIdMap_[syncId];
250         if (connectionIdMap_.find(connectionId) != connectionIdMap_.end()) {
251             connectionIdMap_[connectionId].remove(syncId);
252         }
253         syncIdMap_.erase(syncId);
254         return E_OK;
255     }
256     return -E_INVALID_ARGS;
257 }
258 
StopSync(uint64_t connectionId)259 int GenericSyncer::StopSync(uint64_t connectionId)
260 {
261     std::list<int> syncIdList;
262     {
263         std::lock_guard<std::mutex> lockGuard(syncIdLock_);
264         if (connectionIdMap_.find(connectionId) == connectionIdMap_.end()) {
265             return E_OK;
266         }
267         syncIdList = connectionIdMap_[connectionId];
268         connectionIdMap_.erase(connectionId);
269     }
270     for (auto syncId : syncIdList) {
271         RemoveSyncOperation(syncId);
272         if (syncEngine_ != nullptr) {
273             syncEngine_->AbortMachineIfNeed(syncId);
274         }
275     }
276     if (syncEngine_ != nullptr) {
277         syncEngine_->NotifyConnectionClosed(connectionId);
278     }
279     return E_OK;
280 }
281 
GetTimestamp()282 uint64_t GenericSyncer::GetTimestamp()
283 {
284     std::shared_ptr<TimeHelper> timeHelper = nullptr;
285     ISyncInterface *storage = nullptr;
286     {
287         std::lock_guard<std::mutex> lock(syncerLock_);
288         timeHelper = timeHelper_;
289         if (syncInterface_ != nullptr) {
290             storage = syncInterface_;
291             storage->IncRefCount();
292         }
293     }
294     if (storage == nullptr) {
295         return TimeHelper::GetSysCurrentTime();
296     }
297     if (timeHelper == nullptr) {
298         storage->DecRefCount();
299         return TimeHelper::GetSysCurrentTime();
300     }
301     uint64_t timestamp = timeHelper->GetTime();
302     storage->DecRefCount();
303     return timestamp;
304 }
305 
QueryAutoSync(const InternalSyncParma & param)306 void GenericSyncer::QueryAutoSync(const InternalSyncParma &param)
307 {
308     if (!initialized_) {
309         LOGW("[Syncer] Syncer has not Init");
310         return;
311     }
312     LOGI("[GenericSyncer] trigger query syncmode=%u,dev=%s", param.mode, GetSyncDevicesStr(param.devices).c_str());
313     ISyncInterface *syncInterface = nullptr;
314     ISyncEngine *engine = nullptr;
315     {
316         std::lock_guard<std::mutex> lock(syncerLock_);
317         if (syncInterface_ == nullptr || syncEngine_ == nullptr) {
318             LOGW("[Syncer] Syncer has not Init");
319             return;
320         }
321         syncInterface = syncInterface_;
322         engine = syncEngine_;
323         syncInterface->IncRefCount();
324         RefObject::IncObjRef(engine);
325     }
326     int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, param, engine, syncInterface] {
327         int errCode = Sync(param);
328         if (errCode != E_OK) {
329             LOGE("[GenericSyncer] sync start by QueryAutoSync failed err %d", errCode);
330         }
331         RefObject::DecObjRef(engine);
332         syncInterface->DecRefCount();
333     });
334     if (retCode != E_OK) {
335         LOGE("[GenericSyncer] QueryAutoSync triggler sync retCode:%d", retCode);
336         RefObject::DecObjRef(engine);
337         syncInterface->DecRefCount();
338     }
339 }
340 
AddSyncOperation(SyncOperation * operation)341 void GenericSyncer::AddSyncOperation(SyncOperation *operation)
342 {
343     if (operation == nullptr) {
344         return;
345     }
346 
347     LOGD("[Syncer] AddSyncOperation.");
348     syncEngine_->AddSyncOperation(operation);
349 
350     if (operation->CheckIsAllFinished()) {
351         return;
352     }
353 
354     std::lock_guard<std::mutex> lock(operationMapLock_);
355     syncOperationMap_.insert(std::pair<int, SyncOperation *>(operation->GetSyncId(), operation));
356     // To make sure operation alive before WaitIfNeed out
357     RefObject::IncObjRef(operation);
358 }
359 
SyncOperationKillCallbackInner(int syncId)360 void GenericSyncer::SyncOperationKillCallbackInner(int syncId)
361 {
362     if (syncEngine_ != nullptr) {
363         LOGI("[Syncer] Operation on kill id = %d", syncId);
364         syncEngine_->RemoveSyncOperation(syncId);
365     }
366 }
367 
SyncOperationKillCallback(int syncId)368 void GenericSyncer::SyncOperationKillCallback(int syncId)
369 {
370     SyncOperationKillCallbackInner(syncId);
371 }
372 
InitMetaData(ISyncInterface * syncInterface)373 int GenericSyncer::InitMetaData(ISyncInterface *syncInterface)
374 {
375     if (metadata_ != nullptr) {
376         return E_OK;
377     }
378 
379     metadata_ = std::make_shared<Metadata>();
380     if (metadata_ == nullptr) {
381         LOGE("[Syncer] metadata make shared failed");
382         return -E_OUT_OF_MEMORY;
383     }
384     int errCode = metadata_->Initialize(syncInterface);
385     if (errCode != E_OK) {
386         LOGE("[Syncer] metadata Initializeate failed! err %d.", errCode);
387         metadata_ = nullptr;
388     }
389     syncInterface_ = syncInterface;
390     return errCode;
391 }
392 
InitTimeHelper(ISyncInterface * syncInterface)393 int GenericSyncer::InitTimeHelper(ISyncInterface *syncInterface)
394 {
395     if (timeHelper_ != nullptr) {
396         return E_OK;
397     }
398 
399     timeHelper_ = std::make_shared<TimeHelper>();
400     if (timeHelper_ == nullptr) {
401         return -E_OUT_OF_MEMORY;
402     }
403     int errCode = timeHelper_->Initialize(syncInterface, metadata_);
404     if (errCode != E_OK) {
405         LOGE("[Syncer] TimeHelper init failed! err:%d.", errCode);
406         timeHelper_ = nullptr;
407     }
408     return errCode;
409 }
410 
InitSyncEngine(ISyncInterface * syncInterface)411 int GenericSyncer::InitSyncEngine(ISyncInterface *syncInterface)
412 {
413     if (syncEngine_ != nullptr && syncEngine_->IsEngineActive()) {
414         LOGI("[Syncer] syncEngine is active");
415         return E_OK;
416     }
417     int errCode = BuildSyncEngine();
418     if (errCode != E_OK) {
419         return errCode;
420     }
421     const std::function<void(std::string)> onlineFunc = std::bind(&GenericSyncer::RemoteDataChanged,
422         this, std::placeholders::_1);
423     const std::function<void(std::string)> offlineFunc = std::bind(&GenericSyncer::RemoteDeviceOffline,
424         this, std::placeholders::_1);
425     const std::function<void(const InternalSyncParma &param)> queryAutoSyncFunc =
426         std::bind(&GenericSyncer::QueryAutoSync, this, std::placeholders::_1);
427     const ISyncEngine::InitCallbackParam param = { onlineFunc, offlineFunc, queryAutoSyncFunc };
428     errCode = syncEngine_->Initialize(syncInterface, metadata_, param);
429     if (errCode == E_OK) {
430         syncInterface->IncRefCount();
431         label_ = syncEngine_->GetLabel();
432         return E_OK;
433     } else {
434         LOGE("[Syncer] SyncEngine init failed! err:%d.", errCode);
435         RefObject::KillAndDecObjRef(syncEngine_);
436         syncEngine_ = nullptr;
437         return errCode;
438     }
439 }
440 
CheckSyncActive(ISyncInterface * syncInterface,bool isNeedActive)441 int GenericSyncer::CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive)
442 {
443     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE,
444         false);
445     if (!isSyncDualTupleMode || isNeedActive) {
446         return E_OK;
447     }
448     LOGI("[Syncer] syncer no need to active");
449     int errCode = BuildSyncEngine();
450     if (errCode != E_OK) {
451         return errCode;
452     }
453     return -E_NO_NEED_ACTIVE;
454 }
455 
GenerateSyncId()456 uint32_t GenericSyncer::GenerateSyncId()
457 {
458     std::lock_guard<std::mutex> lock(syncIdLock_);
459     currentSyncId_++;
460     // if overflow, reset to 1
461     if (currentSyncId_ <= 0) {
462         currentSyncId_ = MIN_VALID_SYNC_ID;
463     }
464     return currentSyncId_;
465 }
466 
IsValidMode(int mode) const467 bool GenericSyncer::IsValidMode(int mode) const
468 {
469     if ((mode >= SyncModeType::INVALID_MODE) || (mode < SyncModeType::PUSH)) {
470         LOGE("[Syncer] Sync mode is not valid!");
471         return false;
472     }
473     return true;
474 }
475 
SyncConditionCheck(const SyncParma & param,const ISyncEngine * engine,ISyncInterface * storage) const476 int GenericSyncer::SyncConditionCheck(const SyncParma &param, const ISyncEngine *engine, ISyncInterface *storage) const
477 {
478     (void)param;
479     (void)engine;
480     (void)storage;
481     return E_OK;
482 }
483 
IsValidDevices(const std::vector<std::string> & devices) const484 bool GenericSyncer::IsValidDevices(const std::vector<std::string> &devices) const
485 {
486     if (devices.empty()) {
487         LOGE("[Syncer] devices is empty!");
488         return false;
489     }
490     return true;
491 }
492 
ClearSyncOperations(bool isClosedOperation)493 void GenericSyncer::ClearSyncOperations(bool isClosedOperation)
494 {
495     std::vector<SyncOperation *> syncOperation;
496     {
497         std::lock_guard<std::mutex> lock(operationMapLock_);
498         for (auto &item : syncOperationMap_) {
499             bool isBlockSync = item.second->IsBlockSync();
500             if (isBlockSync || !isClosedOperation) {
501                 int status = (!isClosedOperation) ? SyncOperation::OP_USER_CHANGED : SyncOperation::OP_FAILED;
502                 item.second->SetUnfinishedDevStatus(status);
503                 RefObject::IncObjRef(item.second);
504                 syncOperation.push_back(item.second);
505             }
506         }
507     }
508 
509     if (!isClosedOperation) { // means user changed
510         syncEngine_->NotifyUserChange();
511     }
512 
513     for (auto &operation : syncOperation) {
514         // block sync operation or userChange will trigger remove sync operation
515         // caller won't blocked for block sync
516         // caller won't blocked for userChange operation no mater it is block or non-block sync
517         TriggerSyncFinished(operation);
518         RefObject::DecObjRef(operation);
519     }
520     ClearInnerResource(isClosedOperation);
521 }
522 
ClearInnerResource(bool isClosedOperation)523 void GenericSyncer::ClearInnerResource(bool isClosedOperation)
524 {
525     {
526         std::lock_guard<std::mutex> lock(operationMapLock_);
527         for (auto &iter : syncOperationMap_) {
528             RefObject::KillAndDecObjRef(iter.second);
529             iter.second = nullptr;
530         }
531         syncOperationMap_.clear();
532     }
533     {
534         std::lock_guard<std::mutex> lock(syncIdLock_);
535         if (isClosedOperation) {
536             connectionIdMap_.clear();
537         } else { // only need to clear syncid when user change
538             for (auto &item : connectionIdMap_) {
539                 item.second.clear();
540             }
541         }
542         syncIdMap_.clear();
543     }
544 }
545 
TriggerSyncFinished(SyncOperation * operation)546 void GenericSyncer::TriggerSyncFinished(SyncOperation *operation)
547 {
548     if (operation != nullptr && operation->CheckIsAllFinished()) {
549         operation->Finished();
550     }
551 }
552 
OnSyncFinished(int syncId)553 void GenericSyncer::OnSyncFinished(int syncId)
554 {
555     (void)(RemoveSyncOperation(syncId));
556 }
557 
SyncModuleInit()558 int GenericSyncer::SyncModuleInit()
559 {
560     static bool isInit = false;
561     std::lock_guard<std::mutex> lock(moduleInitLock_);
562     if (!isInit) {
563         int errCode = SyncResourceInit();
564         if (errCode != E_OK) {
565             return errCode;
566         }
567         isInit = true;
568         return E_OK;
569     }
570     return E_OK;
571 }
572 
SyncResourceInit()573 int GenericSyncer::SyncResourceInit()
574 {
575     int errCode = TimeSync::RegisterTransformFunc();
576     if (errCode != E_OK) {
577         LOGE("Register timesync message transform func ERR!");
578         return errCode;
579     }
580     errCode = SingleVerSerializeManager::RegisterTransformFunc();
581     if (errCode != E_OK) {
582         LOGE("Register SingleVerDataSync message transform func ERR!");
583         return errCode;
584     }
585 #ifndef OMIT_MULTI_VER
586     errCode = CommitHistorySync::RegisterTransformFunc();
587     if (errCode != E_OK) {
588         LOGE("Register CommitHistorySync message transform func ERR!");
589         return errCode;
590     }
591     errCode = MultiVerDataSync::RegisterTransformFunc();
592     if (errCode != E_OK) {
593         LOGE("Register MultiVerDataSync message transform func ERR!");
594         return errCode;
595     }
596     errCode = ValueSliceSync::RegisterTransformFunc();
597     if (errCode != E_OK) {
598         LOGE("Register ValueSliceSync message transform func ERR!");
599         return errCode;
600     }
601 #endif
602     errCode = DeviceManager::RegisterTransformFunc();
603     if (errCode != E_OK) {
604         LOGE("Register DeviceManager message transform func ERR!");
605         return errCode;
606     }
607     errCode = AbilitySync::RegisterTransformFunc();
608     if (errCode != E_OK) {
609         LOGE("Register AbilitySync message transform func ERR!");
610         return errCode;
611     }
612     return E_OK;
613 }
614 
GetQueuedSyncSize(int * queuedSyncSize) const615 int GenericSyncer::GetQueuedSyncSize(int *queuedSyncSize) const
616 {
617     if (queuedSyncSize == nullptr) {
618         return -E_INVALID_ARGS;
619     }
620     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
621     *queuedSyncSize = queuedManualSyncSize_;
622     LOGI("[GenericSyncer] GetQueuedSyncSize:%d", queuedManualSyncSize_);
623     return E_OK;
624 }
625 
SetQueuedSyncLimit(const int * queuedSyncLimit)626 int GenericSyncer::SetQueuedSyncLimit(const int *queuedSyncLimit)
627 {
628     if (queuedSyncLimit == nullptr) {
629         return -E_INVALID_ARGS;
630     }
631     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
632     queuedManualSyncLimit_ = *queuedSyncLimit;
633     LOGI("[GenericSyncer] SetQueuedSyncLimit:%d", queuedManualSyncLimit_);
634     return E_OK;
635 }
636 
GetQueuedSyncLimit(int * queuedSyncLimit) const637 int GenericSyncer::GetQueuedSyncLimit(int *queuedSyncLimit) const
638 {
639     if (queuedSyncLimit == nullptr) {
640         return -E_INVALID_ARGS;
641     }
642     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
643     *queuedSyncLimit = queuedManualSyncLimit_;
644     LOGI("[GenericSyncer] GetQueuedSyncLimit:%d", queuedManualSyncLimit_);
645     return E_OK;
646 }
647 
IsManualSync(int inMode) const648 bool GenericSyncer::IsManualSync(int inMode) const
649 {
650     int mode = SyncOperation::TransferSyncMode(inMode);
651     if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH) || (mode == SyncModeType::PUSH_AND_PULL) ||
652         (mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
653         return true;
654     }
655     return false;
656 }
657 
AddQueuedManualSyncSize(int mode,bool wait)658 int GenericSyncer::AddQueuedManualSyncSize(int mode, bool wait)
659 {
660     if (IsManualSync(mode) && (!wait)) {
661         std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
662         if (!manualSyncEnable_) {
663             LOGI("[GenericSyncer] manualSyncEnable is Disable");
664             return -E_BUSY;
665         }
666         queuedManualSyncSize_++;
667     }
668     return E_OK;
669 }
670 
IsQueuedManualSyncFull(int mode,bool wait) const671 bool GenericSyncer::IsQueuedManualSyncFull(int mode, bool wait) const
672 {
673     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
674     if (IsManualSync(mode) && (!manualSyncEnable_)) {
675         LOGI("[GenericSyncer] manualSyncEnable_:false");
676         return true;
677     }
678     if (IsManualSync(mode) && (!wait)) {
679         if (queuedManualSyncSize_ < queuedManualSyncLimit_) {
680             return false;
681         } else {
682             LOGD("[GenericSyncer] queuedManualSyncSize_:%d < queuedManualSyncLimit_:%d", queuedManualSyncSize_,
683                 queuedManualSyncLimit_);
684             return true;
685         }
686     } else {
687         return false;
688     }
689 }
690 
SubQueuedSyncSize(void)691 void GenericSyncer::SubQueuedSyncSize(void)
692 {
693     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
694     queuedManualSyncSize_--;
695     if (queuedManualSyncSize_ < 0) {
696         LOGE("[GenericSyncer] queuedManualSyncSize_ < 0!");
697         queuedManualSyncSize_ = 0;
698     }
699 }
700 
DisableManualSync(void)701 int GenericSyncer::DisableManualSync(void)
702 {
703     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
704     if (queuedManualSyncSize_ > 0) {
705         LOGD("[GenericSyncer] DisableManualSync fail, queuedManualSyncSize_:%d", queuedManualSyncSize_);
706         return -E_BUSY;
707     }
708     manualSyncEnable_ = false;
709     LOGD("[GenericSyncer] DisableManualSync ok");
710     return E_OK;
711 }
712 
EnableManualSync(void)713 int GenericSyncer::EnableManualSync(void)
714 {
715     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
716     manualSyncEnable_ = true;
717     LOGD("[GenericSyncer] EnableManualSync ok");
718     return E_OK;
719 }
720 
GetLocalIdentity(std::string & outTarget) const721 int GenericSyncer::GetLocalIdentity(std::string &outTarget) const
722 {
723     std::string deviceId;
724     int errCode =  RuntimeContext::GetInstance()->GetLocalIdentity(deviceId);
725     if (errCode != E_OK) {
726         LOGE("[GenericSyncer] GetLocalIdentity fail errCode:%d", errCode);
727         return errCode;
728     }
729     outTarget = DBCommon::TransferHashString(deviceId);
730     return E_OK;
731 }
732 
GetOnlineDevices(std::vector<std::string> & devices) const733 void GenericSyncer::GetOnlineDevices(std::vector<std::string> &devices) const
734 {
735     std::string identifier;
736     {
737         std::lock_guard<std::mutex> lock(syncerLock_);
738         // Get devices from AutoLaunch first.
739         if (syncInterface_ == nullptr) {
740             LOGI("[Syncer] GetOnlineDevices syncInterface_ is nullptr");
741             return;
742         }
743         bool isSyncDualTupleMode = syncInterface_->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
744             false);
745         if (isSyncDualTupleMode) {
746             identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::DUAL_TUPLE_IDENTIFIER_DATA,
747                 "");
748         } else {
749             identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
750         }
751     }
752     RuntimeContext::GetInstance()->GetAutoLaunchSyncDevices(identifier, devices);
753     if (!devices.empty()) {
754         return;
755     }
756     std::lock_guard<std::mutex> lock(syncerLock_);
757     if (closing_) {
758         LOGW("[Syncer] Syncer is closing, return!");
759         return;
760     }
761     if (syncEngine_ != nullptr) {
762         syncEngine_->GetOnlineDevices(devices);
763     }
764 }
765 
SetSyncRetry(bool isRetry)766 int GenericSyncer::SetSyncRetry(bool isRetry)
767 {
768     if (syncEngine_ == nullptr) {
769         return -E_NOT_INIT;
770     }
771     syncEngine_->SetSyncRetry(isRetry);
772     return E_OK;
773 }
774 
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)775 int GenericSyncer::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
776 {
777     std::lock_guard<std::mutex> lock(syncerLock_);
778     if (syncEngine_ == nullptr) {
779         return -E_NOT_INIT;
780     }
781     int errCode = syncEngine_->SetEqualIdentifier(identifier, targets);
782     if (errCode == E_OK) {
783         syncEngine_->SetEqualIdentifierMap(identifier, targets);
784     }
785     return errCode;
786 }
787 
GetSyncDevicesStr(const std::vector<std::string> & devices) const788 std::string GenericSyncer::GetSyncDevicesStr(const std::vector<std::string> &devices) const
789 {
790     std::string syncDevices;
791     for (const auto &dev:devices) {
792         syncDevices += DBCommon::StringMasking(dev);
793         syncDevices += ",";
794     }
795     return syncDevices.substr(0, syncDevices.size() - 1);
796 }
797 
StatusCheck() const798 int GenericSyncer::StatusCheck() const
799 {
800     if (!initialized_) {
801         LOGE("[Syncer] Syncer is not initialized, return!");
802         return -E_BUSY;
803     }
804     if (closing_) {
805         LOGW("[Syncer] Syncer is closing, return!");
806         return -E_BUSY;
807     }
808     return E_OK;
809 }
810 
SyncPreCheck(const SyncParma & param) const811 int GenericSyncer::SyncPreCheck(const SyncParma &param) const
812 {
813     ISyncEngine *engine = nullptr;
814     ISyncInterface *storage = nullptr;
815     int errCode = E_OK;
816     {
817         std::lock_guard<std::mutex> lock(syncerLock_);
818         errCode = StatusCheck();
819         if (errCode != E_OK) {
820             return errCode;
821         }
822         if (!IsValidDevices(param.devices) || !IsValidMode(param.mode)) {
823             return -E_INVALID_ARGS;
824         }
825         if (IsQueuedManualSyncFull(param.mode, param.wait)) {
826             LOGE("[Syncer] -E_BUSY");
827             return -E_BUSY;
828         }
829         storage = syncInterface_;
830         engine = syncEngine_;
831         if (storage == nullptr || engine == nullptr) {
832             return -E_BUSY;
833         }
834         storage->IncRefCount();
835         RefObject::IncObjRef(engine);
836     }
837     errCode = SyncConditionCheck(param, engine, storage);
838     storage->DecRefCount();
839     RefObject::DecObjRef(engine);
840     return errCode;
841 }
842 
InitSyncOperation(SyncOperation * operation,const SyncParma & param)843 void GenericSyncer::InitSyncOperation(SyncOperation *operation, const SyncParma &param)
844 {
845     operation->SetIdentifier(syncInterface_->GetIdentifier());
846     operation->Initialize();
847     operation->OnKill(std::bind(&GenericSyncer::SyncOperationKillCallback, this, operation->GetSyncId()));
848     std::function<void(int)> onFinished = std::bind(&GenericSyncer::OnSyncFinished, this, std::placeholders::_1);
849     operation->SetOnSyncFinished(onFinished);
850     operation->SetOnSyncFinalize(param.onFinalize);
851     if (param.isQuerySync) {
852         operation->SetQuery(param.syncQuery);
853     }
854 }
855 
BuildSyncEngine()856 int GenericSyncer::BuildSyncEngine()
857 {
858     if (syncEngine_ != nullptr) {
859         return E_OK;
860     }
861     syncEngine_ = CreateSyncEngine();
862     if (syncEngine_ == nullptr) {
863         return -E_OUT_OF_MEMORY;
864     }
865     syncEngine_->OnLastRef([this]() {
866         LOGD("[Syncer] SyncEngine finalized");
867         {
868             std::lock_guard<std::mutex> cvLock(engineMutex_);
869             engineFinalize_ = true;
870         }
871         engineFinalizeCv_.notify_all();
872     });
873     return E_OK;
874 }
875 
Dump(int fd)876 void GenericSyncer::Dump(int fd)
877 {
878     if (syncEngine_ == nullptr) {
879         return;
880     }
881     RefObject::IncObjRef(syncEngine_);
882     syncEngine_->Dump(fd);
883     RefObject::DecObjRef(syncEngine_);
884 }
885 
DumpSyncerBasicInfo()886 SyncerBasicInfo GenericSyncer::DumpSyncerBasicInfo()
887 {
888     SyncerBasicInfo baseInfo;
889     if (syncEngine_ == nullptr) {
890         return baseInfo;
891     }
892     RefObject::IncObjRef(syncEngine_);
893     baseInfo.isSyncActive = syncEngine_->IsEngineActive();
894     RefObject::DecObjRef(syncEngine_);
895     return baseInfo;
896 }
897 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)898 int GenericSyncer::RemoteQuery(const std::string &device, const RemoteCondition &condition,
899     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
900 {
901     ISyncEngine *syncEngine = nullptr;
902     {
903         std::lock_guard<std::mutex> lock(syncerLock_);
904         int errCode = StatusCheck();
905         if (errCode != E_OK) {
906             return errCode;
907         }
908         syncEngine = syncEngine_;
909         RefObject::IncObjRef(syncEngine);
910     }
911     if (syncEngine == nullptr) {
912         return -E_NOT_INIT;
913     }
914     int errCode = syncEngine->RemoteQuery(device, condition, timeout, connectionId, result);
915     RefObject::DecObjRef(syncEngine);
916     return errCode;
917 }
918 
InitTimeChangedListener()919 int GenericSyncer::InitTimeChangedListener()
920 {
921     int errCode = E_OK;
922     if (timeChangedListener_ != nullptr) {
923         return errCode;
924     }
925     timeChangedListener_ = RuntimeContext::GetInstance()->RegisterTimeChangedLister(
926         [this](void *changedOffset) {
927             RecordTimeChangeOffset(changedOffset);
928         },
929         [this]() {
930             {
931                 std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
932                 timeChangeListenerFinalize_ = true;
933             }
934             timeChangeCv_.notify_all();
935         }, errCode);
936     if (timeChangedListener_ == nullptr) {
937         LOGE("[GenericSyncer] Init RegisterTimeChangedLister failed");
938         return errCode;
939     }
940     {
941         std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
942         timeChangeListenerFinalize_ = false;
943     }
944     return E_OK;
945 }
946 
ReleaseInnerResource()947 void GenericSyncer::ReleaseInnerResource()
948 {
949     NotificationChain::Listener *timeChangedListener = nullptr;
950     {
951         std::lock_guard<std::mutex> lock(syncerLock_);
952         if (timeChangedListener_ != nullptr) {
953             timeChangedListener = timeChangedListener_;
954             timeChangedListener_ = nullptr;
955         }
956         timeHelper_ = nullptr;
957         metadata_ = nullptr;
958     }
959     if (timeChangedListener != nullptr) {
960         timeChangedListener->Drop(true);
961         RuntimeContext::GetInstance()->StopTimeTickMonitorIfNeed();
962     }
963     std::unique_lock<std::mutex> uniqueLock(timeChangeListenerMutex_);
964     LOGD("[GenericSyncer] Begin wait time change listener finalize");
965     timeChangeCv_.wait(uniqueLock, [this]() {
966         return timeChangeListenerFinalize_;
967     });
968     LOGD("[GenericSyncer] End wait time change listener finalize");
969 }
970 
RecordTimeChangeOffset(void * changedOffset)971 void GenericSyncer::RecordTimeChangeOffset(void *changedOffset)
972 {
973     std::shared_ptr<Metadata> metadata = nullptr;
974     ISyncInterface *storage = nullptr;
975     {
976         std::lock_guard<std::mutex> lock(syncerLock_);
977         if (changedOffset == nullptr || metadata_ == nullptr || syncInterface_ == nullptr) {
978             return;
979         }
980         storage = syncInterface_;
981         metadata = metadata_;
982         storage->IncRefCount();
983     }
984     TimeOffset changedTimeOffset = *(reinterpret_cast<TimeOffset *>(changedOffset)) *
985         static_cast<TimeOffset>(TimeHelper::TO_100_NS);
986     TimeOffset orgOffset = metadata->GetLocalTimeOffset() - changedTimeOffset;
987     TimeOffset currentSysTime = static_cast<TimeOffset>(TimeHelper::GetSysCurrentTime());
988     Timestamp maxItemTime = 0;
989     storage->GetMaxTimestamp(maxItemTime);
990     if ((orgOffset + currentSysTime) > TimeHelper::BUFFER_VALID_TIME) {
991         orgOffset = TimeHelper::BUFFER_VALID_TIME -
992             currentSysTime + static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS);
993     }
994     if ((currentSysTime + orgOffset) <= static_cast<TimeOffset>(maxItemTime)) {
995         orgOffset = static_cast<TimeOffset>(maxItemTime) - currentSysTime +
996             static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS); // 1ms
997     }
998     metadata->SaveLocalTimeOffset(orgOffset);
999     storage->DecRefCount();
1000 }
1001 
CloseInner(bool isClosedOperation)1002 int GenericSyncer::CloseInner(bool isClosedOperation)
1003 {
1004     {
1005         std::lock_guard<std::mutex> lock(syncerLock_);
1006         if (!initialized_) {
1007             LOGW("[Syncer] Syncer[%s] don't need to close, because it has not been init", label_.c_str());
1008             return -E_NOT_INIT;
1009         }
1010         initialized_ = false;
1011         if (closing_) {
1012             LOGE("[Syncer] Syncer is closing, return!");
1013             return -E_BUSY;
1014         }
1015         closing_ = true;
1016     }
1017     ClearSyncOperations(isClosedOperation);
1018     if (syncEngine_ != nullptr) {
1019         syncEngine_->Close();
1020         LOGD("[Syncer] Close SyncEngine!");
1021         std::lock_guard<std::mutex> lock(syncerLock_);
1022         closing_ = false;
1023     }
1024     return E_OK;
1025 }
1026 
GetSyncDataSize(const std::string & device,size_t & size) const1027 int GenericSyncer::GetSyncDataSize(const std::string &device, size_t &size) const
1028 {
1029     uint64_t localWaterMark = 0;
1030     std::shared_ptr<Metadata> metadata = nullptr;
1031     {
1032         std::lock_guard<std::mutex> lock(syncerLock_);
1033         if (metadata_ == nullptr || syncInterface_ == nullptr) {
1034             return -E_INTERNAL_ERROR;
1035         }
1036         if (closing_) {
1037             LOGE("[Syncer] Syncer is closing, return!");
1038             return -E_BUSY;
1039         }
1040         int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->TryHandle();
1041         if (errCode != E_OK) {
1042             LOGE("[Syncer] syncer is restarting, return!");
1043             return errCode;
1044         }
1045         syncInterface_->IncRefCount();
1046         metadata = metadata_;
1047     }
1048     metadata->GetLocalWaterMark(device, localWaterMark);
1049     uint32_t expectedMtuSize = DEFAULT_MTU_SIZE;
1050     DataSizeSpecInfo syncDataSizeInfo = {expectedMtuSize, static_cast<size_t>(MAX_TIMESTAMP)};
1051     std::vector<SendDataItem> outData;
1052     ContinueToken token = nullptr;
1053     int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->GetSyncData(localWaterMark, MAX_TIMESTAMP,
1054         outData, token, syncDataSizeInfo);
1055     if (token != nullptr) {
1056         static_cast<SyncGenericInterface *>(syncInterface_)->ReleaseContinueToken(token);
1057         token = nullptr;
1058     }
1059     if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
1060         LOGE("calculate sync data size failed %d", errCode);
1061         syncInterface_->DecRefCount();
1062         return errCode;
1063     }
1064     uint32_t totalLen = 0;
1065     if (errCode == -E_UNFINISHED) {
1066         totalLen = expectedMtuSize;
1067     } else {
1068         totalLen = GenericSingleVerKvEntry::CalculateLens(outData, SOFTWARE_VERSION_CURRENT);
1069     }
1070     for (auto &entry : outData) {
1071         delete entry;
1072         entry = nullptr;
1073     }
1074     syncInterface_->DecRefCount();
1075     // if larger than 1M, return 1M
1076     size = (totalLen >= expectedMtuSize) ? expectedMtuSize : totalLen;
1077     return E_OK;
1078 }
1079 
IsNeedActive(ISyncInterface * syncInterface)1080 bool GenericSyncer::IsNeedActive(ISyncInterface *syncInterface)
1081 {
1082     bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
1083     if (localOnly) {
1084         LOGD("[Syncer] Local only db, don't need active syncer");
1085         return false;
1086     }
1087     return true;
1088 }
1089 
GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const1090 int GenericSyncer::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const
1091 {
1092     (void)clientId;
1093     (void)hashDevId;
1094     return -E_NOT_SUPPORT;
1095 }
1096 
InitStorageResource(ISyncInterface * syncInterface)1097 int GenericSyncer::InitStorageResource(ISyncInterface *syncInterface)
1098 {
1099     // As metadata_ will be used in EraseDeviceWaterMark, it should not be clear even if engine init failed.
1100     // It will be clear in destructor.
1101     int errCode = InitMetaData(syncInterface);
1102     if (errCode != E_OK) {
1103         return errCode;
1104     }
1105 
1106     // As timeHelper_ will be used in GetTimestamp, it should not be clear even if engine init failed.
1107     // It will be clear in destructor.
1108     errCode = InitTimeHelper(syncInterface);
1109     if (errCode != E_OK) {
1110         return errCode;
1111     }
1112 
1113     if (!IsNeedActive(syncInterface)) {
1114         return -E_NO_NEED_ACTIVE;
1115     }
1116     return errCode;
1117 }
1118 } // namespace DistributedDB
1119