• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "generic_syncer.h"
17 
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "ref_object.h"
22 #include "sqlite_single_ver_natural_store.h"
23 #include "time_sync.h"
24 #include "single_ver_data_sync.h"
25 #ifndef OMIT_MULTI_VER
26 #include "commit_history_sync.h"
27 #include "multi_ver_data_sync.h"
28 #include "value_slice_sync.h"
29 #endif
30 #include "device_manager.h"
31 #include "db_constant.h"
32 #include "ability_sync.h"
33 #include "single_ver_serialize_manager.h"
34 
35 namespace DistributedDB {
36 const int GenericSyncer::MIN_VALID_SYNC_ID = 1;
37 std::mutex GenericSyncer::moduleInitLock_;
38 int GenericSyncer::currentSyncId_ = 0;
39 std::mutex GenericSyncer::syncIdLock_;
GenericSyncer()40 GenericSyncer::GenericSyncer()
41     : syncEngine_(nullptr),
42       syncInterface_(nullptr),
43       timeHelper_(nullptr),
44       metadata_(nullptr),
45       initialized_(false),
46       queuedManualSyncSize_(0),
47       queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
48       manualSyncEnable_(true),
49       closing_(false),
50       engineFinalize_(false),
51       timeChangedListener_(nullptr)
52 {
53 }
54 
~GenericSyncer()55 GenericSyncer::~GenericSyncer()
56 {
57     LOGD("[GenericSyncer] ~GenericSyncer!");
58     if (syncEngine_ != nullptr) {
59         syncEngine_->OnKill([this]() { this->syncEngine_->Close(); });
60         RefObject::KillAndDecObjRef(syncEngine_);
61         // waiting all thread exist
62         std::unique_lock<std::mutex> cvLock(engineMutex_);
63         bool engineFinalize = engineFinalizeCv_.wait_for(cvLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
64             [this]() { return engineFinalize_; });
65         if (!engineFinalize) {
66             LOGW("syncer finalize before engine finalize!");
67         }
68         syncEngine_ = nullptr;
69     }
70     if (timeChangedListener_ != nullptr) {
71         timeChangedListener_->Drop(true);
72         timeChangedListener_ = nullptr;
73         RuntimeContext::GetInstance()->StopTimeTickMonitorIfNeed();
74     }
75     timeHelper_ = nullptr;
76     metadata_ = nullptr;
77     syncInterface_ = nullptr;
78 }
79 
Initialize(ISyncInterface * syncInterface,bool isNeedActive)80 int GenericSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
81 {
82     if (syncInterface == nullptr) {
83         LOGE("[Syncer] Init failed, the syncInterface is null!");
84         return -E_INVALID_ARGS;
85     }
86 
87     {
88         std::lock_guard<std::mutex> lock(syncerLock_);
89         if (initialized_) {
90             return E_OK;
91         }
92         if (closing_) {
93             LOGE("[Syncer] Syncer is closing, return!");
94             return -E_BUSY;
95         }
96         std::vector<uint8_t> label = syncInterface->GetIdentifier();
97         label.resize(3); // only show 3 Bytes enough
98         label_ = DBCommon::VectorToHexString(label);
99 
100         // As metadata_ will be used in EraseDeviceWaterMark, it should not be clear even if engine init failed.
101         // It will be clear in destructor.
102         int errCodeMetadata = InitMetaData(syncInterface);
103 
104         // As timeHelper_ will be used in GetTimestamp, it should not be clear even if engine init failed.
105         // It will be clear in destructor.
106         int errCodeTimeHelper = InitTimeHelper(syncInterface);
107 
108         if (!IsNeedActive(syncInterface)) {
109             return -E_NO_NEED_ACTIVE;
110         }
111         // As timeChangedListener_ will record time change, it should not be clear even if engine init failed.
112         // It will be clear in destructor.
113         int errCodeTimeChangedListener = InitTimeChangedListener();
114         if (errCodeMetadata != E_OK || errCodeTimeHelper != E_OK || errCodeTimeChangedListener != E_OK) {
115             return -E_INTERNAL_ERROR;
116         }
117         int errCode = CheckSyncActive(syncInterface, isNeedActive);
118         if (errCode != E_OK) {
119             return errCode;
120         }
121 
122         if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
123             LOGW("[Syncer] Communicator component not ready!");
124             return -E_NOT_INIT;
125         }
126 
127         errCode = SyncModuleInit();
128         if (errCode != E_OK) {
129             LOGE("[Syncer] Sync ModuleInit ERR!");
130             return -E_INTERNAL_ERROR;
131         }
132 
133         errCode = InitSyncEngine(syncInterface);
134         if (errCode != E_OK) {
135             return errCode;
136         }
137         syncEngine_->SetEqualIdentifier();
138         initialized_ = true;
139     }
140 
141     // RegConnectCallback may start an auto sync, this function can not in syncerLock_
142     syncEngine_->RegConnectCallback();
143     return E_OK;
144 }
145 
Close(bool isClosedOperation)146 int GenericSyncer::Close(bool isClosedOperation)
147 {
148     {
149         std::lock_guard<std::mutex> lock(syncerLock_);
150         if (!initialized_) {
151             if (isClosedOperation) {
152                 timeHelper_ = nullptr;
153                 metadata_ = nullptr;
154             }
155             LOGW("[Syncer] Syncer[%s] don't need to close, because it has not been init", label_.c_str());
156             return -E_NOT_INIT;
157         }
158         initialized_ = false;
159         if (closing_) {
160             LOGE("[Syncer] Syncer is closing, return!");
161             return -E_BUSY;
162         }
163         closing_ = true;
164     }
165     ClearSyncOperations(isClosedOperation);
166     if (syncEngine_ != nullptr) {
167         syncEngine_->Close();
168         LOGD("[Syncer] Close SyncEngine!");
169         std::lock_guard<std::mutex> lock(syncerLock_);
170         closing_ = false;
171     }
172     if (isClosedOperation) {
173         timeHelper_ = nullptr;
174         metadata_ = nullptr;
175     }
176     return E_OK;
177 }
178 
Sync(const std::vector<std::string> & devices,int mode,const std::function<void (const std::map<std::string,int> &)> & onComplete,const std::function<void (void)> & onFinalize,bool wait=false)179 int GenericSyncer::Sync(const std::vector<std::string> &devices, int mode,
180     const std::function<void(const std::map<std::string, int> &)> &onComplete,
181     const std::function<void(void)> &onFinalize, bool wait = false)
182 {
183     SyncParma param;
184     param.devices = devices;
185     param.mode = mode;
186     param.onComplete = onComplete;
187     param.onFinalize = onFinalize;
188     param.wait = wait;
189     return Sync(param);
190 }
191 
Sync(const InternalSyncParma & param)192 int GenericSyncer::Sync(const InternalSyncParma &param)
193 {
194     SyncParma syncParam;
195     syncParam.devices = param.devices;
196     syncParam.mode = param.mode;
197     syncParam.isQuerySync = param.isQuerySync;
198     syncParam.syncQuery = param.syncQuery;
199     return Sync(syncParam);
200 }
201 
Sync(const SyncParma & param)202 int GenericSyncer::Sync(const SyncParma &param)
203 {
204     return Sync(param, DBConstant::IGNORE_CONNECTION_ID);
205 }
206 
Sync(const SyncParma & param,uint64_t connectionId)207 int GenericSyncer::Sync(const SyncParma &param, uint64_t connectionId)
208 {
209     int errCode = SyncParamCheck(param);
210     if (errCode != E_OK) {
211         return errCode;
212     }
213     errCode = AddQueuedManualSyncSize(param.mode, param.wait);
214     if (errCode != E_OK) {
215         return errCode;
216     }
217 
218     uint32_t syncId = GenerateSyncId();
219     errCode = PrepareSync(param, syncId, connectionId);
220     if (errCode != E_OK) {
221         LOGE("[Syncer] PrepareSync failed when sync called, err %d", errCode);
222         return errCode;
223     }
224     PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
225     return E_OK;
226 }
227 
PrepareSync(const SyncParma & param,uint32_t syncId,uint64_t connectionId)228 int GenericSyncer::PrepareSync(const SyncParma &param, uint32_t syncId, uint64_t connectionId)
229 {
230     auto *operation =
231         new (std::nothrow) SyncOperation(syncId, param.devices, param.mode, param.onComplete, param.wait);
232     if (operation == nullptr) {
233         SubQueuedSyncSize();
234         return -E_OUT_OF_MEMORY;
235     }
236     operation->SetIdentifier(syncInterface_->GetIdentifier());
237     {
238         std::lock_guard<std::mutex> autoLock(syncerLock_);
239         PerformanceAnalysis::GetInstance()->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
240         InitSyncOperation(operation, param);
241         LOGI("[Syncer] GenerateSyncId %" PRIu32 ", mode = %d, wait = %d, label = %s, devices = %s", syncId, param.mode,
242             param.wait, label_.c_str(), GetSyncDevicesStr(param.devices).c_str());
243         AddSyncOperation(operation);
244         PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
245     }
246     if (!param.wait && connectionId != DBConstant::IGNORE_CONNECTION_ID) {
247         std::lock_guard<std::mutex> lockGuard(syncIdLock_);
248         connectionIdMap_[connectionId].push_back(static_cast<int>(syncId));
249         syncIdMap_[static_cast<int>(syncId)] = connectionId;
250     }
251     if (operation->CheckIsAllFinished()) {
252         operation->Finished();
253         RefObject::KillAndDecObjRef(operation);
254     } else {
255         operation->WaitIfNeed();
256         RefObject::DecObjRef(operation);
257     }
258     return E_OK;
259 }
260 
RemoveSyncOperation(int syncId)261 int GenericSyncer::RemoveSyncOperation(int syncId)
262 {
263     SyncOperation *operation = nullptr;
264     std::unique_lock<std::mutex> lock(operationMapLock_);
265     auto iter = syncOperationMap_.find(syncId);
266     if (iter != syncOperationMap_.end()) {
267         LOGD("[Syncer] RemoveSyncOperation id:%d.", syncId);
268         operation = iter->second;
269         syncOperationMap_.erase(syncId);
270         lock.unlock();
271         if ((!operation->IsAutoSync()) && (!operation->IsBlockSync()) && (!operation->IsAutoControlCmd())) {
272             SubQueuedSyncSize();
273         }
274         operation->NotifyIfNeed();
275         RefObject::KillAndDecObjRef(operation);
276         operation = nullptr;
277         std::lock_guard<std::mutex> lockGuard(syncIdLock_);
278         if (syncIdMap_.find(syncId) == syncIdMap_.end()) {
279             return E_OK;
280         }
281         uint64_t connectionId = syncIdMap_[syncId];
282         if (connectionIdMap_.find(connectionId) != connectionIdMap_.end()) {
283             connectionIdMap_[connectionId].remove(syncId);
284         }
285         syncIdMap_.erase(syncId);
286         return E_OK;
287     }
288     return -E_INVALID_ARGS;
289 }
290 
StopSync(uint64_t connectionId)291 int GenericSyncer::StopSync(uint64_t connectionId)
292 {
293     std::list<int> syncIdList;
294     {
295         std::lock_guard<std::mutex> lockGuard(syncIdLock_);
296         if (connectionIdMap_.find(connectionId) == connectionIdMap_.end()) {
297             return E_OK;
298         }
299         syncIdList = connectionIdMap_[connectionId];
300         connectionIdMap_.erase(connectionId);
301     }
302     for (auto syncId : syncIdList) {
303         RemoveSyncOperation(syncId);
304         if (syncEngine_ != nullptr) {
305             syncEngine_->AbortMachineIfNeed(syncId);
306         }
307     }
308     if (syncEngine_ != nullptr) {
309         syncEngine_->NotifyConnectionClosed(connectionId);
310     }
311     return E_OK;
312 }
313 
GetTimestamp()314 uint64_t GenericSyncer::GetTimestamp()
315 {
316     if (timeHelper_ == nullptr) {
317         return TimeHelper::GetSysCurrentTime();
318     }
319     return timeHelper_->GetTime();
320 }
321 
QueryAutoSync(const InternalSyncParma & param)322 void GenericSyncer::QueryAutoSync(const InternalSyncParma &param)
323 {
324     (void)param;
325 }
326 
AddSyncOperation(SyncOperation * operation)327 void GenericSyncer::AddSyncOperation(SyncOperation *operation)
328 {
329     if (operation == nullptr) {
330         return;
331     }
332 
333     LOGD("[Syncer] AddSyncOperation.");
334     syncEngine_->AddSyncOperation(operation);
335 
336     if (operation->CheckIsAllFinished()) {
337         return;
338     }
339 
340     std::lock_guard<std::mutex> lock(operationMapLock_);
341     syncOperationMap_.insert(std::pair<int, SyncOperation *>(operation->GetSyncId(), operation));
342     // To make sure operation alive before WaitIfNeed out
343     RefObject::IncObjRef(operation);
344 }
345 
SyncOperationKillCallbackInner(int syncId)346 void GenericSyncer::SyncOperationKillCallbackInner(int syncId)
347 {
348     if (syncEngine_ != nullptr) {
349         LOGI("[Syncer] Operation on kill id = %d", syncId);
350         syncEngine_->RemoveSyncOperation(syncId);
351     }
352 }
353 
SyncOperationKillCallback(int syncId)354 void GenericSyncer::SyncOperationKillCallback(int syncId)
355 {
356     SyncOperationKillCallbackInner(syncId);
357 }
358 
InitMetaData(ISyncInterface * syncInterface)359 int GenericSyncer::InitMetaData(ISyncInterface *syncInterface)
360 {
361     if (metadata_ != nullptr) {
362         return E_OK;
363     }
364 
365     metadata_ = std::make_shared<Metadata>();
366     if (metadata_ == nullptr) {
367         LOGE("[Syncer] metadata make shared failed");
368         return -E_OUT_OF_MEMORY;
369     }
370     int errCode = metadata_->Initialize(syncInterface);
371     if (errCode != E_OK) {
372         LOGE("[Syncer] metadata Initializeate failed! err %d.", errCode);
373         metadata_ = nullptr;
374     }
375     syncInterface_ = syncInterface;
376     return errCode;
377 }
378 
InitTimeHelper(ISyncInterface * syncInterface)379 int GenericSyncer::InitTimeHelper(ISyncInterface *syncInterface)
380 {
381     if (timeHelper_ != nullptr) {
382         return E_OK;
383     }
384 
385     timeHelper_ = std::make_shared<TimeHelper>();
386     if (timeHelper_ == nullptr) {
387         return -E_OUT_OF_MEMORY;
388     }
389     int errCode = timeHelper_->Initialize(syncInterface, metadata_);
390     if (errCode != E_OK) {
391         LOGE("[Syncer] TimeHelper init failed! err:%d.", errCode);
392         timeHelper_ = nullptr;
393     }
394     return errCode;
395 }
396 
InitSyncEngine(ISyncInterface * syncInterface)397 int GenericSyncer::InitSyncEngine(ISyncInterface *syncInterface)
398 {
399     if (syncEngine_ != nullptr && syncEngine_->IsEngineActive()) {
400         LOGI("[Syncer] syncEngine is active");
401         return E_OK;
402     }
403     int errCode = BuildSyncEngine();
404     if (errCode != E_OK) {
405         return errCode;
406     }
407     const std::function<void(std::string)> onlineFunc = std::bind(&GenericSyncer::RemoteDataChanged,
408         this, std::placeholders::_1);
409     const std::function<void(std::string)> offlineFunc = std::bind(&GenericSyncer::RemoteDeviceOffline,
410         this, std::placeholders::_1);
411     const std::function<void(const InternalSyncParma &param)> queryAutoSyncFunc =
412         std::bind(&GenericSyncer::QueryAutoSync, this, std::placeholders::_1);
413     errCode = syncEngine_->Initialize(syncInterface, metadata_, onlineFunc, offlineFunc, queryAutoSyncFunc);
414     if (errCode == E_OK) {
415         syncInterface->IncRefCount();
416         label_ = syncEngine_->GetLabel();
417         return E_OK;
418     } else {
419         LOGE("[Syncer] SyncEngine init failed! err:%d.", errCode);
420         RefObject::KillAndDecObjRef(syncEngine_);
421         syncEngine_ = nullptr;
422         return errCode;
423     }
424 }
425 
CheckSyncActive(ISyncInterface * syncInterface,bool isNeedActive)426 int GenericSyncer::CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive)
427 {
428     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE,
429         false);
430     if (!isSyncDualTupleMode || isNeedActive) {
431         return E_OK;
432     }
433     LOGI("[Syncer] syncer no need to active");
434     int errCode = BuildSyncEngine();
435     if (errCode != E_OK) {
436         return errCode;
437     }
438     return -E_NO_NEED_ACTIVE;
439 }
440 
GenerateSyncId()441 uint32_t GenericSyncer::GenerateSyncId()
442 {
443     std::lock_guard<std::mutex> lock(syncIdLock_);
444     currentSyncId_++;
445     // if overflow, reset to 1
446     if (currentSyncId_ <= 0) {
447         currentSyncId_ = MIN_VALID_SYNC_ID;
448     }
449     return currentSyncId_;
450 }
451 
IsValidMode(int mode) const452 bool GenericSyncer::IsValidMode(int mode) const
453 {
454     if ((mode >= SyncModeType::INVALID_MODE) || (mode < SyncModeType::PUSH)) {
455         LOGE("[Syncer] Sync mode is not valid!");
456         return false;
457     }
458     return true;
459 }
460 
SyncConditionCheck(QuerySyncObject & query,int mode,bool isQuerySync,const std::vector<std::string> & devices) const461 int GenericSyncer::SyncConditionCheck(QuerySyncObject &query, int mode, bool isQuerySync,
462     const std::vector<std::string> &devices) const
463 {
464     (void)query;
465     (void)mode;
466     (void)isQuerySync;
467     (void)(devices);
468     return E_OK;
469 }
470 
IsValidDevices(const std::vector<std::string> & devices) const471 bool GenericSyncer::IsValidDevices(const std::vector<std::string> &devices) const
472 {
473     if (devices.empty()) {
474         LOGE("[Syncer] devices is empty!");
475         return false;
476     }
477     return true;
478 }
479 
ClearSyncOperations(bool isClosedOperation)480 void GenericSyncer::ClearSyncOperations(bool isClosedOperation)
481 {
482     std::vector<SyncOperation *> syncOperation;
483     {
484         std::lock_guard<std::mutex> lock(operationMapLock_);
485         for (auto &item : syncOperationMap_) {
486             bool isBlockSync = item.second->IsBlockSync();
487             if (isBlockSync || !isClosedOperation) {
488                 int status = (!isClosedOperation) ? SyncOperation::OP_USER_CHANGED : SyncOperation::OP_FAILED;
489                 item.second->SetUnfinishedDevStatus(status);
490                 RefObject::IncObjRef(item.second);
491                 syncOperation.push_back(item.second);
492             }
493         }
494     }
495 
496     if (!isClosedOperation) { // means user changed
497         syncEngine_->NotifyUserChange();
498     }
499 
500     for (auto &operation : syncOperation) {
501         // block sync operation or userChange will trigger remove sync operation
502         // caller won't blocked for block sync
503         // caller won't blocked for userChange operation no mater it is block or non-block sync
504         TriggerSyncFinished(operation);
505         RefObject::DecObjRef(operation);
506     }
507     {
508         std::lock_guard<std::mutex> lock(operationMapLock_);
509         for (auto &iter : syncOperationMap_) {
510             RefObject::KillAndDecObjRef(iter.second);
511             iter.second = nullptr;
512         }
513         syncOperationMap_.clear();
514     }
515     {
516         std::lock_guard<std::mutex> lock(syncIdLock_);
517         connectionIdMap_.clear();
518         syncIdMap_.clear();
519     }
520 }
521 
TriggerSyncFinished(SyncOperation * operation)522 void GenericSyncer::TriggerSyncFinished(SyncOperation *operation)
523 {
524     if (operation != nullptr && operation->CheckIsAllFinished()) {
525         operation->Finished();
526     }
527 }
528 
OnSyncFinished(int syncId)529 void GenericSyncer::OnSyncFinished(int syncId)
530 {
531     (void)(RemoveSyncOperation(syncId));
532 }
533 
SyncModuleInit()534 int GenericSyncer::SyncModuleInit()
535 {
536     static bool isInit = false;
537     std::lock_guard<std::mutex> lock(moduleInitLock_);
538     if (!isInit) {
539         int errCode = SyncResourceInit();
540         if (errCode != E_OK) {
541             return errCode;
542         }
543         isInit = true;
544         return E_OK;
545     }
546     return E_OK;
547 }
548 
SyncResourceInit()549 int GenericSyncer::SyncResourceInit()
550 {
551     int errCode = TimeSync::RegisterTransformFunc();
552     if (errCode != E_OK) {
553         LOGE("Register timesync message transform func ERR!");
554         return errCode;
555     }
556     errCode = SingleVerSerializeManager::RegisterTransformFunc();
557     if (errCode != E_OK) {
558         LOGE("Register SingleVerDataSync message transform func ERR!");
559         return errCode;
560     }
561 #ifndef OMIT_MULTI_VER
562     errCode = CommitHistorySync::RegisterTransformFunc();
563     if (errCode != E_OK) {
564         LOGE("Register CommitHistorySync message transform func ERR!");
565         return errCode;
566     }
567     errCode = MultiVerDataSync::RegisterTransformFunc();
568     if (errCode != E_OK) {
569         LOGE("Register MultiVerDataSync message transform func ERR!");
570         return errCode;
571     }
572     errCode = ValueSliceSync::RegisterTransformFunc();
573     if (errCode != E_OK) {
574         LOGE("Register ValueSliceSync message transform func ERR!");
575         return errCode;
576     }
577 #endif
578     errCode = DeviceManager::RegisterTransformFunc();
579     if (errCode != E_OK) {
580         LOGE("Register DeviceManager message transform func ERR!");
581         return errCode;
582     }
583     errCode = AbilitySync::RegisterTransformFunc();
584     if (errCode != E_OK) {
585         LOGE("Register AbilitySync message transform func ERR!");
586         return errCode;
587     }
588     return E_OK;
589 }
590 
GetQueuedSyncSize(int * queuedSyncSize) const591 int GenericSyncer::GetQueuedSyncSize(int *queuedSyncSize) const
592 {
593     if (queuedSyncSize == nullptr) {
594         return -E_INVALID_ARGS;
595     }
596     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
597     *queuedSyncSize = queuedManualSyncSize_;
598     LOGI("[GenericSyncer] GetQueuedSyncSize:%d", queuedManualSyncSize_);
599     return E_OK;
600 }
601 
SetQueuedSyncLimit(const int * queuedSyncLimit)602 int GenericSyncer::SetQueuedSyncLimit(const int *queuedSyncLimit)
603 {
604     if (queuedSyncLimit == nullptr) {
605         return -E_INVALID_ARGS;
606     }
607     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
608     queuedManualSyncLimit_ = *queuedSyncLimit;
609     LOGI("[GenericSyncer] SetQueuedSyncLimit:%d", queuedManualSyncLimit_);
610     return E_OK;
611 }
612 
GetQueuedSyncLimit(int * queuedSyncLimit) const613 int GenericSyncer::GetQueuedSyncLimit(int *queuedSyncLimit) const
614 {
615     if (queuedSyncLimit == nullptr) {
616         return -E_INVALID_ARGS;
617     }
618     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
619     *queuedSyncLimit = queuedManualSyncLimit_;
620     LOGI("[GenericSyncer] GetQueuedSyncLimit:%d", queuedManualSyncLimit_);
621     return E_OK;
622 }
623 
IsManualSync(int inMode) const624 bool GenericSyncer::IsManualSync(int inMode) const
625 {
626     int mode = SyncOperation::TransferSyncMode(inMode);
627     if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH) || (mode == SyncModeType::PUSH_AND_PULL) ||
628         (mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
629         return true;
630     }
631     return false;
632 }
633 
AddQueuedManualSyncSize(int mode,bool wait)634 int GenericSyncer::AddQueuedManualSyncSize(int mode, bool wait)
635 {
636     if (IsManualSync(mode) && (!wait)) {
637         std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
638         if (!manualSyncEnable_) {
639             LOGI("[GenericSyncer] manualSyncEnable is Disable");
640             return -E_BUSY;
641         }
642         queuedManualSyncSize_++;
643     }
644     return E_OK;
645 }
646 
IsQueuedManualSyncFull(int mode,bool wait) const647 bool GenericSyncer::IsQueuedManualSyncFull(int mode, bool wait) const
648 {
649     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
650     if (IsManualSync(mode) && (!manualSyncEnable_)) {
651         LOGI("[GenericSyncer] manualSyncEnable_:false");
652         return true;
653     }
654     if (IsManualSync(mode) && (!wait)) {
655         if (queuedManualSyncSize_ < queuedManualSyncLimit_) {
656             return false;
657         } else {
658             LOGD("[GenericSyncer] queuedManualSyncSize_:%d < queuedManualSyncLimit_:%d", queuedManualSyncSize_,
659                 queuedManualSyncLimit_);
660             return true;
661         }
662     } else {
663         return false;
664     }
665 }
666 
SubQueuedSyncSize(void)667 void GenericSyncer::SubQueuedSyncSize(void)
668 {
669     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
670     queuedManualSyncSize_--;
671     if (queuedManualSyncSize_ < 0) {
672         LOGE("[GenericSyncer] queuedManualSyncSize_ < 0!");
673         queuedManualSyncSize_ = 0;
674     }
675 }
676 
DisableManualSync(void)677 int GenericSyncer::DisableManualSync(void)
678 {
679     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
680     if (queuedManualSyncSize_ > 0) {
681         LOGD("[GenericSyncer] DisableManualSync fail, queuedManualSyncSize_:%d", queuedManualSyncSize_);
682         return -E_BUSY;
683     }
684     manualSyncEnable_ = false;
685     LOGD("[GenericSyncer] DisableManualSync ok");
686     return E_OK;
687 }
688 
EnableManualSync(void)689 int GenericSyncer::EnableManualSync(void)
690 {
691     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
692     manualSyncEnable_ = true;
693     LOGD("[GenericSyncer] EnableManualSync ok");
694     return E_OK;
695 }
696 
GetLocalIdentity(std::string & outTarget) const697 int GenericSyncer::GetLocalIdentity(std::string &outTarget) const
698 {
699     std::string deviceId;
700     int errCode =  RuntimeContext::GetInstance()->GetLocalIdentity(deviceId);
701     if (errCode != E_OK) {
702         LOGE("[GenericSyncer] GetLocalIdentity fail errCode:%d", errCode);
703         return errCode;
704     }
705     outTarget = DBCommon::TransferHashString(deviceId);
706     return E_OK;
707 }
708 
GetOnlineDevices(std::vector<std::string> & devices) const709 void GenericSyncer::GetOnlineDevices(std::vector<std::string> &devices) const
710 {
711     // Get devices from AutoLaunch first.
712     if (syncInterface_ == nullptr) {
713         LOGI("[Syncer] GetOnlineDevices syncInterface_ is nullptr");
714         return;
715     }
716     bool isSyncDualTupleMode = syncInterface_->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
717         false);
718     std::string identifier;
719     if (isSyncDualTupleMode) {
720         identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
721     } else {
722         identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
723     }
724     RuntimeContext::GetInstance()->GetAutoLaunchSyncDevices(identifier, devices);
725     if (!devices.empty()) {
726         return;
727     }
728     std::lock_guard<std::mutex> lock(syncerLock_);
729     if (closing_) {
730         LOGE("[Syncer] Syncer is closing, return!");
731         return;
732     }
733     if (syncEngine_ != nullptr) {
734         syncEngine_->GetOnlineDevices(devices);
735     }
736 }
737 
SetSyncRetry(bool isRetry)738 int GenericSyncer::SetSyncRetry(bool isRetry)
739 {
740     if (syncEngine_ == nullptr) {
741         return -E_NOT_INIT;
742     }
743     syncEngine_->SetSyncRetry(isRetry);
744     return E_OK;
745 }
746 
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)747 int GenericSyncer::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
748 {
749     std::lock_guard<std::mutex> lock(syncerLock_);
750     if (syncEngine_ == nullptr) {
751         return -E_NOT_INIT;
752     }
753     int errCode = syncEngine_->SetEqualIdentifier(identifier, targets);
754     if (errCode == E_OK) {
755         syncEngine_->SetEqualIdentifierMap(identifier, targets);
756     }
757     return errCode;
758 }
759 
GetSyncDevicesStr(const std::vector<std::string> & devices) const760 std::string GenericSyncer::GetSyncDevicesStr(const std::vector<std::string> &devices) const
761 {
762     std::string syncDevices;
763     for (const auto &dev:devices) {
764         syncDevices += STR_MASK(dev);
765         syncDevices += ",";
766     }
767     return syncDevices.substr(0, syncDevices.size() - 1);
768 }
769 
StatusCheck() const770 int GenericSyncer::StatusCheck() const
771 {
772     if (!initialized_) {
773         LOGE("[Syncer] Syncer is not initialized, return!");
774         return -E_NOT_INIT;
775     }
776     if (closing_) {
777         LOGE("[Syncer] Syncer is closing, return!");
778         return -E_BUSY;
779     }
780     return E_OK;
781 }
782 
SyncParamCheck(const SyncParma & param) const783 int GenericSyncer::SyncParamCheck(const SyncParma &param) const
784 {
785     std::lock_guard<std::mutex> lock(syncerLock_);
786     int errCode = StatusCheck();
787     if (errCode != E_OK) {
788         return errCode;
789     }
790     if (!IsValidDevices(param.devices) || !IsValidMode(param.mode)) {
791         return -E_INVALID_ARGS;
792     }
793     if (IsQueuedManualSyncFull(param.mode, param.wait)) {
794         LOGE("[Syncer] -E_BUSY");
795         return -E_BUSY;
796     }
797     QuerySyncObject syncQuery = param.syncQuery;
798     return SyncConditionCheck(syncQuery, param.mode, param.isQuerySync, param.devices);
799 }
800 
InitSyncOperation(SyncOperation * operation,const SyncParma & param)801 void GenericSyncer::InitSyncOperation(SyncOperation *operation, const SyncParma &param)
802 {
803     operation->SetIdentifier(syncInterface_->GetIdentifier());
804     operation->Initialize();
805     operation->OnKill(std::bind(&GenericSyncer::SyncOperationKillCallback, this, operation->GetSyncId()));
806     std::function<void(int)> onFinished = std::bind(&GenericSyncer::OnSyncFinished, this, std::placeholders::_1);
807     operation->SetOnSyncFinished(onFinished);
808     operation->SetOnSyncFinalize(param.onFinalize);
809     if (param.isQuerySync) {
810         operation->SetQuery(param.syncQuery);
811     }
812 }
813 
BuildSyncEngine()814 int GenericSyncer::BuildSyncEngine()
815 {
816     if (syncEngine_ != nullptr) {
817         return E_OK;
818     }
819     syncEngine_ = CreateSyncEngine();
820     if (syncEngine_ == nullptr) {
821         return -E_OUT_OF_MEMORY;
822     }
823     syncEngine_->OnLastRef([this]() {
824         LOGD("[Syncer] SyncEngine finalized");
825         {
826             std::lock_guard<std::mutex> cvLock(engineMutex_);
827             engineFinalize_ = true;
828         }
829         engineFinalizeCv_.notify_all();
830     });
831     return E_OK;
832 }
833 
Dump(int fd)834 void GenericSyncer::Dump(int fd)
835 {
836     if (syncEngine_ == nullptr) {
837         return;
838     }
839     syncEngine_->Dump(fd);
840 }
841 
DumpSyncerBasicInfo()842 SyncerBasicInfo GenericSyncer::DumpSyncerBasicInfo()
843 {
844     SyncerBasicInfo baseInfo;
845     if (syncEngine_ == nullptr) {
846         return baseInfo;
847     }
848     RefObject::IncObjRef(syncEngine_);
849     baseInfo.isSyncActive = syncEngine_->IsEngineActive();
850     RefObject::DecObjRef(syncEngine_);
851     return baseInfo;
852 }
853 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)854 int GenericSyncer::RemoteQuery(const std::string &device, const RemoteCondition &condition,
855     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
856 {
857     ISyncEngine *syncEngine = nullptr;
858     {
859         std::lock_guard<std::mutex> lock(syncerLock_);
860         int errCode = StatusCheck();
861         if (errCode != E_OK) {
862             return errCode;
863         }
864         syncEngine = syncEngine_;
865         RefObject::IncObjRef(syncEngine);
866     }
867     if (syncEngine == nullptr) {
868         return -E_NOT_INIT;
869     }
870     int errCode = syncEngine->RemoteQuery(device, condition, timeout, connectionId, result);
871     RefObject::DecObjRef(syncEngine);
872     return errCode;
873 }
874 
InitTimeChangedListener()875 int GenericSyncer::InitTimeChangedListener()
876 {
877     int errCode = E_OK;
878     if (timeChangedListener_ != nullptr) {
879         return errCode;
880     }
881     timeChangedListener_ = RuntimeContext::GetInstance()->RegisterTimeChangedLister(
882         [this](void *changedOffset) {
883             if (changedOffset == nullptr || metadata_ == nullptr || syncInterface_ == nullptr) {
884                 return;
885             }
886             TimeOffset changedTimeOffset = *(reinterpret_cast<TimeOffset *>(changedOffset)) *
887                 static_cast<TimeOffset>(TimeHelper::TO_100_NS);
888             TimeOffset orgOffset = this->metadata_->GetLocalTimeOffset() - changedTimeOffset;
889             Timestamp currentSysTime = TimeHelper::GetSysCurrentTime();
890             Timestamp maxItemTime = 0;
891             this->syncInterface_->GetMaxTimestamp(maxItemTime);
892             if (static_cast<Timestamp>(orgOffset + currentSysTime) > TimeHelper::BUFFER_VALID_TIME) {
893                 orgOffset = static_cast<Timestamp>(TimeHelper::BUFFER_VALID_TIME) -
894                     currentSysTime + TimeHelper::MS_TO_100_NS;
895             }
896             if (static_cast<Timestamp>(currentSysTime + orgOffset) <= maxItemTime) {
897                 orgOffset = static_cast<TimeOffset>(maxItemTime - currentSysTime + TimeHelper::MS_TO_100_NS); // 1ms
898             }
899             this->metadata_->SaveLocalTimeOffset(orgOffset);
900         }, errCode);
901     if (timeChangedListener_ == nullptr) {
902         LOGE("[GenericSyncer] Init RegisterTimeChangedLister failed");
903         return errCode;
904     }
905     return E_OK;
906 }
907 
IsNeedActive(ISyncInterface * syncInterface)908 bool GenericSyncer::IsNeedActive(ISyncInterface *syncInterface)
909 {
910     bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
911     if (localOnly) {
912         LOGD("[Syncer] Local only db, don't need active syncer");
913         return false;
914     }
915     return true;
916 }
917 
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)918 int GenericSyncer::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
919 {
920     return -E_NOT_SUPPORT;
921 }
922 } // namespace DistributedDB
923