1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "generic_syncer.h"
17
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "ref_object.h"
22 #include "sqlite_single_ver_natural_store.h"
23 #include "time_sync.h"
24 #include "single_ver_data_sync.h"
25 #ifndef OMIT_MULTI_VER
26 #include "commit_history_sync.h"
27 #include "multi_ver_data_sync.h"
28 #include "value_slice_sync.h"
29 #endif
30 #include "device_manager.h"
31 #include "db_constant.h"
32 #include "ability_sync.h"
33 #include "generic_single_ver_kv_entry.h"
34 #include "single_ver_serialize_manager.h"
35
36 namespace DistributedDB {
37 namespace {
38 constexpr uint32_t DEFAULT_MTU_SIZE = 1024u * 1024u; // 1M
39 }
40 const int GenericSyncer::MIN_VALID_SYNC_ID = 1;
41 std::mutex GenericSyncer::moduleInitLock_;
42 int GenericSyncer::currentSyncId_ = 0;
43 std::mutex GenericSyncer::syncIdLock_;
GenericSyncer()44 GenericSyncer::GenericSyncer()
45 : syncEngine_(nullptr),
46 syncInterface_(nullptr),
47 timeHelper_(nullptr),
48 metadata_(nullptr),
49 initialized_(false),
50 queuedManualSyncSize_(0),
51 queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
52 manualSyncEnable_(true),
53 closing_(false),
54 engineFinalize_(false),
55 timeChangeListenerFinalize_(true),
56 timeChangedListener_(nullptr)
57 {
58 }
59
~GenericSyncer()60 GenericSyncer::~GenericSyncer()
61 {
62 LOGD("[GenericSyncer] ~GenericSyncer!");
63 if (syncEngine_ != nullptr) {
64 syncEngine_->OnKill([this]() { this->syncEngine_->Close(); });
65 RefObject::KillAndDecObjRef(syncEngine_);
66 // waiting all thread exist
67 std::unique_lock<std::mutex> cvLock(engineMutex_);
68 bool engineFinalize = engineFinalizeCv_.wait_for(cvLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
69 [this]() { return engineFinalize_; });
70 if (!engineFinalize) {
71 LOGW("syncer finalize before engine finalize!");
72 }
73 syncEngine_ = nullptr;
74 }
75 ReleaseInnerResource();
76 std::lock_guard<std::mutex> lock(syncerLock_);
77 syncInterface_ = nullptr;
78 }
79
Initialize(ISyncInterface * syncInterface,bool isNeedActive)80 int GenericSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
81 {
82 if (syncInterface == nullptr) {
83 LOGE("[Syncer] Init failed, the syncInterface is null!");
84 return -E_INVALID_ARGS;
85 }
86
87 {
88 std::lock_guard<std::mutex> lock(syncerLock_);
89 if (initialized_) {
90 return E_OK;
91 }
92 if (closing_) {
93 LOGE("[Syncer] Syncer is closing, return!");
94 return -E_BUSY;
95 }
96 std::vector<uint8_t> label = syncInterface->GetIdentifier();
97 label_ = DBCommon::StringMasking(DBCommon::VectorToHexString(label));
98
99 int errCode = InitStorageResource(syncInterface);
100 if (errCode != E_OK) {
101 return errCode;
102 }
103 // As timeChangedListener_ will record time change, it should not be clear even if engine init failed.
104 // It will be clear in destructor.
105 int errCodeTimeChangedListener = InitTimeChangedListener();
106 if (errCodeTimeChangedListener != E_OK) {
107 return -E_INTERNAL_ERROR;
108 }
109 errCode = CheckSyncActive(syncInterface, isNeedActive);
110 if (errCode != E_OK) {
111 return errCode;
112 }
113
114 if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
115 return -E_NOT_INIT;
116 }
117
118 errCode = SyncModuleInit();
119 if (errCode != E_OK) {
120 LOGE("[Syncer] Sync ModuleInit ERR!");
121 return -E_INTERNAL_ERROR;
122 }
123
124 errCode = InitSyncEngine(syncInterface);
125 if (errCode != E_OK) {
126 return errCode;
127 }
128 syncEngine_->SetEqualIdentifier();
129 initialized_ = true;
130 }
131
132 // StartCommunicator may start an auto sync, this function can not in syncerLock_
133 syncEngine_->StartCommunicator();
134 if (RuntimeContext::GetInstance()->CheckDBTimeChange(syncInterface_->GetIdentifier())) {
135 ResetTimeSyncMarkByTimeChange(metadata_, *syncInterface_);
136 }
137 return E_OK;
138 }
139
Close(bool isClosedOperation)140 int GenericSyncer::Close(bool isClosedOperation)
141 {
142 int errCode = CloseInner(isClosedOperation);
143 if (errCode != -E_BUSY && isClosedOperation) {
144 ReleaseInnerResource();
145 }
146 return errCode;
147 }
148
Sync(const std::vector<std::string> & devices,int mode,const std::function<void (const std::map<std::string,int> &)> & onComplete,const std::function<void (void)> & onFinalize,bool wait=false)149 int GenericSyncer::Sync(const std::vector<std::string> &devices, int mode,
150 const std::function<void(const std::map<std::string, int> &)> &onComplete,
151 const std::function<void(void)> &onFinalize, bool wait = false)
152 {
153 SyncParam param;
154 param.devices = devices;
155 param.mode = mode;
156 param.onComplete = onComplete;
157 param.onFinalize = onFinalize;
158 param.wait = wait;
159 return Sync(param);
160 }
161
Sync(const InternalSyncParma & param)162 int GenericSyncer::Sync(const InternalSyncParma ¶m)
163 {
164 SyncParam syncParam;
165 syncParam.devices = param.devices;
166 syncParam.mode = param.mode;
167 syncParam.isQuerySync = param.isQuerySync;
168 syncParam.syncQuery = param.syncQuery;
169 return Sync(syncParam);
170 }
171
Sync(const SyncParam & param)172 int GenericSyncer::Sync(const SyncParam ¶m)
173 {
174 return Sync(param, DBConstant::IGNORE_CONNECTION_ID);
175 }
176
Sync(const SyncParam & param,uint64_t connectionId)177 int GenericSyncer::Sync(const SyncParam ¶m, uint64_t connectionId)
178 {
179 int errCode = SyncPreCheck(param);
180 if (errCode != E_OK) {
181 return errCode;
182 }
183 errCode = AddQueuedManualSyncSize(param.mode, param.wait);
184 if (errCode != E_OK) {
185 return errCode;
186 }
187
188 uint32_t syncId = GenerateSyncId();
189 errCode = PrepareSync(param, syncId, connectionId);
190 if (errCode != E_OK) {
191 LOGE("[Syncer] PrepareSync failed when sync called, err %d", errCode);
192 return errCode;
193 }
194 PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
195 return E_OK;
196 }
197
CancelSync(uint32_t syncId)198 int GenericSyncer::CancelSync(uint32_t syncId)
199 {
200 LOGI("[Syncer] Start cancelSync %" PRIu32 "", syncId);
201 ISyncEngine *engine = nullptr;
202 {
203 std::lock_guard<std::mutex> autoLock(syncerLock_);
204 engine = syncEngine_;
205 RefObject::IncObjRef(engine);
206 }
207 SyncOperation *operation = nullptr;
208 {
209 std::lock_guard<std::mutex> lock(operationMapLock_);
210 auto iter = syncOperationMap_.find(syncId);
211 if (iter == syncOperationMap_.end()) {
212 LOGE("[Syncer] Non-existent syncId %" PRIu32 "", syncId);
213 RefObject::DecObjRef(engine);
214 return -E_NOT_FOUND;
215 } else if (iter->second->CanCancel() == false) {
216 RefObject::DecObjRef(engine);
217 LOGE("[Syncer] Can't cancel syncId %" PRIu32 " %" PRIu32 "", syncId, iter->second->CanCancel());
218 return -E_NOT_SUPPORT;
219 }
220 operation = iter->second;
221 RefObject::IncObjRef(operation);
222 }
223
224 const std::vector<std::string> &devices = operation->GetDevices();
225 for (const auto &deviceId : devices) {
226 engine->ClearAllSyncTaskByDevice(deviceId);
227 }
228 LOGI("[Syncer] End cancelSync %" PRIu32 ", devices = %s", syncId, GetSyncDevicesStr(devices).c_str());
229
230 RefObject::DecObjRef(operation);
231 RefObject::DecObjRef(engine);
232 return E_OK;
233 }
234
PrepareSync(const SyncParam & param,uint32_t syncId,uint64_t connectionId)235 int GenericSyncer::PrepareSync(const SyncParam ¶m, uint32_t syncId, uint64_t connectionId)
236 {
237 auto *operation =
238 new (std::nothrow) SyncOperation(syncId, param);
239 if (operation == nullptr) {
240 SubQueuedSyncSize();
241 return -E_OUT_OF_MEMORY;
242 }
243
244 ISyncEngine *engine = nullptr;
245 {
246 std::lock_guard<std::mutex> autoLock(syncerLock_);
247 PerformanceAnalysis::GetInstance()->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
248 InitSyncOperation(operation, param);
249 LOGI("[Syncer] GenerateSyncId %" PRIu32 ", mode = %d, wait = %d, label = %.3s, devices = %s, isRetry = %d",
250 syncId, param.mode, param.wait, label_.c_str(), GetSyncDevicesStr(param.devices).c_str(), param.isRetry);
251 engine = syncEngine_;
252 RefObject::IncObjRef(engine);
253 }
254 AddSyncOperation(engine, operation);
255 RefObject::DecObjRef(engine);
256 PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
257 if (connectionId != DBConstant::IGNORE_CONNECTION_ID) {
258 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
259 connectionIdMap_[connectionId].push_back(static_cast<int>(syncId));
260 syncIdMap_[static_cast<int>(syncId)] = connectionId;
261 }
262 if (operation->CheckIsAllFinished()) {
263 operation->Finished();
264 RefObject::KillAndDecObjRef(operation);
265 } else {
266 operation->WaitIfNeed();
267 RefObject::DecObjRef(operation);
268 }
269 return E_OK;
270 }
271
RemoveSyncOperation(int syncId)272 int GenericSyncer::RemoveSyncOperation(int syncId)
273 {
274 SyncOperation *operation = nullptr;
275 std::unique_lock<std::mutex> lock(operationMapLock_);
276 auto iter = syncOperationMap_.find(syncId);
277 if (iter != syncOperationMap_.end()) {
278 LOGD("[Syncer] RemoveSyncOperation id:%d.", syncId);
279 operation = iter->second;
280 syncOperationMap_.erase(syncId);
281 lock.unlock();
282 if ((!operation->IsAutoSync()) && (!operation->IsBlockSync()) && (!operation->IsAutoControlCmd())) {
283 SubQueuedSyncSize();
284 }
285 operation->NotifyIfNeed();
286 RefObject::KillAndDecObjRef(operation);
287 operation = nullptr;
288 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
289 if (syncIdMap_.find(syncId) == syncIdMap_.end()) {
290 return E_OK;
291 }
292 uint64_t connectionId = syncIdMap_[syncId];
293 if (connectionIdMap_.find(connectionId) != connectionIdMap_.end()) {
294 connectionIdMap_[connectionId].remove(syncId);
295 }
296 syncIdMap_.erase(syncId);
297 return E_OK;
298 }
299 LOGE("[Syncer] RemoveSyncOperation, id %d not found", syncId);
300 return -E_INVALID_ARGS;
301 }
302
StopSync(uint64_t connectionId)303 int GenericSyncer::StopSync(uint64_t connectionId)
304 {
305 std::list<int> syncIdList;
306 {
307 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
308 if (connectionIdMap_.find(connectionId) == connectionIdMap_.end()) {
309 return E_OK;
310 }
311 syncIdList = connectionIdMap_[connectionId];
312 connectionIdMap_.erase(connectionId);
313 }
314 for (auto syncId : syncIdList) {
315 RemoveSyncOperation(syncId);
316 if (syncEngine_ != nullptr) {
317 syncEngine_->AbortMachineIfNeed(syncId);
318 }
319 }
320 if (syncEngine_ != nullptr) {
321 syncEngine_->NotifyConnectionClosed(connectionId);
322 }
323 return E_OK;
324 }
325
GetTimestamp()326 uint64_t GenericSyncer::GetTimestamp()
327 {
328 std::shared_ptr<TimeHelper> timeHelper = nullptr;
329 ISyncInterface *storage = nullptr;
330 {
331 std::lock_guard<std::mutex> lock(syncerLock_);
332 timeHelper = timeHelper_;
333 if (syncInterface_ != nullptr) {
334 storage = syncInterface_;
335 storage->IncRefCount();
336 }
337 }
338 if (storage == nullptr) {
339 return TimeHelper::GetSysCurrentTime();
340 }
341 if (timeHelper == nullptr) {
342 storage->DecRefCount();
343 return TimeHelper::GetSysCurrentTime();
344 }
345 uint64_t timestamp = timeHelper->GetTime();
346 storage->DecRefCount();
347 return timestamp;
348 }
349
QueryAutoSync(const InternalSyncParma & param)350 void GenericSyncer::QueryAutoSync(const InternalSyncParma ¶m)
351 {
352 if (!initialized_) {
353 LOGW("[Syncer] Syncer has not Init");
354 return;
355 }
356 LOGI("[GenericSyncer] trigger query syncmode=%u,dev=%s", param.mode, GetSyncDevicesStr(param.devices).c_str());
357 ISyncInterface *syncInterface = nullptr;
358 ISyncEngine *engine = nullptr;
359 {
360 std::lock_guard<std::mutex> lock(syncerLock_);
361 if (syncInterface_ == nullptr || syncEngine_ == nullptr) {
362 LOGW("[Syncer] Syncer has not Init");
363 return;
364 }
365 syncInterface = syncInterface_;
366 engine = syncEngine_;
367 syncInterface->IncRefCount();
368 RefObject::IncObjRef(engine);
369 }
370 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, param, engine, syncInterface] {
371 int errCode = Sync(param);
372 if (errCode != E_OK) {
373 LOGE("[GenericSyncer] sync start by QueryAutoSync failed err %d", errCode);
374 }
375 RefObject::DecObjRef(engine);
376 syncInterface->DecRefCount();
377 });
378 if (retCode != E_OK) {
379 LOGE("[GenericSyncer] QueryAutoSync triggler sync retCode:%d", retCode);
380 RefObject::DecObjRef(engine);
381 syncInterface->DecRefCount();
382 }
383 }
384
AddSyncOperation(ISyncEngine * engine,SyncOperation * operation)385 void GenericSyncer::AddSyncOperation(ISyncEngine *engine, SyncOperation *operation)
386 {
387 if (operation == nullptr) {
388 return;
389 }
390
391 LOGD("[Syncer] AddSyncOperation, sync id: %" PRIu32 ".", operation->GetSyncId());
392 engine->AddSyncOperation(operation);
393
394 if (operation->CheckIsAllFinished()) {
395 LOGD("[Syncer] AddSyncOperation, sync id: %" PRIu32 ", but all finished.", operation->GetSyncId());
396 return;
397 }
398
399 {
400 std::lock_guard<std::mutex> lock(syncerLock_);
401 if (closing_ || !initialized_) {
402 LOGE("[Syncer] Syncer has been closed, return");
403 operation->SetUnfinishedDevStatus(SyncOperation::OP_FAILED);
404 return;
405 }
406 }
407 std::lock_guard<std::mutex> lock(operationMapLock_);
408 syncOperationMap_.insert(std::pair<int, SyncOperation *>(operation->GetSyncId(), operation));
409 // To make sure operation alive before WaitIfNeed out
410 RefObject::IncObjRef(operation);
411 }
412
SyncOperationKillCallbackInner(int syncId)413 void GenericSyncer::SyncOperationKillCallbackInner(int syncId)
414 {
415 if (syncEngine_ != nullptr) {
416 LOGI("[Syncer] Operation on kill id = %d", syncId);
417 syncEngine_->RemoveSyncOperation(syncId);
418 }
419 }
420
SyncOperationKillCallback(int syncId)421 void GenericSyncer::SyncOperationKillCallback(int syncId)
422 {
423 SyncOperationKillCallbackInner(syncId);
424 }
425
InitMetaData(ISyncInterface * syncInterface)426 int GenericSyncer::InitMetaData(ISyncInterface *syncInterface)
427 {
428 if (metadata_ != nullptr) {
429 return E_OK;
430 }
431
432 metadata_ = std::make_shared<Metadata>();
433 if (metadata_ == nullptr) {
434 LOGE("[Syncer] metadata make shared failed");
435 return -E_OUT_OF_MEMORY;
436 }
437 int errCode = metadata_->Initialize(syncInterface);
438 if (errCode != E_OK) {
439 LOGE("[Syncer] metadata Initializeate failed! err %d.", errCode);
440 metadata_ = nullptr;
441 }
442 syncInterface_ = syncInterface;
443 return errCode;
444 }
445
InitTimeHelper(ISyncInterface * syncInterface)446 int GenericSyncer::InitTimeHelper(ISyncInterface *syncInterface)
447 {
448 if (timeHelper_ != nullptr) {
449 return E_OK;
450 }
451
452 timeHelper_ = std::make_shared<TimeHelper>();
453 if (timeHelper_ == nullptr) {
454 return -E_OUT_OF_MEMORY;
455 }
456 int errCode = timeHelper_->Initialize(syncInterface, metadata_);
457 if (errCode != E_OK) {
458 LOGE("[Syncer] TimeHelper init failed! err:%d.", errCode);
459 timeHelper_ = nullptr;
460 }
461 return errCode;
462 }
463
InitSyncEngine(ISyncInterface * syncInterface)464 int GenericSyncer::InitSyncEngine(ISyncInterface *syncInterface)
465 {
466 if (syncEngine_ != nullptr && syncEngine_->IsEngineActive()) {
467 LOGI("[Syncer] syncEngine is active");
468 return E_OK;
469 }
470 int errCode = BuildSyncEngine();
471 if (errCode != E_OK) {
472 return errCode;
473 }
474 const std::function<void(std::string)> onlineFunc = [this](const std::string &device) {
475 RemoteDataChanged(device);
476 };
477 const std::function<void(std::string)> offlineFunc = [this](const std::string &device) {
478 RemoteDeviceOffline(device);
479 };
480 const std::function<void(const InternalSyncParma ¶m)> queryAutoSyncFunc =
481 [this](const InternalSyncParma &syncParam) { QueryAutoSync(syncParam); };
482 const ISyncEngine::InitCallbackParam param = { onlineFunc, offlineFunc, queryAutoSyncFunc };
483 errCode = syncEngine_->Initialize(syncInterface, metadata_, param);
484 if (errCode == E_OK) {
485 syncInterface->IncRefCount();
486 label_ = syncEngine_->GetLabel();
487 return E_OK;
488 } else {
489 LOGE("[Syncer] SyncEngine init failed! err:%d.", errCode);
490 RefObject::KillAndDecObjRef(syncEngine_);
491 syncEngine_ = nullptr;
492 return errCode;
493 }
494 }
495
CheckSyncActive(ISyncInterface * syncInterface,bool isNeedActive)496 int GenericSyncer::CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive)
497 {
498 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE,
499 false);
500 if (!isSyncDualTupleMode || isNeedActive) {
501 return E_OK;
502 }
503 LOGI("[Syncer] syncer no need to active");
504 int errCode = BuildSyncEngine();
505 if (errCode != E_OK) {
506 return errCode;
507 }
508 return -E_NO_NEED_ACTIVE;
509 }
510
GenerateSyncId()511 uint32_t GenericSyncer::GenerateSyncId()
512 {
513 std::lock_guard<std::mutex> lock(syncIdLock_);
514 currentSyncId_++;
515 // if overflow, reset to 1
516 if (currentSyncId_ <= 0) {
517 currentSyncId_ = MIN_VALID_SYNC_ID;
518 }
519 return currentSyncId_;
520 }
521
IsValidMode(int mode) const522 bool GenericSyncer::IsValidMode(int mode) const
523 {
524 if ((mode >= SyncModeType::INVALID_MODE) || (mode < SyncModeType::PUSH)) {
525 LOGE("[Syncer] Sync mode is not valid!");
526 return false;
527 }
528 return true;
529 }
530
SyncConditionCheck(const SyncParam & param,const ISyncEngine * engine,ISyncInterface * storage) const531 int GenericSyncer::SyncConditionCheck(const SyncParam ¶m, const ISyncEngine *engine, ISyncInterface *storage) const
532 {
533 (void)param;
534 (void)engine;
535 (void)storage;
536 return E_OK;
537 }
538
IsValidDevices(const std::vector<std::string> & devices) const539 bool GenericSyncer::IsValidDevices(const std::vector<std::string> &devices) const
540 {
541 if (devices.empty()) {
542 LOGE("[Syncer] devices is empty!");
543 return false;
544 }
545 return true;
546 }
547
ClearSyncOperations(bool isClosedOperation)548 void GenericSyncer::ClearSyncOperations(bool isClosedOperation)
549 {
550 LOGD("[Syncer] begin clear sync operations");
551 std::vector<SyncOperation *> syncOperation;
552 {
553 std::lock_guard<std::mutex> lock(operationMapLock_);
554 for (auto &item : syncOperationMap_) {
555 bool isBlockSync = item.second->IsBlockSync();
556 if (isBlockSync || !isClosedOperation) {
557 int status = (!isClosedOperation) ? SyncOperation::OP_USER_CHANGED : SyncOperation::OP_FAILED;
558 item.second->SetUnfinishedDevStatus(status);
559 RefObject::IncObjRef(item.second);
560 syncOperation.push_back(item.second);
561 }
562 }
563 }
564
565 if (!isClosedOperation) { // means user changed
566 syncEngine_->NotifyUserChange();
567 }
568
569 for (auto &operation : syncOperation) {
570 // block sync operation or userChange will trigger remove sync operation
571 // caller won't blocked for block sync
572 // caller won't blocked for userChange operation no mater it is block or non-block sync
573 TriggerSyncFinished(operation);
574 RefObject::DecObjRef(operation);
575 }
576 ClearInnerResource(isClosedOperation);
577 LOGD("[Syncer] clear sync operations done");
578 }
579
ClearInnerResource(bool isClosedOperation)580 void GenericSyncer::ClearInnerResource(bool isClosedOperation)
581 {
582 {
583 std::lock_guard<std::mutex> lock(operationMapLock_);
584 for (auto &iter : syncOperationMap_) {
585 if (iter.second->IsBlockSync()) {
586 iter.second->NotifyIfNeed();
587 }
588 RefObject::KillAndDecObjRef(iter.second);
589 iter.second = nullptr;
590 }
591 syncOperationMap_.clear();
592 }
593 {
594 std::lock_guard<std::mutex> lock(syncIdLock_);
595 if (isClosedOperation) {
596 connectionIdMap_.clear();
597 } else { // only need to clear syncid when user change
598 for (auto &item : connectionIdMap_) {
599 item.second.clear();
600 }
601 }
602 syncIdMap_.clear();
603 }
604 }
605
TriggerSyncFinished(SyncOperation * operation)606 void GenericSyncer::TriggerSyncFinished(SyncOperation *operation)
607 {
608 if (operation != nullptr && operation->CheckIsAllFinished()) { // LCOV_EXCL_BR_LINE
609 operation->Finished();
610 }
611 }
612
OnSyncFinished(int syncId)613 void GenericSyncer::OnSyncFinished(int syncId)
614 {
615 (void)(RemoveSyncOperation(syncId));
616 }
617
SyncModuleInit()618 int GenericSyncer::SyncModuleInit()
619 {
620 static bool isInit = false;
621 std::lock_guard<std::mutex> lock(moduleInitLock_);
622 if (!isInit) {
623 int errCode = SyncResourceInit();
624 if (errCode != E_OK) {
625 return errCode;
626 }
627 isInit = true;
628 return E_OK;
629 }
630 return E_OK;
631 }
632
SyncResourceInit()633 int GenericSyncer::SyncResourceInit()
634 {
635 int errCode = TimeSync::RegisterTransformFunc();
636 if (errCode != E_OK) {
637 LOGE("Register timesync message transform func ERR!");
638 return errCode;
639 }
640 errCode = SingleVerSerializeManager::RegisterTransformFunc();
641 if (errCode != E_OK) {
642 LOGE("Register SingleVerDataSync message transform func ERR!");
643 return errCode;
644 }
645 #ifndef OMIT_MULTI_VER
646 errCode = CommitHistorySync::RegisterTransformFunc();
647 if (errCode != E_OK) {
648 LOGE("Register CommitHistorySync message transform func ERR!");
649 return errCode;
650 }
651 errCode = MultiVerDataSync::RegisterTransformFunc();
652 if (errCode != E_OK) {
653 LOGE("Register MultiVerDataSync message transform func ERR!");
654 return errCode;
655 }
656 errCode = ValueSliceSync::RegisterTransformFunc();
657 if (errCode != E_OK) {
658 LOGE("Register ValueSliceSync message transform func ERR!");
659 return errCode;
660 }
661 #endif
662 errCode = DeviceManager::RegisterTransformFunc();
663 if (errCode != E_OK) {
664 LOGE("Register DeviceManager message transform func ERR!");
665 return errCode;
666 }
667 errCode = AbilitySync::RegisterTransformFunc();
668 if (errCode != E_OK) {
669 LOGE("Register AbilitySync message transform func ERR!");
670 return errCode;
671 }
672 return E_OK;
673 }
674
GetQueuedSyncSize(int * queuedSyncSize) const675 int GenericSyncer::GetQueuedSyncSize(int *queuedSyncSize) const
676 {
677 if (queuedSyncSize == nullptr) {
678 return -E_INVALID_ARGS;
679 }
680 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
681 *queuedSyncSize = queuedManualSyncSize_;
682 LOGI("[GenericSyncer] GetQueuedSyncSize:%d", queuedManualSyncSize_);
683 return E_OK;
684 }
685
SetQueuedSyncLimit(const int * queuedSyncLimit)686 int GenericSyncer::SetQueuedSyncLimit(const int *queuedSyncLimit)
687 {
688 if (queuedSyncLimit == nullptr) {
689 return -E_INVALID_ARGS;
690 }
691 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
692 queuedManualSyncLimit_ = *queuedSyncLimit;
693 LOGI("[GenericSyncer] SetQueuedSyncLimit:%d", queuedManualSyncLimit_);
694 return E_OK;
695 }
696
GetQueuedSyncLimit(int * queuedSyncLimit) const697 int GenericSyncer::GetQueuedSyncLimit(int *queuedSyncLimit) const
698 {
699 if (queuedSyncLimit == nullptr) {
700 return -E_INVALID_ARGS;
701 }
702 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
703 *queuedSyncLimit = queuedManualSyncLimit_;
704 LOGI("[GenericSyncer] GetQueuedSyncLimit:%d", queuedManualSyncLimit_);
705 return E_OK;
706 }
707
IsManualSync(int inMode) const708 bool GenericSyncer::IsManualSync(int inMode) const
709 {
710 int mode = SyncOperation::TransferSyncMode(inMode);
711 if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH) || (mode == SyncModeType::PUSH_AND_PULL) ||
712 (mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
713 return true;
714 }
715 return false;
716 }
717
AddQueuedManualSyncSize(int mode,bool wait)718 int GenericSyncer::AddQueuedManualSyncSize(int mode, bool wait)
719 {
720 if (IsManualSync(mode) && (!wait)) {
721 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
722 if (!manualSyncEnable_) {
723 LOGI("[GenericSyncer] manualSyncEnable is Disable");
724 return -E_BUSY;
725 }
726 queuedManualSyncSize_++;
727 }
728 return E_OK;
729 }
730
IsQueuedManualSyncFull(int mode,bool wait) const731 bool GenericSyncer::IsQueuedManualSyncFull(int mode, bool wait) const
732 {
733 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
734 if (IsManualSync(mode) && (!manualSyncEnable_)) { // LCOV_EXCL_BR_LINE
735 LOGI("[GenericSyncer] manualSyncEnable_:false");
736 return true;
737 }
738 if (IsManualSync(mode) && (!wait)) { // LCOV_EXCL_BR_LINE
739 if (queuedManualSyncSize_ < queuedManualSyncLimit_) {
740 return false;
741 } else {
742 LOGD("[GenericSyncer] queuedManualSyncSize_:%d < queuedManualSyncLimit_:%d", queuedManualSyncSize_,
743 queuedManualSyncLimit_);
744 return true;
745 }
746 } else {
747 return false;
748 }
749 }
750
SubQueuedSyncSize(void)751 void GenericSyncer::SubQueuedSyncSize(void)
752 {
753 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
754 queuedManualSyncSize_--;
755 if (queuedManualSyncSize_ < 0) {
756 LOGE("[GenericSyncer] queuedManualSyncSize_ < 0!");
757 queuedManualSyncSize_ = 0;
758 }
759 }
760
DisableManualSync(void)761 int GenericSyncer::DisableManualSync(void)
762 {
763 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
764 if (queuedManualSyncSize_ > 0) {
765 LOGD("[GenericSyncer] DisableManualSync fail, queuedManualSyncSize_:%d", queuedManualSyncSize_);
766 return -E_BUSY;
767 }
768 manualSyncEnable_ = false;
769 LOGD("[GenericSyncer] DisableManualSync ok");
770 return E_OK;
771 }
772
EnableManualSync(void)773 int GenericSyncer::EnableManualSync(void)
774 {
775 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
776 manualSyncEnable_ = true;
777 LOGD("[GenericSyncer] EnableManualSync ok");
778 return E_OK;
779 }
780
GetLocalIdentity(std::string & outTarget) const781 int GenericSyncer::GetLocalIdentity(std::string &outTarget) const
782 {
783 std::string deviceId;
784 int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(deviceId);
785 if (errCode != E_OK) {
786 LOGE("[GenericSyncer] GetLocalIdentity fail errCode:%d", errCode);
787 return errCode;
788 }
789 outTarget = DBCommon::TransferHashString(deviceId);
790 return E_OK;
791 }
792
GetOnlineDevices(std::vector<std::string> & devices) const793 void GenericSyncer::GetOnlineDevices(std::vector<std::string> &devices) const
794 {
795 std::string identifier;
796 {
797 std::lock_guard<std::mutex> lock(syncerLock_);
798 // Get devices from AutoLaunch first.
799 if (syncInterface_ == nullptr) {
800 LOGI("[Syncer] GetOnlineDevices syncInterface_ is nullptr");
801 return;
802 }
803 bool isSyncDualTupleMode = syncInterface_->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
804 false);
805 if (isSyncDualTupleMode) {
806 identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::DUAL_TUPLE_IDENTIFIER_DATA,
807 "");
808 } else {
809 identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
810 }
811 }
812 RuntimeContext::GetInstance()->GetAutoLaunchSyncDevices(identifier, devices);
813 if (!devices.empty()) {
814 return;
815 }
816 std::lock_guard<std::mutex> lock(syncerLock_);
817 if (closing_) {
818 LOGW("[Syncer] Syncer is closing, return!");
819 return;
820 }
821 if (syncEngine_ != nullptr) {
822 syncEngine_->GetOnlineDevices(devices);
823 }
824 }
825
SetSyncRetry(bool isRetry)826 int GenericSyncer::SetSyncRetry(bool isRetry)
827 {
828 if (syncEngine_ == nullptr) {
829 return -E_NOT_INIT;
830 }
831 syncEngine_->SetSyncRetry(isRetry);
832 return E_OK;
833 }
834
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)835 int GenericSyncer::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
836 {
837 std::lock_guard<std::mutex> lock(syncerLock_);
838 if (syncEngine_ == nullptr) {
839 return -E_NOT_INIT;
840 }
841 int errCode = syncEngine_->SetEqualIdentifier(identifier, targets);
842 if (errCode == E_OK) {
843 syncEngine_->SetEqualIdentifierMap(identifier, targets);
844 }
845 return errCode;
846 }
847
GetSyncDevicesStr(const std::vector<std::string> & devices) const848 std::string GenericSyncer::GetSyncDevicesStr(const std::vector<std::string> &devices) const
849 {
850 std::string syncDevices;
851 for (const auto &dev:devices) {
852 syncDevices += DBCommon::StringMasking(dev);
853 syncDevices += ",";
854 }
855 if (syncDevices.empty()) {
856 return "";
857 }
858 return syncDevices.substr(0, syncDevices.size() - 1);
859 }
860
StatusCheck() const861 int GenericSyncer::StatusCheck() const
862 {
863 if (!initialized_) {
864 LOGE("[Syncer] Syncer is not initialized, return!");
865 return -E_BUSY;
866 }
867 if (closing_) {
868 LOGW("[Syncer] Syncer is closing, return!");
869 return -E_BUSY;
870 }
871 return E_OK;
872 }
873
SyncPreCheck(const SyncParam & param) const874 int GenericSyncer::SyncPreCheck(const SyncParam ¶m) const
875 {
876 ISyncEngine *engine = nullptr;
877 ISyncInterface *storage = nullptr;
878 int errCode = E_OK;
879 {
880 std::lock_guard<std::mutex> lock(syncerLock_);
881 errCode = StatusCheck();
882 if (errCode != E_OK) {
883 return errCode;
884 }
885 if (!IsValidDevices(param.devices) || !IsValidMode(param.mode)) { // LCOV_EXCL_BR_LINE
886 return -E_INVALID_ARGS;
887 }
888 if (IsQueuedManualSyncFull(param.mode, param.wait)) { // LCOV_EXCL_BR_LINE
889 LOGE("[Syncer] -E_BUSY");
890 return -E_BUSY;
891 }
892 storage = syncInterface_;
893 engine = syncEngine_;
894 if (storage == nullptr || engine == nullptr) {
895 return -E_BUSY;
896 }
897 storage->IncRefCount();
898 RefObject::IncObjRef(engine);
899 }
900 errCode = SyncConditionCheck(param, engine, storage);
901 storage->DecRefCount();
902 RefObject::DecObjRef(engine);
903 return errCode;
904 }
905
InitSyncOperation(SyncOperation * operation,const SyncParam & param)906 void GenericSyncer::InitSyncOperation(SyncOperation *operation, const SyncParam ¶m)
907 {
908 if (syncInterface_ == nullptr) {
909 LOGE("[GenericSyncer] [InitSyncOperation] syncInterface_ is nullptr.");
910 return;
911 }
912 operation->SetIdentifier(syncInterface_->GetIdentifier());
913 operation->SetSyncProcessCallFun(param.onSyncProcess);
914 operation->Initialize();
915 operation->OnKill([this, id = operation->GetSyncId()] { SyncOperationKillCallback(id); });
916 std::function<void(int)> onFinished = [this](int syncId) { OnSyncFinished(syncId); };
917 operation->SetOnSyncFinished(onFinished);
918 operation->SetOnSyncFinalize(param.onFinalize);
919 if (param.isQuerySync) {
920 operation->SetQuery(param.syncQuery);
921 }
922 }
923
BuildSyncEngine()924 int GenericSyncer::BuildSyncEngine()
925 {
926 if (syncEngine_ != nullptr) {
927 return E_OK;
928 }
929 syncEngine_ = CreateSyncEngine();
930 if (syncEngine_ == nullptr) {
931 return -E_OUT_OF_MEMORY;
932 }
933 syncEngine_->OnLastRef([this]() {
934 LOGD("[Syncer] SyncEngine finalized");
935 {
936 std::lock_guard<std::mutex> cvLock(engineMutex_);
937 engineFinalize_ = true;
938 }
939 engineFinalizeCv_.notify_all();
940 });
941 return E_OK;
942 }
943
Dump(int fd)944 void GenericSyncer::Dump(int fd)
945 {
946 if (syncEngine_ == nullptr) {
947 return;
948 }
949 RefObject::IncObjRef(syncEngine_);
950 syncEngine_->Dump(fd);
951 RefObject::DecObjRef(syncEngine_);
952 }
953
DumpSyncerBasicInfo()954 SyncerBasicInfo GenericSyncer::DumpSyncerBasicInfo()
955 {
956 SyncerBasicInfo baseInfo;
957 if (syncEngine_ == nullptr) {
958 return baseInfo;
959 }
960 RefObject::IncObjRef(syncEngine_);
961 baseInfo.isSyncActive = syncEngine_->IsEngineActive();
962 RefObject::DecObjRef(syncEngine_);
963 return baseInfo;
964 }
965
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)966 int GenericSyncer::RemoteQuery(const std::string &device, const RemoteCondition &condition,
967 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
968 {
969 ISyncEngine *syncEngine = nullptr;
970 {
971 std::lock_guard<std::mutex> lock(syncerLock_);
972 int errCode = StatusCheck();
973 if (errCode != E_OK) {
974 return errCode;
975 }
976 syncEngine = syncEngine_;
977 RefObject::IncObjRef(syncEngine);
978 }
979 if (syncEngine == nullptr) {
980 return -E_NOT_INIT;
981 }
982 int errCode = syncEngine->RemoteQuery(device, condition, timeout, connectionId, result);
983 RefObject::DecObjRef(syncEngine);
984 return errCode;
985 }
986
InitTimeChangedListener()987 int GenericSyncer::InitTimeChangedListener()
988 {
989 int errCode = E_OK;
990 if (timeChangedListener_ != nullptr) {
991 return errCode;
992 }
993 timeChangedListener_ = RuntimeContext::GetInstance()->RegisterTimeChangedLister(
994 [this](void *changedOffset) {
995 RecordTimeChangeOffset(changedOffset);
996 },
997 [this]() {
998 {
999 std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
1000 timeChangeListenerFinalize_ = true;
1001 }
1002 timeChangeCv_.notify_all();
1003 }, errCode);
1004 if (timeChangedListener_ == nullptr) {
1005 LOGE("[GenericSyncer] Init RegisterTimeChangedLister failed");
1006 return errCode;
1007 }
1008 {
1009 std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
1010 timeChangeListenerFinalize_ = false;
1011 }
1012 return E_OK;
1013 }
1014
ReleaseInnerResource()1015 void GenericSyncer::ReleaseInnerResource()
1016 {
1017 NotificationChain::Listener *timeChangedListener = nullptr;
1018 {
1019 std::lock_guard<std::mutex> lock(syncerLock_);
1020 if (timeChangedListener_ != nullptr) {
1021 timeChangedListener = timeChangedListener_;
1022 timeChangedListener_ = nullptr;
1023 }
1024 timeHelper_ = nullptr;
1025 metadata_ = nullptr;
1026 }
1027 if (timeChangedListener != nullptr) {
1028 timeChangedListener->Drop(true);
1029 RuntimeContext::GetInstance()->StopTimeTickMonitorIfNeed();
1030 }
1031 std::unique_lock<std::mutex> uniqueLock(timeChangeListenerMutex_);
1032 LOGD("[GenericSyncer] Begin wait time change listener finalize");
1033 timeChangeCv_.wait(uniqueLock, [this]() {
1034 return timeChangeListenerFinalize_;
1035 });
1036 LOGD("[GenericSyncer] End wait time change listener finalize");
1037 }
1038
RecordTimeChangeOffset(void * changedOffset)1039 void GenericSyncer::RecordTimeChangeOffset(void *changedOffset)
1040 {
1041 std::shared_ptr<Metadata> metadata = nullptr;
1042 ISyncInterface *storage = nullptr;
1043 {
1044 std::lock_guard<std::mutex> lock(syncerLock_);
1045 if (changedOffset == nullptr || metadata_ == nullptr || syncInterface_ == nullptr) {
1046 return;
1047 }
1048 storage = syncInterface_;
1049 metadata = metadata_;
1050 storage->IncRefCount();
1051 }
1052 TimeOffset changedTimeOffset = *(reinterpret_cast<TimeOffset *>(changedOffset)) *
1053 static_cast<TimeOffset>(TimeHelper::TO_100_NS);
1054 TimeOffset orgOffset = metadata->GetLocalTimeOffset() - changedTimeOffset;
1055 TimeOffset currentSysTime = static_cast<TimeOffset>(TimeHelper::GetSysCurrentTime());
1056 Timestamp maxItemTime = 0;
1057 storage->GetMaxTimestamp(maxItemTime);
1058 if ((orgOffset + currentSysTime) > TimeHelper::BUFFER_VALID_TIME) {
1059 orgOffset = TimeHelper::BUFFER_VALID_TIME -
1060 currentSysTime + static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS);
1061 }
1062 if ((currentSysTime + orgOffset) <= static_cast<TimeOffset>(maxItemTime)) {
1063 orgOffset = static_cast<TimeOffset>(maxItemTime) - currentSysTime +
1064 static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS); // 1ms
1065 }
1066 metadata->SaveLocalTimeOffset(orgOffset);
1067 ResetTimeSyncMarkByTimeChange(metadata, *storage);
1068 storage->DecRefCount();
1069 }
1070
CloseInner(bool isClosedOperation)1071 int GenericSyncer::CloseInner(bool isClosedOperation)
1072 {
1073 {
1074 std::lock_guard<std::mutex> lock(syncerLock_);
1075 if (!initialized_) {
1076 LOGW("[Syncer] CloseInner[%.3s] don't close", label_.c_str());
1077 return -E_NOT_INIT;
1078 }
1079 initialized_ = false;
1080 if (closing_) {
1081 LOGE("[Syncer] Syncer is closing, return!");
1082 return -E_BUSY;
1083 }
1084 closing_ = true;
1085 }
1086 ClearSyncOperations(isClosedOperation);
1087 if (syncEngine_ != nullptr) {
1088 syncEngine_->Close();
1089 LOGD("[Syncer] Close SyncEngine!");
1090 std::lock_guard<std::mutex> lock(syncerLock_);
1091 closing_ = false;
1092 }
1093 return E_OK;
1094 }
1095
GetSyncDataSize(const std::string & device,size_t & size) const1096 int GenericSyncer::GetSyncDataSize(const std::string &device, size_t &size) const
1097 {
1098 uint64_t localWaterMark = 0;
1099 std::shared_ptr<Metadata> metadata = nullptr;
1100 {
1101 std::lock_guard<std::mutex> lock(syncerLock_);
1102 if (metadata_ == nullptr || syncInterface_ == nullptr) {
1103 return -E_INTERNAL_ERROR;
1104 }
1105 if (closing_) {
1106 LOGE("[Syncer] Syncer is closing, return!");
1107 return -E_BUSY;
1108 }
1109 int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->TryHandle();
1110 if (errCode != E_OK) {
1111 LOGE("[Syncer] syncer is restarting, return!");
1112 return errCode;
1113 }
1114 syncInterface_->IncRefCount();
1115 metadata = metadata_;
1116 }
1117 metadata->GetLocalWaterMark(device, "", localWaterMark);
1118 uint32_t expectedMtuSize = DEFAULT_MTU_SIZE;
1119 DataSizeSpecInfo syncDataSizeInfo = {expectedMtuSize, static_cast<size_t>(MAX_TIMESTAMP)};
1120 std::vector<SendDataItem> outData;
1121 ContinueToken token = nullptr;
1122 int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->GetSyncData(localWaterMark, MAX_TIMESTAMP,
1123 outData, token, syncDataSizeInfo);
1124 if (token != nullptr) {
1125 static_cast<SyncGenericInterface *>(syncInterface_)->ReleaseContinueToken(token);
1126 token = nullptr;
1127 }
1128 if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
1129 LOGE("calculate sync data size failed %d", errCode);
1130 syncInterface_->DecRefCount();
1131 return errCode;
1132 }
1133 uint32_t totalLen = 0;
1134 if (errCode == -E_UNFINISHED) {
1135 totalLen = expectedMtuSize;
1136 } else {
1137 totalLen = GenericSingleVerKvEntry::CalculateLens(outData, SOFTWARE_VERSION_CURRENT);
1138 }
1139 for (auto &entry : outData) {
1140 delete entry;
1141 entry = nullptr;
1142 }
1143 syncInterface_->DecRefCount();
1144 // if larger than 1M, return 1M
1145 size = (totalLen >= expectedMtuSize) ? expectedMtuSize : totalLen;
1146 return E_OK;
1147 }
1148
IsNeedActive(ISyncInterface * syncInterface)1149 bool GenericSyncer::IsNeedActive(ISyncInterface *syncInterface)
1150 {
1151 bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
1152 if (localOnly) {
1153 LOGD("[Syncer] Local only db, don't need active syncer");
1154 return false;
1155 }
1156 return true;
1157 }
1158
GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const1159 int GenericSyncer::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const
1160 {
1161 (void)clientId;
1162 (void)hashDevId;
1163 return -E_NOT_SUPPORT;
1164 }
1165
InitStorageResource(ISyncInterface * syncInterface)1166 int GenericSyncer::InitStorageResource(ISyncInterface *syncInterface)
1167 {
1168 // As metadata_ will be used in EraseDeviceWaterMark, it should not be clear even if engine init failed.
1169 // It will be clear in destructor.
1170 int errCode = InitMetaData(syncInterface);
1171 if (errCode != E_OK) {
1172 return errCode;
1173 }
1174
1175 // As timeHelper_ will be used in GetTimestamp, it should not be clear even if engine init failed.
1176 // It will be clear in destructor.
1177 errCode = InitTimeHelper(syncInterface);
1178 if (errCode != E_OK) {
1179 return errCode;
1180 }
1181
1182 if (!IsNeedActive(syncInterface)) {
1183 return -E_NO_NEED_ACTIVE;
1184 }
1185 return errCode;
1186 }
1187
GetWatermarkInfo(const std::string & device,WatermarkInfo & info)1188 int GenericSyncer::GetWatermarkInfo(const std::string &device, WatermarkInfo &info)
1189 {
1190 std::shared_ptr<Metadata> metadata = nullptr;
1191 {
1192 std::lock_guard<std::mutex> autoLock(syncerLock_);
1193 metadata = metadata_;
1194 }
1195 if (metadata == nullptr) {
1196 LOGE("[Syncer] Metadata is not init");
1197 return -E_NOT_INIT;
1198 }
1199 std::string dev;
1200 bool devNeedHash = true;
1201 int errCode = metadata->GetHashDeviceId(device, dev);
1202 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
1203 LOGE("[Syncer] Get watermark info failed %d", errCode);
1204 return errCode;
1205 } else if (errCode == E_OK) {
1206 devNeedHash = false;
1207 } else {
1208 dev = device;
1209 }
1210 return metadata->GetWaterMarkInfoFromDB(dev, DBConstant::DEFAULT_USER, devNeedHash, info);
1211 }
1212
UpgradeSchemaVerInMeta()1213 int GenericSyncer::UpgradeSchemaVerInMeta()
1214 {
1215 std::shared_ptr<Metadata> metadata = nullptr;
1216 {
1217 std::lock_guard<std::mutex> autoLock(syncerLock_);
1218 metadata = metadata_;
1219 }
1220 if (metadata == nullptr) {
1221 LOGE("[Syncer] metadata is not init");
1222 return -E_NOT_INIT;
1223 }
1224 int errCode = metadata->ClearAllAbilitySyncFinishMark();
1225 if (errCode != E_OK) {
1226 LOGE("[Syncer] clear ability mark failed:%d", errCode);
1227 return errCode;
1228 }
1229 auto [err, localSchemaVer] = metadata->GetLocalSchemaVersion();
1230 if (err != E_OK) {
1231 LOGE("[Syncer] get local schema version failed:%d", err);
1232 return err;
1233 }
1234 errCode = metadata->SetLocalSchemaVersion(localSchemaVer + 1);
1235 if (errCode != E_OK) {
1236 LOGE("[Syncer] increase local schema version failed:%d", errCode);
1237 }
1238 return errCode;
1239 }
1240
ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> & metadata,ISyncInterface & storage)1241 void GenericSyncer::ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> &metadata, ISyncInterface &storage)
1242 {
1243 if (syncEngine_ != nullptr) {
1244 syncEngine_->TimeChange();
1245 }
1246 int errCode = metadata->ClearAllTimeSyncFinishMark();
1247 if (errCode != E_OK) {
1248 LOGW("[GenericSyncer] %.3s clear time sync finish mark failed %d", label_.c_str(), errCode);
1249 } else {
1250 LOGD("[GenericSyncer] ClearAllTimeSyncFinishMark finish");
1251 RuntimeContext::GetInstance()->ResetDBTimeChangeStatus(storage.GetIdentifier());
1252 }
1253 }
1254
ResetSyncStatus()1255 void GenericSyncer::ResetSyncStatus()
1256 {
1257 std::shared_ptr<Metadata> metadata = nullptr;
1258 {
1259 std::lock_guard<std::mutex> lock(syncerLock_);
1260 if (metadata_ == nullptr) {
1261 return;
1262 }
1263 metadata = metadata_;
1264 }
1265 metadata->ClearAllAbilitySyncFinishMark();
1266 }
1267
GetLocalTimeOffset()1268 int64_t GenericSyncer::GetLocalTimeOffset()
1269 {
1270 std::shared_ptr<Metadata> metadata = nullptr;
1271 {
1272 std::lock_guard<std::mutex> lock(syncerLock_);
1273 if (metadata_ == nullptr) {
1274 return 0;
1275 }
1276 metadata = metadata_;
1277 }
1278 return metadata->GetLocalTimeOffset();
1279 }
1280
GetTaskCount()1281 int32_t GenericSyncer::GetTaskCount()
1282 {
1283 int32_t count = 0;
1284 {
1285 std::lock_guard<std::mutex> autoLock(operationMapLock_);
1286 count += static_cast<int32_t>(syncOperationMap_.size());
1287 }
1288 ISyncEngine *syncEngine = nullptr;
1289 {
1290 std::lock_guard<std::mutex> lock(syncerLock_);
1291 if (syncEngine_ == nullptr) {
1292 return count;
1293 }
1294 syncEngine = syncEngine_;
1295 RefObject::IncObjRef(syncEngine);
1296 }
1297 count += syncEngine->GetResponseTaskCount();
1298 RefObject::DecObjRef(syncEngine);
1299 return count;
1300 }
1301
ExchangeClosePending(bool expected)1302 bool GenericSyncer::ExchangeClosePending(bool expected)
1303 {
1304 ISyncEngine *syncEngine = nullptr;
1305 {
1306 std::lock_guard<std::mutex> lock(syncerLock_);
1307 if (syncEngine_ == nullptr) {
1308 return false;
1309 }
1310 syncEngine = syncEngine_;
1311 RefObject::IncObjRef(syncEngine);
1312 }
1313 bool res = syncEngine->ExchangeClosePending(expected);
1314 RefObject::DecObjRef(syncEngine);
1315 return res;
1316 }
1317 } // namespace DistributedDB
1318