• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "generic_syncer.h"
17 
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "ref_object.h"
22 #include "sqlite_single_ver_natural_store.h"
23 #include "time_sync.h"
24 #include "single_ver_data_sync.h"
25 #ifndef OMIT_MULTI_VER
26 #include "commit_history_sync.h"
27 #include "multi_ver_data_sync.h"
28 #include "value_slice_sync.h"
29 #endif
30 #include "device_manager.h"
31 #include "db_constant.h"
32 #include "ability_sync.h"
33 #include "generic_single_ver_kv_entry.h"
34 #include "single_ver_serialize_manager.h"
35 
36 namespace DistributedDB {
37 namespace {
38     constexpr uint32_t DEFAULT_MTU_SIZE = 1024u * 1024u; // 1M
39 }
40 const int GenericSyncer::MIN_VALID_SYNC_ID = 1;
41 std::mutex GenericSyncer::moduleInitLock_;
42 int GenericSyncer::currentSyncId_ = 0;
43 std::mutex GenericSyncer::syncIdLock_;
GenericSyncer()44 GenericSyncer::GenericSyncer()
45     : syncEngine_(nullptr),
46       syncInterface_(nullptr),
47       timeHelper_(nullptr),
48       metadata_(nullptr),
49       initialized_(false),
50       queuedManualSyncSize_(0),
51       queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
52       manualSyncEnable_(true),
53       closing_(false),
54       engineFinalize_(false),
55       timeChangeListenerFinalize_(true),
56       timeChangedListener_(nullptr)
57 {
58 }
59 
~GenericSyncer()60 GenericSyncer::~GenericSyncer()
61 {
62     LOGD("[GenericSyncer] ~GenericSyncer!");
63     if (syncEngine_ != nullptr) {
64         syncEngine_->OnKill([this]() { this->syncEngine_->Close(); });
65         RefObject::KillAndDecObjRef(syncEngine_);
66         // waiting all thread exist
67         std::unique_lock<std::mutex> cvLock(engineMutex_);
68         bool engineFinalize = engineFinalizeCv_.wait_for(cvLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
69             [this]() { return engineFinalize_; });
70         if (!engineFinalize) {
71             LOGW("syncer finalize before engine finalize!");
72         }
73         syncEngine_ = nullptr;
74     }
75     ReleaseInnerResource();
76     std::lock_guard<std::mutex> lock(syncerLock_);
77     syncInterface_ = nullptr;
78 }
79 
Initialize(ISyncInterface * syncInterface,bool isNeedActive)80 int GenericSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
81 {
82     if (syncInterface == nullptr) {
83         LOGE("[Syncer] Init failed, the syncInterface is null!");
84         return -E_INVALID_ARGS;
85     }
86 
87     {
88         std::lock_guard<std::mutex> lock(syncerLock_);
89         if (initialized_) {
90             return E_OK;
91         }
92         if (closing_) {
93             LOGE("[Syncer] Syncer is closing, return!");
94             return -E_BUSY;
95         }
96         std::vector<uint8_t> label = syncInterface->GetIdentifier();
97         label_ = DBCommon::StringMasking(DBCommon::VectorToHexString(label));
98 
99         int errCode = InitStorageResource(syncInterface);
100         if (errCode != E_OK) {
101             return errCode;
102         }
103         // As timeChangedListener_ will record time change, it should not be clear even if engine init failed.
104         // It will be clear in destructor.
105         int errCodeTimeChangedListener = InitTimeChangedListener();
106         if (errCodeTimeChangedListener != E_OK) {
107             return -E_INTERNAL_ERROR;
108         }
109         errCode = CheckSyncActive(syncInterface, isNeedActive);
110         if (errCode != E_OK) {
111             return errCode;
112         }
113 
114         if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
115             return -E_NOT_INIT;
116         }
117 
118         errCode = SyncModuleInit();
119         if (errCode != E_OK) {
120             LOGE("[Syncer] Sync ModuleInit ERR!");
121             return -E_INTERNAL_ERROR;
122         }
123 
124         errCode = InitSyncEngine(syncInterface);
125         if (errCode != E_OK) {
126             return errCode;
127         }
128         syncEngine_->SetEqualIdentifier();
129         initialized_ = true;
130     }
131 
132     // StartCommunicator may start an auto sync, this function can not in syncerLock_
133     syncEngine_->StartCommunicator();
134     if (RuntimeContext::GetInstance()->CheckDBTimeChange(syncInterface_->GetIdentifier())) {
135         ResetTimeSyncMarkByTimeChange(metadata_, *syncInterface_);
136     }
137     return E_OK;
138 }
139 
Close(bool isClosedOperation)140 int GenericSyncer::Close(bool isClosedOperation)
141 {
142     int errCode = CloseInner(isClosedOperation);
143     if (errCode != -E_BUSY && isClosedOperation) {
144         ReleaseInnerResource();
145     }
146     return errCode;
147 }
148 
Sync(const std::vector<std::string> & devices,int mode,const std::function<void (const std::map<std::string,int> &)> & onComplete,const std::function<void (void)> & onFinalize,bool wait=false)149 int GenericSyncer::Sync(const std::vector<std::string> &devices, int mode,
150     const std::function<void(const std::map<std::string, int> &)> &onComplete,
151     const std::function<void(void)> &onFinalize, bool wait = false)
152 {
153     SyncParma param;
154     param.devices = devices;
155     param.mode = mode;
156     param.onComplete = onComplete;
157     param.onFinalize = onFinalize;
158     param.wait = wait;
159     return Sync(param);
160 }
161 
Sync(const InternalSyncParma & param)162 int GenericSyncer::Sync(const InternalSyncParma &param)
163 {
164     SyncParma syncParam;
165     syncParam.devices = param.devices;
166     syncParam.mode = param.mode;
167     syncParam.isQuerySync = param.isQuerySync;
168     syncParam.syncQuery = param.syncQuery;
169     return Sync(syncParam);
170 }
171 
Sync(const SyncParma & param)172 int GenericSyncer::Sync(const SyncParma &param)
173 {
174     return Sync(param, DBConstant::IGNORE_CONNECTION_ID);
175 }
176 
Sync(const SyncParma & param,uint64_t connectionId)177 int GenericSyncer::Sync(const SyncParma &param, uint64_t connectionId)
178 {
179     int errCode = SyncPreCheck(param);
180     if (errCode != E_OK) {
181         return errCode;
182     }
183     errCode = AddQueuedManualSyncSize(param.mode, param.wait);
184     if (errCode != E_OK) {
185         return errCode;
186     }
187 
188     uint32_t syncId = GenerateSyncId();
189     errCode = PrepareSync(param, syncId, connectionId);
190     if (errCode != E_OK) {
191         LOGE("[Syncer] PrepareSync failed when sync called, err %d", errCode);
192         return errCode;
193     }
194     PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
195     return E_OK;
196 }
197 
CancelSync(uint32_t syncId)198 int GenericSyncer::CancelSync(uint32_t syncId)
199 {
200     LOGI("[Syncer] Start cancelSync %" PRIu32 "", syncId);
201     ISyncEngine *engine = nullptr;
202     {
203         std::lock_guard<std::mutex> autoLock(syncerLock_);
204         engine = syncEngine_;
205         RefObject::IncObjRef(engine);
206     }
207     SyncOperation *operation = nullptr;
208     {
209         std::lock_guard<std::mutex> lock(operationMapLock_);
210         auto iter = syncOperationMap_.find(syncId);
211         if (iter == syncOperationMap_.end()) {
212             LOGE("[Syncer] Non-existent syncId %" PRIu32 "", syncId);
213             RefObject::DecObjRef(engine);
214             return -E_NOT_SUPPORT;
215         } else if (iter->second->CanCancel() == false) {
216             RefObject::DecObjRef(engine);
217             LOGE("[Syncer] Can't cancel syncId %" PRIu32 " %" PRIu32 "", syncId, iter->second->CanCancel());
218             return -E_NOT_SUPPORT;
219         }
220         operation = iter->second;
221         RefObject::IncObjRef(operation);
222     }
223 
224     const std::vector<std::string> &devices = operation->GetDevices();
225     for (auto &deviceId : devices) {
226         engine->ClearAllSyncTaskByDevice(deviceId);
227     }
228     LOGI("[Syncer] End cancelSync %" PRIu32 ", devices = %s", syncId, GetSyncDevicesStr(devices).c_str());
229 
230     RefObject::DecObjRef(operation);
231     RefObject::DecObjRef(engine);
232     return E_OK;
233 }
234 
PrepareSync(const SyncParma & param,uint32_t syncId,uint64_t connectionId)235 int GenericSyncer::PrepareSync(const SyncParma &param, uint32_t syncId, uint64_t connectionId)
236 {
237     auto *operation =
238         new (std::nothrow) SyncOperation(syncId, param.devices, param.mode, param.onComplete, param.wait);
239     if (operation == nullptr) {
240         SubQueuedSyncSize();
241         return -E_OUT_OF_MEMORY;
242     }
243     ISyncEngine *engine = nullptr;
244     {
245         std::lock_guard<std::mutex> autoLock(syncerLock_);
246         PerformanceAnalysis::GetInstance()->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
247         InitSyncOperation(operation, param);
248         LOGI("[Syncer] GenerateSyncId %" PRIu32 ", mode = %d, wait = %d, label = %s, devices = %s", syncId, param.mode,
249             param.wait, label_.c_str(), GetSyncDevicesStr(param.devices).c_str());
250         engine = syncEngine_;
251         RefObject::IncObjRef(engine);
252     }
253     AddSyncOperation(engine, operation);
254     RefObject::DecObjRef(engine);
255     PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
256     if (connectionId != DBConstant::IGNORE_CONNECTION_ID) {
257         std::lock_guard<std::mutex> lockGuard(syncIdLock_);
258         connectionIdMap_[connectionId].push_back(static_cast<int>(syncId));
259         syncIdMap_[static_cast<int>(syncId)] = connectionId;
260     }
261     if (operation->CheckIsAllFinished()) {
262         operation->Finished();
263         RefObject::KillAndDecObjRef(operation);
264     } else {
265         operation->WaitIfNeed();
266         RefObject::DecObjRef(operation);
267     }
268     return E_OK;
269 }
270 
RemoveSyncOperation(int syncId)271 int GenericSyncer::RemoveSyncOperation(int syncId)
272 {
273     SyncOperation *operation = nullptr;
274     std::unique_lock<std::mutex> lock(operationMapLock_);
275     auto iter = syncOperationMap_.find(syncId);
276     if (iter != syncOperationMap_.end()) {
277         LOGD("[Syncer] RemoveSyncOperation id:%d.", syncId);
278         operation = iter->second;
279         syncOperationMap_.erase(syncId);
280         lock.unlock();
281         if ((!operation->IsAutoSync()) && (!operation->IsBlockSync()) && (!operation->IsAutoControlCmd())) {
282             SubQueuedSyncSize();
283         }
284         operation->NotifyIfNeed();
285         RefObject::KillAndDecObjRef(operation);
286         operation = nullptr;
287         std::lock_guard<std::mutex> lockGuard(syncIdLock_);
288         if (syncIdMap_.find(syncId) == syncIdMap_.end()) {
289             return E_OK;
290         }
291         uint64_t connectionId = syncIdMap_[syncId];
292         if (connectionIdMap_.find(connectionId) != connectionIdMap_.end()) {
293             connectionIdMap_[connectionId].remove(syncId);
294         }
295         syncIdMap_.erase(syncId);
296         return E_OK;
297     }
298     return -E_INVALID_ARGS;
299 }
300 
StopSync(uint64_t connectionId)301 int GenericSyncer::StopSync(uint64_t connectionId)
302 {
303     std::list<int> syncIdList;
304     {
305         std::lock_guard<std::mutex> lockGuard(syncIdLock_);
306         if (connectionIdMap_.find(connectionId) == connectionIdMap_.end()) {
307             return E_OK;
308         }
309         syncIdList = connectionIdMap_[connectionId];
310         connectionIdMap_.erase(connectionId);
311     }
312     for (auto syncId : syncIdList) {
313         RemoveSyncOperation(syncId);
314         if (syncEngine_ != nullptr) {
315             syncEngine_->AbortMachineIfNeed(syncId);
316         }
317     }
318     if (syncEngine_ != nullptr) {
319         syncEngine_->NotifyConnectionClosed(connectionId);
320     }
321     return E_OK;
322 }
323 
GetTimestamp()324 uint64_t GenericSyncer::GetTimestamp()
325 {
326     std::shared_ptr<TimeHelper> timeHelper = nullptr;
327     ISyncInterface *storage = nullptr;
328     {
329         std::lock_guard<std::mutex> lock(syncerLock_);
330         timeHelper = timeHelper_;
331         if (syncInterface_ != nullptr) {
332             storage = syncInterface_;
333             storage->IncRefCount();
334         }
335     }
336     if (storage == nullptr) {
337         return TimeHelper::GetSysCurrentTime();
338     }
339     if (timeHelper == nullptr) {
340         storage->DecRefCount();
341         return TimeHelper::GetSysCurrentTime();
342     }
343     uint64_t timestamp = timeHelper->GetTime();
344     storage->DecRefCount();
345     return timestamp;
346 }
347 
QueryAutoSync(const InternalSyncParma & param)348 void GenericSyncer::QueryAutoSync(const InternalSyncParma &param)
349 {
350     if (!initialized_) {
351         LOGW("[Syncer] Syncer has not Init");
352         return;
353     }
354     LOGI("[GenericSyncer] trigger query syncmode=%u,dev=%s", param.mode, GetSyncDevicesStr(param.devices).c_str());
355     ISyncInterface *syncInterface = nullptr;
356     ISyncEngine *engine = nullptr;
357     {
358         std::lock_guard<std::mutex> lock(syncerLock_);
359         if (syncInterface_ == nullptr || syncEngine_ == nullptr) {
360             LOGW("[Syncer] Syncer has not Init");
361             return;
362         }
363         syncInterface = syncInterface_;
364         engine = syncEngine_;
365         syncInterface->IncRefCount();
366         RefObject::IncObjRef(engine);
367     }
368     int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, param, engine, syncInterface] {
369         int errCode = Sync(param);
370         if (errCode != E_OK) {
371             LOGE("[GenericSyncer] sync start by QueryAutoSync failed err %d", errCode);
372         }
373         RefObject::DecObjRef(engine);
374         syncInterface->DecRefCount();
375     });
376     if (retCode != E_OK) {
377         LOGE("[GenericSyncer] QueryAutoSync triggler sync retCode:%d", retCode);
378         RefObject::DecObjRef(engine);
379         syncInterface->DecRefCount();
380     }
381 }
382 
AddSyncOperation(ISyncEngine * engine,SyncOperation * operation)383 void GenericSyncer::AddSyncOperation(ISyncEngine *engine, SyncOperation *operation)
384 {
385     if (operation == nullptr) {
386         return;
387     }
388 
389     LOGD("[Syncer] AddSyncOperation.");
390     engine->AddSyncOperation(operation);
391 
392     if (operation->CheckIsAllFinished()) {
393         return;
394     }
395 
396     std::lock_guard<std::mutex> lock(operationMapLock_);
397     syncOperationMap_.insert(std::pair<int, SyncOperation *>(operation->GetSyncId(), operation));
398     // To make sure operation alive before WaitIfNeed out
399     RefObject::IncObjRef(operation);
400 }
401 
SyncOperationKillCallbackInner(int syncId)402 void GenericSyncer::SyncOperationKillCallbackInner(int syncId)
403 {
404     if (syncEngine_ != nullptr) {
405         LOGI("[Syncer] Operation on kill id = %d", syncId);
406         syncEngine_->RemoveSyncOperation(syncId);
407     }
408 }
409 
SyncOperationKillCallback(int syncId)410 void GenericSyncer::SyncOperationKillCallback(int syncId)
411 {
412     SyncOperationKillCallbackInner(syncId);
413 }
414 
InitMetaData(ISyncInterface * syncInterface)415 int GenericSyncer::InitMetaData(ISyncInterface *syncInterface)
416 {
417     if (metadata_ != nullptr) {
418         return E_OK;
419     }
420 
421     metadata_ = std::make_shared<Metadata>();
422     if (metadata_ == nullptr) {
423         LOGE("[Syncer] metadata make shared failed");
424         return -E_OUT_OF_MEMORY;
425     }
426     int errCode = metadata_->Initialize(syncInterface);
427     if (errCode != E_OK) {
428         LOGE("[Syncer] metadata Initializeate failed! err %d.", errCode);
429         metadata_ = nullptr;
430     }
431     syncInterface_ = syncInterface;
432     return errCode;
433 }
434 
InitTimeHelper(ISyncInterface * syncInterface)435 int GenericSyncer::InitTimeHelper(ISyncInterface *syncInterface)
436 {
437     if (timeHelper_ != nullptr) {
438         return E_OK;
439     }
440 
441     timeHelper_ = std::make_shared<TimeHelper>();
442     if (timeHelper_ == nullptr) {
443         return -E_OUT_OF_MEMORY;
444     }
445     int errCode = timeHelper_->Initialize(syncInterface, metadata_);
446     if (errCode != E_OK) {
447         LOGE("[Syncer] TimeHelper init failed! err:%d.", errCode);
448         timeHelper_ = nullptr;
449     }
450     return errCode;
451 }
452 
InitSyncEngine(ISyncInterface * syncInterface)453 int GenericSyncer::InitSyncEngine(ISyncInterface *syncInterface)
454 {
455     if (syncEngine_ != nullptr && syncEngine_->IsEngineActive()) {
456         LOGI("[Syncer] syncEngine is active");
457         return E_OK;
458     }
459     int errCode = BuildSyncEngine();
460     if (errCode != E_OK) {
461         return errCode;
462     }
463     const std::function<void(std::string)> onlineFunc = [this](const std::string &device) {
464         RemoteDataChanged(device);
465     };
466     const std::function<void(std::string)> offlineFunc = [this](const std::string &device) {
467         RemoteDeviceOffline(device);
468     };
469     const std::function<void(const InternalSyncParma &param)> queryAutoSyncFunc =
470         [this](const InternalSyncParma &syncParam) { QueryAutoSync(syncParam); };
471     const ISyncEngine::InitCallbackParam param = { onlineFunc, offlineFunc, queryAutoSyncFunc };
472     errCode = syncEngine_->Initialize(syncInterface, metadata_, param);
473     if (errCode == E_OK) {
474         syncInterface->IncRefCount();
475         label_ = syncEngine_->GetLabel();
476         return E_OK;
477     } else {
478         LOGE("[Syncer] SyncEngine init failed! err:%d.", errCode);
479         RefObject::KillAndDecObjRef(syncEngine_);
480         syncEngine_ = nullptr;
481         return errCode;
482     }
483 }
484 
CheckSyncActive(ISyncInterface * syncInterface,bool isNeedActive)485 int GenericSyncer::CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive)
486 {
487     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE,
488         false);
489     if (!isSyncDualTupleMode || isNeedActive) {
490         return E_OK;
491     }
492     LOGI("[Syncer] syncer no need to active");
493     int errCode = BuildSyncEngine();
494     if (errCode != E_OK) {
495         return errCode;
496     }
497     return -E_NO_NEED_ACTIVE;
498 }
499 
GenerateSyncId()500 uint32_t GenericSyncer::GenerateSyncId()
501 {
502     std::lock_guard<std::mutex> lock(syncIdLock_);
503     currentSyncId_++;
504     // if overflow, reset to 1
505     if (currentSyncId_ <= 0) {
506         currentSyncId_ = MIN_VALID_SYNC_ID;
507     }
508     return currentSyncId_;
509 }
510 
IsValidMode(int mode) const511 bool GenericSyncer::IsValidMode(int mode) const
512 {
513     if ((mode >= SyncModeType::INVALID_MODE) || (mode < SyncModeType::PUSH)) {
514         LOGE("[Syncer] Sync mode is not valid!");
515         return false;
516     }
517     return true;
518 }
519 
SyncConditionCheck(const SyncParma & param,const ISyncEngine * engine,ISyncInterface * storage) const520 int GenericSyncer::SyncConditionCheck(const SyncParma &param, const ISyncEngine *engine, ISyncInterface *storage) const
521 {
522     (void)param;
523     (void)engine;
524     (void)storage;
525     return E_OK;
526 }
527 
IsValidDevices(const std::vector<std::string> & devices) const528 bool GenericSyncer::IsValidDevices(const std::vector<std::string> &devices) const
529 {
530     if (devices.empty()) {
531         LOGE("[Syncer] devices is empty!");
532         return false;
533     }
534     return true;
535 }
536 
ClearSyncOperations(bool isClosedOperation)537 void GenericSyncer::ClearSyncOperations(bool isClosedOperation)
538 {
539     std::vector<SyncOperation *> syncOperation;
540     {
541         std::lock_guard<std::mutex> lock(operationMapLock_);
542         for (auto &item : syncOperationMap_) {
543             bool isBlockSync = item.second->IsBlockSync();
544             if (isBlockSync || !isClosedOperation) {
545                 int status = (!isClosedOperation) ? SyncOperation::OP_USER_CHANGED : SyncOperation::OP_FAILED;
546                 item.second->SetUnfinishedDevStatus(status);
547                 RefObject::IncObjRef(item.second);
548                 syncOperation.push_back(item.second);
549             }
550         }
551     }
552 
553     if (!isClosedOperation) { // means user changed
554         syncEngine_->NotifyUserChange();
555     }
556 
557     for (auto &operation : syncOperation) {
558         // block sync operation or userChange will trigger remove sync operation
559         // caller won't blocked for block sync
560         // caller won't blocked for userChange operation no mater it is block or non-block sync
561         TriggerSyncFinished(operation);
562         RefObject::DecObjRef(operation);
563     }
564     ClearInnerResource(isClosedOperation);
565 }
566 
ClearInnerResource(bool isClosedOperation)567 void GenericSyncer::ClearInnerResource(bool isClosedOperation)
568 {
569     {
570         std::lock_guard<std::mutex> lock(operationMapLock_);
571         for (auto &iter : syncOperationMap_) {
572             RefObject::KillAndDecObjRef(iter.second);
573             iter.second = nullptr;
574         }
575         syncOperationMap_.clear();
576     }
577     {
578         std::lock_guard<std::mutex> lock(syncIdLock_);
579         if (isClosedOperation) {
580             connectionIdMap_.clear();
581         } else { // only need to clear syncid when user change
582             for (auto &item : connectionIdMap_) {
583                 item.second.clear();
584             }
585         }
586         syncIdMap_.clear();
587     }
588 }
589 
TriggerSyncFinished(SyncOperation * operation)590 void GenericSyncer::TriggerSyncFinished(SyncOperation *operation)
591 {
592     if (operation != nullptr && operation->CheckIsAllFinished()) { // LCOV_EXCL_BR_LINE
593         operation->Finished();
594     }
595 }
596 
OnSyncFinished(int syncId)597 void GenericSyncer::OnSyncFinished(int syncId)
598 {
599     (void)(RemoveSyncOperation(syncId));
600 }
601 
SyncModuleInit()602 int GenericSyncer::SyncModuleInit()
603 {
604     static bool isInit = false;
605     std::lock_guard<std::mutex> lock(moduleInitLock_);
606     if (!isInit) {
607         int errCode = SyncResourceInit();
608         if (errCode != E_OK) {
609             return errCode;
610         }
611         isInit = true;
612         return E_OK;
613     }
614     return E_OK;
615 }
616 
SyncResourceInit()617 int GenericSyncer::SyncResourceInit()
618 {
619     int errCode = TimeSync::RegisterTransformFunc();
620     if (errCode != E_OK) {
621         LOGE("Register timesync message transform func ERR!");
622         return errCode;
623     }
624     errCode = SingleVerSerializeManager::RegisterTransformFunc();
625     if (errCode != E_OK) {
626         LOGE("Register SingleVerDataSync message transform func ERR!");
627         return errCode;
628     }
629 #ifndef OMIT_MULTI_VER
630     errCode = CommitHistorySync::RegisterTransformFunc();
631     if (errCode != E_OK) {
632         LOGE("Register CommitHistorySync message transform func ERR!");
633         return errCode;
634     }
635     errCode = MultiVerDataSync::RegisterTransformFunc();
636     if (errCode != E_OK) {
637         LOGE("Register MultiVerDataSync message transform func ERR!");
638         return errCode;
639     }
640     errCode = ValueSliceSync::RegisterTransformFunc();
641     if (errCode != E_OK) {
642         LOGE("Register ValueSliceSync message transform func ERR!");
643         return errCode;
644     }
645 #endif
646     errCode = DeviceManager::RegisterTransformFunc();
647     if (errCode != E_OK) {
648         LOGE("Register DeviceManager message transform func ERR!");
649         return errCode;
650     }
651     errCode = AbilitySync::RegisterTransformFunc();
652     if (errCode != E_OK) {
653         LOGE("Register AbilitySync message transform func ERR!");
654         return errCode;
655     }
656     return E_OK;
657 }
658 
GetQueuedSyncSize(int * queuedSyncSize) const659 int GenericSyncer::GetQueuedSyncSize(int *queuedSyncSize) const
660 {
661     if (queuedSyncSize == nullptr) {
662         return -E_INVALID_ARGS;
663     }
664     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
665     *queuedSyncSize = queuedManualSyncSize_;
666     LOGI("[GenericSyncer] GetQueuedSyncSize:%d", queuedManualSyncSize_);
667     return E_OK;
668 }
669 
SetQueuedSyncLimit(const int * queuedSyncLimit)670 int GenericSyncer::SetQueuedSyncLimit(const int *queuedSyncLimit)
671 {
672     if (queuedSyncLimit == nullptr) {
673         return -E_INVALID_ARGS;
674     }
675     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
676     queuedManualSyncLimit_ = *queuedSyncLimit;
677     LOGI("[GenericSyncer] SetQueuedSyncLimit:%d", queuedManualSyncLimit_);
678     return E_OK;
679 }
680 
GetQueuedSyncLimit(int * queuedSyncLimit) const681 int GenericSyncer::GetQueuedSyncLimit(int *queuedSyncLimit) const
682 {
683     if (queuedSyncLimit == nullptr) {
684         return -E_INVALID_ARGS;
685     }
686     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
687     *queuedSyncLimit = queuedManualSyncLimit_;
688     LOGI("[GenericSyncer] GetQueuedSyncLimit:%d", queuedManualSyncLimit_);
689     return E_OK;
690 }
691 
IsManualSync(int inMode) const692 bool GenericSyncer::IsManualSync(int inMode) const
693 {
694     int mode = SyncOperation::TransferSyncMode(inMode);
695     if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH) || (mode == SyncModeType::PUSH_AND_PULL) ||
696         (mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
697         return true;
698     }
699     return false;
700 }
701 
AddQueuedManualSyncSize(int mode,bool wait)702 int GenericSyncer::AddQueuedManualSyncSize(int mode, bool wait)
703 {
704     if (IsManualSync(mode) && (!wait)) {
705         std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
706         if (!manualSyncEnable_) {
707             LOGI("[GenericSyncer] manualSyncEnable is Disable");
708             return -E_BUSY;
709         }
710         queuedManualSyncSize_++;
711     }
712     return E_OK;
713 }
714 
IsQueuedManualSyncFull(int mode,bool wait) const715 bool GenericSyncer::IsQueuedManualSyncFull(int mode, bool wait) const
716 {
717     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
718     if (IsManualSync(mode) && (!manualSyncEnable_)) { // LCOV_EXCL_BR_LINE
719         LOGI("[GenericSyncer] manualSyncEnable_:false");
720         return true;
721     }
722     if (IsManualSync(mode) && (!wait)) { // LCOV_EXCL_BR_LINE
723         if (queuedManualSyncSize_ < queuedManualSyncLimit_) {
724             return false;
725         } else {
726             LOGD("[GenericSyncer] queuedManualSyncSize_:%d < queuedManualSyncLimit_:%d", queuedManualSyncSize_,
727                 queuedManualSyncLimit_);
728             return true;
729         }
730     } else {
731         return false;
732     }
733 }
734 
SubQueuedSyncSize(void)735 void GenericSyncer::SubQueuedSyncSize(void)
736 {
737     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
738     queuedManualSyncSize_--;
739     if (queuedManualSyncSize_ < 0) {
740         LOGE("[GenericSyncer] queuedManualSyncSize_ < 0!");
741         queuedManualSyncSize_ = 0;
742     }
743 }
744 
DisableManualSync(void)745 int GenericSyncer::DisableManualSync(void)
746 {
747     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
748     if (queuedManualSyncSize_ > 0) {
749         LOGD("[GenericSyncer] DisableManualSync fail, queuedManualSyncSize_:%d", queuedManualSyncSize_);
750         return -E_BUSY;
751     }
752     manualSyncEnable_ = false;
753     LOGD("[GenericSyncer] DisableManualSync ok");
754     return E_OK;
755 }
756 
EnableManualSync(void)757 int GenericSyncer::EnableManualSync(void)
758 {
759     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
760     manualSyncEnable_ = true;
761     LOGD("[GenericSyncer] EnableManualSync ok");
762     return E_OK;
763 }
764 
GetLocalIdentity(std::string & outTarget) const765 int GenericSyncer::GetLocalIdentity(std::string &outTarget) const
766 {
767     std::string deviceId;
768     int errCode =  RuntimeContext::GetInstance()->GetLocalIdentity(deviceId);
769     if (errCode != E_OK) {
770         LOGE("[GenericSyncer] GetLocalIdentity fail errCode:%d", errCode);
771         return errCode;
772     }
773     outTarget = DBCommon::TransferHashString(deviceId);
774     return E_OK;
775 }
776 
GetOnlineDevices(std::vector<std::string> & devices) const777 void GenericSyncer::GetOnlineDevices(std::vector<std::string> &devices) const
778 {
779     std::string identifier;
780     {
781         std::lock_guard<std::mutex> lock(syncerLock_);
782         // Get devices from AutoLaunch first.
783         if (syncInterface_ == nullptr) {
784             LOGI("[Syncer] GetOnlineDevices syncInterface_ is nullptr");
785             return;
786         }
787         bool isSyncDualTupleMode = syncInterface_->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
788             false);
789         if (isSyncDualTupleMode) {
790             identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::DUAL_TUPLE_IDENTIFIER_DATA,
791                 "");
792         } else {
793             identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
794         }
795     }
796     RuntimeContext::GetInstance()->GetAutoLaunchSyncDevices(identifier, devices);
797     if (!devices.empty()) {
798         return;
799     }
800     std::lock_guard<std::mutex> lock(syncerLock_);
801     if (closing_) {
802         LOGW("[Syncer] Syncer is closing, return!");
803         return;
804     }
805     if (syncEngine_ != nullptr) {
806         syncEngine_->GetOnlineDevices(devices);
807     }
808 }
809 
SetSyncRetry(bool isRetry)810 int GenericSyncer::SetSyncRetry(bool isRetry)
811 {
812     if (syncEngine_ == nullptr) {
813         return -E_NOT_INIT;
814     }
815     syncEngine_->SetSyncRetry(isRetry);
816     return E_OK;
817 }
818 
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)819 int GenericSyncer::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
820 {
821     std::lock_guard<std::mutex> lock(syncerLock_);
822     if (syncEngine_ == nullptr) {
823         return -E_NOT_INIT;
824     }
825     int errCode = syncEngine_->SetEqualIdentifier(identifier, targets);
826     if (errCode == E_OK) {
827         syncEngine_->SetEqualIdentifierMap(identifier, targets);
828     }
829     return errCode;
830 }
831 
GetSyncDevicesStr(const std::vector<std::string> & devices) const832 std::string GenericSyncer::GetSyncDevicesStr(const std::vector<std::string> &devices) const
833 {
834     std::string syncDevices;
835     for (const auto &dev:devices) {
836         syncDevices += DBCommon::StringMasking(dev);
837         syncDevices += ",";
838     }
839     if (syncDevices.empty()) {
840         return "";
841     }
842     return syncDevices.substr(0, syncDevices.size() - 1);
843 }
844 
StatusCheck() const845 int GenericSyncer::StatusCheck() const
846 {
847     if (!initialized_) {
848         LOGE("[Syncer] Syncer is not initialized, return!");
849         return -E_BUSY;
850     }
851     if (closing_) {
852         LOGW("[Syncer] Syncer is closing, return!");
853         return -E_BUSY;
854     }
855     return E_OK;
856 }
857 
SyncPreCheck(const SyncParma & param) const858 int GenericSyncer::SyncPreCheck(const SyncParma &param) const
859 {
860     ISyncEngine *engine = nullptr;
861     ISyncInterface *storage = nullptr;
862     int errCode = E_OK;
863     {
864         std::lock_guard<std::mutex> lock(syncerLock_);
865         errCode = StatusCheck();
866         if (errCode != E_OK) {
867             return errCode;
868         }
869         if (!IsValidDevices(param.devices) || !IsValidMode(param.mode)) { // LCOV_EXCL_BR_LINE
870             return -E_INVALID_ARGS;
871         }
872         if (IsQueuedManualSyncFull(param.mode, param.wait)) { // LCOV_EXCL_BR_LINE
873             LOGE("[Syncer] -E_BUSY");
874             return -E_BUSY;
875         }
876         storage = syncInterface_;
877         engine = syncEngine_;
878         if (storage == nullptr || engine == nullptr) {
879             return -E_BUSY;
880         }
881         storage->IncRefCount();
882         RefObject::IncObjRef(engine);
883     }
884     errCode = SyncConditionCheck(param, engine, storage);
885     storage->DecRefCount();
886     RefObject::DecObjRef(engine);
887     return errCode;
888 }
889 
InitSyncOperation(SyncOperation * operation,const SyncParma & param)890 void GenericSyncer::InitSyncOperation(SyncOperation *operation, const SyncParma &param)
891 {
892     operation->SetIdentifier(syncInterface_->GetIdentifier());
893     operation->SetSyncProcessCallFun(param.onSyncProcess);
894     operation->Initialize();
895     operation->OnKill([this, id = operation->GetSyncId()] { SyncOperationKillCallback(id); });
896     std::function<void(int)> onFinished = [this](int syncId) { OnSyncFinished(syncId); };
897     operation->SetOnSyncFinished(onFinished);
898     operation->SetOnSyncFinalize(param.onFinalize);
899     if (param.isQuerySync) {
900         operation->SetQuery(param.syncQuery);
901     }
902 }
903 
BuildSyncEngine()904 int GenericSyncer::BuildSyncEngine()
905 {
906     if (syncEngine_ != nullptr) {
907         return E_OK;
908     }
909     syncEngine_ = CreateSyncEngine();
910     if (syncEngine_ == nullptr) {
911         return -E_OUT_OF_MEMORY;
912     }
913     syncEngine_->OnLastRef([this]() {
914         LOGD("[Syncer] SyncEngine finalized");
915         {
916             std::lock_guard<std::mutex> cvLock(engineMutex_);
917             engineFinalize_ = true;
918         }
919         engineFinalizeCv_.notify_all();
920     });
921     return E_OK;
922 }
923 
Dump(int fd)924 void GenericSyncer::Dump(int fd)
925 {
926     if (syncEngine_ == nullptr) {
927         return;
928     }
929     RefObject::IncObjRef(syncEngine_);
930     syncEngine_->Dump(fd);
931     RefObject::DecObjRef(syncEngine_);
932 }
933 
DumpSyncerBasicInfo()934 SyncerBasicInfo GenericSyncer::DumpSyncerBasicInfo()
935 {
936     SyncerBasicInfo baseInfo;
937     if (syncEngine_ == nullptr) {
938         return baseInfo;
939     }
940     RefObject::IncObjRef(syncEngine_);
941     baseInfo.isSyncActive = syncEngine_->IsEngineActive();
942     RefObject::DecObjRef(syncEngine_);
943     return baseInfo;
944 }
945 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)946 int GenericSyncer::RemoteQuery(const std::string &device, const RemoteCondition &condition,
947     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
948 {
949     ISyncEngine *syncEngine = nullptr;
950     {
951         std::lock_guard<std::mutex> lock(syncerLock_);
952         int errCode = StatusCheck();
953         if (errCode != E_OK) {
954             return errCode;
955         }
956         syncEngine = syncEngine_;
957         RefObject::IncObjRef(syncEngine);
958     }
959     if (syncEngine == nullptr) {
960         return -E_NOT_INIT;
961     }
962     int errCode = syncEngine->RemoteQuery(device, condition, timeout, connectionId, result);
963     RefObject::DecObjRef(syncEngine);
964     return errCode;
965 }
966 
InitTimeChangedListener()967 int GenericSyncer::InitTimeChangedListener()
968 {
969     int errCode = E_OK;
970     if (timeChangedListener_ != nullptr) {
971         return errCode;
972     }
973     timeChangedListener_ = RuntimeContext::GetInstance()->RegisterTimeChangedLister(
974         [this](void *changedOffset) {
975             RecordTimeChangeOffset(changedOffset);
976         },
977         [this]() {
978             {
979                 std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
980                 timeChangeListenerFinalize_ = true;
981             }
982             timeChangeCv_.notify_all();
983         }, errCode);
984     if (timeChangedListener_ == nullptr) {
985         LOGE("[GenericSyncer] Init RegisterTimeChangedLister failed");
986         return errCode;
987     }
988     {
989         std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
990         timeChangeListenerFinalize_ = false;
991     }
992     return E_OK;
993 }
994 
ReleaseInnerResource()995 void GenericSyncer::ReleaseInnerResource()
996 {
997     NotificationChain::Listener *timeChangedListener = nullptr;
998     {
999         std::lock_guard<std::mutex> lock(syncerLock_);
1000         if (timeChangedListener_ != nullptr) {
1001             timeChangedListener = timeChangedListener_;
1002             timeChangedListener_ = nullptr;
1003         }
1004         timeHelper_ = nullptr;
1005         metadata_ = nullptr;
1006     }
1007     if (timeChangedListener != nullptr) {
1008         timeChangedListener->Drop(true);
1009         RuntimeContext::GetInstance()->StopTimeTickMonitorIfNeed();
1010     }
1011     std::unique_lock<std::mutex> uniqueLock(timeChangeListenerMutex_);
1012     LOGD("[GenericSyncer] Begin wait time change listener finalize");
1013     timeChangeCv_.wait(uniqueLock, [this]() {
1014         return timeChangeListenerFinalize_;
1015     });
1016     LOGD("[GenericSyncer] End wait time change listener finalize");
1017 }
1018 
RecordTimeChangeOffset(void * changedOffset)1019 void GenericSyncer::RecordTimeChangeOffset(void *changedOffset)
1020 {
1021     std::shared_ptr<Metadata> metadata = nullptr;
1022     ISyncInterface *storage = nullptr;
1023     {
1024         std::lock_guard<std::mutex> lock(syncerLock_);
1025         if (changedOffset == nullptr || metadata_ == nullptr || syncInterface_ == nullptr) {
1026             return;
1027         }
1028         storage = syncInterface_;
1029         metadata = metadata_;
1030         storage->IncRefCount();
1031     }
1032     TimeOffset changedTimeOffset = *(reinterpret_cast<TimeOffset *>(changedOffset)) *
1033         static_cast<TimeOffset>(TimeHelper::TO_100_NS);
1034     TimeOffset orgOffset = metadata->GetLocalTimeOffset() - changedTimeOffset;
1035     TimeOffset currentSysTime = static_cast<TimeOffset>(TimeHelper::GetSysCurrentTime());
1036     Timestamp maxItemTime = 0;
1037     storage->GetMaxTimestamp(maxItemTime);
1038     if ((orgOffset + currentSysTime) > TimeHelper::BUFFER_VALID_TIME) {
1039         orgOffset = TimeHelper::BUFFER_VALID_TIME -
1040             currentSysTime + static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS);
1041     }
1042     if ((currentSysTime + orgOffset) <= static_cast<TimeOffset>(maxItemTime)) {
1043         orgOffset = static_cast<TimeOffset>(maxItemTime) - currentSysTime +
1044             static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS); // 1ms
1045     }
1046     metadata->SaveLocalTimeOffset(orgOffset);
1047     ResetTimeSyncMarkByTimeChange(metadata, *storage);
1048     storage->DecRefCount();
1049 }
1050 
CloseInner(bool isClosedOperation)1051 int GenericSyncer::CloseInner(bool isClosedOperation)
1052 {
1053     {
1054         std::lock_guard<std::mutex> lock(syncerLock_);
1055         if (!initialized_) {
1056             LOGW("[Syncer] Syncer[%s] don't need to close, because it has not been init", label_.c_str());
1057             return -E_NOT_INIT;
1058         }
1059         initialized_ = false;
1060         if (closing_) {
1061             LOGE("[Syncer] Syncer is closing, return!");
1062             return -E_BUSY;
1063         }
1064         closing_ = true;
1065     }
1066     ClearSyncOperations(isClosedOperation);
1067     if (syncEngine_ != nullptr) {
1068         syncEngine_->Close();
1069         LOGD("[Syncer] Close SyncEngine!");
1070         std::lock_guard<std::mutex> lock(syncerLock_);
1071         closing_ = false;
1072     }
1073     return E_OK;
1074 }
1075 
GetSyncDataSize(const std::string & device,size_t & size) const1076 int GenericSyncer::GetSyncDataSize(const std::string &device, size_t &size) const
1077 {
1078     uint64_t localWaterMark = 0;
1079     std::shared_ptr<Metadata> metadata = nullptr;
1080     {
1081         std::lock_guard<std::mutex> lock(syncerLock_);
1082         if (metadata_ == nullptr || syncInterface_ == nullptr) {
1083             return -E_INTERNAL_ERROR;
1084         }
1085         if (closing_) {
1086             LOGE("[Syncer] Syncer is closing, return!");
1087             return -E_BUSY;
1088         }
1089         int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->TryHandle();
1090         if (errCode != E_OK) {
1091             LOGE("[Syncer] syncer is restarting, return!");
1092             return errCode;
1093         }
1094         syncInterface_->IncRefCount();
1095         metadata = metadata_;
1096     }
1097     metadata->GetLocalWaterMark(device, localWaterMark);
1098     uint32_t expectedMtuSize = DEFAULT_MTU_SIZE;
1099     DataSizeSpecInfo syncDataSizeInfo = {expectedMtuSize, static_cast<size_t>(MAX_TIMESTAMP)};
1100     std::vector<SendDataItem> outData;
1101     ContinueToken token = nullptr;
1102     int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->GetSyncData(localWaterMark, MAX_TIMESTAMP,
1103         outData, token, syncDataSizeInfo);
1104     if (token != nullptr) {
1105         static_cast<SyncGenericInterface *>(syncInterface_)->ReleaseContinueToken(token);
1106         token = nullptr;
1107     }
1108     if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
1109         LOGE("calculate sync data size failed %d", errCode);
1110         syncInterface_->DecRefCount();
1111         return errCode;
1112     }
1113     uint32_t totalLen = 0;
1114     if (errCode == -E_UNFINISHED) {
1115         totalLen = expectedMtuSize;
1116     } else {
1117         totalLen = GenericSingleVerKvEntry::CalculateLens(outData, SOFTWARE_VERSION_CURRENT);
1118     }
1119     for (auto &entry : outData) {
1120         delete entry;
1121         entry = nullptr;
1122     }
1123     syncInterface_->DecRefCount();
1124     // if larger than 1M, return 1M
1125     size = (totalLen >= expectedMtuSize) ? expectedMtuSize : totalLen;
1126     return E_OK;
1127 }
1128 
IsNeedActive(ISyncInterface * syncInterface)1129 bool GenericSyncer::IsNeedActive(ISyncInterface *syncInterface)
1130 {
1131     bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
1132     if (localOnly) {
1133         LOGD("[Syncer] Local only db, don't need active syncer");
1134         return false;
1135     }
1136     return true;
1137 }
1138 
GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const1139 int GenericSyncer::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const
1140 {
1141     (void)clientId;
1142     (void)hashDevId;
1143     return -E_NOT_SUPPORT;
1144 }
1145 
InitStorageResource(ISyncInterface * syncInterface)1146 int GenericSyncer::InitStorageResource(ISyncInterface *syncInterface)
1147 {
1148     // As metadata_ will be used in EraseDeviceWaterMark, it should not be clear even if engine init failed.
1149     // It will be clear in destructor.
1150     int errCode = InitMetaData(syncInterface);
1151     if (errCode != E_OK) {
1152         return errCode;
1153     }
1154 
1155     // As timeHelper_ will be used in GetTimestamp, it should not be clear even if engine init failed.
1156     // It will be clear in destructor.
1157     errCode = InitTimeHelper(syncInterface);
1158     if (errCode != E_OK) {
1159         return errCode;
1160     }
1161 
1162     if (!IsNeedActive(syncInterface)) {
1163         return -E_NO_NEED_ACTIVE;
1164     }
1165     return errCode;
1166 }
1167 
GetWatermarkInfo(const std::string & device,WatermarkInfo & info)1168 int GenericSyncer::GetWatermarkInfo(const std::string &device, WatermarkInfo &info)
1169 {
1170     std::shared_ptr<Metadata> metadata = nullptr;
1171     {
1172         std::lock_guard<std::mutex> autoLock(syncerLock_);
1173         metadata = metadata_;
1174     }
1175     if (metadata == nullptr) {
1176         LOGE("[Syncer] Metadata is not init");
1177         return -E_NOT_INIT;
1178     }
1179     std::string dev;
1180     bool devNeedHash = true;
1181     int errCode = metadata->GetHashDeviceId(device, dev);
1182     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
1183         LOGE("[Syncer] Get watermark info failed %d", errCode);
1184         return errCode;
1185     } else if (errCode == E_OK) {
1186         devNeedHash = false;
1187     } else {
1188         dev = device;
1189     }
1190     return metadata->GetWaterMarkInfoFromDB(dev, devNeedHash, info);
1191 }
1192 
UpgradeSchemaVerInMeta()1193 int GenericSyncer::UpgradeSchemaVerInMeta()
1194 {
1195     std::shared_ptr<Metadata> metadata = nullptr;
1196     {
1197         std::lock_guard<std::mutex> autoLock(syncerLock_);
1198         metadata = metadata_;
1199     }
1200     if (metadata == nullptr) {
1201         LOGE("[Syncer] metadata is not init");
1202         return -E_NOT_INIT;
1203     }
1204     int errCode = metadata->ClearAllAbilitySyncFinishMark();
1205     if (errCode != E_OK) {
1206         LOGE("[Syncer] clear ability mark failed:%d", errCode);
1207         return errCode;
1208     }
1209     auto [err, localSchemaVer] = metadata->GetLocalSchemaVersion();
1210     if (err != E_OK) {
1211         LOGE("[Syncer] get local schema version failed:%d", err);
1212         return err;
1213     }
1214     errCode = metadata->SetLocalSchemaVersion(localSchemaVer + 1);
1215     if (errCode != E_OK) {
1216         LOGE("[Syncer] increase local schema version failed:%d", errCode);
1217     }
1218     return errCode;
1219 }
1220 
ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> & metadata,ISyncInterface & storage)1221 void GenericSyncer::ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> &metadata, ISyncInterface &storage)
1222 {
1223     if (syncEngine_ != nullptr) {
1224         syncEngine_->TimeChange();
1225     }
1226     int errCode = metadata->ClearAllTimeSyncFinishMark();
1227     if (errCode != E_OK) {
1228         LOGW("[GenericSyncer] %s clear time sync finish mark failed %d", label_.c_str(), errCode);
1229     } else {
1230         LOGD("[GenericSyncer] ClearAllTimeSyncFinishMark finish");
1231         RuntimeContext::GetInstance()->ResetDBTimeChangeStatus(storage.GetIdentifier());
1232     }
1233 }
1234 
ResetSyncStatus()1235 void GenericSyncer::ResetSyncStatus()
1236 {
1237     std::shared_ptr<Metadata> metadata = nullptr;
1238     {
1239         std::lock_guard<std::mutex> lock(syncerLock_);
1240         if (metadata_ == nullptr) {
1241             return;
1242         }
1243         metadata = metadata_;
1244     }
1245     metadata->ClearAllAbilitySyncFinishMark();
1246 }
1247 
GetLocalTimeOffset()1248 int64_t GenericSyncer::GetLocalTimeOffset()
1249 {
1250     std::shared_ptr<Metadata> metadata = nullptr;
1251     {
1252         std::lock_guard<std::mutex> lock(syncerLock_);
1253         if (metadata_ == nullptr) {
1254             return 0;
1255         }
1256         metadata = metadata_;
1257     }
1258     return metadata->GetLocalTimeOffset();
1259 }
1260 
GetTaskCount()1261 int32_t GenericSyncer::GetTaskCount()
1262 {
1263     int32_t count = 0;
1264     {
1265         std::lock_guard<std::mutex> autoLock(operationMapLock_);
1266         count += static_cast<int32_t>(syncOperationMap_.size());
1267     }
1268     ISyncEngine *syncEngine = nullptr;
1269     {
1270         std::lock_guard<std::mutex> lock(syncerLock_);
1271         if (syncEngine_ == nullptr) {
1272             return count;
1273         }
1274         syncEngine = syncEngine_;
1275         RefObject::IncObjRef(syncEngine);
1276     }
1277     count += syncEngine->GetResponseTaskCount();
1278     RefObject::DecObjRef(syncEngine);
1279     return count;
1280 }
1281 } // namespace DistributedDB
1282