1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "generic_syncer.h"
17
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "ref_object.h"
22 #include "sqlite_single_ver_natural_store.h"
23 #include "time_sync.h"
24 #include "single_ver_data_sync.h"
25 #ifndef OMIT_MULTI_VER
26 #include "commit_history_sync.h"
27 #include "multi_ver_data_sync.h"
28 #include "value_slice_sync.h"
29 #endif
30 #include "device_manager.h"
31 #include "db_constant.h"
32 #include "ability_sync.h"
33 #include "generic_single_ver_kv_entry.h"
34 #include "single_ver_serialize_manager.h"
35
36 namespace DistributedDB {
37 namespace {
38 constexpr uint32_t DEFAULT_MTU_SIZE = 1024u * 1024u; // 1M
39 }
40 const int GenericSyncer::MIN_VALID_SYNC_ID = 1;
41 std::mutex GenericSyncer::moduleInitLock_;
42 int GenericSyncer::currentSyncId_ = 0;
43 std::mutex GenericSyncer::syncIdLock_;
GenericSyncer()44 GenericSyncer::GenericSyncer()
45 : syncEngine_(nullptr),
46 syncInterface_(nullptr),
47 timeHelper_(nullptr),
48 metadata_(nullptr),
49 initialized_(false),
50 queuedManualSyncSize_(0),
51 queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
52 manualSyncEnable_(true),
53 closing_(false),
54 engineFinalize_(false),
55 timeChangeListenerFinalize_(true),
56 timeChangedListener_(nullptr)
57 {
58 }
59
~GenericSyncer()60 GenericSyncer::~GenericSyncer()
61 {
62 LOGD("[GenericSyncer] ~GenericSyncer!");
63 if (syncEngine_ != nullptr) {
64 syncEngine_->OnKill([this]() { this->syncEngine_->Close(); });
65 RefObject::KillAndDecObjRef(syncEngine_);
66 // waiting all thread exist
67 std::unique_lock<std::mutex> cvLock(engineMutex_);
68 bool engineFinalize = engineFinalizeCv_.wait_for(cvLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
69 [this]() { return engineFinalize_; });
70 if (!engineFinalize) {
71 LOGW("syncer finalize before engine finalize!");
72 }
73 syncEngine_ = nullptr;
74 }
75 ReleaseInnerResource();
76 std::lock_guard<std::mutex> lock(syncerLock_);
77 syncInterface_ = nullptr;
78 }
79
Initialize(ISyncInterface * syncInterface,bool isNeedActive)80 int GenericSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
81 {
82 if (syncInterface == nullptr) {
83 LOGE("[Syncer] Init failed, the syncInterface is null!");
84 return -E_INVALID_ARGS;
85 }
86
87 {
88 std::lock_guard<std::mutex> lock(syncerLock_);
89 if (initialized_) {
90 return E_OK;
91 }
92 if (closing_) {
93 LOGE("[Syncer] Syncer is closing, return!");
94 return -E_BUSY;
95 }
96 std::vector<uint8_t> label = syncInterface->GetIdentifier();
97 label.resize(3); // only show 3 Bytes enough
98 label_ = DBCommon::VectorToHexString(label);
99
100 int errCode = InitStorageResource(syncInterface);
101 if (errCode != E_OK) {
102 return errCode;
103 }
104 // As timeChangedListener_ will record time change, it should not be clear even if engine init failed.
105 // It will be clear in destructor.
106 int errCodeTimeChangedListener = InitTimeChangedListener();
107 if (errCodeTimeChangedListener != E_OK) {
108 return -E_INTERNAL_ERROR;
109 }
110 errCode = CheckSyncActive(syncInterface, isNeedActive);
111 if (errCode != E_OK) {
112 return errCode;
113 }
114
115 if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
116 LOGW("[Syncer] Communicator component not ready!");
117 return -E_NOT_INIT;
118 }
119
120 errCode = SyncModuleInit();
121 if (errCode != E_OK) {
122 LOGE("[Syncer] Sync ModuleInit ERR!");
123 return -E_INTERNAL_ERROR;
124 }
125
126 errCode = InitSyncEngine(syncInterface);
127 if (errCode != E_OK) {
128 return errCode;
129 }
130 syncEngine_->SetEqualIdentifier();
131 initialized_ = true;
132 }
133
134 // StartCommunicator may start an auto sync, this function can not in syncerLock_
135 syncEngine_->StartCommunicator();
136 return E_OK;
137 }
138
Close(bool isClosedOperation)139 int GenericSyncer::Close(bool isClosedOperation)
140 {
141 int errCode = CloseInner(isClosedOperation);
142 if (errCode != -E_BUSY && isClosedOperation) {
143 ReleaseInnerResource();
144 }
145 return errCode;
146 }
147
Sync(const std::vector<std::string> & devices,int mode,const std::function<void (const std::map<std::string,int> &)> & onComplete,const std::function<void (void)> & onFinalize,bool wait=false)148 int GenericSyncer::Sync(const std::vector<std::string> &devices, int mode,
149 const std::function<void(const std::map<std::string, int> &)> &onComplete,
150 const std::function<void(void)> &onFinalize, bool wait = false)
151 {
152 SyncParma param;
153 param.devices = devices;
154 param.mode = mode;
155 param.onComplete = onComplete;
156 param.onFinalize = onFinalize;
157 param.wait = wait;
158 return Sync(param);
159 }
160
Sync(const InternalSyncParma & param)161 int GenericSyncer::Sync(const InternalSyncParma ¶m)
162 {
163 SyncParma syncParam;
164 syncParam.devices = param.devices;
165 syncParam.mode = param.mode;
166 syncParam.isQuerySync = param.isQuerySync;
167 syncParam.syncQuery = param.syncQuery;
168 return Sync(syncParam);
169 }
170
Sync(const SyncParma & param)171 int GenericSyncer::Sync(const SyncParma ¶m)
172 {
173 return Sync(param, DBConstant::IGNORE_CONNECTION_ID);
174 }
175
Sync(const SyncParma & param,uint64_t connectionId)176 int GenericSyncer::Sync(const SyncParma ¶m, uint64_t connectionId)
177 {
178 int errCode = SyncPreCheck(param);
179 if (errCode != E_OK) {
180 return errCode;
181 }
182 errCode = AddQueuedManualSyncSize(param.mode, param.wait);
183 if (errCode != E_OK) {
184 return errCode;
185 }
186
187 uint32_t syncId = GenerateSyncId();
188 errCode = PrepareSync(param, syncId, connectionId);
189 if (errCode != E_OK) {
190 LOGE("[Syncer] PrepareSync failed when sync called, err %d", errCode);
191 return errCode;
192 }
193 PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
194 return E_OK;
195 }
196
PrepareSync(const SyncParma & param,uint32_t syncId,uint64_t connectionId)197 int GenericSyncer::PrepareSync(const SyncParma ¶m, uint32_t syncId, uint64_t connectionId)
198 {
199 auto *operation =
200 new (std::nothrow) SyncOperation(syncId, param.devices, param.mode, param.onComplete, param.wait);
201 if (operation == nullptr) {
202 SubQueuedSyncSize();
203 return -E_OUT_OF_MEMORY;
204 }
205 {
206 std::lock_guard<std::mutex> autoLock(syncerLock_);
207 PerformanceAnalysis::GetInstance()->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
208 InitSyncOperation(operation, param);
209 LOGI("[Syncer] GenerateSyncId %" PRIu32 ", mode = %d, wait = %d, label = %s, devices = %s", syncId, param.mode,
210 param.wait, label_.c_str(), GetSyncDevicesStr(param.devices).c_str());
211 AddSyncOperation(operation);
212 PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
213 }
214 if (connectionId != DBConstant::IGNORE_CONNECTION_ID) {
215 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
216 connectionIdMap_[connectionId].push_back(static_cast<int>(syncId));
217 syncIdMap_[static_cast<int>(syncId)] = connectionId;
218 }
219 if (operation->CheckIsAllFinished()) {
220 operation->Finished();
221 RefObject::KillAndDecObjRef(operation);
222 } else {
223 operation->WaitIfNeed();
224 RefObject::DecObjRef(operation);
225 }
226 return E_OK;
227 }
228
RemoveSyncOperation(int syncId)229 int GenericSyncer::RemoveSyncOperation(int syncId)
230 {
231 SyncOperation *operation = nullptr;
232 std::unique_lock<std::mutex> lock(operationMapLock_);
233 auto iter = syncOperationMap_.find(syncId);
234 if (iter != syncOperationMap_.end()) {
235 LOGD("[Syncer] RemoveSyncOperation id:%d.", syncId);
236 operation = iter->second;
237 syncOperationMap_.erase(syncId);
238 lock.unlock();
239 if ((!operation->IsAutoSync()) && (!operation->IsBlockSync()) && (!operation->IsAutoControlCmd())) {
240 SubQueuedSyncSize();
241 }
242 operation->NotifyIfNeed();
243 RefObject::KillAndDecObjRef(operation);
244 operation = nullptr;
245 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
246 if (syncIdMap_.find(syncId) == syncIdMap_.end()) {
247 return E_OK;
248 }
249 uint64_t connectionId = syncIdMap_[syncId];
250 if (connectionIdMap_.find(connectionId) != connectionIdMap_.end()) {
251 connectionIdMap_[connectionId].remove(syncId);
252 }
253 syncIdMap_.erase(syncId);
254 return E_OK;
255 }
256 return -E_INVALID_ARGS;
257 }
258
StopSync(uint64_t connectionId)259 int GenericSyncer::StopSync(uint64_t connectionId)
260 {
261 std::list<int> syncIdList;
262 {
263 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
264 if (connectionIdMap_.find(connectionId) == connectionIdMap_.end()) {
265 return E_OK;
266 }
267 syncIdList = connectionIdMap_[connectionId];
268 connectionIdMap_.erase(connectionId);
269 }
270 for (auto syncId : syncIdList) {
271 RemoveSyncOperation(syncId);
272 if (syncEngine_ != nullptr) {
273 syncEngine_->AbortMachineIfNeed(syncId);
274 }
275 }
276 if (syncEngine_ != nullptr) {
277 syncEngine_->NotifyConnectionClosed(connectionId);
278 }
279 return E_OK;
280 }
281
GetTimestamp()282 uint64_t GenericSyncer::GetTimestamp()
283 {
284 std::shared_ptr<TimeHelper> timeHelper = nullptr;
285 ISyncInterface *storage = nullptr;
286 {
287 std::lock_guard<std::mutex> lock(syncerLock_);
288 timeHelper = timeHelper_;
289 if (syncInterface_ != nullptr) {
290 storage = syncInterface_;
291 storage->IncRefCount();
292 }
293 }
294 if (storage == nullptr) {
295 return TimeHelper::GetSysCurrentTime();
296 }
297 if (timeHelper == nullptr) {
298 storage->DecRefCount();
299 return TimeHelper::GetSysCurrentTime();
300 }
301 uint64_t timestamp = timeHelper->GetTime();
302 storage->DecRefCount();
303 return timestamp;
304 }
305
QueryAutoSync(const InternalSyncParma & param)306 void GenericSyncer::QueryAutoSync(const InternalSyncParma ¶m)
307 {
308 if (!initialized_) {
309 LOGW("[Syncer] Syncer has not Init");
310 return;
311 }
312 LOGI("[GenericSyncer] trigger query syncmode=%u,dev=%s", param.mode, GetSyncDevicesStr(param.devices).c_str());
313 ISyncInterface *syncInterface = nullptr;
314 ISyncEngine *engine = nullptr;
315 {
316 std::lock_guard<std::mutex> lock(syncerLock_);
317 if (syncInterface_ == nullptr || syncEngine_ == nullptr) {
318 LOGW("[Syncer] Syncer has not Init");
319 return;
320 }
321 syncInterface = syncInterface_;
322 engine = syncEngine_;
323 syncInterface->IncRefCount();
324 RefObject::IncObjRef(engine);
325 }
326 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, param, engine, syncInterface] {
327 int errCode = Sync(param);
328 if (errCode != E_OK) {
329 LOGE("[GenericSyncer] sync start by QueryAutoSync failed err %d", errCode);
330 }
331 RefObject::DecObjRef(engine);
332 syncInterface->DecRefCount();
333 });
334 if (retCode != E_OK) {
335 LOGE("[GenericSyncer] QueryAutoSync triggler sync retCode:%d", retCode);
336 RefObject::DecObjRef(engine);
337 syncInterface->DecRefCount();
338 }
339 }
340
AddSyncOperation(SyncOperation * operation)341 void GenericSyncer::AddSyncOperation(SyncOperation *operation)
342 {
343 if (operation == nullptr) {
344 return;
345 }
346
347 LOGD("[Syncer] AddSyncOperation.");
348 syncEngine_->AddSyncOperation(operation);
349
350 if (operation->CheckIsAllFinished()) {
351 return;
352 }
353
354 std::lock_guard<std::mutex> lock(operationMapLock_);
355 syncOperationMap_.insert(std::pair<int, SyncOperation *>(operation->GetSyncId(), operation));
356 // To make sure operation alive before WaitIfNeed out
357 RefObject::IncObjRef(operation);
358 }
359
SyncOperationKillCallbackInner(int syncId)360 void GenericSyncer::SyncOperationKillCallbackInner(int syncId)
361 {
362 if (syncEngine_ != nullptr) {
363 LOGI("[Syncer] Operation on kill id = %d", syncId);
364 syncEngine_->RemoveSyncOperation(syncId);
365 }
366 }
367
SyncOperationKillCallback(int syncId)368 void GenericSyncer::SyncOperationKillCallback(int syncId)
369 {
370 SyncOperationKillCallbackInner(syncId);
371 }
372
InitMetaData(ISyncInterface * syncInterface)373 int GenericSyncer::InitMetaData(ISyncInterface *syncInterface)
374 {
375 if (metadata_ != nullptr) {
376 return E_OK;
377 }
378
379 metadata_ = std::make_shared<Metadata>();
380 if (metadata_ == nullptr) {
381 LOGE("[Syncer] metadata make shared failed");
382 return -E_OUT_OF_MEMORY;
383 }
384 int errCode = metadata_->Initialize(syncInterface);
385 if (errCode != E_OK) {
386 LOGE("[Syncer] metadata Initializeate failed! err %d.", errCode);
387 metadata_ = nullptr;
388 }
389 syncInterface_ = syncInterface;
390 return errCode;
391 }
392
InitTimeHelper(ISyncInterface * syncInterface)393 int GenericSyncer::InitTimeHelper(ISyncInterface *syncInterface)
394 {
395 if (timeHelper_ != nullptr) {
396 return E_OK;
397 }
398
399 timeHelper_ = std::make_shared<TimeHelper>();
400 if (timeHelper_ == nullptr) {
401 return -E_OUT_OF_MEMORY;
402 }
403 int errCode = timeHelper_->Initialize(syncInterface, metadata_);
404 if (errCode != E_OK) {
405 LOGE("[Syncer] TimeHelper init failed! err:%d.", errCode);
406 timeHelper_ = nullptr;
407 }
408 return errCode;
409 }
410
InitSyncEngine(ISyncInterface * syncInterface)411 int GenericSyncer::InitSyncEngine(ISyncInterface *syncInterface)
412 {
413 if (syncEngine_ != nullptr && syncEngine_->IsEngineActive()) {
414 LOGI("[Syncer] syncEngine is active");
415 return E_OK;
416 }
417 int errCode = BuildSyncEngine();
418 if (errCode != E_OK) {
419 return errCode;
420 }
421 const std::function<void(std::string)> onlineFunc = std::bind(&GenericSyncer::RemoteDataChanged,
422 this, std::placeholders::_1);
423 const std::function<void(std::string)> offlineFunc = std::bind(&GenericSyncer::RemoteDeviceOffline,
424 this, std::placeholders::_1);
425 const std::function<void(const InternalSyncParma ¶m)> queryAutoSyncFunc =
426 std::bind(&GenericSyncer::QueryAutoSync, this, std::placeholders::_1);
427 const ISyncEngine::InitCallbackParam param = { onlineFunc, offlineFunc, queryAutoSyncFunc };
428 errCode = syncEngine_->Initialize(syncInterface, metadata_, param);
429 if (errCode == E_OK) {
430 syncInterface->IncRefCount();
431 label_ = syncEngine_->GetLabel();
432 return E_OK;
433 } else {
434 LOGE("[Syncer] SyncEngine init failed! err:%d.", errCode);
435 RefObject::KillAndDecObjRef(syncEngine_);
436 syncEngine_ = nullptr;
437 return errCode;
438 }
439 }
440
CheckSyncActive(ISyncInterface * syncInterface,bool isNeedActive)441 int GenericSyncer::CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive)
442 {
443 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE,
444 false);
445 if (!isSyncDualTupleMode || isNeedActive) {
446 return E_OK;
447 }
448 LOGI("[Syncer] syncer no need to active");
449 int errCode = BuildSyncEngine();
450 if (errCode != E_OK) {
451 return errCode;
452 }
453 return -E_NO_NEED_ACTIVE;
454 }
455
GenerateSyncId()456 uint32_t GenericSyncer::GenerateSyncId()
457 {
458 std::lock_guard<std::mutex> lock(syncIdLock_);
459 currentSyncId_++;
460 // if overflow, reset to 1
461 if (currentSyncId_ <= 0) {
462 currentSyncId_ = MIN_VALID_SYNC_ID;
463 }
464 return currentSyncId_;
465 }
466
IsValidMode(int mode) const467 bool GenericSyncer::IsValidMode(int mode) const
468 {
469 if ((mode >= SyncModeType::INVALID_MODE) || (mode < SyncModeType::PUSH)) {
470 LOGE("[Syncer] Sync mode is not valid!");
471 return false;
472 }
473 return true;
474 }
475
SyncConditionCheck(const SyncParma & param,const ISyncEngine * engine,ISyncInterface * storage) const476 int GenericSyncer::SyncConditionCheck(const SyncParma ¶m, const ISyncEngine *engine, ISyncInterface *storage) const
477 {
478 (void)param;
479 (void)engine;
480 (void)storage;
481 return E_OK;
482 }
483
IsValidDevices(const std::vector<std::string> & devices) const484 bool GenericSyncer::IsValidDevices(const std::vector<std::string> &devices) const
485 {
486 if (devices.empty()) {
487 LOGE("[Syncer] devices is empty!");
488 return false;
489 }
490 return true;
491 }
492
ClearSyncOperations(bool isClosedOperation)493 void GenericSyncer::ClearSyncOperations(bool isClosedOperation)
494 {
495 std::vector<SyncOperation *> syncOperation;
496 {
497 std::lock_guard<std::mutex> lock(operationMapLock_);
498 for (auto &item : syncOperationMap_) {
499 bool isBlockSync = item.second->IsBlockSync();
500 if (isBlockSync || !isClosedOperation) {
501 int status = (!isClosedOperation) ? SyncOperation::OP_USER_CHANGED : SyncOperation::OP_FAILED;
502 item.second->SetUnfinishedDevStatus(status);
503 RefObject::IncObjRef(item.second);
504 syncOperation.push_back(item.second);
505 }
506 }
507 }
508
509 if (!isClosedOperation) { // means user changed
510 syncEngine_->NotifyUserChange();
511 }
512
513 for (auto &operation : syncOperation) {
514 // block sync operation or userChange will trigger remove sync operation
515 // caller won't blocked for block sync
516 // caller won't blocked for userChange operation no mater it is block or non-block sync
517 TriggerSyncFinished(operation);
518 RefObject::DecObjRef(operation);
519 }
520 ClearInnerResource(isClosedOperation);
521 }
522
ClearInnerResource(bool isClosedOperation)523 void GenericSyncer::ClearInnerResource(bool isClosedOperation)
524 {
525 {
526 std::lock_guard<std::mutex> lock(operationMapLock_);
527 for (auto &iter : syncOperationMap_) {
528 RefObject::KillAndDecObjRef(iter.second);
529 iter.second = nullptr;
530 }
531 syncOperationMap_.clear();
532 }
533 {
534 std::lock_guard<std::mutex> lock(syncIdLock_);
535 if (isClosedOperation) {
536 connectionIdMap_.clear();
537 } else { // only need to clear syncid when user change
538 for (auto &item : connectionIdMap_) {
539 item.second.clear();
540 }
541 }
542 syncIdMap_.clear();
543 }
544 }
545
TriggerSyncFinished(SyncOperation * operation)546 void GenericSyncer::TriggerSyncFinished(SyncOperation *operation)
547 {
548 if (operation != nullptr && operation->CheckIsAllFinished()) {
549 operation->Finished();
550 }
551 }
552
OnSyncFinished(int syncId)553 void GenericSyncer::OnSyncFinished(int syncId)
554 {
555 (void)(RemoveSyncOperation(syncId));
556 }
557
SyncModuleInit()558 int GenericSyncer::SyncModuleInit()
559 {
560 static bool isInit = false;
561 std::lock_guard<std::mutex> lock(moduleInitLock_);
562 if (!isInit) {
563 int errCode = SyncResourceInit();
564 if (errCode != E_OK) {
565 return errCode;
566 }
567 isInit = true;
568 return E_OK;
569 }
570 return E_OK;
571 }
572
SyncResourceInit()573 int GenericSyncer::SyncResourceInit()
574 {
575 int errCode = TimeSync::RegisterTransformFunc();
576 if (errCode != E_OK) {
577 LOGE("Register timesync message transform func ERR!");
578 return errCode;
579 }
580 errCode = SingleVerSerializeManager::RegisterTransformFunc();
581 if (errCode != E_OK) {
582 LOGE("Register SingleVerDataSync message transform func ERR!");
583 return errCode;
584 }
585 #ifndef OMIT_MULTI_VER
586 errCode = CommitHistorySync::RegisterTransformFunc();
587 if (errCode != E_OK) {
588 LOGE("Register CommitHistorySync message transform func ERR!");
589 return errCode;
590 }
591 errCode = MultiVerDataSync::RegisterTransformFunc();
592 if (errCode != E_OK) {
593 LOGE("Register MultiVerDataSync message transform func ERR!");
594 return errCode;
595 }
596 errCode = ValueSliceSync::RegisterTransformFunc();
597 if (errCode != E_OK) {
598 LOGE("Register ValueSliceSync message transform func ERR!");
599 return errCode;
600 }
601 #endif
602 errCode = DeviceManager::RegisterTransformFunc();
603 if (errCode != E_OK) {
604 LOGE("Register DeviceManager message transform func ERR!");
605 return errCode;
606 }
607 errCode = AbilitySync::RegisterTransformFunc();
608 if (errCode != E_OK) {
609 LOGE("Register AbilitySync message transform func ERR!");
610 return errCode;
611 }
612 return E_OK;
613 }
614
GetQueuedSyncSize(int * queuedSyncSize) const615 int GenericSyncer::GetQueuedSyncSize(int *queuedSyncSize) const
616 {
617 if (queuedSyncSize == nullptr) {
618 return -E_INVALID_ARGS;
619 }
620 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
621 *queuedSyncSize = queuedManualSyncSize_;
622 LOGI("[GenericSyncer] GetQueuedSyncSize:%d", queuedManualSyncSize_);
623 return E_OK;
624 }
625
SetQueuedSyncLimit(const int * queuedSyncLimit)626 int GenericSyncer::SetQueuedSyncLimit(const int *queuedSyncLimit)
627 {
628 if (queuedSyncLimit == nullptr) {
629 return -E_INVALID_ARGS;
630 }
631 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
632 queuedManualSyncLimit_ = *queuedSyncLimit;
633 LOGI("[GenericSyncer] SetQueuedSyncLimit:%d", queuedManualSyncLimit_);
634 return E_OK;
635 }
636
GetQueuedSyncLimit(int * queuedSyncLimit) const637 int GenericSyncer::GetQueuedSyncLimit(int *queuedSyncLimit) const
638 {
639 if (queuedSyncLimit == nullptr) {
640 return -E_INVALID_ARGS;
641 }
642 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
643 *queuedSyncLimit = queuedManualSyncLimit_;
644 LOGI("[GenericSyncer] GetQueuedSyncLimit:%d", queuedManualSyncLimit_);
645 return E_OK;
646 }
647
IsManualSync(int inMode) const648 bool GenericSyncer::IsManualSync(int inMode) const
649 {
650 int mode = SyncOperation::TransferSyncMode(inMode);
651 if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH) || (mode == SyncModeType::PUSH_AND_PULL) ||
652 (mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
653 return true;
654 }
655 return false;
656 }
657
AddQueuedManualSyncSize(int mode,bool wait)658 int GenericSyncer::AddQueuedManualSyncSize(int mode, bool wait)
659 {
660 if (IsManualSync(mode) && (!wait)) {
661 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
662 if (!manualSyncEnable_) {
663 LOGI("[GenericSyncer] manualSyncEnable is Disable");
664 return -E_BUSY;
665 }
666 queuedManualSyncSize_++;
667 }
668 return E_OK;
669 }
670
IsQueuedManualSyncFull(int mode,bool wait) const671 bool GenericSyncer::IsQueuedManualSyncFull(int mode, bool wait) const
672 {
673 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
674 if (IsManualSync(mode) && (!manualSyncEnable_)) {
675 LOGI("[GenericSyncer] manualSyncEnable_:false");
676 return true;
677 }
678 if (IsManualSync(mode) && (!wait)) {
679 if (queuedManualSyncSize_ < queuedManualSyncLimit_) {
680 return false;
681 } else {
682 LOGD("[GenericSyncer] queuedManualSyncSize_:%d < queuedManualSyncLimit_:%d", queuedManualSyncSize_,
683 queuedManualSyncLimit_);
684 return true;
685 }
686 } else {
687 return false;
688 }
689 }
690
SubQueuedSyncSize(void)691 void GenericSyncer::SubQueuedSyncSize(void)
692 {
693 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
694 queuedManualSyncSize_--;
695 if (queuedManualSyncSize_ < 0) {
696 LOGE("[GenericSyncer] queuedManualSyncSize_ < 0!");
697 queuedManualSyncSize_ = 0;
698 }
699 }
700
DisableManualSync(void)701 int GenericSyncer::DisableManualSync(void)
702 {
703 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
704 if (queuedManualSyncSize_ > 0) {
705 LOGD("[GenericSyncer] DisableManualSync fail, queuedManualSyncSize_:%d", queuedManualSyncSize_);
706 return -E_BUSY;
707 }
708 manualSyncEnable_ = false;
709 LOGD("[GenericSyncer] DisableManualSync ok");
710 return E_OK;
711 }
712
EnableManualSync(void)713 int GenericSyncer::EnableManualSync(void)
714 {
715 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
716 manualSyncEnable_ = true;
717 LOGD("[GenericSyncer] EnableManualSync ok");
718 return E_OK;
719 }
720
GetLocalIdentity(std::string & outTarget) const721 int GenericSyncer::GetLocalIdentity(std::string &outTarget) const
722 {
723 std::string deviceId;
724 int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(deviceId);
725 if (errCode != E_OK) {
726 LOGE("[GenericSyncer] GetLocalIdentity fail errCode:%d", errCode);
727 return errCode;
728 }
729 outTarget = DBCommon::TransferHashString(deviceId);
730 return E_OK;
731 }
732
GetOnlineDevices(std::vector<std::string> & devices) const733 void GenericSyncer::GetOnlineDevices(std::vector<std::string> &devices) const
734 {
735 std::string identifier;
736 {
737 std::lock_guard<std::mutex> lock(syncerLock_);
738 // Get devices from AutoLaunch first.
739 if (syncInterface_ == nullptr) {
740 LOGI("[Syncer] GetOnlineDevices syncInterface_ is nullptr");
741 return;
742 }
743 bool isSyncDualTupleMode = syncInterface_->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
744 false);
745 if (isSyncDualTupleMode) {
746 identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::DUAL_TUPLE_IDENTIFIER_DATA,
747 "");
748 } else {
749 identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
750 }
751 }
752 RuntimeContext::GetInstance()->GetAutoLaunchSyncDevices(identifier, devices);
753 if (!devices.empty()) {
754 return;
755 }
756 std::lock_guard<std::mutex> lock(syncerLock_);
757 if (closing_) {
758 LOGW("[Syncer] Syncer is closing, return!");
759 return;
760 }
761 if (syncEngine_ != nullptr) {
762 syncEngine_->GetOnlineDevices(devices);
763 }
764 }
765
SetSyncRetry(bool isRetry)766 int GenericSyncer::SetSyncRetry(bool isRetry)
767 {
768 if (syncEngine_ == nullptr) {
769 return -E_NOT_INIT;
770 }
771 syncEngine_->SetSyncRetry(isRetry);
772 return E_OK;
773 }
774
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)775 int GenericSyncer::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
776 {
777 std::lock_guard<std::mutex> lock(syncerLock_);
778 if (syncEngine_ == nullptr) {
779 return -E_NOT_INIT;
780 }
781 int errCode = syncEngine_->SetEqualIdentifier(identifier, targets);
782 if (errCode == E_OK) {
783 syncEngine_->SetEqualIdentifierMap(identifier, targets);
784 }
785 return errCode;
786 }
787
GetSyncDevicesStr(const std::vector<std::string> & devices) const788 std::string GenericSyncer::GetSyncDevicesStr(const std::vector<std::string> &devices) const
789 {
790 std::string syncDevices;
791 for (const auto &dev:devices) {
792 syncDevices += DBCommon::StringMasking(dev);
793 syncDevices += ",";
794 }
795 return syncDevices.substr(0, syncDevices.size() - 1);
796 }
797
StatusCheck() const798 int GenericSyncer::StatusCheck() const
799 {
800 if (!initialized_) {
801 LOGE("[Syncer] Syncer is not initialized, return!");
802 return -E_BUSY;
803 }
804 if (closing_) {
805 LOGW("[Syncer] Syncer is closing, return!");
806 return -E_BUSY;
807 }
808 return E_OK;
809 }
810
SyncPreCheck(const SyncParma & param) const811 int GenericSyncer::SyncPreCheck(const SyncParma ¶m) const
812 {
813 ISyncEngine *engine = nullptr;
814 ISyncInterface *storage = nullptr;
815 int errCode = E_OK;
816 {
817 std::lock_guard<std::mutex> lock(syncerLock_);
818 errCode = StatusCheck();
819 if (errCode != E_OK) {
820 return errCode;
821 }
822 if (!IsValidDevices(param.devices) || !IsValidMode(param.mode)) {
823 return -E_INVALID_ARGS;
824 }
825 if (IsQueuedManualSyncFull(param.mode, param.wait)) {
826 LOGE("[Syncer] -E_BUSY");
827 return -E_BUSY;
828 }
829 storage = syncInterface_;
830 engine = syncEngine_;
831 if (storage == nullptr || engine == nullptr) {
832 return -E_BUSY;
833 }
834 storage->IncRefCount();
835 RefObject::IncObjRef(engine);
836 }
837 errCode = SyncConditionCheck(param, engine, storage);
838 storage->DecRefCount();
839 RefObject::DecObjRef(engine);
840 return errCode;
841 }
842
InitSyncOperation(SyncOperation * operation,const SyncParma & param)843 void GenericSyncer::InitSyncOperation(SyncOperation *operation, const SyncParma ¶m)
844 {
845 operation->SetIdentifier(syncInterface_->GetIdentifier());
846 operation->Initialize();
847 operation->OnKill(std::bind(&GenericSyncer::SyncOperationKillCallback, this, operation->GetSyncId()));
848 std::function<void(int)> onFinished = std::bind(&GenericSyncer::OnSyncFinished, this, std::placeholders::_1);
849 operation->SetOnSyncFinished(onFinished);
850 operation->SetOnSyncFinalize(param.onFinalize);
851 if (param.isQuerySync) {
852 operation->SetQuery(param.syncQuery);
853 }
854 }
855
BuildSyncEngine()856 int GenericSyncer::BuildSyncEngine()
857 {
858 if (syncEngine_ != nullptr) {
859 return E_OK;
860 }
861 syncEngine_ = CreateSyncEngine();
862 if (syncEngine_ == nullptr) {
863 return -E_OUT_OF_MEMORY;
864 }
865 syncEngine_->OnLastRef([this]() {
866 LOGD("[Syncer] SyncEngine finalized");
867 {
868 std::lock_guard<std::mutex> cvLock(engineMutex_);
869 engineFinalize_ = true;
870 }
871 engineFinalizeCv_.notify_all();
872 });
873 return E_OK;
874 }
875
Dump(int fd)876 void GenericSyncer::Dump(int fd)
877 {
878 if (syncEngine_ == nullptr) {
879 return;
880 }
881 RefObject::IncObjRef(syncEngine_);
882 syncEngine_->Dump(fd);
883 RefObject::DecObjRef(syncEngine_);
884 }
885
DumpSyncerBasicInfo()886 SyncerBasicInfo GenericSyncer::DumpSyncerBasicInfo()
887 {
888 SyncerBasicInfo baseInfo;
889 if (syncEngine_ == nullptr) {
890 return baseInfo;
891 }
892 RefObject::IncObjRef(syncEngine_);
893 baseInfo.isSyncActive = syncEngine_->IsEngineActive();
894 RefObject::DecObjRef(syncEngine_);
895 return baseInfo;
896 }
897
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)898 int GenericSyncer::RemoteQuery(const std::string &device, const RemoteCondition &condition,
899 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
900 {
901 ISyncEngine *syncEngine = nullptr;
902 {
903 std::lock_guard<std::mutex> lock(syncerLock_);
904 int errCode = StatusCheck();
905 if (errCode != E_OK) {
906 return errCode;
907 }
908 syncEngine = syncEngine_;
909 RefObject::IncObjRef(syncEngine);
910 }
911 if (syncEngine == nullptr) {
912 return -E_NOT_INIT;
913 }
914 int errCode = syncEngine->RemoteQuery(device, condition, timeout, connectionId, result);
915 RefObject::DecObjRef(syncEngine);
916 return errCode;
917 }
918
InitTimeChangedListener()919 int GenericSyncer::InitTimeChangedListener()
920 {
921 int errCode = E_OK;
922 if (timeChangedListener_ != nullptr) {
923 return errCode;
924 }
925 timeChangedListener_ = RuntimeContext::GetInstance()->RegisterTimeChangedLister(
926 [this](void *changedOffset) {
927 RecordTimeChangeOffset(changedOffset);
928 },
929 [this]() {
930 {
931 std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
932 timeChangeListenerFinalize_ = true;
933 }
934 timeChangeCv_.notify_all();
935 }, errCode);
936 if (timeChangedListener_ == nullptr) {
937 LOGE("[GenericSyncer] Init RegisterTimeChangedLister failed");
938 return errCode;
939 }
940 {
941 std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
942 timeChangeListenerFinalize_ = false;
943 }
944 return E_OK;
945 }
946
ReleaseInnerResource()947 void GenericSyncer::ReleaseInnerResource()
948 {
949 NotificationChain::Listener *timeChangedListener = nullptr;
950 {
951 std::lock_guard<std::mutex> lock(syncerLock_);
952 if (timeChangedListener_ != nullptr) {
953 timeChangedListener = timeChangedListener_;
954 timeChangedListener_ = nullptr;
955 }
956 timeHelper_ = nullptr;
957 metadata_ = nullptr;
958 }
959 if (timeChangedListener != nullptr) {
960 timeChangedListener->Drop(true);
961 RuntimeContext::GetInstance()->StopTimeTickMonitorIfNeed();
962 }
963 std::unique_lock<std::mutex> uniqueLock(timeChangeListenerMutex_);
964 LOGD("[GenericSyncer] Begin wait time change listener finalize");
965 timeChangeCv_.wait(uniqueLock, [this]() {
966 return timeChangeListenerFinalize_;
967 });
968 LOGD("[GenericSyncer] End wait time change listener finalize");
969 }
970
RecordTimeChangeOffset(void * changedOffset)971 void GenericSyncer::RecordTimeChangeOffset(void *changedOffset)
972 {
973 std::shared_ptr<Metadata> metadata = nullptr;
974 ISyncInterface *storage = nullptr;
975 {
976 std::lock_guard<std::mutex> lock(syncerLock_);
977 if (changedOffset == nullptr || metadata_ == nullptr || syncInterface_ == nullptr) {
978 return;
979 }
980 storage = syncInterface_;
981 metadata = metadata_;
982 storage->IncRefCount();
983 }
984 TimeOffset changedTimeOffset = *(reinterpret_cast<TimeOffset *>(changedOffset)) *
985 static_cast<TimeOffset>(TimeHelper::TO_100_NS);
986 TimeOffset orgOffset = metadata->GetLocalTimeOffset() - changedTimeOffset;
987 TimeOffset currentSysTime = static_cast<TimeOffset>(TimeHelper::GetSysCurrentTime());
988 Timestamp maxItemTime = 0;
989 storage->GetMaxTimestamp(maxItemTime);
990 if ((orgOffset + currentSysTime) > TimeHelper::BUFFER_VALID_TIME) {
991 orgOffset = TimeHelper::BUFFER_VALID_TIME -
992 currentSysTime + static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS);
993 }
994 if ((currentSysTime + orgOffset) <= static_cast<TimeOffset>(maxItemTime)) {
995 orgOffset = static_cast<TimeOffset>(maxItemTime) - currentSysTime +
996 static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS); // 1ms
997 }
998 metadata->SaveLocalTimeOffset(orgOffset);
999 storage->DecRefCount();
1000 }
1001
CloseInner(bool isClosedOperation)1002 int GenericSyncer::CloseInner(bool isClosedOperation)
1003 {
1004 {
1005 std::lock_guard<std::mutex> lock(syncerLock_);
1006 if (!initialized_) {
1007 LOGW("[Syncer] Syncer[%s] don't need to close, because it has not been init", label_.c_str());
1008 return -E_NOT_INIT;
1009 }
1010 initialized_ = false;
1011 if (closing_) {
1012 LOGE("[Syncer] Syncer is closing, return!");
1013 return -E_BUSY;
1014 }
1015 closing_ = true;
1016 }
1017 ClearSyncOperations(isClosedOperation);
1018 if (syncEngine_ != nullptr) {
1019 syncEngine_->Close();
1020 LOGD("[Syncer] Close SyncEngine!");
1021 std::lock_guard<std::mutex> lock(syncerLock_);
1022 closing_ = false;
1023 }
1024 return E_OK;
1025 }
1026
GetSyncDataSize(const std::string & device,size_t & size) const1027 int GenericSyncer::GetSyncDataSize(const std::string &device, size_t &size) const
1028 {
1029 uint64_t localWaterMark = 0;
1030 std::shared_ptr<Metadata> metadata = nullptr;
1031 {
1032 std::lock_guard<std::mutex> lock(syncerLock_);
1033 if (metadata_ == nullptr || syncInterface_ == nullptr) {
1034 return -E_INTERNAL_ERROR;
1035 }
1036 if (closing_) {
1037 LOGE("[Syncer] Syncer is closing, return!");
1038 return -E_BUSY;
1039 }
1040 int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->TryHandle();
1041 if (errCode != E_OK) {
1042 LOGE("[Syncer] syncer is restarting, return!");
1043 return errCode;
1044 }
1045 syncInterface_->IncRefCount();
1046 metadata = metadata_;
1047 }
1048 metadata->GetLocalWaterMark(device, localWaterMark);
1049 uint32_t expectedMtuSize = DEFAULT_MTU_SIZE;
1050 DataSizeSpecInfo syncDataSizeInfo = {expectedMtuSize, static_cast<size_t>(MAX_TIMESTAMP)};
1051 std::vector<SendDataItem> outData;
1052 ContinueToken token = nullptr;
1053 int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->GetSyncData(localWaterMark, MAX_TIMESTAMP,
1054 outData, token, syncDataSizeInfo);
1055 if (token != nullptr) {
1056 static_cast<SyncGenericInterface *>(syncInterface_)->ReleaseContinueToken(token);
1057 token = nullptr;
1058 }
1059 if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
1060 LOGE("calculate sync data size failed %d", errCode);
1061 syncInterface_->DecRefCount();
1062 return errCode;
1063 }
1064 uint32_t totalLen = 0;
1065 if (errCode == -E_UNFINISHED) {
1066 totalLen = expectedMtuSize;
1067 } else {
1068 totalLen = GenericSingleVerKvEntry::CalculateLens(outData, SOFTWARE_VERSION_CURRENT);
1069 }
1070 for (auto &entry : outData) {
1071 delete entry;
1072 entry = nullptr;
1073 }
1074 syncInterface_->DecRefCount();
1075 // if larger than 1M, return 1M
1076 size = (totalLen >= expectedMtuSize) ? expectedMtuSize : totalLen;
1077 return E_OK;
1078 }
1079
IsNeedActive(ISyncInterface * syncInterface)1080 bool GenericSyncer::IsNeedActive(ISyncInterface *syncInterface)
1081 {
1082 bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
1083 if (localOnly) {
1084 LOGD("[Syncer] Local only db, don't need active syncer");
1085 return false;
1086 }
1087 return true;
1088 }
1089
GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const1090 int GenericSyncer::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const
1091 {
1092 (void)clientId;
1093 (void)hashDevId;
1094 return -E_NOT_SUPPORT;
1095 }
1096
InitStorageResource(ISyncInterface * syncInterface)1097 int GenericSyncer::InitStorageResource(ISyncInterface *syncInterface)
1098 {
1099 // As metadata_ will be used in EraseDeviceWaterMark, it should not be clear even if engine init failed.
1100 // It will be clear in destructor.
1101 int errCode = InitMetaData(syncInterface);
1102 if (errCode != E_OK) {
1103 return errCode;
1104 }
1105
1106 // As timeHelper_ will be used in GetTimestamp, it should not be clear even if engine init failed.
1107 // It will be clear in destructor.
1108 errCode = InitTimeHelper(syncInterface);
1109 if (errCode != E_OK) {
1110 return errCode;
1111 }
1112
1113 if (!IsNeedActive(syncInterface)) {
1114 return -E_NO_NEED_ACTIVE;
1115 }
1116 return errCode;
1117 }
1118 } // namespace DistributedDB
1119