• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "generic_syncer.h"
17 
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "ref_object.h"
22 #include "sqlite_single_ver_natural_store.h"
23 #include "time_sync.h"
24 #include "single_ver_data_sync.h"
25 #ifndef OMIT_MULTI_VER
26 #include "commit_history_sync.h"
27 #include "multi_ver_data_sync.h"
28 #include "value_slice_sync.h"
29 #endif
30 #include "device_manager.h"
31 #include "db_constant.h"
32 #include "ability_sync.h"
33 #include "generic_single_ver_kv_entry.h"
34 #include "single_ver_serialize_manager.h"
35 
36 namespace DistributedDB {
37 namespace {
38     constexpr uint32_t DEFAULT_MTU_SIZE = 1024u * 1024u; // 1M
39 }
40 const int GenericSyncer::MIN_VALID_SYNC_ID = 1;
41 std::mutex GenericSyncer::moduleInitLock_;
42 int GenericSyncer::currentSyncId_ = 0;
43 std::mutex GenericSyncer::syncIdLock_;
GenericSyncer()44 GenericSyncer::GenericSyncer()
45     : syncEngine_(nullptr),
46       syncInterface_(nullptr),
47       timeHelper_(nullptr),
48       metadata_(nullptr),
49       initialized_(false),
50       queuedManualSyncSize_(0),
51       queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
52       manualSyncEnable_(true),
53       closing_(false),
54       engineFinalize_(false),
55       timeChangeListenerFinalize_(true),
56       timeChangedListener_(nullptr)
57 {
58 }
59 
~GenericSyncer()60 GenericSyncer::~GenericSyncer()
61 {
62     LOGD("[GenericSyncer] ~GenericSyncer!");
63     if (syncEngine_ != nullptr) {
64         syncEngine_->OnKill([this]() { this->syncEngine_->Close(); });
65         RefObject::KillAndDecObjRef(syncEngine_);
66         // waiting all thread exist
67         std::unique_lock<std::mutex> cvLock(engineMutex_);
68         bool engineFinalize = engineFinalizeCv_.wait_for(cvLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
69             [this]() { return engineFinalize_; });
70         if (!engineFinalize) {
71             LOGW("syncer finalize before engine finalize!");
72         }
73         syncEngine_ = nullptr;
74     }
75     ReleaseInnerResource();
76     std::lock_guard<std::mutex> lock(syncerLock_);
77     syncInterface_ = nullptr;
78 }
79 
Initialize(ISyncInterface * syncInterface,bool isNeedActive)80 int GenericSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
81 {
82     if (syncInterface == nullptr) {
83         LOGE("[Syncer] Init failed, the syncInterface is null!");
84         return -E_INVALID_ARGS;
85     }
86 
87     {
88         std::lock_guard<std::mutex> lock(syncerLock_);
89         if (initialized_) {
90             return E_OK;
91         }
92         if (closing_) {
93             LOGE("[Syncer] Syncer is closing, return!");
94             return -E_BUSY;
95         }
96         std::vector<uint8_t> label = syncInterface->GetIdentifier();
97         label_ = DBCommon::StringMasking(DBCommon::VectorToHexString(label));
98 
99         int errCode = InitStorageResource(syncInterface);
100         if (errCode != E_OK) {
101             return errCode;
102         }
103         // As timeChangedListener_ will record time change, it should not be clear even if engine init failed.
104         // It will be clear in destructor.
105         int errCodeTimeChangedListener = InitTimeChangedListener();
106         if (errCodeTimeChangedListener != E_OK) {
107             return -E_INTERNAL_ERROR;
108         }
109         errCode = CheckSyncActive(syncInterface, isNeedActive);
110         if (errCode != E_OK) {
111             return errCode;
112         }
113 
114         if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
115             return -E_NOT_INIT;
116         }
117 
118         errCode = SyncModuleInit();
119         if (errCode != E_OK) {
120             LOGE("[Syncer] Sync ModuleInit ERR!");
121             return -E_INTERNAL_ERROR;
122         }
123 
124         errCode = InitSyncEngine(syncInterface);
125         if (errCode != E_OK) {
126             return errCode;
127         }
128         syncEngine_->SetEqualIdentifier();
129         initialized_ = true;
130     }
131 
132     // StartCommunicator may start an auto sync, this function can not in syncerLock_
133     syncEngine_->StartCommunicator();
134     if (RuntimeContext::GetInstance()->CheckDBTimeChange(syncInterface_->GetIdentifier())) {
135         ResetTimeSyncMarkByTimeChange(metadata_, *syncInterface_);
136     }
137     return E_OK;
138 }
139 
Close(bool isClosedOperation)140 int GenericSyncer::Close(bool isClosedOperation)
141 {
142     int errCode = CloseInner(isClosedOperation);
143     if (errCode != -E_BUSY && isClosedOperation) {
144         ReleaseInnerResource();
145     }
146     return errCode;
147 }
148 
Sync(const std::vector<std::string> & devices,int mode,const std::function<void (const std::map<std::string,int> &)> & onComplete,const std::function<void (void)> & onFinalize,bool wait=false)149 int GenericSyncer::Sync(const std::vector<std::string> &devices, int mode,
150     const std::function<void(const std::map<std::string, int> &)> &onComplete,
151     const std::function<void(void)> &onFinalize, bool wait = false)
152 {
153     SyncParam param;
154     param.devices = devices;
155     param.mode = mode;
156     param.onComplete = onComplete;
157     param.onFinalize = onFinalize;
158     param.wait = wait;
159     return Sync(param);
160 }
161 
Sync(const InternalSyncParma & param)162 int GenericSyncer::Sync(const InternalSyncParma &param)
163 {
164     SyncParam syncParam;
165     syncParam.devices = param.devices;
166     syncParam.mode = param.mode;
167     syncParam.isQuerySync = param.isQuerySync;
168     syncParam.syncQuery = param.syncQuery;
169     return Sync(syncParam);
170 }
171 
Sync(const SyncParam & param)172 int GenericSyncer::Sync(const SyncParam &param)
173 {
174     return Sync(param, DBConstant::IGNORE_CONNECTION_ID);
175 }
176 
Sync(const SyncParam & param,uint64_t connectionId)177 int GenericSyncer::Sync(const SyncParam &param, uint64_t connectionId)
178 {
179     int errCode = SyncPreCheck(param);
180     if (errCode != E_OK) {
181         return errCode;
182     }
183     errCode = AddQueuedManualSyncSize(param.mode, param.wait);
184     if (errCode != E_OK) {
185         return errCode;
186     }
187 
188     uint32_t syncId = GenerateSyncId();
189     errCode = PrepareSync(param, syncId, connectionId);
190     if (errCode != E_OK) {
191         LOGE("[Syncer] PrepareSync failed when sync called, err %d", errCode);
192         return errCode;
193     }
194     PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
195     return E_OK;
196 }
197 
CancelSync(uint32_t syncId)198 int GenericSyncer::CancelSync(uint32_t syncId)
199 {
200     LOGI("[Syncer] Start cancelSync %" PRIu32 "", syncId);
201     ISyncEngine *engine = nullptr;
202     {
203         std::lock_guard<std::mutex> autoLock(syncerLock_);
204         engine = syncEngine_;
205         RefObject::IncObjRef(engine);
206     }
207     SyncOperation *operation = nullptr;
208     {
209         std::lock_guard<std::mutex> lock(operationMapLock_);
210         auto iter = syncOperationMap_.find(syncId);
211         if (iter == syncOperationMap_.end()) {
212             LOGE("[Syncer] Non-existent syncId %" PRIu32 "", syncId);
213             RefObject::DecObjRef(engine);
214             return -E_NOT_FOUND;
215         } else if (iter->second->CanCancel() == false) {
216             RefObject::DecObjRef(engine);
217             LOGE("[Syncer] Can't cancel syncId %" PRIu32 " %" PRIu32 "", syncId, iter->second->CanCancel());
218             return -E_NOT_SUPPORT;
219         }
220         operation = iter->second;
221         RefObject::IncObjRef(operation);
222     }
223 
224     const std::vector<std::string> &devices = operation->GetDevices();
225     for (const auto &deviceId : devices) {
226         engine->ClearAllSyncTaskByDevice(deviceId);
227     }
228     LOGI("[Syncer] End cancelSync %" PRIu32 ", devices = %s", syncId, GetSyncDevicesStr(devices).c_str());
229 
230     RefObject::DecObjRef(operation);
231     RefObject::DecObjRef(engine);
232     return E_OK;
233 }
234 
PrepareSync(const SyncParam & param,uint32_t syncId,uint64_t connectionId)235 int GenericSyncer::PrepareSync(const SyncParam &param, uint32_t syncId, uint64_t connectionId)
236 {
237     auto *operation =
238         new (std::nothrow) SyncOperation(syncId, param);
239     if (operation == nullptr) {
240         SubQueuedSyncSize();
241         return -E_OUT_OF_MEMORY;
242     }
243 
244     ISyncEngine *engine = nullptr;
245     {
246         std::lock_guard<std::mutex> autoLock(syncerLock_);
247         PerformanceAnalysis::GetInstance()->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
248         InitSyncOperation(operation, param);
249         LOGI("[Syncer] GenerateSyncId %" PRIu32 ", mode = %d, wait = %d, label = %.3s, devices = %s, isRetry = %d",
250             syncId, param.mode, param.wait, label_.c_str(), GetSyncDevicesStr(param.devices).c_str(), param.isRetry);
251         engine = syncEngine_;
252         RefObject::IncObjRef(engine);
253     }
254     AddSyncOperation(engine, operation);
255     RefObject::DecObjRef(engine);
256     PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
257     if (connectionId != DBConstant::IGNORE_CONNECTION_ID) {
258         std::lock_guard<std::mutex> lockGuard(syncIdLock_);
259         connectionIdMap_[connectionId].push_back(static_cast<int>(syncId));
260         syncIdMap_[static_cast<int>(syncId)] = connectionId;
261     }
262     if (operation->CheckIsAllFinished()) {
263         operation->Finished();
264         RefObject::KillAndDecObjRef(operation);
265     } else {
266         operation->WaitIfNeed();
267         RefObject::DecObjRef(operation);
268     }
269     return E_OK;
270 }
271 
RemoveSyncOperation(int syncId)272 int GenericSyncer::RemoveSyncOperation(int syncId)
273 {
274     SyncOperation *operation = nullptr;
275     std::unique_lock<std::mutex> lock(operationMapLock_);
276     auto iter = syncOperationMap_.find(syncId);
277     if (iter != syncOperationMap_.end()) {
278         LOGD("[Syncer] RemoveSyncOperation id:%d.", syncId);
279         operation = iter->second;
280         syncOperationMap_.erase(syncId);
281         lock.unlock();
282         if ((!operation->IsAutoSync()) && (!operation->IsBlockSync()) && (!operation->IsAutoControlCmd())) {
283             SubQueuedSyncSize();
284         }
285         operation->NotifyIfNeed();
286         RefObject::KillAndDecObjRef(operation);
287         operation = nullptr;
288         std::lock_guard<std::mutex> lockGuard(syncIdLock_);
289         if (syncIdMap_.find(syncId) == syncIdMap_.end()) {
290             return E_OK;
291         }
292         uint64_t connectionId = syncIdMap_[syncId];
293         if (connectionIdMap_.find(connectionId) != connectionIdMap_.end()) {
294             connectionIdMap_[connectionId].remove(syncId);
295         }
296         syncIdMap_.erase(syncId);
297         return E_OK;
298     }
299     LOGE("[Syncer] RemoveSyncOperation, id %d not found", syncId);
300     return -E_INVALID_ARGS;
301 }
302 
StopSync(uint64_t connectionId)303 int GenericSyncer::StopSync(uint64_t connectionId)
304 {
305     std::list<int> syncIdList;
306     {
307         std::lock_guard<std::mutex> lockGuard(syncIdLock_);
308         if (connectionIdMap_.find(connectionId) == connectionIdMap_.end()) {
309             return E_OK;
310         }
311         syncIdList = connectionIdMap_[connectionId];
312         connectionIdMap_.erase(connectionId);
313     }
314     for (auto syncId : syncIdList) {
315         RemoveSyncOperation(syncId);
316         if (syncEngine_ != nullptr) {
317             syncEngine_->AbortMachineIfNeed(syncId);
318         }
319     }
320     if (syncEngine_ != nullptr) {
321         syncEngine_->NotifyConnectionClosed(connectionId);
322     }
323     return E_OK;
324 }
325 
GetTimestamp()326 uint64_t GenericSyncer::GetTimestamp()
327 {
328     std::shared_ptr<TimeHelper> timeHelper = nullptr;
329     ISyncInterface *storage = nullptr;
330     {
331         std::lock_guard<std::mutex> lock(syncerLock_);
332         timeHelper = timeHelper_;
333         if (syncInterface_ != nullptr) {
334             storage = syncInterface_;
335             storage->IncRefCount();
336         }
337     }
338     if (storage == nullptr) {
339         return TimeHelper::GetSysCurrentTime();
340     }
341     if (timeHelper == nullptr) {
342         storage->DecRefCount();
343         return TimeHelper::GetSysCurrentTime();
344     }
345     uint64_t timestamp = timeHelper->GetTime();
346     storage->DecRefCount();
347     return timestamp;
348 }
349 
QueryAutoSync(const InternalSyncParma & param)350 void GenericSyncer::QueryAutoSync(const InternalSyncParma &param)
351 {
352     if (!initialized_) {
353         LOGW("[Syncer] Syncer has not Init");
354         return;
355     }
356     LOGI("[GenericSyncer] trigger query syncmode=%u,dev=%s", param.mode, GetSyncDevicesStr(param.devices).c_str());
357     ISyncInterface *syncInterface = nullptr;
358     ISyncEngine *engine = nullptr;
359     {
360         std::lock_guard<std::mutex> lock(syncerLock_);
361         if (syncInterface_ == nullptr || syncEngine_ == nullptr) {
362             LOGW("[Syncer] Syncer has not Init");
363             return;
364         }
365         syncInterface = syncInterface_;
366         engine = syncEngine_;
367         syncInterface->IncRefCount();
368         RefObject::IncObjRef(engine);
369     }
370     int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, param, engine, syncInterface] {
371         int errCode = Sync(param);
372         if (errCode != E_OK) {
373             LOGE("[GenericSyncer] sync start by QueryAutoSync failed err %d", errCode);
374         }
375         RefObject::DecObjRef(engine);
376         syncInterface->DecRefCount();
377     });
378     if (retCode != E_OK) {
379         LOGE("[GenericSyncer] QueryAutoSync triggler sync retCode:%d", retCode);
380         RefObject::DecObjRef(engine);
381         syncInterface->DecRefCount();
382     }
383 }
384 
AddSyncOperation(ISyncEngine * engine,SyncOperation * operation)385 void GenericSyncer::AddSyncOperation(ISyncEngine *engine, SyncOperation *operation)
386 {
387     if (operation == nullptr) {
388         return;
389     }
390 
391     LOGD("[Syncer] AddSyncOperation, sync id: %" PRIu32 ".", operation->GetSyncId());
392     engine->AddSyncOperation(operation);
393 
394     if (operation->CheckIsAllFinished()) {
395         LOGD("[Syncer] AddSyncOperation, sync id: %" PRIu32 ", but all finished.", operation->GetSyncId());
396         return;
397     }
398 
399     {
400         std::lock_guard<std::mutex> lock(syncerLock_);
401         if (closing_ || !initialized_) {
402             LOGE("[Syncer] Syncer has been closed, return");
403             operation->SetUnfinishedDevStatus(SyncOperation::OP_FAILED);
404             return;
405         }
406     }
407     std::lock_guard<std::mutex> lock(operationMapLock_);
408     syncOperationMap_.insert(std::pair<int, SyncOperation *>(operation->GetSyncId(), operation));
409     // To make sure operation alive before WaitIfNeed out
410     RefObject::IncObjRef(operation);
411 }
412 
SyncOperationKillCallbackInner(int syncId)413 void GenericSyncer::SyncOperationKillCallbackInner(int syncId)
414 {
415     if (syncEngine_ != nullptr) {
416         LOGI("[Syncer] Operation on kill id = %d", syncId);
417         syncEngine_->RemoveSyncOperation(syncId);
418     }
419 }
420 
SyncOperationKillCallback(int syncId)421 void GenericSyncer::SyncOperationKillCallback(int syncId)
422 {
423     SyncOperationKillCallbackInner(syncId);
424 }
425 
InitMetaData(ISyncInterface * syncInterface)426 int GenericSyncer::InitMetaData(ISyncInterface *syncInterface)
427 {
428     if (metadata_ != nullptr) {
429         return E_OK;
430     }
431 
432     metadata_ = std::make_shared<Metadata>();
433     if (metadata_ == nullptr) {
434         LOGE("[Syncer] metadata make shared failed");
435         return -E_OUT_OF_MEMORY;
436     }
437     int errCode = metadata_->Initialize(syncInterface);
438     if (errCode != E_OK) {
439         LOGE("[Syncer] metadata Initializeate failed! err %d.", errCode);
440         metadata_ = nullptr;
441     }
442     syncInterface_ = syncInterface;
443     return errCode;
444 }
445 
InitTimeHelper(ISyncInterface * syncInterface)446 int GenericSyncer::InitTimeHelper(ISyncInterface *syncInterface)
447 {
448     if (timeHelper_ != nullptr) {
449         return E_OK;
450     }
451 
452     timeHelper_ = std::make_shared<TimeHelper>();
453     if (timeHelper_ == nullptr) {
454         return -E_OUT_OF_MEMORY;
455     }
456     int errCode = timeHelper_->Initialize(syncInterface, metadata_);
457     if (errCode != E_OK) {
458         LOGE("[Syncer] TimeHelper init failed! err:%d.", errCode);
459         timeHelper_ = nullptr;
460     }
461     return errCode;
462 }
463 
InitSyncEngine(ISyncInterface * syncInterface)464 int GenericSyncer::InitSyncEngine(ISyncInterface *syncInterface)
465 {
466     if (syncEngine_ != nullptr && syncEngine_->IsEngineActive()) {
467         LOGI("[Syncer] syncEngine is active");
468         return E_OK;
469     }
470     int errCode = BuildSyncEngine();
471     if (errCode != E_OK) {
472         return errCode;
473     }
474     const std::function<void(std::string)> onlineFunc = [this](const std::string &device) {
475         RemoteDataChanged(device);
476     };
477     const std::function<void(std::string)> offlineFunc = [this](const std::string &device) {
478         RemoteDeviceOffline(device);
479     };
480     const std::function<void(const InternalSyncParma &param)> queryAutoSyncFunc =
481         [this](const InternalSyncParma &syncParam) { QueryAutoSync(syncParam); };
482     const ISyncEngine::InitCallbackParam param = { onlineFunc, offlineFunc, queryAutoSyncFunc };
483     errCode = syncEngine_->Initialize(syncInterface, metadata_, param);
484     if (errCode == E_OK) {
485         syncInterface->IncRefCount();
486         label_ = syncEngine_->GetLabel();
487         return E_OK;
488     } else {
489         LOGE("[Syncer] SyncEngine init failed! err:%d.", errCode);
490         RefObject::KillAndDecObjRef(syncEngine_);
491         syncEngine_ = nullptr;
492         return errCode;
493     }
494 }
495 
CheckSyncActive(ISyncInterface * syncInterface,bool isNeedActive)496 int GenericSyncer::CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive)
497 {
498     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE,
499         false);
500     if (!isSyncDualTupleMode || isNeedActive) {
501         return E_OK;
502     }
503     LOGI("[Syncer] syncer no need to active");
504     int errCode = BuildSyncEngine();
505     if (errCode != E_OK) {
506         return errCode;
507     }
508     return -E_NO_NEED_ACTIVE;
509 }
510 
GenerateSyncId()511 uint32_t GenericSyncer::GenerateSyncId()
512 {
513     std::lock_guard<std::mutex> lock(syncIdLock_);
514     currentSyncId_++;
515     // if overflow, reset to 1
516     if (currentSyncId_ <= 0) {
517         currentSyncId_ = MIN_VALID_SYNC_ID;
518     }
519     return currentSyncId_;
520 }
521 
IsValidMode(int mode) const522 bool GenericSyncer::IsValidMode(int mode) const
523 {
524     if ((mode >= SyncModeType::INVALID_MODE) || (mode < SyncModeType::PUSH)) {
525         LOGE("[Syncer] Sync mode is not valid!");
526         return false;
527     }
528     return true;
529 }
530 
SyncConditionCheck(const SyncParam & param,const ISyncEngine * engine,ISyncInterface * storage) const531 int GenericSyncer::SyncConditionCheck(const SyncParam &param, const ISyncEngine *engine, ISyncInterface *storage) const
532 {
533     (void)param;
534     (void)engine;
535     (void)storage;
536     return E_OK;
537 }
538 
IsValidDevices(const std::vector<std::string> & devices) const539 bool GenericSyncer::IsValidDevices(const std::vector<std::string> &devices) const
540 {
541     if (devices.empty()) {
542         LOGE("[Syncer] devices is empty!");
543         return false;
544     }
545     return true;
546 }
547 
ClearSyncOperations(bool isClosedOperation)548 void GenericSyncer::ClearSyncOperations(bool isClosedOperation)
549 {
550     LOGD("[Syncer] begin clear sync operations");
551     std::vector<SyncOperation *> syncOperation;
552     {
553         std::lock_guard<std::mutex> lock(operationMapLock_);
554         for (auto &item : syncOperationMap_) {
555             bool isBlockSync = item.second->IsBlockSync();
556             if (isBlockSync || !isClosedOperation) {
557                 int status = (!isClosedOperation) ? SyncOperation::OP_USER_CHANGED : SyncOperation::OP_FAILED;
558                 item.second->SetUnfinishedDevStatus(status);
559                 RefObject::IncObjRef(item.second);
560                 syncOperation.push_back(item.second);
561             }
562         }
563     }
564 
565     if (!isClosedOperation) { // means user changed
566         syncEngine_->NotifyUserChange();
567     }
568 
569     for (auto &operation : syncOperation) {
570         // block sync operation or userChange will trigger remove sync operation
571         // caller won't blocked for block sync
572         // caller won't blocked for userChange operation no mater it is block or non-block sync
573         TriggerSyncFinished(operation);
574         RefObject::DecObjRef(operation);
575     }
576     ClearInnerResource(isClosedOperation);
577     LOGD("[Syncer] clear sync operations done");
578 }
579 
ClearInnerResource(bool isClosedOperation)580 void GenericSyncer::ClearInnerResource(bool isClosedOperation)
581 {
582     {
583         std::lock_guard<std::mutex> lock(operationMapLock_);
584         for (auto &iter : syncOperationMap_) {
585             if (iter.second->IsBlockSync()) {
586                 iter.second->NotifyIfNeed();
587             }
588             RefObject::KillAndDecObjRef(iter.second);
589             iter.second = nullptr;
590         }
591         syncOperationMap_.clear();
592     }
593     {
594         std::lock_guard<std::mutex> lock(syncIdLock_);
595         if (isClosedOperation) {
596             connectionIdMap_.clear();
597         } else { // only need to clear syncid when user change
598             for (auto &item : connectionIdMap_) {
599                 item.second.clear();
600             }
601         }
602         syncIdMap_.clear();
603     }
604 }
605 
TriggerSyncFinished(SyncOperation * operation)606 void GenericSyncer::TriggerSyncFinished(SyncOperation *operation)
607 {
608     if (operation != nullptr && operation->CheckIsAllFinished()) { // LCOV_EXCL_BR_LINE
609         operation->Finished();
610     }
611 }
612 
OnSyncFinished(int syncId)613 void GenericSyncer::OnSyncFinished(int syncId)
614 {
615     (void)(RemoveSyncOperation(syncId));
616 }
617 
SyncModuleInit()618 int GenericSyncer::SyncModuleInit()
619 {
620     static bool isInit = false;
621     std::lock_guard<std::mutex> lock(moduleInitLock_);
622     if (!isInit) {
623         int errCode = SyncResourceInit();
624         if (errCode != E_OK) {
625             return errCode;
626         }
627         isInit = true;
628         return E_OK;
629     }
630     return E_OK;
631 }
632 
SyncResourceInit()633 int GenericSyncer::SyncResourceInit()
634 {
635     int errCode = TimeSync::RegisterTransformFunc();
636     if (errCode != E_OK) {
637         LOGE("Register timesync message transform func ERR!");
638         return errCode;
639     }
640     errCode = SingleVerSerializeManager::RegisterTransformFunc();
641     if (errCode != E_OK) {
642         LOGE("Register SingleVerDataSync message transform func ERR!");
643         return errCode;
644     }
645 #ifndef OMIT_MULTI_VER
646     errCode = CommitHistorySync::RegisterTransformFunc();
647     if (errCode != E_OK) {
648         LOGE("Register CommitHistorySync message transform func ERR!");
649         return errCode;
650     }
651     errCode = MultiVerDataSync::RegisterTransformFunc();
652     if (errCode != E_OK) {
653         LOGE("Register MultiVerDataSync message transform func ERR!");
654         return errCode;
655     }
656     errCode = ValueSliceSync::RegisterTransformFunc();
657     if (errCode != E_OK) {
658         LOGE("Register ValueSliceSync message transform func ERR!");
659         return errCode;
660     }
661 #endif
662     errCode = DeviceManager::RegisterTransformFunc();
663     if (errCode != E_OK) {
664         LOGE("Register DeviceManager message transform func ERR!");
665         return errCode;
666     }
667     errCode = AbilitySync::RegisterTransformFunc();
668     if (errCode != E_OK) {
669         LOGE("Register AbilitySync message transform func ERR!");
670         return errCode;
671     }
672     return E_OK;
673 }
674 
GetQueuedSyncSize(int * queuedSyncSize) const675 int GenericSyncer::GetQueuedSyncSize(int *queuedSyncSize) const
676 {
677     if (queuedSyncSize == nullptr) {
678         return -E_INVALID_ARGS;
679     }
680     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
681     *queuedSyncSize = queuedManualSyncSize_;
682     LOGI("[GenericSyncer] GetQueuedSyncSize:%d", queuedManualSyncSize_);
683     return E_OK;
684 }
685 
SetQueuedSyncLimit(const int * queuedSyncLimit)686 int GenericSyncer::SetQueuedSyncLimit(const int *queuedSyncLimit)
687 {
688     if (queuedSyncLimit == nullptr) {
689         return -E_INVALID_ARGS;
690     }
691     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
692     queuedManualSyncLimit_ = *queuedSyncLimit;
693     LOGI("[GenericSyncer] SetQueuedSyncLimit:%d", queuedManualSyncLimit_);
694     return E_OK;
695 }
696 
GetQueuedSyncLimit(int * queuedSyncLimit) const697 int GenericSyncer::GetQueuedSyncLimit(int *queuedSyncLimit) const
698 {
699     if (queuedSyncLimit == nullptr) {
700         return -E_INVALID_ARGS;
701     }
702     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
703     *queuedSyncLimit = queuedManualSyncLimit_;
704     LOGI("[GenericSyncer] GetQueuedSyncLimit:%d", queuedManualSyncLimit_);
705     return E_OK;
706 }
707 
IsManualSync(int inMode) const708 bool GenericSyncer::IsManualSync(int inMode) const
709 {
710     int mode = SyncOperation::TransferSyncMode(inMode);
711     if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH) || (mode == SyncModeType::PUSH_AND_PULL) ||
712         (mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
713         return true;
714     }
715     return false;
716 }
717 
AddQueuedManualSyncSize(int mode,bool wait)718 int GenericSyncer::AddQueuedManualSyncSize(int mode, bool wait)
719 {
720     if (IsManualSync(mode) && (!wait)) {
721         std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
722         if (!manualSyncEnable_) {
723             LOGI("[GenericSyncer] manualSyncEnable is Disable");
724             return -E_BUSY;
725         }
726         queuedManualSyncSize_++;
727     }
728     return E_OK;
729 }
730 
IsQueuedManualSyncFull(int mode,bool wait) const731 bool GenericSyncer::IsQueuedManualSyncFull(int mode, bool wait) const
732 {
733     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
734     if (IsManualSync(mode) && (!manualSyncEnable_)) { // LCOV_EXCL_BR_LINE
735         LOGI("[GenericSyncer] manualSyncEnable_:false");
736         return true;
737     }
738     if (IsManualSync(mode) && (!wait)) { // LCOV_EXCL_BR_LINE
739         if (queuedManualSyncSize_ < queuedManualSyncLimit_) {
740             return false;
741         } else {
742             LOGD("[GenericSyncer] queuedManualSyncSize_:%d < queuedManualSyncLimit_:%d", queuedManualSyncSize_,
743                 queuedManualSyncLimit_);
744             return true;
745         }
746     } else {
747         return false;
748     }
749 }
750 
SubQueuedSyncSize(void)751 void GenericSyncer::SubQueuedSyncSize(void)
752 {
753     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
754     queuedManualSyncSize_--;
755     if (queuedManualSyncSize_ < 0) {
756         LOGE("[GenericSyncer] queuedManualSyncSize_ < 0!");
757         queuedManualSyncSize_ = 0;
758     }
759 }
760 
DisableManualSync(void)761 int GenericSyncer::DisableManualSync(void)
762 {
763     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
764     if (queuedManualSyncSize_ > 0) {
765         LOGD("[GenericSyncer] DisableManualSync fail, queuedManualSyncSize_:%d", queuedManualSyncSize_);
766         return -E_BUSY;
767     }
768     manualSyncEnable_ = false;
769     LOGD("[GenericSyncer] DisableManualSync ok");
770     return E_OK;
771 }
772 
EnableManualSync(void)773 int GenericSyncer::EnableManualSync(void)
774 {
775     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
776     manualSyncEnable_ = true;
777     LOGD("[GenericSyncer] EnableManualSync ok");
778     return E_OK;
779 }
780 
GetLocalIdentity(std::string & outTarget) const781 int GenericSyncer::GetLocalIdentity(std::string &outTarget) const
782 {
783     std::string deviceId;
784     int errCode =  RuntimeContext::GetInstance()->GetLocalIdentity(deviceId);
785     if (errCode != E_OK) {
786         LOGE("[GenericSyncer] GetLocalIdentity fail errCode:%d", errCode);
787         return errCode;
788     }
789     outTarget = DBCommon::TransferHashString(deviceId);
790     return E_OK;
791 }
792 
GetOnlineDevices(std::vector<std::string> & devices) const793 void GenericSyncer::GetOnlineDevices(std::vector<std::string> &devices) const
794 {
795     std::string identifier;
796     {
797         std::lock_guard<std::mutex> lock(syncerLock_);
798         // Get devices from AutoLaunch first.
799         if (syncInterface_ == nullptr) {
800             LOGI("[Syncer] GetOnlineDevices syncInterface_ is nullptr");
801             return;
802         }
803         bool isSyncDualTupleMode = syncInterface_->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
804             false);
805         if (isSyncDualTupleMode) {
806             identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::DUAL_TUPLE_IDENTIFIER_DATA,
807                 "");
808         } else {
809             identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
810         }
811     }
812     RuntimeContext::GetInstance()->GetAutoLaunchSyncDevices(identifier, devices);
813     if (!devices.empty()) {
814         return;
815     }
816     std::lock_guard<std::mutex> lock(syncerLock_);
817     if (closing_) {
818         LOGW("[Syncer] Syncer is closing, return!");
819         return;
820     }
821     if (syncEngine_ != nullptr) {
822         syncEngine_->GetOnlineDevices(devices);
823     }
824 }
825 
SetSyncRetry(bool isRetry)826 int GenericSyncer::SetSyncRetry(bool isRetry)
827 {
828     if (syncEngine_ == nullptr) {
829         return -E_NOT_INIT;
830     }
831     syncEngine_->SetSyncRetry(isRetry);
832     return E_OK;
833 }
834 
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)835 int GenericSyncer::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
836 {
837     std::lock_guard<std::mutex> lock(syncerLock_);
838     if (syncEngine_ == nullptr) {
839         return -E_NOT_INIT;
840     }
841     int errCode = syncEngine_->SetEqualIdentifier(identifier, targets);
842     if (errCode == E_OK) {
843         syncEngine_->SetEqualIdentifierMap(identifier, targets);
844     }
845     return errCode;
846 }
847 
GetSyncDevicesStr(const std::vector<std::string> & devices) const848 std::string GenericSyncer::GetSyncDevicesStr(const std::vector<std::string> &devices) const
849 {
850     std::string syncDevices;
851     for (const auto &dev:devices) {
852         syncDevices += DBCommon::StringMasking(dev);
853         syncDevices += ",";
854     }
855     if (syncDevices.empty()) {
856         return "";
857     }
858     return syncDevices.substr(0, syncDevices.size() - 1);
859 }
860 
StatusCheck() const861 int GenericSyncer::StatusCheck() const
862 {
863     if (!initialized_) {
864         LOGE("[Syncer] Syncer is not initialized, return!");
865         return -E_BUSY;
866     }
867     if (closing_) {
868         LOGW("[Syncer] Syncer is closing, return!");
869         return -E_BUSY;
870     }
871     return E_OK;
872 }
873 
SyncPreCheck(const SyncParam & param) const874 int GenericSyncer::SyncPreCheck(const SyncParam &param) const
875 {
876     ISyncEngine *engine = nullptr;
877     ISyncInterface *storage = nullptr;
878     int errCode = E_OK;
879     {
880         std::lock_guard<std::mutex> lock(syncerLock_);
881         errCode = StatusCheck();
882         if (errCode != E_OK) {
883             return errCode;
884         }
885         if (!IsValidDevices(param.devices) || !IsValidMode(param.mode)) { // LCOV_EXCL_BR_LINE
886             return -E_INVALID_ARGS;
887         }
888         if (IsQueuedManualSyncFull(param.mode, param.wait)) { // LCOV_EXCL_BR_LINE
889             LOGE("[Syncer] -E_BUSY");
890             return -E_BUSY;
891         }
892         storage = syncInterface_;
893         engine = syncEngine_;
894         if (storage == nullptr || engine == nullptr) {
895             return -E_BUSY;
896         }
897         storage->IncRefCount();
898         RefObject::IncObjRef(engine);
899     }
900     errCode = SyncConditionCheck(param, engine, storage);
901     storage->DecRefCount();
902     RefObject::DecObjRef(engine);
903     return errCode;
904 }
905 
InitSyncOperation(SyncOperation * operation,const SyncParam & param)906 void GenericSyncer::InitSyncOperation(SyncOperation *operation, const SyncParam &param)
907 {
908     if (syncInterface_ == nullptr) {
909         LOGE("[GenericSyncer] [InitSyncOperation] syncInterface_ is nullptr.");
910         return;
911     }
912     operation->SetIdentifier(syncInterface_->GetIdentifier());
913     operation->SetSyncProcessCallFun(param.onSyncProcess);
914     operation->Initialize();
915     operation->OnKill([this, id = operation->GetSyncId()] { SyncOperationKillCallback(id); });
916     std::function<void(int)> onFinished = [this](int syncId) { OnSyncFinished(syncId); };
917     operation->SetOnSyncFinished(onFinished);
918     operation->SetOnSyncFinalize(param.onFinalize);
919     if (param.isQuerySync) {
920         operation->SetQuery(param.syncQuery);
921     }
922 }
923 
BuildSyncEngine()924 int GenericSyncer::BuildSyncEngine()
925 {
926     if (syncEngine_ != nullptr) {
927         return E_OK;
928     }
929     syncEngine_ = CreateSyncEngine();
930     if (syncEngine_ == nullptr) {
931         return -E_OUT_OF_MEMORY;
932     }
933     syncEngine_->OnLastRef([this]() {
934         LOGD("[Syncer] SyncEngine finalized");
935         {
936             std::lock_guard<std::mutex> cvLock(engineMutex_);
937             engineFinalize_ = true;
938         }
939         engineFinalizeCv_.notify_all();
940     });
941     return E_OK;
942 }
943 
Dump(int fd)944 void GenericSyncer::Dump(int fd)
945 {
946     if (syncEngine_ == nullptr) {
947         return;
948     }
949     RefObject::IncObjRef(syncEngine_);
950     syncEngine_->Dump(fd);
951     RefObject::DecObjRef(syncEngine_);
952 }
953 
DumpSyncerBasicInfo()954 SyncerBasicInfo GenericSyncer::DumpSyncerBasicInfo()
955 {
956     SyncerBasicInfo baseInfo;
957     if (syncEngine_ == nullptr) {
958         return baseInfo;
959     }
960     RefObject::IncObjRef(syncEngine_);
961     baseInfo.isSyncActive = syncEngine_->IsEngineActive();
962     RefObject::DecObjRef(syncEngine_);
963     return baseInfo;
964 }
965 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)966 int GenericSyncer::RemoteQuery(const std::string &device, const RemoteCondition &condition,
967     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
968 {
969     ISyncEngine *syncEngine = nullptr;
970     {
971         std::lock_guard<std::mutex> lock(syncerLock_);
972         int errCode = StatusCheck();
973         if (errCode != E_OK) {
974             return errCode;
975         }
976         syncEngine = syncEngine_;
977         RefObject::IncObjRef(syncEngine);
978     }
979     if (syncEngine == nullptr) {
980         return -E_NOT_INIT;
981     }
982     int errCode = syncEngine->RemoteQuery(device, condition, timeout, connectionId, result);
983     RefObject::DecObjRef(syncEngine);
984     return errCode;
985 }
986 
InitTimeChangedListener()987 int GenericSyncer::InitTimeChangedListener()
988 {
989     int errCode = E_OK;
990     if (timeChangedListener_ != nullptr) {
991         return errCode;
992     }
993     timeChangedListener_ = RuntimeContext::GetInstance()->RegisterTimeChangedLister(
994         [this](void *changedOffset) {
995             RecordTimeChangeOffset(changedOffset);
996         },
997         [this]() {
998             {
999                 std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
1000                 timeChangeListenerFinalize_ = true;
1001             }
1002             timeChangeCv_.notify_all();
1003         }, errCode);
1004     if (timeChangedListener_ == nullptr) {
1005         LOGE("[GenericSyncer] Init RegisterTimeChangedLister failed");
1006         return errCode;
1007     }
1008     {
1009         std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
1010         timeChangeListenerFinalize_ = false;
1011     }
1012     return E_OK;
1013 }
1014 
ReleaseInnerResource()1015 void GenericSyncer::ReleaseInnerResource()
1016 {
1017     NotificationChain::Listener *timeChangedListener = nullptr;
1018     {
1019         std::lock_guard<std::mutex> lock(syncerLock_);
1020         if (timeChangedListener_ != nullptr) {
1021             timeChangedListener = timeChangedListener_;
1022             timeChangedListener_ = nullptr;
1023         }
1024         timeHelper_ = nullptr;
1025         metadata_ = nullptr;
1026     }
1027     if (timeChangedListener != nullptr) {
1028         timeChangedListener->Drop(true);
1029         RuntimeContext::GetInstance()->StopTimeTickMonitorIfNeed();
1030     }
1031     std::unique_lock<std::mutex> uniqueLock(timeChangeListenerMutex_);
1032     LOGD("[GenericSyncer] Begin wait time change listener finalize");
1033     timeChangeCv_.wait(uniqueLock, [this]() {
1034         return timeChangeListenerFinalize_;
1035     });
1036     LOGD("[GenericSyncer] End wait time change listener finalize");
1037 }
1038 
RecordTimeChangeOffset(void * changedOffset)1039 void GenericSyncer::RecordTimeChangeOffset(void *changedOffset)
1040 {
1041     std::shared_ptr<Metadata> metadata = nullptr;
1042     ISyncInterface *storage = nullptr;
1043     {
1044         std::lock_guard<std::mutex> lock(syncerLock_);
1045         if (changedOffset == nullptr || metadata_ == nullptr || syncInterface_ == nullptr) {
1046             return;
1047         }
1048         storage = syncInterface_;
1049         metadata = metadata_;
1050         storage->IncRefCount();
1051     }
1052     TimeOffset changedTimeOffset = *(reinterpret_cast<TimeOffset *>(changedOffset)) *
1053         static_cast<TimeOffset>(TimeHelper::TO_100_NS);
1054     TimeOffset orgOffset = metadata->GetLocalTimeOffset() - changedTimeOffset;
1055     TimeOffset currentSysTime = static_cast<TimeOffset>(TimeHelper::GetSysCurrentTime());
1056     Timestamp maxItemTime = 0;
1057     storage->GetMaxTimestamp(maxItemTime);
1058     if ((orgOffset + currentSysTime) > TimeHelper::BUFFER_VALID_TIME) {
1059         orgOffset = TimeHelper::BUFFER_VALID_TIME -
1060             currentSysTime + static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS);
1061     }
1062     if ((currentSysTime + orgOffset) <= static_cast<TimeOffset>(maxItemTime)) {
1063         orgOffset = static_cast<TimeOffset>(maxItemTime) - currentSysTime +
1064             static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS); // 1ms
1065     }
1066     metadata->SaveLocalTimeOffset(orgOffset);
1067     ResetTimeSyncMarkByTimeChange(metadata, *storage);
1068     storage->DecRefCount();
1069 }
1070 
CloseInner(bool isClosedOperation)1071 int GenericSyncer::CloseInner(bool isClosedOperation)
1072 {
1073     {
1074         std::lock_guard<std::mutex> lock(syncerLock_);
1075         if (!initialized_) {
1076             LOGW("[Syncer] CloseInner[%.3s] don't close", label_.c_str());
1077             return -E_NOT_INIT;
1078         }
1079         initialized_ = false;
1080         if (closing_) {
1081             LOGE("[Syncer] Syncer is closing, return!");
1082             return -E_BUSY;
1083         }
1084         closing_ = true;
1085     }
1086     ClearSyncOperations(isClosedOperation);
1087     if (syncEngine_ != nullptr) {
1088         syncEngine_->Close();
1089         LOGD("[Syncer] Close SyncEngine!");
1090         std::lock_guard<std::mutex> lock(syncerLock_);
1091         closing_ = false;
1092     }
1093     return E_OK;
1094 }
1095 
GetSyncDataSize(const std::string & device,size_t & size) const1096 int GenericSyncer::GetSyncDataSize(const std::string &device, size_t &size) const
1097 {
1098     uint64_t localWaterMark = 0;
1099     std::shared_ptr<Metadata> metadata = nullptr;
1100     {
1101         std::lock_guard<std::mutex> lock(syncerLock_);
1102         if (metadata_ == nullptr || syncInterface_ == nullptr) {
1103             return -E_INTERNAL_ERROR;
1104         }
1105         if (closing_) {
1106             LOGE("[Syncer] Syncer is closing, return!");
1107             return -E_BUSY;
1108         }
1109         int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->TryHandle();
1110         if (errCode != E_OK) {
1111             LOGE("[Syncer] syncer is restarting, return!");
1112             return errCode;
1113         }
1114         syncInterface_->IncRefCount();
1115         metadata = metadata_;
1116     }
1117     metadata->GetLocalWaterMark(device, "", localWaterMark);
1118     uint32_t expectedMtuSize = DEFAULT_MTU_SIZE;
1119     DataSizeSpecInfo syncDataSizeInfo = {expectedMtuSize, static_cast<size_t>(MAX_TIMESTAMP)};
1120     std::vector<SendDataItem> outData;
1121     ContinueToken token = nullptr;
1122     int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->GetSyncData(localWaterMark, MAX_TIMESTAMP,
1123         outData, token, syncDataSizeInfo);
1124     if (token != nullptr) {
1125         static_cast<SyncGenericInterface *>(syncInterface_)->ReleaseContinueToken(token);
1126         token = nullptr;
1127     }
1128     if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
1129         LOGE("calculate sync data size failed %d", errCode);
1130         syncInterface_->DecRefCount();
1131         return errCode;
1132     }
1133     uint32_t totalLen = 0;
1134     if (errCode == -E_UNFINISHED) {
1135         totalLen = expectedMtuSize;
1136     } else {
1137         totalLen = GenericSingleVerKvEntry::CalculateLens(outData, SOFTWARE_VERSION_CURRENT);
1138     }
1139     for (auto &entry : outData) {
1140         delete entry;
1141         entry = nullptr;
1142     }
1143     syncInterface_->DecRefCount();
1144     // if larger than 1M, return 1M
1145     size = (totalLen >= expectedMtuSize) ? expectedMtuSize : totalLen;
1146     return E_OK;
1147 }
1148 
IsNeedActive(ISyncInterface * syncInterface)1149 bool GenericSyncer::IsNeedActive(ISyncInterface *syncInterface)
1150 {
1151     bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
1152     if (localOnly) {
1153         LOGD("[Syncer] Local only db, don't need active syncer");
1154         return false;
1155     }
1156     return true;
1157 }
1158 
GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const1159 int GenericSyncer::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const
1160 {
1161     (void)clientId;
1162     (void)hashDevId;
1163     return -E_NOT_SUPPORT;
1164 }
1165 
InitStorageResource(ISyncInterface * syncInterface)1166 int GenericSyncer::InitStorageResource(ISyncInterface *syncInterface)
1167 {
1168     // As metadata_ will be used in EraseDeviceWaterMark, it should not be clear even if engine init failed.
1169     // It will be clear in destructor.
1170     int errCode = InitMetaData(syncInterface);
1171     if (errCode != E_OK) {
1172         return errCode;
1173     }
1174 
1175     // As timeHelper_ will be used in GetTimestamp, it should not be clear even if engine init failed.
1176     // It will be clear in destructor.
1177     errCode = InitTimeHelper(syncInterface);
1178     if (errCode != E_OK) {
1179         return errCode;
1180     }
1181 
1182     if (!IsNeedActive(syncInterface)) {
1183         return -E_NO_NEED_ACTIVE;
1184     }
1185     return errCode;
1186 }
1187 
GetWatermarkInfo(const std::string & device,WatermarkInfo & info)1188 int GenericSyncer::GetWatermarkInfo(const std::string &device, WatermarkInfo &info)
1189 {
1190     std::shared_ptr<Metadata> metadata = nullptr;
1191     {
1192         std::lock_guard<std::mutex> autoLock(syncerLock_);
1193         metadata = metadata_;
1194     }
1195     if (metadata == nullptr) {
1196         LOGE("[Syncer] Metadata is not init");
1197         return -E_NOT_INIT;
1198     }
1199     std::string dev;
1200     bool devNeedHash = true;
1201     int errCode = metadata->GetHashDeviceId(device, dev);
1202     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
1203         LOGE("[Syncer] Get watermark info failed %d", errCode);
1204         return errCode;
1205     } else if (errCode == E_OK) {
1206         devNeedHash = false;
1207     } else {
1208         dev = device;
1209     }
1210     return metadata->GetWaterMarkInfoFromDB(dev, DBConstant::DEFAULT_USER, devNeedHash, info);
1211 }
1212 
UpgradeSchemaVerInMeta()1213 int GenericSyncer::UpgradeSchemaVerInMeta()
1214 {
1215     std::shared_ptr<Metadata> metadata = nullptr;
1216     {
1217         std::lock_guard<std::mutex> autoLock(syncerLock_);
1218         metadata = metadata_;
1219     }
1220     if (metadata == nullptr) {
1221         LOGE("[Syncer] metadata is not init");
1222         return -E_NOT_INIT;
1223     }
1224     int errCode = metadata->ClearAllAbilitySyncFinishMark();
1225     if (errCode != E_OK) {
1226         LOGE("[Syncer] clear ability mark failed:%d", errCode);
1227         return errCode;
1228     }
1229     auto [err, localSchemaVer] = metadata->GetLocalSchemaVersion();
1230     if (err != E_OK) {
1231         LOGE("[Syncer] get local schema version failed:%d", err);
1232         return err;
1233     }
1234     errCode = metadata->SetLocalSchemaVersion(localSchemaVer + 1);
1235     if (errCode != E_OK) {
1236         LOGE("[Syncer] increase local schema version failed:%d", errCode);
1237     }
1238     return errCode;
1239 }
1240 
ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> & metadata,ISyncInterface & storage)1241 void GenericSyncer::ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> &metadata, ISyncInterface &storage)
1242 {
1243     if (syncEngine_ != nullptr) {
1244         syncEngine_->TimeChange();
1245     }
1246     int errCode = metadata->ClearAllTimeSyncFinishMark();
1247     if (errCode != E_OK) {
1248         LOGW("[GenericSyncer] %.3s clear time sync finish mark failed %d", label_.c_str(), errCode);
1249     } else {
1250         LOGD("[GenericSyncer] ClearAllTimeSyncFinishMark finish");
1251         RuntimeContext::GetInstance()->ResetDBTimeChangeStatus(storage.GetIdentifier());
1252     }
1253 }
1254 
ResetSyncStatus()1255 void GenericSyncer::ResetSyncStatus()
1256 {
1257     std::shared_ptr<Metadata> metadata = nullptr;
1258     {
1259         std::lock_guard<std::mutex> lock(syncerLock_);
1260         if (metadata_ == nullptr) {
1261             return;
1262         }
1263         metadata = metadata_;
1264     }
1265     metadata->ClearAllAbilitySyncFinishMark();
1266 }
1267 
GetLocalTimeOffset()1268 int64_t GenericSyncer::GetLocalTimeOffset()
1269 {
1270     std::shared_ptr<Metadata> metadata = nullptr;
1271     {
1272         std::lock_guard<std::mutex> lock(syncerLock_);
1273         if (metadata_ == nullptr) {
1274             return 0;
1275         }
1276         metadata = metadata_;
1277     }
1278     return metadata->GetLocalTimeOffset();
1279 }
1280 
GetTaskCount()1281 int32_t GenericSyncer::GetTaskCount()
1282 {
1283     int32_t count = 0;
1284     {
1285         std::lock_guard<std::mutex> autoLock(operationMapLock_);
1286         count += static_cast<int32_t>(syncOperationMap_.size());
1287     }
1288     ISyncEngine *syncEngine = nullptr;
1289     {
1290         std::lock_guard<std::mutex> lock(syncerLock_);
1291         if (syncEngine_ == nullptr) {
1292             return count;
1293         }
1294         syncEngine = syncEngine_;
1295         RefObject::IncObjRef(syncEngine);
1296     }
1297     count += syncEngine->GetResponseTaskCount();
1298     RefObject::DecObjRef(syncEngine);
1299     return count;
1300 }
1301 
ExchangeClosePending(bool expected)1302 bool GenericSyncer::ExchangeClosePending(bool expected)
1303 {
1304     ISyncEngine *syncEngine = nullptr;
1305     {
1306         std::lock_guard<std::mutex> lock(syncerLock_);
1307         if (syncEngine_ == nullptr) {
1308             return false;
1309         }
1310         syncEngine = syncEngine_;
1311         RefObject::IncObjRef(syncEngine);
1312     }
1313     bool res = syncEngine->ExchangeClosePending(expected);
1314     RefObject::DecObjRef(syncEngine);
1315     return res;
1316 }
1317 } // namespace DistributedDB
1318