1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "generic_syncer.h"
17
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "ref_object.h"
22 #include "sqlite_single_ver_natural_store.h"
23 #include "time_sync.h"
24 #include "single_ver_data_sync.h"
25 #ifndef OMIT_MULTI_VER
26 #include "commit_history_sync.h"
27 #include "multi_ver_data_sync.h"
28 #include "value_slice_sync.h"
29 #endif
30 #include "device_manager.h"
31 #include "db_constant.h"
32 #include "ability_sync.h"
33 #include "generic_single_ver_kv_entry.h"
34 #include "single_ver_serialize_manager.h"
35
36 namespace DistributedDB {
37 namespace {
38 constexpr uint32_t DEFAULT_MTU_SIZE = 1024u * 1024u; // 1M
39 }
40 const int GenericSyncer::MIN_VALID_SYNC_ID = 1;
41 std::mutex GenericSyncer::moduleInitLock_;
42 int GenericSyncer::currentSyncId_ = 0;
43 std::mutex GenericSyncer::syncIdLock_;
GenericSyncer()44 GenericSyncer::GenericSyncer()
45 : syncEngine_(nullptr),
46 syncInterface_(nullptr),
47 timeHelper_(nullptr),
48 metadata_(nullptr),
49 initialized_(false),
50 queuedManualSyncSize_(0),
51 queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
52 manualSyncEnable_(true),
53 closing_(false),
54 engineFinalize_(false),
55 timeChangeListenerFinalize_(true),
56 timeChangedListener_(nullptr)
57 {
58 }
59
~GenericSyncer()60 GenericSyncer::~GenericSyncer()
61 {
62 LOGD("[GenericSyncer] ~GenericSyncer!");
63 if (syncEngine_ != nullptr) {
64 syncEngine_->OnKill([this]() { this->syncEngine_->Close(); });
65 RefObject::KillAndDecObjRef(syncEngine_);
66 // waiting all thread exist
67 std::unique_lock<std::mutex> cvLock(engineMutex_);
68 bool engineFinalize = engineFinalizeCv_.wait_for(cvLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
69 [this]() { return engineFinalize_; });
70 if (!engineFinalize) {
71 LOGW("syncer finalize before engine finalize!");
72 }
73 syncEngine_ = nullptr;
74 }
75 ReleaseInnerResource();
76 std::lock_guard<std::mutex> lock(syncerLock_);
77 syncInterface_ = nullptr;
78 }
79
Initialize(ISyncInterface * syncInterface,bool isNeedActive)80 int GenericSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
81 {
82 if (syncInterface == nullptr) {
83 LOGE("[Syncer] Init failed, the syncInterface is null!");
84 return -E_INVALID_ARGS;
85 }
86
87 {
88 std::lock_guard<std::mutex> lock(syncerLock_);
89 if (initialized_) {
90 return E_OK;
91 }
92 if (closing_) {
93 LOGE("[Syncer] Syncer is closing, return!");
94 return -E_BUSY;
95 }
96 std::vector<uint8_t> label = syncInterface->GetIdentifier();
97 label_ = DBCommon::StringMasking(DBCommon::VectorToHexString(label));
98
99 int errCode = InitStorageResource(syncInterface);
100 if (errCode != E_OK) {
101 return errCode;
102 }
103 // As timeChangedListener_ will record time change, it should not be clear even if engine init failed.
104 // It will be clear in destructor.
105 int errCodeTimeChangedListener = InitTimeChangedListener();
106 if (errCodeTimeChangedListener != E_OK) {
107 return -E_INTERNAL_ERROR;
108 }
109 errCode = CheckSyncActive(syncInterface, isNeedActive);
110 if (errCode != E_OK) {
111 return errCode;
112 }
113
114 if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
115 return -E_NOT_INIT;
116 }
117
118 errCode = SyncModuleInit();
119 if (errCode != E_OK) {
120 LOGE("[Syncer] Sync ModuleInit ERR!");
121 return -E_INTERNAL_ERROR;
122 }
123
124 errCode = InitSyncEngine(syncInterface);
125 if (errCode != E_OK) {
126 return errCode;
127 }
128 syncEngine_->SetEqualIdentifier();
129 initialized_ = true;
130 }
131
132 // StartCommunicator may start an auto sync, this function can not in syncerLock_
133 syncEngine_->StartCommunicator();
134 if (RuntimeContext::GetInstance()->CheckDBTimeChange(syncInterface_->GetIdentifier())) {
135 ResetTimeSyncMarkByTimeChange(metadata_, *syncInterface_);
136 }
137 return E_OK;
138 }
139
Close(bool isClosedOperation)140 int GenericSyncer::Close(bool isClosedOperation)
141 {
142 int errCode = CloseInner(isClosedOperation);
143 if (errCode != -E_BUSY && isClosedOperation) {
144 ReleaseInnerResource();
145 }
146 return errCode;
147 }
148
Sync(const std::vector<std::string> & devices,int mode,const std::function<void (const std::map<std::string,int> &)> & onComplete,const std::function<void (void)> & onFinalize,bool wait=false)149 int GenericSyncer::Sync(const std::vector<std::string> &devices, int mode,
150 const std::function<void(const std::map<std::string, int> &)> &onComplete,
151 const std::function<void(void)> &onFinalize, bool wait = false)
152 {
153 SyncParma param;
154 param.devices = devices;
155 param.mode = mode;
156 param.onComplete = onComplete;
157 param.onFinalize = onFinalize;
158 param.wait = wait;
159 return Sync(param);
160 }
161
Sync(const InternalSyncParma & param)162 int GenericSyncer::Sync(const InternalSyncParma ¶m)
163 {
164 SyncParma syncParam;
165 syncParam.devices = param.devices;
166 syncParam.mode = param.mode;
167 syncParam.isQuerySync = param.isQuerySync;
168 syncParam.syncQuery = param.syncQuery;
169 return Sync(syncParam);
170 }
171
Sync(const SyncParma & param)172 int GenericSyncer::Sync(const SyncParma ¶m)
173 {
174 return Sync(param, DBConstant::IGNORE_CONNECTION_ID);
175 }
176
Sync(const SyncParma & param,uint64_t connectionId)177 int GenericSyncer::Sync(const SyncParma ¶m, uint64_t connectionId)
178 {
179 int errCode = SyncPreCheck(param);
180 if (errCode != E_OK) {
181 return errCode;
182 }
183 errCode = AddQueuedManualSyncSize(param.mode, param.wait);
184 if (errCode != E_OK) {
185 return errCode;
186 }
187
188 uint32_t syncId = GenerateSyncId();
189 errCode = PrepareSync(param, syncId, connectionId);
190 if (errCode != E_OK) {
191 LOGE("[Syncer] PrepareSync failed when sync called, err %d", errCode);
192 return errCode;
193 }
194 PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
195 return E_OK;
196 }
197
CancelSync(uint32_t syncId)198 int GenericSyncer::CancelSync(uint32_t syncId)
199 {
200 LOGI("[Syncer] Start cancelSync %" PRIu32 "", syncId);
201 ISyncEngine *engine = nullptr;
202 {
203 std::lock_guard<std::mutex> autoLock(syncerLock_);
204 engine = syncEngine_;
205 RefObject::IncObjRef(engine);
206 }
207 SyncOperation *operation = nullptr;
208 {
209 std::lock_guard<std::mutex> lock(operationMapLock_);
210 auto iter = syncOperationMap_.find(syncId);
211 if (iter == syncOperationMap_.end()) {
212 LOGE("[Syncer] Non-existent syncId %" PRIu32 "", syncId);
213 RefObject::DecObjRef(engine);
214 return -E_NOT_SUPPORT;
215 } else if (iter->second->CanCancel() == false) {
216 RefObject::DecObjRef(engine);
217 LOGE("[Syncer] Can't cancel syncId %" PRIu32 " %" PRIu32 "", syncId, iter->second->CanCancel());
218 return -E_NOT_SUPPORT;
219 }
220 operation = iter->second;
221 RefObject::IncObjRef(operation);
222 }
223
224 const std::vector<std::string> &devices = operation->GetDevices();
225 for (auto &deviceId : devices) {
226 engine->ClearAllSyncTaskByDevice(deviceId);
227 }
228 LOGI("[Syncer] End cancelSync %" PRIu32 ", devices = %s", syncId, GetSyncDevicesStr(devices).c_str());
229
230 RefObject::DecObjRef(operation);
231 RefObject::DecObjRef(engine);
232 return E_OK;
233 }
234
PrepareSync(const SyncParma & param,uint32_t syncId,uint64_t connectionId)235 int GenericSyncer::PrepareSync(const SyncParma ¶m, uint32_t syncId, uint64_t connectionId)
236 {
237 auto *operation =
238 new (std::nothrow) SyncOperation(syncId, param.devices, param.mode, param.onComplete, param.wait);
239 if (operation == nullptr) {
240 SubQueuedSyncSize();
241 return -E_OUT_OF_MEMORY;
242 }
243 ISyncEngine *engine = nullptr;
244 {
245 std::lock_guard<std::mutex> autoLock(syncerLock_);
246 PerformanceAnalysis::GetInstance()->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
247 InitSyncOperation(operation, param);
248 LOGI("[Syncer] GenerateSyncId %" PRIu32 ", mode = %d, wait = %d, label = %s, devices = %s", syncId, param.mode,
249 param.wait, label_.c_str(), GetSyncDevicesStr(param.devices).c_str());
250 engine = syncEngine_;
251 RefObject::IncObjRef(engine);
252 }
253 AddSyncOperation(engine, operation);
254 RefObject::DecObjRef(engine);
255 PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
256 if (connectionId != DBConstant::IGNORE_CONNECTION_ID) {
257 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
258 connectionIdMap_[connectionId].push_back(static_cast<int>(syncId));
259 syncIdMap_[static_cast<int>(syncId)] = connectionId;
260 }
261 if (operation->CheckIsAllFinished()) {
262 operation->Finished();
263 RefObject::KillAndDecObjRef(operation);
264 } else {
265 operation->WaitIfNeed();
266 RefObject::DecObjRef(operation);
267 }
268 return E_OK;
269 }
270
RemoveSyncOperation(int syncId)271 int GenericSyncer::RemoveSyncOperation(int syncId)
272 {
273 SyncOperation *operation = nullptr;
274 std::unique_lock<std::mutex> lock(operationMapLock_);
275 auto iter = syncOperationMap_.find(syncId);
276 if (iter != syncOperationMap_.end()) {
277 LOGD("[Syncer] RemoveSyncOperation id:%d.", syncId);
278 operation = iter->second;
279 syncOperationMap_.erase(syncId);
280 lock.unlock();
281 if ((!operation->IsAutoSync()) && (!operation->IsBlockSync()) && (!operation->IsAutoControlCmd())) {
282 SubQueuedSyncSize();
283 }
284 operation->NotifyIfNeed();
285 RefObject::KillAndDecObjRef(operation);
286 operation = nullptr;
287 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
288 if (syncIdMap_.find(syncId) == syncIdMap_.end()) {
289 return E_OK;
290 }
291 uint64_t connectionId = syncIdMap_[syncId];
292 if (connectionIdMap_.find(connectionId) != connectionIdMap_.end()) {
293 connectionIdMap_[connectionId].remove(syncId);
294 }
295 syncIdMap_.erase(syncId);
296 return E_OK;
297 }
298 return -E_INVALID_ARGS;
299 }
300
StopSync(uint64_t connectionId)301 int GenericSyncer::StopSync(uint64_t connectionId)
302 {
303 std::list<int> syncIdList;
304 {
305 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
306 if (connectionIdMap_.find(connectionId) == connectionIdMap_.end()) {
307 return E_OK;
308 }
309 syncIdList = connectionIdMap_[connectionId];
310 connectionIdMap_.erase(connectionId);
311 }
312 for (auto syncId : syncIdList) {
313 RemoveSyncOperation(syncId);
314 if (syncEngine_ != nullptr) {
315 syncEngine_->AbortMachineIfNeed(syncId);
316 }
317 }
318 if (syncEngine_ != nullptr) {
319 syncEngine_->NotifyConnectionClosed(connectionId);
320 }
321 return E_OK;
322 }
323
GetTimestamp()324 uint64_t GenericSyncer::GetTimestamp()
325 {
326 std::shared_ptr<TimeHelper> timeHelper = nullptr;
327 ISyncInterface *storage = nullptr;
328 {
329 std::lock_guard<std::mutex> lock(syncerLock_);
330 timeHelper = timeHelper_;
331 if (syncInterface_ != nullptr) {
332 storage = syncInterface_;
333 storage->IncRefCount();
334 }
335 }
336 if (storage == nullptr) {
337 return TimeHelper::GetSysCurrentTime();
338 }
339 if (timeHelper == nullptr) {
340 storage->DecRefCount();
341 return TimeHelper::GetSysCurrentTime();
342 }
343 uint64_t timestamp = timeHelper->GetTime();
344 storage->DecRefCount();
345 return timestamp;
346 }
347
QueryAutoSync(const InternalSyncParma & param)348 void GenericSyncer::QueryAutoSync(const InternalSyncParma ¶m)
349 {
350 if (!initialized_) {
351 LOGW("[Syncer] Syncer has not Init");
352 return;
353 }
354 LOGI("[GenericSyncer] trigger query syncmode=%u,dev=%s", param.mode, GetSyncDevicesStr(param.devices).c_str());
355 ISyncInterface *syncInterface = nullptr;
356 ISyncEngine *engine = nullptr;
357 {
358 std::lock_guard<std::mutex> lock(syncerLock_);
359 if (syncInterface_ == nullptr || syncEngine_ == nullptr) {
360 LOGW("[Syncer] Syncer has not Init");
361 return;
362 }
363 syncInterface = syncInterface_;
364 engine = syncEngine_;
365 syncInterface->IncRefCount();
366 RefObject::IncObjRef(engine);
367 }
368 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, param, engine, syncInterface] {
369 int errCode = Sync(param);
370 if (errCode != E_OK) {
371 LOGE("[GenericSyncer] sync start by QueryAutoSync failed err %d", errCode);
372 }
373 RefObject::DecObjRef(engine);
374 syncInterface->DecRefCount();
375 });
376 if (retCode != E_OK) {
377 LOGE("[GenericSyncer] QueryAutoSync triggler sync retCode:%d", retCode);
378 RefObject::DecObjRef(engine);
379 syncInterface->DecRefCount();
380 }
381 }
382
AddSyncOperation(ISyncEngine * engine,SyncOperation * operation)383 void GenericSyncer::AddSyncOperation(ISyncEngine *engine, SyncOperation *operation)
384 {
385 if (operation == nullptr) {
386 return;
387 }
388
389 LOGD("[Syncer] AddSyncOperation.");
390 engine->AddSyncOperation(operation);
391
392 if (operation->CheckIsAllFinished()) {
393 return;
394 }
395
396 std::lock_guard<std::mutex> lock(operationMapLock_);
397 syncOperationMap_.insert(std::pair<int, SyncOperation *>(operation->GetSyncId(), operation));
398 // To make sure operation alive before WaitIfNeed out
399 RefObject::IncObjRef(operation);
400 }
401
SyncOperationKillCallbackInner(int syncId)402 void GenericSyncer::SyncOperationKillCallbackInner(int syncId)
403 {
404 if (syncEngine_ != nullptr) {
405 LOGI("[Syncer] Operation on kill id = %d", syncId);
406 syncEngine_->RemoveSyncOperation(syncId);
407 }
408 }
409
SyncOperationKillCallback(int syncId)410 void GenericSyncer::SyncOperationKillCallback(int syncId)
411 {
412 SyncOperationKillCallbackInner(syncId);
413 }
414
InitMetaData(ISyncInterface * syncInterface)415 int GenericSyncer::InitMetaData(ISyncInterface *syncInterface)
416 {
417 if (metadata_ != nullptr) {
418 return E_OK;
419 }
420
421 metadata_ = std::make_shared<Metadata>();
422 if (metadata_ == nullptr) {
423 LOGE("[Syncer] metadata make shared failed");
424 return -E_OUT_OF_MEMORY;
425 }
426 int errCode = metadata_->Initialize(syncInterface);
427 if (errCode != E_OK) {
428 LOGE("[Syncer] metadata Initializeate failed! err %d.", errCode);
429 metadata_ = nullptr;
430 }
431 syncInterface_ = syncInterface;
432 return errCode;
433 }
434
InitTimeHelper(ISyncInterface * syncInterface)435 int GenericSyncer::InitTimeHelper(ISyncInterface *syncInterface)
436 {
437 if (timeHelper_ != nullptr) {
438 return E_OK;
439 }
440
441 timeHelper_ = std::make_shared<TimeHelper>();
442 if (timeHelper_ == nullptr) {
443 return -E_OUT_OF_MEMORY;
444 }
445 int errCode = timeHelper_->Initialize(syncInterface, metadata_);
446 if (errCode != E_OK) {
447 LOGE("[Syncer] TimeHelper init failed! err:%d.", errCode);
448 timeHelper_ = nullptr;
449 }
450 return errCode;
451 }
452
InitSyncEngine(ISyncInterface * syncInterface)453 int GenericSyncer::InitSyncEngine(ISyncInterface *syncInterface)
454 {
455 if (syncEngine_ != nullptr && syncEngine_->IsEngineActive()) {
456 LOGI("[Syncer] syncEngine is active");
457 return E_OK;
458 }
459 int errCode = BuildSyncEngine();
460 if (errCode != E_OK) {
461 return errCode;
462 }
463 const std::function<void(std::string)> onlineFunc = [this](const std::string &device) {
464 RemoteDataChanged(device);
465 };
466 const std::function<void(std::string)> offlineFunc = [this](const std::string &device) {
467 RemoteDeviceOffline(device);
468 };
469 const std::function<void(const InternalSyncParma ¶m)> queryAutoSyncFunc =
470 [this](const InternalSyncParma &syncParam) { QueryAutoSync(syncParam); };
471 const ISyncEngine::InitCallbackParam param = { onlineFunc, offlineFunc, queryAutoSyncFunc };
472 errCode = syncEngine_->Initialize(syncInterface, metadata_, param);
473 if (errCode == E_OK) {
474 syncInterface->IncRefCount();
475 label_ = syncEngine_->GetLabel();
476 return E_OK;
477 } else {
478 LOGE("[Syncer] SyncEngine init failed! err:%d.", errCode);
479 RefObject::KillAndDecObjRef(syncEngine_);
480 syncEngine_ = nullptr;
481 return errCode;
482 }
483 }
484
CheckSyncActive(ISyncInterface * syncInterface,bool isNeedActive)485 int GenericSyncer::CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive)
486 {
487 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE,
488 false);
489 if (!isSyncDualTupleMode || isNeedActive) {
490 return E_OK;
491 }
492 LOGI("[Syncer] syncer no need to active");
493 int errCode = BuildSyncEngine();
494 if (errCode != E_OK) {
495 return errCode;
496 }
497 return -E_NO_NEED_ACTIVE;
498 }
499
GenerateSyncId()500 uint32_t GenericSyncer::GenerateSyncId()
501 {
502 std::lock_guard<std::mutex> lock(syncIdLock_);
503 currentSyncId_++;
504 // if overflow, reset to 1
505 if (currentSyncId_ <= 0) {
506 currentSyncId_ = MIN_VALID_SYNC_ID;
507 }
508 return currentSyncId_;
509 }
510
IsValidMode(int mode) const511 bool GenericSyncer::IsValidMode(int mode) const
512 {
513 if ((mode >= SyncModeType::INVALID_MODE) || (mode < SyncModeType::PUSH)) {
514 LOGE("[Syncer] Sync mode is not valid!");
515 return false;
516 }
517 return true;
518 }
519
SyncConditionCheck(const SyncParma & param,const ISyncEngine * engine,ISyncInterface * storage) const520 int GenericSyncer::SyncConditionCheck(const SyncParma ¶m, const ISyncEngine *engine, ISyncInterface *storage) const
521 {
522 (void)param;
523 (void)engine;
524 (void)storage;
525 return E_OK;
526 }
527
IsValidDevices(const std::vector<std::string> & devices) const528 bool GenericSyncer::IsValidDevices(const std::vector<std::string> &devices) const
529 {
530 if (devices.empty()) {
531 LOGE("[Syncer] devices is empty!");
532 return false;
533 }
534 return true;
535 }
536
ClearSyncOperations(bool isClosedOperation)537 void GenericSyncer::ClearSyncOperations(bool isClosedOperation)
538 {
539 std::vector<SyncOperation *> syncOperation;
540 {
541 std::lock_guard<std::mutex> lock(operationMapLock_);
542 for (auto &item : syncOperationMap_) {
543 bool isBlockSync = item.second->IsBlockSync();
544 if (isBlockSync || !isClosedOperation) {
545 int status = (!isClosedOperation) ? SyncOperation::OP_USER_CHANGED : SyncOperation::OP_FAILED;
546 item.second->SetUnfinishedDevStatus(status);
547 RefObject::IncObjRef(item.second);
548 syncOperation.push_back(item.second);
549 }
550 }
551 }
552
553 if (!isClosedOperation) { // means user changed
554 syncEngine_->NotifyUserChange();
555 }
556
557 for (auto &operation : syncOperation) {
558 // block sync operation or userChange will trigger remove sync operation
559 // caller won't blocked for block sync
560 // caller won't blocked for userChange operation no mater it is block or non-block sync
561 TriggerSyncFinished(operation);
562 RefObject::DecObjRef(operation);
563 }
564 ClearInnerResource(isClosedOperation);
565 }
566
ClearInnerResource(bool isClosedOperation)567 void GenericSyncer::ClearInnerResource(bool isClosedOperation)
568 {
569 {
570 std::lock_guard<std::mutex> lock(operationMapLock_);
571 for (auto &iter : syncOperationMap_) {
572 RefObject::KillAndDecObjRef(iter.second);
573 iter.second = nullptr;
574 }
575 syncOperationMap_.clear();
576 }
577 {
578 std::lock_guard<std::mutex> lock(syncIdLock_);
579 if (isClosedOperation) {
580 connectionIdMap_.clear();
581 } else { // only need to clear syncid when user change
582 for (auto &item : connectionIdMap_) {
583 item.second.clear();
584 }
585 }
586 syncIdMap_.clear();
587 }
588 }
589
TriggerSyncFinished(SyncOperation * operation)590 void GenericSyncer::TriggerSyncFinished(SyncOperation *operation)
591 {
592 if (operation != nullptr && operation->CheckIsAllFinished()) { // LCOV_EXCL_BR_LINE
593 operation->Finished();
594 }
595 }
596
OnSyncFinished(int syncId)597 void GenericSyncer::OnSyncFinished(int syncId)
598 {
599 (void)(RemoveSyncOperation(syncId));
600 }
601
SyncModuleInit()602 int GenericSyncer::SyncModuleInit()
603 {
604 static bool isInit = false;
605 std::lock_guard<std::mutex> lock(moduleInitLock_);
606 if (!isInit) {
607 int errCode = SyncResourceInit();
608 if (errCode != E_OK) {
609 return errCode;
610 }
611 isInit = true;
612 return E_OK;
613 }
614 return E_OK;
615 }
616
SyncResourceInit()617 int GenericSyncer::SyncResourceInit()
618 {
619 int errCode = TimeSync::RegisterTransformFunc();
620 if (errCode != E_OK) {
621 LOGE("Register timesync message transform func ERR!");
622 return errCode;
623 }
624 errCode = SingleVerSerializeManager::RegisterTransformFunc();
625 if (errCode != E_OK) {
626 LOGE("Register SingleVerDataSync message transform func ERR!");
627 return errCode;
628 }
629 #ifndef OMIT_MULTI_VER
630 errCode = CommitHistorySync::RegisterTransformFunc();
631 if (errCode != E_OK) {
632 LOGE("Register CommitHistorySync message transform func ERR!");
633 return errCode;
634 }
635 errCode = MultiVerDataSync::RegisterTransformFunc();
636 if (errCode != E_OK) {
637 LOGE("Register MultiVerDataSync message transform func ERR!");
638 return errCode;
639 }
640 errCode = ValueSliceSync::RegisterTransformFunc();
641 if (errCode != E_OK) {
642 LOGE("Register ValueSliceSync message transform func ERR!");
643 return errCode;
644 }
645 #endif
646 errCode = DeviceManager::RegisterTransformFunc();
647 if (errCode != E_OK) {
648 LOGE("Register DeviceManager message transform func ERR!");
649 return errCode;
650 }
651 errCode = AbilitySync::RegisterTransformFunc();
652 if (errCode != E_OK) {
653 LOGE("Register AbilitySync message transform func ERR!");
654 return errCode;
655 }
656 return E_OK;
657 }
658
GetQueuedSyncSize(int * queuedSyncSize) const659 int GenericSyncer::GetQueuedSyncSize(int *queuedSyncSize) const
660 {
661 if (queuedSyncSize == nullptr) {
662 return -E_INVALID_ARGS;
663 }
664 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
665 *queuedSyncSize = queuedManualSyncSize_;
666 LOGI("[GenericSyncer] GetQueuedSyncSize:%d", queuedManualSyncSize_);
667 return E_OK;
668 }
669
SetQueuedSyncLimit(const int * queuedSyncLimit)670 int GenericSyncer::SetQueuedSyncLimit(const int *queuedSyncLimit)
671 {
672 if (queuedSyncLimit == nullptr) {
673 return -E_INVALID_ARGS;
674 }
675 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
676 queuedManualSyncLimit_ = *queuedSyncLimit;
677 LOGI("[GenericSyncer] SetQueuedSyncLimit:%d", queuedManualSyncLimit_);
678 return E_OK;
679 }
680
GetQueuedSyncLimit(int * queuedSyncLimit) const681 int GenericSyncer::GetQueuedSyncLimit(int *queuedSyncLimit) const
682 {
683 if (queuedSyncLimit == nullptr) {
684 return -E_INVALID_ARGS;
685 }
686 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
687 *queuedSyncLimit = queuedManualSyncLimit_;
688 LOGI("[GenericSyncer] GetQueuedSyncLimit:%d", queuedManualSyncLimit_);
689 return E_OK;
690 }
691
IsManualSync(int inMode) const692 bool GenericSyncer::IsManualSync(int inMode) const
693 {
694 int mode = SyncOperation::TransferSyncMode(inMode);
695 if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH) || (mode == SyncModeType::PUSH_AND_PULL) ||
696 (mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
697 return true;
698 }
699 return false;
700 }
701
AddQueuedManualSyncSize(int mode,bool wait)702 int GenericSyncer::AddQueuedManualSyncSize(int mode, bool wait)
703 {
704 if (IsManualSync(mode) && (!wait)) {
705 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
706 if (!manualSyncEnable_) {
707 LOGI("[GenericSyncer] manualSyncEnable is Disable");
708 return -E_BUSY;
709 }
710 queuedManualSyncSize_++;
711 }
712 return E_OK;
713 }
714
IsQueuedManualSyncFull(int mode,bool wait) const715 bool GenericSyncer::IsQueuedManualSyncFull(int mode, bool wait) const
716 {
717 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
718 if (IsManualSync(mode) && (!manualSyncEnable_)) { // LCOV_EXCL_BR_LINE
719 LOGI("[GenericSyncer] manualSyncEnable_:false");
720 return true;
721 }
722 if (IsManualSync(mode) && (!wait)) { // LCOV_EXCL_BR_LINE
723 if (queuedManualSyncSize_ < queuedManualSyncLimit_) {
724 return false;
725 } else {
726 LOGD("[GenericSyncer] queuedManualSyncSize_:%d < queuedManualSyncLimit_:%d", queuedManualSyncSize_,
727 queuedManualSyncLimit_);
728 return true;
729 }
730 } else {
731 return false;
732 }
733 }
734
SubQueuedSyncSize(void)735 void GenericSyncer::SubQueuedSyncSize(void)
736 {
737 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
738 queuedManualSyncSize_--;
739 if (queuedManualSyncSize_ < 0) {
740 LOGE("[GenericSyncer] queuedManualSyncSize_ < 0!");
741 queuedManualSyncSize_ = 0;
742 }
743 }
744
DisableManualSync(void)745 int GenericSyncer::DisableManualSync(void)
746 {
747 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
748 if (queuedManualSyncSize_ > 0) {
749 LOGD("[GenericSyncer] DisableManualSync fail, queuedManualSyncSize_:%d", queuedManualSyncSize_);
750 return -E_BUSY;
751 }
752 manualSyncEnable_ = false;
753 LOGD("[GenericSyncer] DisableManualSync ok");
754 return E_OK;
755 }
756
EnableManualSync(void)757 int GenericSyncer::EnableManualSync(void)
758 {
759 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
760 manualSyncEnable_ = true;
761 LOGD("[GenericSyncer] EnableManualSync ok");
762 return E_OK;
763 }
764
GetLocalIdentity(std::string & outTarget) const765 int GenericSyncer::GetLocalIdentity(std::string &outTarget) const
766 {
767 std::string deviceId;
768 int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(deviceId);
769 if (errCode != E_OK) {
770 LOGE("[GenericSyncer] GetLocalIdentity fail errCode:%d", errCode);
771 return errCode;
772 }
773 outTarget = DBCommon::TransferHashString(deviceId);
774 return E_OK;
775 }
776
GetOnlineDevices(std::vector<std::string> & devices) const777 void GenericSyncer::GetOnlineDevices(std::vector<std::string> &devices) const
778 {
779 std::string identifier;
780 {
781 std::lock_guard<std::mutex> lock(syncerLock_);
782 // Get devices from AutoLaunch first.
783 if (syncInterface_ == nullptr) {
784 LOGI("[Syncer] GetOnlineDevices syncInterface_ is nullptr");
785 return;
786 }
787 bool isSyncDualTupleMode = syncInterface_->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
788 false);
789 if (isSyncDualTupleMode) {
790 identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::DUAL_TUPLE_IDENTIFIER_DATA,
791 "");
792 } else {
793 identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
794 }
795 }
796 RuntimeContext::GetInstance()->GetAutoLaunchSyncDevices(identifier, devices);
797 if (!devices.empty()) {
798 return;
799 }
800 std::lock_guard<std::mutex> lock(syncerLock_);
801 if (closing_) {
802 LOGW("[Syncer] Syncer is closing, return!");
803 return;
804 }
805 if (syncEngine_ != nullptr) {
806 syncEngine_->GetOnlineDevices(devices);
807 }
808 }
809
SetSyncRetry(bool isRetry)810 int GenericSyncer::SetSyncRetry(bool isRetry)
811 {
812 if (syncEngine_ == nullptr) {
813 return -E_NOT_INIT;
814 }
815 syncEngine_->SetSyncRetry(isRetry);
816 return E_OK;
817 }
818
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)819 int GenericSyncer::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
820 {
821 std::lock_guard<std::mutex> lock(syncerLock_);
822 if (syncEngine_ == nullptr) {
823 return -E_NOT_INIT;
824 }
825 int errCode = syncEngine_->SetEqualIdentifier(identifier, targets);
826 if (errCode == E_OK) {
827 syncEngine_->SetEqualIdentifierMap(identifier, targets);
828 }
829 return errCode;
830 }
831
GetSyncDevicesStr(const std::vector<std::string> & devices) const832 std::string GenericSyncer::GetSyncDevicesStr(const std::vector<std::string> &devices) const
833 {
834 std::string syncDevices;
835 for (const auto &dev:devices) {
836 syncDevices += DBCommon::StringMasking(dev);
837 syncDevices += ",";
838 }
839 if (syncDevices.empty()) {
840 return "";
841 }
842 return syncDevices.substr(0, syncDevices.size() - 1);
843 }
844
StatusCheck() const845 int GenericSyncer::StatusCheck() const
846 {
847 if (!initialized_) {
848 LOGE("[Syncer] Syncer is not initialized, return!");
849 return -E_BUSY;
850 }
851 if (closing_) {
852 LOGW("[Syncer] Syncer is closing, return!");
853 return -E_BUSY;
854 }
855 return E_OK;
856 }
857
SyncPreCheck(const SyncParma & param) const858 int GenericSyncer::SyncPreCheck(const SyncParma ¶m) const
859 {
860 ISyncEngine *engine = nullptr;
861 ISyncInterface *storage = nullptr;
862 int errCode = E_OK;
863 {
864 std::lock_guard<std::mutex> lock(syncerLock_);
865 errCode = StatusCheck();
866 if (errCode != E_OK) {
867 return errCode;
868 }
869 if (!IsValidDevices(param.devices) || !IsValidMode(param.mode)) { // LCOV_EXCL_BR_LINE
870 return -E_INVALID_ARGS;
871 }
872 if (IsQueuedManualSyncFull(param.mode, param.wait)) { // LCOV_EXCL_BR_LINE
873 LOGE("[Syncer] -E_BUSY");
874 return -E_BUSY;
875 }
876 storage = syncInterface_;
877 engine = syncEngine_;
878 if (storage == nullptr || engine == nullptr) {
879 return -E_BUSY;
880 }
881 storage->IncRefCount();
882 RefObject::IncObjRef(engine);
883 }
884 errCode = SyncConditionCheck(param, engine, storage);
885 storage->DecRefCount();
886 RefObject::DecObjRef(engine);
887 return errCode;
888 }
889
InitSyncOperation(SyncOperation * operation,const SyncParma & param)890 void GenericSyncer::InitSyncOperation(SyncOperation *operation, const SyncParma ¶m)
891 {
892 operation->SetIdentifier(syncInterface_->GetIdentifier());
893 operation->SetSyncProcessCallFun(param.onSyncProcess);
894 operation->Initialize();
895 operation->OnKill([this, id = operation->GetSyncId()] { SyncOperationKillCallback(id); });
896 std::function<void(int)> onFinished = [this](int syncId) { OnSyncFinished(syncId); };
897 operation->SetOnSyncFinished(onFinished);
898 operation->SetOnSyncFinalize(param.onFinalize);
899 if (param.isQuerySync) {
900 operation->SetQuery(param.syncQuery);
901 }
902 }
903
BuildSyncEngine()904 int GenericSyncer::BuildSyncEngine()
905 {
906 if (syncEngine_ != nullptr) {
907 return E_OK;
908 }
909 syncEngine_ = CreateSyncEngine();
910 if (syncEngine_ == nullptr) {
911 return -E_OUT_OF_MEMORY;
912 }
913 syncEngine_->OnLastRef([this]() {
914 LOGD("[Syncer] SyncEngine finalized");
915 {
916 std::lock_guard<std::mutex> cvLock(engineMutex_);
917 engineFinalize_ = true;
918 }
919 engineFinalizeCv_.notify_all();
920 });
921 return E_OK;
922 }
923
Dump(int fd)924 void GenericSyncer::Dump(int fd)
925 {
926 if (syncEngine_ == nullptr) {
927 return;
928 }
929 RefObject::IncObjRef(syncEngine_);
930 syncEngine_->Dump(fd);
931 RefObject::DecObjRef(syncEngine_);
932 }
933
DumpSyncerBasicInfo()934 SyncerBasicInfo GenericSyncer::DumpSyncerBasicInfo()
935 {
936 SyncerBasicInfo baseInfo;
937 if (syncEngine_ == nullptr) {
938 return baseInfo;
939 }
940 RefObject::IncObjRef(syncEngine_);
941 baseInfo.isSyncActive = syncEngine_->IsEngineActive();
942 RefObject::DecObjRef(syncEngine_);
943 return baseInfo;
944 }
945
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)946 int GenericSyncer::RemoteQuery(const std::string &device, const RemoteCondition &condition,
947 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
948 {
949 ISyncEngine *syncEngine = nullptr;
950 {
951 std::lock_guard<std::mutex> lock(syncerLock_);
952 int errCode = StatusCheck();
953 if (errCode != E_OK) {
954 return errCode;
955 }
956 syncEngine = syncEngine_;
957 RefObject::IncObjRef(syncEngine);
958 }
959 if (syncEngine == nullptr) {
960 return -E_NOT_INIT;
961 }
962 int errCode = syncEngine->RemoteQuery(device, condition, timeout, connectionId, result);
963 RefObject::DecObjRef(syncEngine);
964 return errCode;
965 }
966
InitTimeChangedListener()967 int GenericSyncer::InitTimeChangedListener()
968 {
969 int errCode = E_OK;
970 if (timeChangedListener_ != nullptr) {
971 return errCode;
972 }
973 timeChangedListener_ = RuntimeContext::GetInstance()->RegisterTimeChangedLister(
974 [this](void *changedOffset) {
975 RecordTimeChangeOffset(changedOffset);
976 },
977 [this]() {
978 {
979 std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
980 timeChangeListenerFinalize_ = true;
981 }
982 timeChangeCv_.notify_all();
983 }, errCode);
984 if (timeChangedListener_ == nullptr) {
985 LOGE("[GenericSyncer] Init RegisterTimeChangedLister failed");
986 return errCode;
987 }
988 {
989 std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
990 timeChangeListenerFinalize_ = false;
991 }
992 return E_OK;
993 }
994
ReleaseInnerResource()995 void GenericSyncer::ReleaseInnerResource()
996 {
997 NotificationChain::Listener *timeChangedListener = nullptr;
998 {
999 std::lock_guard<std::mutex> lock(syncerLock_);
1000 if (timeChangedListener_ != nullptr) {
1001 timeChangedListener = timeChangedListener_;
1002 timeChangedListener_ = nullptr;
1003 }
1004 timeHelper_ = nullptr;
1005 metadata_ = nullptr;
1006 }
1007 if (timeChangedListener != nullptr) {
1008 timeChangedListener->Drop(true);
1009 RuntimeContext::GetInstance()->StopTimeTickMonitorIfNeed();
1010 }
1011 std::unique_lock<std::mutex> uniqueLock(timeChangeListenerMutex_);
1012 LOGD("[GenericSyncer] Begin wait time change listener finalize");
1013 timeChangeCv_.wait(uniqueLock, [this]() {
1014 return timeChangeListenerFinalize_;
1015 });
1016 LOGD("[GenericSyncer] End wait time change listener finalize");
1017 }
1018
RecordTimeChangeOffset(void * changedOffset)1019 void GenericSyncer::RecordTimeChangeOffset(void *changedOffset)
1020 {
1021 std::shared_ptr<Metadata> metadata = nullptr;
1022 ISyncInterface *storage = nullptr;
1023 {
1024 std::lock_guard<std::mutex> lock(syncerLock_);
1025 if (changedOffset == nullptr || metadata_ == nullptr || syncInterface_ == nullptr) {
1026 return;
1027 }
1028 storage = syncInterface_;
1029 metadata = metadata_;
1030 storage->IncRefCount();
1031 }
1032 TimeOffset changedTimeOffset = *(reinterpret_cast<TimeOffset *>(changedOffset)) *
1033 static_cast<TimeOffset>(TimeHelper::TO_100_NS);
1034 TimeOffset orgOffset = metadata->GetLocalTimeOffset() - changedTimeOffset;
1035 TimeOffset currentSysTime = static_cast<TimeOffset>(TimeHelper::GetSysCurrentTime());
1036 Timestamp maxItemTime = 0;
1037 storage->GetMaxTimestamp(maxItemTime);
1038 if ((orgOffset + currentSysTime) > TimeHelper::BUFFER_VALID_TIME) {
1039 orgOffset = TimeHelper::BUFFER_VALID_TIME -
1040 currentSysTime + static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS);
1041 }
1042 if ((currentSysTime + orgOffset) <= static_cast<TimeOffset>(maxItemTime)) {
1043 orgOffset = static_cast<TimeOffset>(maxItemTime) - currentSysTime +
1044 static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS); // 1ms
1045 }
1046 metadata->SaveLocalTimeOffset(orgOffset);
1047 ResetTimeSyncMarkByTimeChange(metadata, *storage);
1048 storage->DecRefCount();
1049 }
1050
CloseInner(bool isClosedOperation)1051 int GenericSyncer::CloseInner(bool isClosedOperation)
1052 {
1053 {
1054 std::lock_guard<std::mutex> lock(syncerLock_);
1055 if (!initialized_) {
1056 LOGW("[Syncer] Syncer[%s] don't need to close, because it has not been init", label_.c_str());
1057 return -E_NOT_INIT;
1058 }
1059 initialized_ = false;
1060 if (closing_) {
1061 LOGE("[Syncer] Syncer is closing, return!");
1062 return -E_BUSY;
1063 }
1064 closing_ = true;
1065 }
1066 ClearSyncOperations(isClosedOperation);
1067 if (syncEngine_ != nullptr) {
1068 syncEngine_->Close();
1069 LOGD("[Syncer] Close SyncEngine!");
1070 std::lock_guard<std::mutex> lock(syncerLock_);
1071 closing_ = false;
1072 }
1073 return E_OK;
1074 }
1075
GetSyncDataSize(const std::string & device,size_t & size) const1076 int GenericSyncer::GetSyncDataSize(const std::string &device, size_t &size) const
1077 {
1078 uint64_t localWaterMark = 0;
1079 std::shared_ptr<Metadata> metadata = nullptr;
1080 {
1081 std::lock_guard<std::mutex> lock(syncerLock_);
1082 if (metadata_ == nullptr || syncInterface_ == nullptr) {
1083 return -E_INTERNAL_ERROR;
1084 }
1085 if (closing_) {
1086 LOGE("[Syncer] Syncer is closing, return!");
1087 return -E_BUSY;
1088 }
1089 int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->TryHandle();
1090 if (errCode != E_OK) {
1091 LOGE("[Syncer] syncer is restarting, return!");
1092 return errCode;
1093 }
1094 syncInterface_->IncRefCount();
1095 metadata = metadata_;
1096 }
1097 metadata->GetLocalWaterMark(device, localWaterMark);
1098 uint32_t expectedMtuSize = DEFAULT_MTU_SIZE;
1099 DataSizeSpecInfo syncDataSizeInfo = {expectedMtuSize, static_cast<size_t>(MAX_TIMESTAMP)};
1100 std::vector<SendDataItem> outData;
1101 ContinueToken token = nullptr;
1102 int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->GetSyncData(localWaterMark, MAX_TIMESTAMP,
1103 outData, token, syncDataSizeInfo);
1104 if (token != nullptr) {
1105 static_cast<SyncGenericInterface *>(syncInterface_)->ReleaseContinueToken(token);
1106 token = nullptr;
1107 }
1108 if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
1109 LOGE("calculate sync data size failed %d", errCode);
1110 syncInterface_->DecRefCount();
1111 return errCode;
1112 }
1113 uint32_t totalLen = 0;
1114 if (errCode == -E_UNFINISHED) {
1115 totalLen = expectedMtuSize;
1116 } else {
1117 totalLen = GenericSingleVerKvEntry::CalculateLens(outData, SOFTWARE_VERSION_CURRENT);
1118 }
1119 for (auto &entry : outData) {
1120 delete entry;
1121 entry = nullptr;
1122 }
1123 syncInterface_->DecRefCount();
1124 // if larger than 1M, return 1M
1125 size = (totalLen >= expectedMtuSize) ? expectedMtuSize : totalLen;
1126 return E_OK;
1127 }
1128
IsNeedActive(ISyncInterface * syncInterface)1129 bool GenericSyncer::IsNeedActive(ISyncInterface *syncInterface)
1130 {
1131 bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
1132 if (localOnly) {
1133 LOGD("[Syncer] Local only db, don't need active syncer");
1134 return false;
1135 }
1136 return true;
1137 }
1138
GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const1139 int GenericSyncer::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const
1140 {
1141 (void)clientId;
1142 (void)hashDevId;
1143 return -E_NOT_SUPPORT;
1144 }
1145
InitStorageResource(ISyncInterface * syncInterface)1146 int GenericSyncer::InitStorageResource(ISyncInterface *syncInterface)
1147 {
1148 // As metadata_ will be used in EraseDeviceWaterMark, it should not be clear even if engine init failed.
1149 // It will be clear in destructor.
1150 int errCode = InitMetaData(syncInterface);
1151 if (errCode != E_OK) {
1152 return errCode;
1153 }
1154
1155 // As timeHelper_ will be used in GetTimestamp, it should not be clear even if engine init failed.
1156 // It will be clear in destructor.
1157 errCode = InitTimeHelper(syncInterface);
1158 if (errCode != E_OK) {
1159 return errCode;
1160 }
1161
1162 if (!IsNeedActive(syncInterface)) {
1163 return -E_NO_NEED_ACTIVE;
1164 }
1165 return errCode;
1166 }
1167
GetWatermarkInfo(const std::string & device,WatermarkInfo & info)1168 int GenericSyncer::GetWatermarkInfo(const std::string &device, WatermarkInfo &info)
1169 {
1170 std::shared_ptr<Metadata> metadata = nullptr;
1171 {
1172 std::lock_guard<std::mutex> autoLock(syncerLock_);
1173 metadata = metadata_;
1174 }
1175 if (metadata == nullptr) {
1176 LOGE("[Syncer] Metadata is not init");
1177 return -E_NOT_INIT;
1178 }
1179 std::string dev;
1180 bool devNeedHash = true;
1181 int errCode = metadata->GetHashDeviceId(device, dev);
1182 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
1183 LOGE("[Syncer] Get watermark info failed %d", errCode);
1184 return errCode;
1185 } else if (errCode == E_OK) {
1186 devNeedHash = false;
1187 } else {
1188 dev = device;
1189 }
1190 return metadata->GetWaterMarkInfoFromDB(dev, devNeedHash, info);
1191 }
1192
UpgradeSchemaVerInMeta()1193 int GenericSyncer::UpgradeSchemaVerInMeta()
1194 {
1195 std::shared_ptr<Metadata> metadata = nullptr;
1196 {
1197 std::lock_guard<std::mutex> autoLock(syncerLock_);
1198 metadata = metadata_;
1199 }
1200 if (metadata == nullptr) {
1201 LOGE("[Syncer] metadata is not init");
1202 return -E_NOT_INIT;
1203 }
1204 int errCode = metadata->ClearAllAbilitySyncFinishMark();
1205 if (errCode != E_OK) {
1206 LOGE("[Syncer] clear ability mark failed:%d", errCode);
1207 return errCode;
1208 }
1209 auto [err, localSchemaVer] = metadata->GetLocalSchemaVersion();
1210 if (err != E_OK) {
1211 LOGE("[Syncer] get local schema version failed:%d", err);
1212 return err;
1213 }
1214 errCode = metadata->SetLocalSchemaVersion(localSchemaVer + 1);
1215 if (errCode != E_OK) {
1216 LOGE("[Syncer] increase local schema version failed:%d", errCode);
1217 }
1218 return errCode;
1219 }
1220
ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> & metadata,ISyncInterface & storage)1221 void GenericSyncer::ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> &metadata, ISyncInterface &storage)
1222 {
1223 if (syncEngine_ != nullptr) {
1224 syncEngine_->TimeChange();
1225 }
1226 int errCode = metadata->ClearAllTimeSyncFinishMark();
1227 if (errCode != E_OK) {
1228 LOGW("[GenericSyncer] %s clear time sync finish mark failed %d", label_.c_str(), errCode);
1229 } else {
1230 LOGD("[GenericSyncer] ClearAllTimeSyncFinishMark finish");
1231 RuntimeContext::GetInstance()->ResetDBTimeChangeStatus(storage.GetIdentifier());
1232 }
1233 }
1234
ResetSyncStatus()1235 void GenericSyncer::ResetSyncStatus()
1236 {
1237 std::shared_ptr<Metadata> metadata = nullptr;
1238 {
1239 std::lock_guard<std::mutex> lock(syncerLock_);
1240 if (metadata_ == nullptr) {
1241 return;
1242 }
1243 metadata = metadata_;
1244 }
1245 metadata->ClearAllAbilitySyncFinishMark();
1246 }
1247
GetLocalTimeOffset()1248 int64_t GenericSyncer::GetLocalTimeOffset()
1249 {
1250 std::shared_ptr<Metadata> metadata = nullptr;
1251 {
1252 std::lock_guard<std::mutex> lock(syncerLock_);
1253 if (metadata_ == nullptr) {
1254 return 0;
1255 }
1256 metadata = metadata_;
1257 }
1258 return metadata->GetLocalTimeOffset();
1259 }
1260
GetTaskCount()1261 int32_t GenericSyncer::GetTaskCount()
1262 {
1263 int32_t count = 0;
1264 {
1265 std::lock_guard<std::mutex> autoLock(operationMapLock_);
1266 count += static_cast<int32_t>(syncOperationMap_.size());
1267 }
1268 ISyncEngine *syncEngine = nullptr;
1269 {
1270 std::lock_guard<std::mutex> lock(syncerLock_);
1271 if (syncEngine_ == nullptr) {
1272 return count;
1273 }
1274 syncEngine = syncEngine_;
1275 RefObject::IncObjRef(syncEngine);
1276 }
1277 count += syncEngine->GetResponseTaskCount();
1278 RefObject::DecObjRef(syncEngine);
1279 return count;
1280 }
1281 } // namespace DistributedDB
1282