1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "generic_syncer.h"
17
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "ref_object.h"
22 #include "sqlite_single_ver_natural_store.h"
23 #include "time_sync.h"
24 #include "single_ver_data_sync.h"
25 #ifndef OMIT_MULTI_VER
26 #include "commit_history_sync.h"
27 #include "multi_ver_data_sync.h"
28 #include "value_slice_sync.h"
29 #endif
30 #include "device_manager.h"
31 #include "db_constant.h"
32 #include "ability_sync.h"
33 #include "single_ver_serialize_manager.h"
34
35 namespace DistributedDB {
36 const int GenericSyncer::MIN_VALID_SYNC_ID = 1;
37 std::mutex GenericSyncer::moduleInitLock_;
38 int GenericSyncer::currentSyncId_ = 0;
39 std::mutex GenericSyncer::syncIdLock_;
GenericSyncer()40 GenericSyncer::GenericSyncer()
41 : syncEngine_(nullptr),
42 syncInterface_(nullptr),
43 timeHelper_(nullptr),
44 metadata_(nullptr),
45 initialized_(false),
46 queuedManualSyncSize_(0),
47 queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
48 manualSyncEnable_(true),
49 closing_(false),
50 engineFinalize_(false),
51 timeChangedListener_(nullptr)
52 {
53 }
54
~GenericSyncer()55 GenericSyncer::~GenericSyncer()
56 {
57 LOGD("[GenericSyncer] ~GenericSyncer!");
58 if (syncEngine_ != nullptr) {
59 syncEngine_->OnKill([this]() { this->syncEngine_->Close(); });
60 RefObject::KillAndDecObjRef(syncEngine_);
61 // waiting all thread exist
62 std::unique_lock<std::mutex> cvLock(engineMutex_);
63 bool engineFinalize = engineFinalizeCv_.wait_for(cvLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
64 [this]() { return engineFinalize_; });
65 if (!engineFinalize) {
66 LOGW("syncer finalize before engine finalize!");
67 }
68 syncEngine_ = nullptr;
69 }
70 if (timeChangedListener_ != nullptr) {
71 timeChangedListener_->Drop(true);
72 timeChangedListener_ = nullptr;
73 RuntimeContext::GetInstance()->StopTimeTickMonitorIfNeed();
74 }
75 timeHelper_ = nullptr;
76 metadata_ = nullptr;
77 syncInterface_ = nullptr;
78 }
79
Initialize(ISyncInterface * syncInterface,bool isNeedActive)80 int GenericSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
81 {
82 if (syncInterface == nullptr) {
83 LOGE("[Syncer] Init failed, the syncInterface is null!");
84 return -E_INVALID_ARGS;
85 }
86
87 {
88 std::lock_guard<std::mutex> lock(syncerLock_);
89 if (initialized_) {
90 return E_OK;
91 }
92 if (closing_) {
93 LOGE("[Syncer] Syncer is closing, return!");
94 return -E_BUSY;
95 }
96 std::vector<uint8_t> label = syncInterface->GetIdentifier();
97 label.resize(3); // only show 3 Bytes enough
98 label_ = DBCommon::VectorToHexString(label);
99
100 // As metadata_ will be used in EraseDeviceWaterMark, it should not be clear even if engine init failed.
101 // It will be clear in destructor.
102 int errCodeMetadata = InitMetaData(syncInterface);
103
104 // As timeHelper_ will be used in GetTimestamp, it should not be clear even if engine init failed.
105 // It will be clear in destructor.
106 int errCodeTimeHelper = InitTimeHelper(syncInterface);
107
108 if (!IsNeedActive(syncInterface)) {
109 return -E_NO_NEED_ACTIVE;
110 }
111 // As timeChangedListener_ will record time change, it should not be clear even if engine init failed.
112 // It will be clear in destructor.
113 int errCodeTimeChangedListener = InitTimeChangedListener();
114 if (errCodeMetadata != E_OK || errCodeTimeHelper != E_OK || errCodeTimeChangedListener != E_OK) {
115 return -E_INTERNAL_ERROR;
116 }
117 int errCode = CheckSyncActive(syncInterface, isNeedActive);
118 if (errCode != E_OK) {
119 return errCode;
120 }
121
122 if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
123 LOGW("[Syncer] Communicator component not ready!");
124 return -E_NOT_INIT;
125 }
126
127 errCode = SyncModuleInit();
128 if (errCode != E_OK) {
129 LOGE("[Syncer] Sync ModuleInit ERR!");
130 return -E_INTERNAL_ERROR;
131 }
132
133 errCode = InitSyncEngine(syncInterface);
134 if (errCode != E_OK) {
135 return errCode;
136 }
137 syncEngine_->SetEqualIdentifier();
138 initialized_ = true;
139 }
140
141 // RegConnectCallback may start an auto sync, this function can not in syncerLock_
142 syncEngine_->RegConnectCallback();
143 return E_OK;
144 }
145
Close(bool isClosedOperation)146 int GenericSyncer::Close(bool isClosedOperation)
147 {
148 {
149 std::lock_guard<std::mutex> lock(syncerLock_);
150 if (!initialized_) {
151 if (isClosedOperation) {
152 timeHelper_ = nullptr;
153 metadata_ = nullptr;
154 }
155 LOGW("[Syncer] Syncer[%s] don't need to close, because it has not been init", label_.c_str());
156 return -E_NOT_INIT;
157 }
158 initialized_ = false;
159 if (closing_) {
160 LOGE("[Syncer] Syncer is closing, return!");
161 return -E_BUSY;
162 }
163 closing_ = true;
164 }
165 ClearSyncOperations(isClosedOperation);
166 if (syncEngine_ != nullptr) {
167 syncEngine_->Close();
168 LOGD("[Syncer] Close SyncEngine!");
169 std::lock_guard<std::mutex> lock(syncerLock_);
170 closing_ = false;
171 }
172 if (isClosedOperation) {
173 timeHelper_ = nullptr;
174 metadata_ = nullptr;
175 }
176 return E_OK;
177 }
178
Sync(const std::vector<std::string> & devices,int mode,const std::function<void (const std::map<std::string,int> &)> & onComplete,const std::function<void (void)> & onFinalize,bool wait=false)179 int GenericSyncer::Sync(const std::vector<std::string> &devices, int mode,
180 const std::function<void(const std::map<std::string, int> &)> &onComplete,
181 const std::function<void(void)> &onFinalize, bool wait = false)
182 {
183 SyncParma param;
184 param.devices = devices;
185 param.mode = mode;
186 param.onComplete = onComplete;
187 param.onFinalize = onFinalize;
188 param.wait = wait;
189 return Sync(param);
190 }
191
Sync(const InternalSyncParma & param)192 int GenericSyncer::Sync(const InternalSyncParma ¶m)
193 {
194 SyncParma syncParam;
195 syncParam.devices = param.devices;
196 syncParam.mode = param.mode;
197 syncParam.isQuerySync = param.isQuerySync;
198 syncParam.syncQuery = param.syncQuery;
199 return Sync(syncParam);
200 }
201
Sync(const SyncParma & param)202 int GenericSyncer::Sync(const SyncParma ¶m)
203 {
204 return Sync(param, DBConstant::IGNORE_CONNECTION_ID);
205 }
206
Sync(const SyncParma & param,uint64_t connectionId)207 int GenericSyncer::Sync(const SyncParma ¶m, uint64_t connectionId)
208 {
209 int errCode = SyncParamCheck(param);
210 if (errCode != E_OK) {
211 return errCode;
212 }
213 errCode = AddQueuedManualSyncSize(param.mode, param.wait);
214 if (errCode != E_OK) {
215 return errCode;
216 }
217
218 uint32_t syncId = GenerateSyncId();
219 errCode = PrepareSync(param, syncId, connectionId);
220 if (errCode != E_OK) {
221 LOGE("[Syncer] PrepareSync failed when sync called, err %d", errCode);
222 return errCode;
223 }
224 PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
225 return E_OK;
226 }
227
PrepareSync(const SyncParma & param,uint32_t syncId,uint64_t connectionId)228 int GenericSyncer::PrepareSync(const SyncParma ¶m, uint32_t syncId, uint64_t connectionId)
229 {
230 auto *operation =
231 new (std::nothrow) SyncOperation(syncId, param.devices, param.mode, param.onComplete, param.wait);
232 if (operation == nullptr) {
233 SubQueuedSyncSize();
234 return -E_OUT_OF_MEMORY;
235 }
236 operation->SetIdentifier(syncInterface_->GetIdentifier());
237 {
238 std::lock_guard<std::mutex> autoLock(syncerLock_);
239 PerformanceAnalysis::GetInstance()->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
240 InitSyncOperation(operation, param);
241 LOGI("[Syncer] GenerateSyncId %" PRIu32 ", mode = %d, wait = %d, label = %s, devices = %s", syncId, param.mode,
242 param.wait, label_.c_str(), GetSyncDevicesStr(param.devices).c_str());
243 AddSyncOperation(operation);
244 PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
245 }
246 if (!param.wait && connectionId != DBConstant::IGNORE_CONNECTION_ID) {
247 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
248 connectionIdMap_[connectionId].push_back(static_cast<int>(syncId));
249 syncIdMap_[static_cast<int>(syncId)] = connectionId;
250 }
251 if (operation->CheckIsAllFinished()) {
252 operation->Finished();
253 RefObject::KillAndDecObjRef(operation);
254 } else {
255 operation->WaitIfNeed();
256 RefObject::DecObjRef(operation);
257 }
258 return E_OK;
259 }
260
RemoveSyncOperation(int syncId)261 int GenericSyncer::RemoveSyncOperation(int syncId)
262 {
263 SyncOperation *operation = nullptr;
264 std::unique_lock<std::mutex> lock(operationMapLock_);
265 auto iter = syncOperationMap_.find(syncId);
266 if (iter != syncOperationMap_.end()) {
267 LOGD("[Syncer] RemoveSyncOperation id:%d.", syncId);
268 operation = iter->second;
269 syncOperationMap_.erase(syncId);
270 lock.unlock();
271 if ((!operation->IsAutoSync()) && (!operation->IsBlockSync()) && (!operation->IsAutoControlCmd())) {
272 SubQueuedSyncSize();
273 }
274 operation->NotifyIfNeed();
275 RefObject::KillAndDecObjRef(operation);
276 operation = nullptr;
277 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
278 if (syncIdMap_.find(syncId) == syncIdMap_.end()) {
279 return E_OK;
280 }
281 uint64_t connectionId = syncIdMap_[syncId];
282 if (connectionIdMap_.find(connectionId) != connectionIdMap_.end()) {
283 connectionIdMap_[connectionId].remove(syncId);
284 }
285 syncIdMap_.erase(syncId);
286 return E_OK;
287 }
288 return -E_INVALID_ARGS;
289 }
290
StopSync(uint64_t connectionId)291 int GenericSyncer::StopSync(uint64_t connectionId)
292 {
293 std::list<int> syncIdList;
294 {
295 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
296 if (connectionIdMap_.find(connectionId) == connectionIdMap_.end()) {
297 return E_OK;
298 }
299 syncIdList = connectionIdMap_[connectionId];
300 connectionIdMap_.erase(connectionId);
301 }
302 for (auto syncId : syncIdList) {
303 RemoveSyncOperation(syncId);
304 if (syncEngine_ != nullptr) {
305 syncEngine_->AbortMachineIfNeed(syncId);
306 }
307 }
308 if (syncEngine_ != nullptr) {
309 syncEngine_->NotifyConnectionClosed(connectionId);
310 }
311 return E_OK;
312 }
313
GetTimestamp()314 uint64_t GenericSyncer::GetTimestamp()
315 {
316 if (timeHelper_ == nullptr) {
317 return TimeHelper::GetSysCurrentTime();
318 }
319 return timeHelper_->GetTime();
320 }
321
QueryAutoSync(const InternalSyncParma & param)322 void GenericSyncer::QueryAutoSync(const InternalSyncParma ¶m)
323 {
324 (void)param;
325 }
326
AddSyncOperation(SyncOperation * operation)327 void GenericSyncer::AddSyncOperation(SyncOperation *operation)
328 {
329 if (operation == nullptr) {
330 return;
331 }
332
333 LOGD("[Syncer] AddSyncOperation.");
334 syncEngine_->AddSyncOperation(operation);
335
336 if (operation->CheckIsAllFinished()) {
337 return;
338 }
339
340 std::lock_guard<std::mutex> lock(operationMapLock_);
341 syncOperationMap_.insert(std::pair<int, SyncOperation *>(operation->GetSyncId(), operation));
342 // To make sure operation alive before WaitIfNeed out
343 RefObject::IncObjRef(operation);
344 }
345
SyncOperationKillCallbackInner(int syncId)346 void GenericSyncer::SyncOperationKillCallbackInner(int syncId)
347 {
348 if (syncEngine_ != nullptr) {
349 LOGI("[Syncer] Operation on kill id = %d", syncId);
350 syncEngine_->RemoveSyncOperation(syncId);
351 }
352 }
353
SyncOperationKillCallback(int syncId)354 void GenericSyncer::SyncOperationKillCallback(int syncId)
355 {
356 SyncOperationKillCallbackInner(syncId);
357 }
358
InitMetaData(ISyncInterface * syncInterface)359 int GenericSyncer::InitMetaData(ISyncInterface *syncInterface)
360 {
361 if (metadata_ != nullptr) {
362 return E_OK;
363 }
364
365 metadata_ = std::make_shared<Metadata>();
366 if (metadata_ == nullptr) {
367 LOGE("[Syncer] metadata make shared failed");
368 return -E_OUT_OF_MEMORY;
369 }
370 int errCode = metadata_->Initialize(syncInterface);
371 if (errCode != E_OK) {
372 LOGE("[Syncer] metadata Initializeate failed! err %d.", errCode);
373 metadata_ = nullptr;
374 }
375 syncInterface_ = syncInterface;
376 return errCode;
377 }
378
InitTimeHelper(ISyncInterface * syncInterface)379 int GenericSyncer::InitTimeHelper(ISyncInterface *syncInterface)
380 {
381 if (timeHelper_ != nullptr) {
382 return E_OK;
383 }
384
385 timeHelper_ = std::make_shared<TimeHelper>();
386 if (timeHelper_ == nullptr) {
387 return -E_OUT_OF_MEMORY;
388 }
389 int errCode = timeHelper_->Initialize(syncInterface, metadata_);
390 if (errCode != E_OK) {
391 LOGE("[Syncer] TimeHelper init failed! err:%d.", errCode);
392 timeHelper_ = nullptr;
393 }
394 return errCode;
395 }
396
InitSyncEngine(ISyncInterface * syncInterface)397 int GenericSyncer::InitSyncEngine(ISyncInterface *syncInterface)
398 {
399 if (syncEngine_ != nullptr && syncEngine_->IsEngineActive()) {
400 LOGI("[Syncer] syncEngine is active");
401 return E_OK;
402 }
403 int errCode = BuildSyncEngine();
404 if (errCode != E_OK) {
405 return errCode;
406 }
407 const std::function<void(std::string)> onlineFunc = std::bind(&GenericSyncer::RemoteDataChanged,
408 this, std::placeholders::_1);
409 const std::function<void(std::string)> offlineFunc = std::bind(&GenericSyncer::RemoteDeviceOffline,
410 this, std::placeholders::_1);
411 const std::function<void(const InternalSyncParma ¶m)> queryAutoSyncFunc =
412 std::bind(&GenericSyncer::QueryAutoSync, this, std::placeholders::_1);
413 errCode = syncEngine_->Initialize(syncInterface, metadata_, onlineFunc, offlineFunc, queryAutoSyncFunc);
414 if (errCode == E_OK) {
415 syncInterface->IncRefCount();
416 label_ = syncEngine_->GetLabel();
417 return E_OK;
418 } else {
419 LOGE("[Syncer] SyncEngine init failed! err:%d.", errCode);
420 RefObject::KillAndDecObjRef(syncEngine_);
421 syncEngine_ = nullptr;
422 return errCode;
423 }
424 }
425
CheckSyncActive(ISyncInterface * syncInterface,bool isNeedActive)426 int GenericSyncer::CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive)
427 {
428 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE,
429 false);
430 if (!isSyncDualTupleMode || isNeedActive) {
431 return E_OK;
432 }
433 LOGI("[Syncer] syncer no need to active");
434 int errCode = BuildSyncEngine();
435 if (errCode != E_OK) {
436 return errCode;
437 }
438 return -E_NO_NEED_ACTIVE;
439 }
440
GenerateSyncId()441 uint32_t GenericSyncer::GenerateSyncId()
442 {
443 std::lock_guard<std::mutex> lock(syncIdLock_);
444 currentSyncId_++;
445 // if overflow, reset to 1
446 if (currentSyncId_ <= 0) {
447 currentSyncId_ = MIN_VALID_SYNC_ID;
448 }
449 return currentSyncId_;
450 }
451
IsValidMode(int mode) const452 bool GenericSyncer::IsValidMode(int mode) const
453 {
454 if ((mode >= SyncModeType::INVALID_MODE) || (mode < SyncModeType::PUSH)) {
455 LOGE("[Syncer] Sync mode is not valid!");
456 return false;
457 }
458 return true;
459 }
460
SyncConditionCheck(QuerySyncObject & query,int mode,bool isQuerySync,const std::vector<std::string> & devices) const461 int GenericSyncer::SyncConditionCheck(QuerySyncObject &query, int mode, bool isQuerySync,
462 const std::vector<std::string> &devices) const
463 {
464 (void)query;
465 (void)mode;
466 (void)isQuerySync;
467 (void)(devices);
468 return E_OK;
469 }
470
IsValidDevices(const std::vector<std::string> & devices) const471 bool GenericSyncer::IsValidDevices(const std::vector<std::string> &devices) const
472 {
473 if (devices.empty()) {
474 LOGE("[Syncer] devices is empty!");
475 return false;
476 }
477 return true;
478 }
479
ClearSyncOperations(bool isClosedOperation)480 void GenericSyncer::ClearSyncOperations(bool isClosedOperation)
481 {
482 std::vector<SyncOperation *> syncOperation;
483 {
484 std::lock_guard<std::mutex> lock(operationMapLock_);
485 for (auto &item : syncOperationMap_) {
486 bool isBlockSync = item.second->IsBlockSync();
487 if (isBlockSync || !isClosedOperation) {
488 int status = (!isClosedOperation) ? SyncOperation::OP_USER_CHANGED : SyncOperation::OP_FAILED;
489 item.second->SetUnfinishedDevStatus(status);
490 RefObject::IncObjRef(item.second);
491 syncOperation.push_back(item.second);
492 }
493 }
494 }
495
496 if (!isClosedOperation) { // means user changed
497 syncEngine_->NotifyUserChange();
498 }
499
500 for (auto &operation : syncOperation) {
501 // block sync operation or userChange will trigger remove sync operation
502 // caller won't blocked for block sync
503 // caller won't blocked for userChange operation no mater it is block or non-block sync
504 TriggerSyncFinished(operation);
505 RefObject::DecObjRef(operation);
506 }
507 {
508 std::lock_guard<std::mutex> lock(operationMapLock_);
509 for (auto &iter : syncOperationMap_) {
510 RefObject::KillAndDecObjRef(iter.second);
511 iter.second = nullptr;
512 }
513 syncOperationMap_.clear();
514 }
515 {
516 std::lock_guard<std::mutex> lock(syncIdLock_);
517 connectionIdMap_.clear();
518 syncIdMap_.clear();
519 }
520 }
521
TriggerSyncFinished(SyncOperation * operation)522 void GenericSyncer::TriggerSyncFinished(SyncOperation *operation)
523 {
524 if (operation != nullptr && operation->CheckIsAllFinished()) {
525 operation->Finished();
526 }
527 }
528
OnSyncFinished(int syncId)529 void GenericSyncer::OnSyncFinished(int syncId)
530 {
531 (void)(RemoveSyncOperation(syncId));
532 }
533
SyncModuleInit()534 int GenericSyncer::SyncModuleInit()
535 {
536 static bool isInit = false;
537 std::lock_guard<std::mutex> lock(moduleInitLock_);
538 if (!isInit) {
539 int errCode = SyncResourceInit();
540 if (errCode != E_OK) {
541 return errCode;
542 }
543 isInit = true;
544 return E_OK;
545 }
546 return E_OK;
547 }
548
SyncResourceInit()549 int GenericSyncer::SyncResourceInit()
550 {
551 int errCode = TimeSync::RegisterTransformFunc();
552 if (errCode != E_OK) {
553 LOGE("Register timesync message transform func ERR!");
554 return errCode;
555 }
556 errCode = SingleVerSerializeManager::RegisterTransformFunc();
557 if (errCode != E_OK) {
558 LOGE("Register SingleVerDataSync message transform func ERR!");
559 return errCode;
560 }
561 #ifndef OMIT_MULTI_VER
562 errCode = CommitHistorySync::RegisterTransformFunc();
563 if (errCode != E_OK) {
564 LOGE("Register CommitHistorySync message transform func ERR!");
565 return errCode;
566 }
567 errCode = MultiVerDataSync::RegisterTransformFunc();
568 if (errCode != E_OK) {
569 LOGE("Register MultiVerDataSync message transform func ERR!");
570 return errCode;
571 }
572 errCode = ValueSliceSync::RegisterTransformFunc();
573 if (errCode != E_OK) {
574 LOGE("Register ValueSliceSync message transform func ERR!");
575 return errCode;
576 }
577 #endif
578 errCode = DeviceManager::RegisterTransformFunc();
579 if (errCode != E_OK) {
580 LOGE("Register DeviceManager message transform func ERR!");
581 return errCode;
582 }
583 errCode = AbilitySync::RegisterTransformFunc();
584 if (errCode != E_OK) {
585 LOGE("Register AbilitySync message transform func ERR!");
586 return errCode;
587 }
588 return E_OK;
589 }
590
GetQueuedSyncSize(int * queuedSyncSize) const591 int GenericSyncer::GetQueuedSyncSize(int *queuedSyncSize) const
592 {
593 if (queuedSyncSize == nullptr) {
594 return -E_INVALID_ARGS;
595 }
596 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
597 *queuedSyncSize = queuedManualSyncSize_;
598 LOGI("[GenericSyncer] GetQueuedSyncSize:%d", queuedManualSyncSize_);
599 return E_OK;
600 }
601
SetQueuedSyncLimit(const int * queuedSyncLimit)602 int GenericSyncer::SetQueuedSyncLimit(const int *queuedSyncLimit)
603 {
604 if (queuedSyncLimit == nullptr) {
605 return -E_INVALID_ARGS;
606 }
607 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
608 queuedManualSyncLimit_ = *queuedSyncLimit;
609 LOGI("[GenericSyncer] SetQueuedSyncLimit:%d", queuedManualSyncLimit_);
610 return E_OK;
611 }
612
GetQueuedSyncLimit(int * queuedSyncLimit) const613 int GenericSyncer::GetQueuedSyncLimit(int *queuedSyncLimit) const
614 {
615 if (queuedSyncLimit == nullptr) {
616 return -E_INVALID_ARGS;
617 }
618 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
619 *queuedSyncLimit = queuedManualSyncLimit_;
620 LOGI("[GenericSyncer] GetQueuedSyncLimit:%d", queuedManualSyncLimit_);
621 return E_OK;
622 }
623
IsManualSync(int inMode) const624 bool GenericSyncer::IsManualSync(int inMode) const
625 {
626 int mode = SyncOperation::TransferSyncMode(inMode);
627 if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH) || (mode == SyncModeType::PUSH_AND_PULL) ||
628 (mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
629 return true;
630 }
631 return false;
632 }
633
AddQueuedManualSyncSize(int mode,bool wait)634 int GenericSyncer::AddQueuedManualSyncSize(int mode, bool wait)
635 {
636 if (IsManualSync(mode) && (!wait)) {
637 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
638 if (!manualSyncEnable_) {
639 LOGI("[GenericSyncer] manualSyncEnable is Disable");
640 return -E_BUSY;
641 }
642 queuedManualSyncSize_++;
643 }
644 return E_OK;
645 }
646
IsQueuedManualSyncFull(int mode,bool wait) const647 bool GenericSyncer::IsQueuedManualSyncFull(int mode, bool wait) const
648 {
649 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
650 if (IsManualSync(mode) && (!manualSyncEnable_)) {
651 LOGI("[GenericSyncer] manualSyncEnable_:false");
652 return true;
653 }
654 if (IsManualSync(mode) && (!wait)) {
655 if (queuedManualSyncSize_ < queuedManualSyncLimit_) {
656 return false;
657 } else {
658 LOGD("[GenericSyncer] queuedManualSyncSize_:%d < queuedManualSyncLimit_:%d", queuedManualSyncSize_,
659 queuedManualSyncLimit_);
660 return true;
661 }
662 } else {
663 return false;
664 }
665 }
666
SubQueuedSyncSize(void)667 void GenericSyncer::SubQueuedSyncSize(void)
668 {
669 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
670 queuedManualSyncSize_--;
671 if (queuedManualSyncSize_ < 0) {
672 LOGE("[GenericSyncer] queuedManualSyncSize_ < 0!");
673 queuedManualSyncSize_ = 0;
674 }
675 }
676
DisableManualSync(void)677 int GenericSyncer::DisableManualSync(void)
678 {
679 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
680 if (queuedManualSyncSize_ > 0) {
681 LOGD("[GenericSyncer] DisableManualSync fail, queuedManualSyncSize_:%d", queuedManualSyncSize_);
682 return -E_BUSY;
683 }
684 manualSyncEnable_ = false;
685 LOGD("[GenericSyncer] DisableManualSync ok");
686 return E_OK;
687 }
688
EnableManualSync(void)689 int GenericSyncer::EnableManualSync(void)
690 {
691 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
692 manualSyncEnable_ = true;
693 LOGD("[GenericSyncer] EnableManualSync ok");
694 return E_OK;
695 }
696
GetLocalIdentity(std::string & outTarget) const697 int GenericSyncer::GetLocalIdentity(std::string &outTarget) const
698 {
699 std::string deviceId;
700 int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(deviceId);
701 if (errCode != E_OK) {
702 LOGE("[GenericSyncer] GetLocalIdentity fail errCode:%d", errCode);
703 return errCode;
704 }
705 outTarget = DBCommon::TransferHashString(deviceId);
706 return E_OK;
707 }
708
GetOnlineDevices(std::vector<std::string> & devices) const709 void GenericSyncer::GetOnlineDevices(std::vector<std::string> &devices) const
710 {
711 // Get devices from AutoLaunch first.
712 if (syncInterface_ == nullptr) {
713 LOGI("[Syncer] GetOnlineDevices syncInterface_ is nullptr");
714 return;
715 }
716 bool isSyncDualTupleMode = syncInterface_->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
717 false);
718 std::string identifier;
719 if (isSyncDualTupleMode) {
720 identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
721 } else {
722 identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
723 }
724 RuntimeContext::GetInstance()->GetAutoLaunchSyncDevices(identifier, devices);
725 if (!devices.empty()) {
726 return;
727 }
728 std::lock_guard<std::mutex> lock(syncerLock_);
729 if (closing_) {
730 LOGE("[Syncer] Syncer is closing, return!");
731 return;
732 }
733 if (syncEngine_ != nullptr) {
734 syncEngine_->GetOnlineDevices(devices);
735 }
736 }
737
SetSyncRetry(bool isRetry)738 int GenericSyncer::SetSyncRetry(bool isRetry)
739 {
740 if (syncEngine_ == nullptr) {
741 return -E_NOT_INIT;
742 }
743 syncEngine_->SetSyncRetry(isRetry);
744 return E_OK;
745 }
746
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)747 int GenericSyncer::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
748 {
749 std::lock_guard<std::mutex> lock(syncerLock_);
750 if (syncEngine_ == nullptr) {
751 return -E_NOT_INIT;
752 }
753 int errCode = syncEngine_->SetEqualIdentifier(identifier, targets);
754 if (errCode == E_OK) {
755 syncEngine_->SetEqualIdentifierMap(identifier, targets);
756 }
757 return errCode;
758 }
759
GetSyncDevicesStr(const std::vector<std::string> & devices) const760 std::string GenericSyncer::GetSyncDevicesStr(const std::vector<std::string> &devices) const
761 {
762 std::string syncDevices;
763 for (const auto &dev:devices) {
764 syncDevices += STR_MASK(dev);
765 syncDevices += ",";
766 }
767 return syncDevices.substr(0, syncDevices.size() - 1);
768 }
769
StatusCheck() const770 int GenericSyncer::StatusCheck() const
771 {
772 if (!initialized_) {
773 LOGE("[Syncer] Syncer is not initialized, return!");
774 return -E_NOT_INIT;
775 }
776 if (closing_) {
777 LOGE("[Syncer] Syncer is closing, return!");
778 return -E_BUSY;
779 }
780 return E_OK;
781 }
782
SyncParamCheck(const SyncParma & param) const783 int GenericSyncer::SyncParamCheck(const SyncParma ¶m) const
784 {
785 std::lock_guard<std::mutex> lock(syncerLock_);
786 int errCode = StatusCheck();
787 if (errCode != E_OK) {
788 return errCode;
789 }
790 if (!IsValidDevices(param.devices) || !IsValidMode(param.mode)) {
791 return -E_INVALID_ARGS;
792 }
793 if (IsQueuedManualSyncFull(param.mode, param.wait)) {
794 LOGE("[Syncer] -E_BUSY");
795 return -E_BUSY;
796 }
797 QuerySyncObject syncQuery = param.syncQuery;
798 return SyncConditionCheck(syncQuery, param.mode, param.isQuerySync, param.devices);
799 }
800
InitSyncOperation(SyncOperation * operation,const SyncParma & param)801 void GenericSyncer::InitSyncOperation(SyncOperation *operation, const SyncParma ¶m)
802 {
803 operation->SetIdentifier(syncInterface_->GetIdentifier());
804 operation->Initialize();
805 operation->OnKill(std::bind(&GenericSyncer::SyncOperationKillCallback, this, operation->GetSyncId()));
806 std::function<void(int)> onFinished = std::bind(&GenericSyncer::OnSyncFinished, this, std::placeholders::_1);
807 operation->SetOnSyncFinished(onFinished);
808 operation->SetOnSyncFinalize(param.onFinalize);
809 if (param.isQuerySync) {
810 operation->SetQuery(param.syncQuery);
811 }
812 }
813
BuildSyncEngine()814 int GenericSyncer::BuildSyncEngine()
815 {
816 if (syncEngine_ != nullptr) {
817 return E_OK;
818 }
819 syncEngine_ = CreateSyncEngine();
820 if (syncEngine_ == nullptr) {
821 return -E_OUT_OF_MEMORY;
822 }
823 syncEngine_->OnLastRef([this]() {
824 LOGD("[Syncer] SyncEngine finalized");
825 {
826 std::lock_guard<std::mutex> cvLock(engineMutex_);
827 engineFinalize_ = true;
828 }
829 engineFinalizeCv_.notify_all();
830 });
831 return E_OK;
832 }
833
Dump(int fd)834 void GenericSyncer::Dump(int fd)
835 {
836 if (syncEngine_ == nullptr) {
837 return;
838 }
839 syncEngine_->Dump(fd);
840 }
841
DumpSyncerBasicInfo()842 SyncerBasicInfo GenericSyncer::DumpSyncerBasicInfo()
843 {
844 SyncerBasicInfo baseInfo;
845 if (syncEngine_ == nullptr) {
846 return baseInfo;
847 }
848 RefObject::IncObjRef(syncEngine_);
849 baseInfo.isSyncActive = syncEngine_->IsEngineActive();
850 RefObject::DecObjRef(syncEngine_);
851 return baseInfo;
852 }
853
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)854 int GenericSyncer::RemoteQuery(const std::string &device, const RemoteCondition &condition,
855 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
856 {
857 ISyncEngine *syncEngine = nullptr;
858 {
859 std::lock_guard<std::mutex> lock(syncerLock_);
860 int errCode = StatusCheck();
861 if (errCode != E_OK) {
862 return errCode;
863 }
864 syncEngine = syncEngine_;
865 RefObject::IncObjRef(syncEngine);
866 }
867 if (syncEngine == nullptr) {
868 return -E_NOT_INIT;
869 }
870 int errCode = syncEngine->RemoteQuery(device, condition, timeout, connectionId, result);
871 RefObject::DecObjRef(syncEngine);
872 return errCode;
873 }
874
InitTimeChangedListener()875 int GenericSyncer::InitTimeChangedListener()
876 {
877 int errCode = E_OK;
878 if (timeChangedListener_ != nullptr) {
879 return errCode;
880 }
881 timeChangedListener_ = RuntimeContext::GetInstance()->RegisterTimeChangedLister(
882 [this](void *changedOffset) {
883 if (changedOffset == nullptr || metadata_ == nullptr || syncInterface_ == nullptr) {
884 return;
885 }
886 TimeOffset changedTimeOffset = *(reinterpret_cast<TimeOffset *>(changedOffset)) *
887 static_cast<TimeOffset>(TimeHelper::TO_100_NS);
888 TimeOffset orgOffset = this->metadata_->GetLocalTimeOffset() - changedTimeOffset;
889 Timestamp currentSysTime = TimeHelper::GetSysCurrentTime();
890 Timestamp maxItemTime = 0;
891 this->syncInterface_->GetMaxTimestamp(maxItemTime);
892 if (static_cast<Timestamp>(orgOffset + currentSysTime) > TimeHelper::BUFFER_VALID_TIME) {
893 orgOffset = static_cast<Timestamp>(TimeHelper::BUFFER_VALID_TIME) -
894 currentSysTime + TimeHelper::MS_TO_100_NS;
895 }
896 if (static_cast<Timestamp>(currentSysTime + orgOffset) <= maxItemTime) {
897 orgOffset = static_cast<TimeOffset>(maxItemTime - currentSysTime + TimeHelper::MS_TO_100_NS); // 1ms
898 }
899 this->metadata_->SaveLocalTimeOffset(orgOffset);
900 }, errCode);
901 if (timeChangedListener_ == nullptr) {
902 LOGE("[GenericSyncer] Init RegisterTimeChangedLister failed");
903 return errCode;
904 }
905 return E_OK;
906 }
907
IsNeedActive(ISyncInterface * syncInterface)908 bool GenericSyncer::IsNeedActive(ISyncInterface *syncInterface)
909 {
910 bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
911 if (localOnly) {
912 LOGD("[Syncer] Local only db, don't need active syncer");
913 return false;
914 }
915 return true;
916 }
917
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)918 int GenericSyncer::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
919 {
920 return -E_NOT_SUPPORT;
921 }
922 } // namespace DistributedDB
923