1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_engine.h"
17
18 #include <algorithm>
19 #include <deque>
20 #include <functional>
21
22 #include "ability_sync.h"
23 #include "db_common.h"
24 #include "db_dump_helper.h"
25 #include "db_errno.h"
26 #include "device_manager.h"
27 #include "hash.h"
28 #include "isync_state_machine.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "single_ver_serialize_manager.h"
32 #include "subscribe_manager.h"
33 #include "time_sync.h"
34
35 #ifndef OMIT_MULTI_VER
36 #include "commit_history_sync.h"
37 #include "multi_ver_data_sync.h"
38 #include "value_slice_sync.h"
39 #endif
40
41 namespace DistributedDB {
42 int SyncEngine::queueCacheSize_ = 0;
43 int SyncEngine::maxQueueCacheSize_ = DEFAULT_CACHE_SIZE;
44 unsigned int SyncEngine::discardMsgNum_ = 0;
45 std::mutex SyncEngine::queueLock_;
46
SyncEngine()47 SyncEngine::SyncEngine()
48 : syncInterface_(nullptr),
49 communicator_(nullptr),
50 deviceManager_(nullptr),
51 metadata_(nullptr),
52 execTaskCount_(0),
53 isSyncRetry_(false),
54 communicatorProxy_(nullptr),
55 isActive_(false),
56 remoteExecutor_(nullptr)
57 {
58 }
59
~SyncEngine()60 SyncEngine::~SyncEngine()
61 {
62 LOGD("[SyncEngine] ~SyncEngine!");
63 ClearInnerResource();
64 equalIdentifierMap_.clear();
65 subManager_ = nullptr;
66 LOGD("[SyncEngine] ~SyncEngine ok!");
67 }
68
Initialize(ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,const InitCallbackParam & callbackParam)69 int SyncEngine::Initialize(ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata,
70 const InitCallbackParam &callbackParam)
71 {
72 if ((syncInterface == nullptr) || (metadata == nullptr)) {
73 return -E_INVALID_ARGS;
74 }
75 int errCode = StartAutoSubscribeTimer(*syncInterface);
76 if (errCode != E_OK) {
77 return errCode;
78 }
79
80 errCode = InitComunicator(syncInterface);
81 if (errCode != E_OK) {
82 LOGE("[SyncEngine] Init Communicator failed");
83 // There need to set nullptr. other wise, syncInterface will be
84 // DecRef in th destroy-method.
85 StopAutoSubscribeTimer();
86 return errCode;
87 }
88 onRemoteDataChanged_ = callbackParam.onRemoteDataChanged;
89 offlineChanged_ = callbackParam.offlineChanged;
90 queryAutoSyncCallback_ = callbackParam.queryAutoSyncCallback;
91 errCode = InitInnerSource(callbackParam.onRemoteDataChanged, callbackParam.offlineChanged, syncInterface);
92 if (errCode != E_OK) {
93 // reset ptr if initialize device manager failed
94 StopAutoSubscribeTimer();
95 return errCode;
96 }
97 SetSyncInterface(syncInterface);
98 if (subManager_ == nullptr) {
99 subManager_ = std::make_shared<SubscribeManager>();
100 }
101 metadata_ = metadata;
102 isActive_ = true;
103 LOGI("[SyncEngine] Engine [%s] init ok", label_.c_str());
104 return E_OK;
105 }
106
Close()107 int SyncEngine::Close()
108 {
109 LOGI("[SyncEngine] SyncEngine [%s] close enter!", label_.c_str());
110 isActive_ = false;
111 UnRegCommunicatorsCallback();
112 StopAutoSubscribeTimer();
113 std::vector<ISyncTaskContext *> decContext;
114 // Clear SyncContexts
115 {
116 std::unique_lock<std::mutex> lock(contextMapLock_);
117 for (auto &iter : syncTaskContextMap_) {
118 decContext.push_back(iter.second);
119 iter.second = nullptr;
120 }
121 syncTaskContextMap_.clear();
122 }
123 for (auto &iter : decContext) {
124 RefObject::KillAndDecObjRef(iter);
125 iter = nullptr;
126 }
127 WaitingExecTaskExist();
128 ReleaseCommunicators();
129 {
130 std::lock_guard<std::mutex> msgLock(queueLock_);
131 while (!msgQueue_.empty()) {
132 Message *inMsg = msgQueue_.front();
133 msgQueue_.pop_front();
134 if (inMsg != nullptr) { // LCOV_EXCL_BR_LINE
135 queueCacheSize_ -= GetMsgSize(inMsg);
136 delete inMsg;
137 inMsg = nullptr;
138 }
139 }
140 }
141 // close db, rekey or import scene, need clear all remote query info
142 // local query info will destroy with syncEngine destruct
143 if (subManager_ != nullptr) {
144 subManager_->ClearAllRemoteQuery();
145 }
146
147 RemoteExecutor *executor = GetAndIncRemoteExector();
148 if (executor != nullptr) {
149 executor->Close();
150 RefObject::DecObjRef(executor);
151 executor = nullptr;
152 }
153 ClearInnerResource();
154 LOGI("[SyncEngine] SyncEngine [%s] closed!", label_.c_str());
155 return E_OK;
156 }
157
AddSyncOperation(SyncOperation * operation)158 int SyncEngine::AddSyncOperation(SyncOperation *operation)
159 {
160 if (operation == nullptr) {
161 LOGE("[SyncEngine] operation is nullptr");
162 return -E_INVALID_ARGS;
163 }
164
165 std::vector<std::string> devices = operation->GetDevices();
166 std::string localDeviceId;
167 int errCode = GetLocalDeviceId(localDeviceId);
168 for (const auto &deviceId : devices) {
169 if (errCode != E_OK) {
170 operation->SetStatus(deviceId, errCode == -E_BUSY ?
171 SyncOperation::OP_BUSY_FAILURE : SyncOperation::OP_FAILED);
172 continue;
173 }
174 if (!CheckDeviceIdValid(deviceId, localDeviceId)) {
175 operation->SetStatus(deviceId, SyncOperation::OP_INVALID_ARGS);
176 continue;
177 }
178 operation->SetStatus(deviceId, SyncOperation::OP_WAITING);
179 if (AddSyncOperForContext(deviceId, operation) != E_OK) {
180 operation->SetStatus(deviceId, SyncOperation::OP_FAILED);
181 }
182 }
183 return E_OK;
184 }
185
RemoveSyncOperation(int syncId)186 void SyncEngine::RemoveSyncOperation(int syncId)
187 {
188 std::lock_guard<std::mutex> lock(contextMapLock_);
189 for (auto &iter : syncTaskContextMap_) {
190 ISyncTaskContext *context = iter.second;
191 if (context != nullptr) {
192 context->RemoveSyncOperation(syncId);
193 }
194 }
195 }
196
197 #ifndef OMIT_MULTI_VER
BroadCastDataChanged() const198 void SyncEngine::BroadCastDataChanged() const
199 {
200 if (deviceManager_ != nullptr) {
201 (void)deviceManager_->SendBroadCast(LOCAL_DATA_CHANGED);
202 }
203 }
204 #endif // OMIT_MULTI_VER
205
StartCommunicator()206 void SyncEngine::StartCommunicator()
207 {
208 if (communicator_ == nullptr) {
209 LOGE("[SyncEngine][StartCommunicator] communicator is not set!");
210 return;
211 }
212 LOGD("[SyncEngine][StartCommunicator] RegOnConnectCallback");
213 int errCode = communicator_->RegOnConnectCallback(
214 [this, deviceManager = deviceManager_](const std::string &targetDev, bool isConnect) {
215 deviceManager->OnDeviceConnectCallback(targetDev, isConnect);
216 }, nullptr);
217 if (errCode != E_OK) {
218 LOGE("[SyncEngine][StartCommunicator] register failed, auto sync can not use! err %d", errCode);
219 return;
220 }
221 communicator_->Activate(GetUserId());
222 }
223
GetOnlineDevices(std::vector<std::string> & devices) const224 void SyncEngine::GetOnlineDevices(std::vector<std::string> &devices) const
225 {
226 devices.clear();
227 if (deviceManager_ != nullptr) {
228 deviceManager_->GetOnlineDevices(devices);
229 }
230 }
231
InitInnerSource(const std::function<void (std::string)> & onRemoteDataChanged,const std::function<void (std::string)> & offlineChanged,ISyncInterface * syncInterface)232 int SyncEngine::InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged,
233 const std::function<void(std::string)> &offlineChanged, ISyncInterface *syncInterface)
234 {
235 deviceManager_ = new (std::nothrow) DeviceManager();
236 if (deviceManager_ == nullptr) {
237 LOGE("[SyncEngine] deviceManager alloc failed!");
238 return -E_OUT_OF_MEMORY;
239 }
240 auto executor = new (std::nothrow) RemoteExecutor();
241 if (executor == nullptr) {
242 LOGE("[SyncEngine] remoteExecutor alloc failed!");
243 delete deviceManager_;
244 deviceManager_ = nullptr;
245 return -E_OUT_OF_MEMORY;
246 }
247
248 int errCode = E_OK;
249 do {
250 CommunicatorProxy *comProxy = nullptr;
251 {
252 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
253 comProxy = communicatorProxy_;
254 RefObject::IncObjRef(comProxy);
255 }
256 errCode = deviceManager_->Initialize(comProxy, onRemoteDataChanged, offlineChanged);
257 RefObject::DecObjRef(comProxy);
258 if (errCode != E_OK) {
259 LOGE("[SyncEngine] deviceManager init failed! err %d", errCode);
260 break;
261 }
262 errCode = executor->Initialize(syncInterface, communicator_);
263 } while (false);
264 if (errCode != E_OK) {
265 delete deviceManager_;
266 deviceManager_ = nullptr;
267 delete executor;
268 executor = nullptr;
269 } else {
270 SetRemoteExector(executor);
271 }
272 return errCode;
273 }
274
InitComunicator(const ISyncInterface * syncInterface)275 int SyncEngine::InitComunicator(const ISyncInterface *syncInterface)
276 {
277 ICommunicatorAggregator *communicatorAggregator = nullptr;
278 int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
279 if (communicatorAggregator == nullptr) {
280 LOGE("[SyncEngine] Get ICommunicatorAggregator error when init the sync engine err = %d", errCode);
281 return errCode;
282 }
283 std::vector<uint8_t> label = syncInterface->GetIdentifier();
284 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
285 if (isSyncDualTupleMode) {
286 std::vector<uint8_t> dualTuplelabel = syncInterface->GetDualTupleIdentifier();
287 LOGI("[SyncEngine] dual tuple mode, original identifier=%.3s, target identifier=%.3s", VEC_TO_STR(label),
288 VEC_TO_STR(dualTuplelabel));
289 communicator_ = communicatorAggregator->AllocCommunicator(dualTuplelabel, errCode, GetUserId(syncInterface));
290 } else {
291 communicator_ = communicatorAggregator->AllocCommunicator(label, errCode, GetUserId(syncInterface));
292 }
293 if (communicator_ == nullptr) {
294 LOGE("[SyncEngine] AllocCommunicator error when init the sync engine! err = %d", errCode);
295 return errCode;
296 }
297
298 errCode = communicator_->RegOnMessageCallback(
299 [this](const std::string &targetDev, Message *inMsg) { MessageReciveCallback(targetDev, inMsg); }, []() {});
300 if (errCode != E_OK) {
301 LOGE("[SyncEngine] SyncRequestCallback register failed! err = %d", errCode);
302 communicatorAggregator->ReleaseCommunicator(communicator_, GetUserId(syncInterface));
303 communicator_ = nullptr;
304 return errCode;
305 }
306 {
307 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
308 communicatorProxy_ = new (std::nothrow) CommunicatorProxy();
309 if (communicatorProxy_ == nullptr) {
310 communicatorAggregator->ReleaseCommunicator(communicator_, GetUserId(syncInterface));
311 communicator_ = nullptr;
312 return -E_OUT_OF_MEMORY;
313 }
314 communicatorProxy_->SetMainCommunicator(communicator_);
315 }
316 label.resize(3); // only show 3 Bytes enough
317 label_ = DBCommon::VectorToHexString(label);
318 LOGD("[SyncEngine] RegOnConnectCallback");
319 return errCode;
320 }
321
AddSyncOperForContext(const std::string & deviceId,SyncOperation * operation)322 int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation)
323 {
324 int errCode = E_OK;
325 ISyncTaskContext *context = nullptr;
326 {
327 std::lock_guard<std::mutex> lock(contextMapLock_);
328 context = FindSyncTaskContext(deviceId);
329 if (context == nullptr) {
330 if (!IsKilled()) {
331 context = GetSyncTaskContext(deviceId, errCode);
332 }
333 if (context == nullptr) {
334 return errCode;
335 }
336 }
337 if (context->IsKilled()) { // LCOV_EXCL_BR_LINE
338 return -E_OBJ_IS_KILLED;
339 }
340 // IncRef for SyncEngine to make sure context is valid, to avoid a big lock
341 RefObject::IncObjRef(context);
342 }
343
344 errCode = context->AddSyncOperation(operation);
345 if (operation != nullptr) {
346 operation->SetSyncContext(context); // make the life cycle of context and operation are same
347 }
348 RefObject::DecObjRef(context);
349 return errCode;
350 }
351
MessageReciveCallbackTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)352 void SyncEngine::MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator,
353 Message *inMsg)
354 {
355 std::string deviceId = context->GetDeviceId();
356
357 if (inMsg->GetMessageId() != LOCAL_DATA_CHANGED) {
358 int errCode = context->ReceiveMessageCallback(inMsg);
359 if (errCode == -E_NOT_NEED_DELETE_MSG) {
360 goto MSG_CALLBACK_OUT_NOT_DEL;
361 }
362 // add auto sync here while recv subscribe request
363 QuerySyncObject syncObject;
364 if (errCode == E_OK && context->IsNeedTriggerQueryAutoSync(inMsg, syncObject)) {
365 InternalSyncParma param;
366 GetQueryAutoSyncParam(deviceId, syncObject, param);
367 queryAutoSyncCallback_(param);
368 }
369 }
370
371 delete inMsg;
372 inMsg = nullptr;
373 MSG_CALLBACK_OUT_NOT_DEL:
374 ScheduleTaskOut(context, communicator);
375 }
376
RemoteDataChangedTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)377 void SyncEngine::RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg)
378 {
379 std::string deviceId = context->GetDeviceId();
380 if (onRemoteDataChanged_ && deviceManager_->IsDeviceOnline(deviceId)) {
381 onRemoteDataChanged_(deviceId);
382 } else {
383 LOGE("[SyncEngine] onRemoteDataChanged is null!");
384 }
385 delete inMsg;
386 inMsg = nullptr;
387 ScheduleTaskOut(context, communicator);
388 }
389
ScheduleTaskOut(ISyncTaskContext * context,const ICommunicator * communicator)390 void SyncEngine::ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator)
391 {
392 (void)DealMsgUtilQueueEmpty();
393 DecExecTaskCount();
394 RefObject::DecObjRef(communicator);
395 RefObject::DecObjRef(context);
396 }
397
DealMsgUtilQueueEmpty()398 int SyncEngine::DealMsgUtilQueueEmpty()
399 {
400 if (!isActive_) {
401 return -E_BUSY; // db is closing just return
402 }
403 int errCode = E_OK;
404 Message *inMsg = nullptr;
405 {
406 std::lock_guard<std::mutex> lock(queueLock_);
407 if (msgQueue_.empty()) {
408 return errCode;
409 }
410 inMsg = msgQueue_.front();
411 msgQueue_.pop_front();
412 queueCacheSize_ -= GetMsgSize(inMsg);
413 }
414
415 IncExecTaskCount();
416 // it will deal with the first message in queue, we should increase object reference counts and sure that resources
417 // could be prevented from destroying by other threads.
418 do {
419 ISyncTaskContext *nextContext = GetContextForMsg(inMsg->GetTarget(), errCode);
420 if (errCode != E_OK) {
421 break;
422 }
423 errCode = ScheduleDealMsg(nextContext, inMsg);
424 if (errCode != E_OK) {
425 RefObject::DecObjRef(nextContext);
426 }
427 } while (false);
428 if (errCode != E_OK) {
429 delete inMsg;
430 inMsg = nullptr;
431 DecExecTaskCount();
432 }
433 return errCode;
434 }
435
GetContextForMsg(const std::string & targetDev,int & errCode)436 ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, int &errCode)
437 {
438 ISyncTaskContext *context = nullptr;
439 {
440 std::lock_guard<std::mutex> lock(contextMapLock_);
441 context = FindSyncTaskContext(targetDev);
442 if (context != nullptr) { // LCOV_EXCL_BR_LINE
443 if (context->IsKilled()) {
444 errCode = -E_OBJ_IS_KILLED;
445 return nullptr;
446 }
447 } else {
448 if (IsKilled()) {
449 errCode = -E_OBJ_IS_KILLED;
450 return nullptr;
451 }
452 context = GetSyncTaskContext(targetDev, errCode);
453 if (context == nullptr) {
454 return nullptr;
455 }
456 }
457 // IncRef for context to make sure context is valid, when task run another thread
458 RefObject::IncObjRef(context);
459 }
460 return context;
461 }
462
ScheduleDealMsg(ISyncTaskContext * context,Message * inMsg)463 int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg)
464 {
465 if (inMsg == nullptr) {
466 LOGE("[SyncEngine] MessageReciveCallback inMsg is null!");
467 DecExecTaskCount();
468 return E_OK;
469 }
470 CommunicatorProxy *comProxy = nullptr;
471 {
472 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
473 comProxy = communicatorProxy_;
474 RefObject::IncObjRef(comProxy);
475 }
476 int errCode = E_OK;
477 // deal remote local data changed message
478 if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
479 RemoteDataChangedTask(context, comProxy, inMsg);
480 } else {
481 errCode = RuntimeContext::GetInstance()->ScheduleTask(
482 [this, context, comProxy, inMsg] { MessageReciveCallbackTask(context, comProxy, inMsg); });
483 }
484
485 if (errCode != E_OK) {
486 LOGE("[SyncEngine] MessageReciveCallbackTask Schedule failed err %d", errCode);
487 RefObject::DecObjRef(comProxy);
488 }
489 return errCode;
490 }
491
MessageReciveCallback(const std::string & targetDev,Message * inMsg)492 void SyncEngine::MessageReciveCallback(const std::string &targetDev, Message *inMsg)
493 {
494 IncExecTaskCount();
495 int errCode = MessageReciveCallbackInner(targetDev, inMsg);
496 if (errCode != E_OK) {
497 if (inMsg != nullptr) {
498 delete inMsg;
499 inMsg = nullptr;
500 }
501 DecExecTaskCount();
502 LOGE("[SyncEngine] MessageReciveCallback failed!");
503 }
504 }
505
MessageReciveCallbackInner(const std::string & targetDev,Message * inMsg)506 int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg)
507 {
508 if (targetDev.empty() || inMsg == nullptr) {
509 LOGE("[SyncEngine][MessageReciveCallback] from a invalid device or inMsg is null ");
510 return -E_INVALID_ARGS;
511 }
512 if (!isActive_) {
513 LOGE("[SyncEngine] engine is closing, ignore msg");
514 return -E_BUSY;
515 }
516 if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE) {
517 return HandleRemoteExecutorMsg(targetDev, inMsg);
518 }
519
520 int msgSize = 0;
521 if (!IsSkipCalculateLen(inMsg)) {
522 msgSize = GetMsgSize(inMsg);
523 if (msgSize <= 0) {
524 LOGE("[SyncEngine] GetMsgSize makes a mistake");
525 return -E_NOT_SUPPORT;
526 }
527 }
528
529 {
530 std::lock_guard<std::mutex> lock(queueLock_);
531 if ((queueCacheSize_ + msgSize) > maxQueueCacheSize_) {
532 LOGE("[SyncEngine] The size of message queue is beyond maximum");
533 discardMsgNum_++;
534 return -E_BUSY;
535 }
536
537 if (execTaskCount_ > MAX_EXEC_NUM) {
538 PutMsgIntoQueue(targetDev, inMsg, msgSize);
539 // task dont exec here
540 DecExecTaskCount();
541 return E_OK;
542 }
543 }
544
545 int errCode = E_OK;
546 ISyncTaskContext *nextContext = GetContextForMsg(targetDev, errCode);
547 if (errCode != E_OK) {
548 return errCode;
549 }
550 LOGD("[SyncEngine] MessageReciveCallback MSG ID = %d", inMsg->GetMessageId());
551 return ScheduleDealMsg(nextContext, inMsg);
552 }
553
PutMsgIntoQueue(const std::string & targetDev,Message * inMsg,int msgSize)554 void SyncEngine::PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize)
555 {
556 if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
557 auto iter = std::find_if(msgQueue_.begin(), msgQueue_.end(),
558 [&targetDev](const Message *msg) {
559 return targetDev == msg->GetTarget() && msg->GetMessageId() == LOCAL_DATA_CHANGED;
560 });
561 if (iter != msgQueue_.end()) { // LCOV_EXCL_BR_LINE
562 delete inMsg;
563 inMsg = nullptr;
564 return;
565 }
566 }
567 inMsg->SetTarget(targetDev);
568 msgQueue_.push_back(inMsg);
569 queueCacheSize_ += msgSize;
570 LOGW("[SyncEngine] The quantity of executing threads is beyond maximum. msgQueueSize = %zu", msgQueue_.size());
571 }
572
GetMsgSize(const Message * inMsg) const573 int SyncEngine::GetMsgSize(const Message *inMsg) const
574 {
575 switch (inMsg->GetMessageId()) {
576 case TIME_SYNC_MESSAGE:
577 return TimeSync::CalculateLen(inMsg);
578 case ABILITY_SYNC_MESSAGE:
579 return AbilitySync::CalculateLen(inMsg);
580 case DATA_SYNC_MESSAGE:
581 case QUERY_SYNC_MESSAGE:
582 case CONTROL_SYNC_MESSAGE:
583 return SingleVerSerializeManager::CalculateLen(inMsg);
584 #ifndef OMIT_MULTI_VER
585 case COMMIT_HISTORY_SYNC_MESSAGE:
586 return CommitHistorySync::CalculateLen(inMsg);
587 case MULTI_VER_DATA_SYNC_MESSAGE:
588 return MultiVerDataSync::CalculateLen(inMsg);
589 case VALUE_SLICE_SYNC_MESSAGE:
590 return ValueSliceSync::CalculateLen(inMsg);
591 #endif
592 case LOCAL_DATA_CHANGED:
593 return DeviceManager::CalculateLen();
594 default:
595 LOGE("[SyncEngine] GetMsgSize not support msgId:%u", inMsg->GetMessageId());
596 return -E_NOT_SUPPORT;
597 }
598 }
599
FindSyncTaskContext(const std::string & deviceId)600 ISyncTaskContext *SyncEngine::FindSyncTaskContext(const std::string &deviceId)
601 {
602 auto iter = syncTaskContextMap_.find(deviceId);
603 if (iter != syncTaskContextMap_.end()) {
604 ISyncTaskContext *context = iter->second;
605 return context;
606 }
607 return nullptr;
608 }
609
GetSyncTaskContextAndInc(const std::string & deviceId)610 ISyncTaskContext *SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId)
611 {
612 ISyncTaskContext *context = nullptr;
613 std::lock_guard<std::mutex> lock(contextMapLock_);
614 context = FindSyncTaskContext(deviceId);
615 if (context == nullptr) {
616 LOGI("[SyncEngine] dev=%s, context is null, no need to clear sync operation", STR_MASK(deviceId));
617 return nullptr;
618 }
619 if (context->IsKilled()) { // LCOV_EXCL_BR_LINE
620 LOGI("[SyncEngine] context is killing");
621 return nullptr;
622 }
623 RefObject::IncObjRef(context);
624 return context;
625 }
626
GetSyncTaskContext(const std::string & deviceId,int & errCode)627 ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, int &errCode)
628 {
629 auto storage = GetAndIncSyncInterface();
630 if (storage == nullptr) {
631 errCode = -E_INVALID_DB;
632 LOGE("[SyncEngine] SyncTaskContext alloc failed with null db");
633 return nullptr;
634 }
635 ISyncTaskContext *context = CreateSyncTaskContext(*storage);
636 if (context == nullptr) {
637 errCode = -E_OUT_OF_MEMORY;
638 LOGE("[SyncEngine] SyncTaskContext alloc failed, may be no memory available!");
639 return nullptr;
640 }
641 errCode = context->Initialize(deviceId, storage, metadata_, communicatorProxy_);
642 if (errCode != E_OK) {
643 LOGE("[SyncEngine] context init failed err %d, dev %s", errCode, STR_MASK(deviceId));
644 RefObject::DecObjRef(context);
645 storage->DecRefCount();
646 context = nullptr;
647 return nullptr;
648 }
649 syncTaskContextMap_.insert(std::pair<std::string, ISyncTaskContext *>(deviceId, context));
650 // IncRef for SyncEngine to make sure SyncEngine is valid when context access
651 RefObject::IncObjRef(this);
652 context->OnLastRef([this, deviceId, storage]() {
653 LOGD("[SyncEngine] SyncTaskContext for id %s finalized", STR_MASK(deviceId));
654 RefObject::DecObjRef(this);
655 storage->DecRefCount();
656 });
657 context->RegOnSyncTask([this, context] { return ExecSyncTask(context); });
658 return context;
659 }
660
ExecSyncTask(ISyncTaskContext * context)661 int SyncEngine::ExecSyncTask(ISyncTaskContext *context)
662 {
663 if (IsKilled()) {
664 return -E_OBJ_IS_KILLED;
665 }
666 auto timeout = GetTimeout(context->GetDeviceId());
667 AutoLock lockGuard(context);
668 int status = context->GetTaskExecStatus();
669 if ((status == SyncTaskContext::RUNNING) || context->IsKilled()) {
670 return -E_NOT_SUPPORT;
671 }
672 context->SetTaskExecStatus(ISyncTaskContext::RUNNING);
673 while (!context->IsTargetQueueEmpty()) {
674 int errCode = context->GetNextTarget(timeout);
675 if (errCode != E_OK) {
676 // current task execute failed, try next task
677 context->ClearSyncOperation();
678 continue;
679 }
680 if (context->IsCurrentSyncTaskCanBeSkipped()) { // LCOV_EXCL_BR_LINE
681 context->SetOperationStatus(SyncOperation::OP_FINISHED_ALL);
682 context->ClearSyncOperation();
683 continue;
684 }
685 context->UnlockObj();
686 errCode = context->StartStateMachine();
687 context->LockObj();
688 if (errCode != E_OK) {
689 // machine start failed because timer start failed, try to execute next task
690 LOGW("[SyncEngine] machine StartSync failed");
691 context->SetOperationStatus(SyncOperation::OP_FAILED);
692 context->ClearSyncOperation();
693 continue;
694 }
695 // now task is running just return here
696 return errCode;
697 }
698 LOGD("[SyncEngine] ExecSyncTask finished");
699 context->SetTaskExecStatus(ISyncTaskContext::FINISHED);
700 return E_OK;
701 }
702
GetQueueCacheSize() const703 int SyncEngine::GetQueueCacheSize() const
704 {
705 return queueCacheSize_;
706 }
707
SetQueueCacheSize(int size)708 void SyncEngine::SetQueueCacheSize(int size)
709 {
710 std::lock_guard<std::mutex> lock(queueLock_);
711 queueCacheSize_ = size;
712 }
713
GetDiscardMsgNum() const714 unsigned int SyncEngine::GetDiscardMsgNum() const
715 {
716 return discardMsgNum_;
717 }
718
SetDiscardMsgNum(unsigned int num)719 void SyncEngine::SetDiscardMsgNum(unsigned int num)
720 {
721 std::lock_guard<std::mutex> lock(queueLock_);
722 discardMsgNum_ = num;
723 }
724
GetMaxExecNum() const725 unsigned int SyncEngine::GetMaxExecNum() const
726 {
727 return MAX_EXEC_NUM;
728 }
729
GetMaxQueueCacheSize() const730 int SyncEngine::GetMaxQueueCacheSize() const
731 {
732 return maxQueueCacheSize_;
733 }
734
SetMaxQueueCacheSize(int value)735 void SyncEngine::SetMaxQueueCacheSize(int value)
736 {
737 maxQueueCacheSize_ = value;
738 }
739
GetLabel() const740 std::string SyncEngine::GetLabel() const
741 {
742 return label_;
743 }
744
GetSyncRetry() const745 bool SyncEngine::GetSyncRetry() const
746 {
747 return isSyncRetry_;
748 }
749
SetSyncRetry(bool isRetry)750 void SyncEngine::SetSyncRetry(bool isRetry)
751 {
752 if (isSyncRetry_ == isRetry) {
753 LOGI("sync retry is equal, syncTry=%d, no need to set.", isRetry);
754 return;
755 }
756 isSyncRetry_ = isRetry;
757 LOGI("[SyncEngine] SetSyncRetry:%d ok", isRetry);
758 std::lock_guard<std::mutex> lock(contextMapLock_);
759 for (auto &iter : syncTaskContextMap_) {
760 ISyncTaskContext *context = iter.second;
761 if (context != nullptr) { // LCOV_EXCL_BR_LINE
762 context->SetSyncRetry(isRetry);
763 }
764 }
765 }
766
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)767 int SyncEngine::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
768 {
769 if (!isActive_) {
770 LOGI("[SyncEngine] engine is closed, just put into map");
771 return E_OK;
772 }
773 ICommunicator *communicator = nullptr;
774 {
775 std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
776 if (equalCommunicators_.count(identifier) != 0) {
777 communicator = equalCommunicators_[identifier];
778 } else {
779 int errCode = E_OK;
780 communicator = AllocCommunicator(identifier, errCode, GetUserId());
781 if (communicator == nullptr) {
782 return errCode;
783 }
784 equalCommunicators_[identifier] = communicator;
785 }
786 }
787 std::string targetDevices;
788 for (const auto &dev : targets) {
789 targetDevices += DBCommon::StringMasking(dev) + ",";
790 }
791 LOGI("[SyncEngine] set equal identifier=%.3s, original=%.3s, targetDevices=%s",
792 DBCommon::TransferStringToHex(identifier).c_str(), label_.c_str(),
793 targetDevices.substr(0, (targetDevices.size() > 0 ? targetDevices.size() - 1 : 0)).c_str());
794 {
795 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
796 if (communicatorProxy_ == nullptr) {
797 return -E_INTERNAL_ERROR;
798 }
799 communicatorProxy_->SetEqualCommunicator(communicator, identifier, targets);
800 }
801 communicator->Activate(GetUserId());
802 return E_OK;
803 }
804
SetEqualIdentifier()805 void SyncEngine::SetEqualIdentifier()
806 {
807 std::map<std::string, std::vector<std::string>> equalIdentifier; // key: equalIdentifier value: devices
808 for (auto &item : equalIdentifierMap_) {
809 if (equalIdentifier.find(item.second) == equalIdentifier.end()) { // LCOV_EXCL_BR_LINE
810 equalIdentifier[item.second] = {item.first};
811 } else {
812 equalIdentifier[item.second].push_back(item.first);
813 }
814 }
815 for (const auto &item : equalIdentifier) {
816 SetEqualIdentifier(item.first, item.second);
817 }
818 }
819
SetEqualIdentifierMap(const std::string & identifier,const std::vector<std::string> & targets)820 void SyncEngine::SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets)
821 {
822 for (auto iter = equalIdentifierMap_.begin(); iter != equalIdentifierMap_.end();) {
823 if (identifier == iter->second) {
824 iter = equalIdentifierMap_.erase(iter);
825 continue;
826 }
827 iter++;
828 }
829 for (const auto &device : targets) {
830 equalIdentifierMap_[device] = identifier;
831 }
832 }
833
OfflineHandleByDevice(const std::string & deviceId,ISyncInterface * storage)834 void SyncEngine::OfflineHandleByDevice(const std::string &deviceId, ISyncInterface *storage)
835 {
836 if (!isActive_) {
837 LOGD("[SyncEngine][OfflineHandleByDevice] ignore offline because not init");
838 return;
839 }
840 RemoteExecutor *executor = GetAndIncRemoteExector();
841 if (executor != nullptr) {
842 executor->NotifyDeviceOffline(deviceId);
843 RefObject::DecObjRef(executor);
844 executor = nullptr;
845 }
846 // db closed or device is offline
847 // clear remote subscribe and trigger
848 std::vector<std::string> remoteQueryId;
849 subManager_->GetRemoteSubscribeQueryIds(deviceId, remoteQueryId);
850 subManager_->ClearRemoteSubscribeQuery(deviceId);
851 for (const auto &queryId: remoteQueryId) {
852 if (!subManager_->IsQueryExistSubscribe(queryId)) {
853 static_cast<SingleVerKvDBSyncInterface *>(storage)->RemoveSubscribe(queryId);
854 }
855 }
856 DBInfo dbInfo;
857 static_cast<SyncGenericInterface *>(storage)->GetDBInfo(dbInfo);
858 RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, deviceId);
859 // get context and Inc context if context is not nullptr
860 ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId);
861 {
862 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
863 if (communicatorProxy_ == nullptr) {
864 return;
865 }
866 if (communicatorProxy_->IsDeviceOnline(deviceId)) { // LCOV_EXCL_BR_LINE
867 LOGI("[SyncEngine] target dev=%s is online, no need to clear task.", STR_MASK(deviceId));
868 RefObject::DecObjRef(context);
869 return;
870 }
871 }
872 // means device is offline, clear local subscribe
873 subManager_->ClearLocalSubscribeQuery(deviceId);
874 // clear sync task
875 if (context != nullptr) {
876 context->ClearAllSyncTask();
877 RefObject::DecObjRef(context);
878 }
879 }
880
ClearAllSyncTaskByDevice(const std::string & deviceId)881 void SyncEngine::ClearAllSyncTaskByDevice(const std::string &deviceId)
882 {
883 ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId);
884 if (context != nullptr) {
885 context->ClearAllSyncTask();
886 RefObject::DecObjRef(context);
887 }
888 }
889
GetLocalSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)890 void SyncEngine::GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
891 {
892 subManager_->GetLocalSubscribeQueries(device, subscribeQueries);
893 }
894
GetRemoteSubscribeQueryIds(const std::string & device,std::vector<std::string> & subscribeQueryIds)895 void SyncEngine::GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds)
896 {
897 subManager_->GetRemoteSubscribeQueryIds(device, subscribeQueryIds);
898 }
899
GetRemoteSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)900 void SyncEngine::GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
901 {
902 subManager_->GetRemoteSubscribeQueries(device, subscribeQueries);
903 }
904
PutUnfinishedSubQueries(const std::string & device,const std::vector<QuerySyncObject> & subscribeQueries)905 void SyncEngine::PutUnfinishedSubQueries(const std::string &device,
906 const std::vector<QuerySyncObject> &subscribeQueries)
907 {
908 subManager_->PutLocalUnFinishedSubQueries(device, subscribeQueries);
909 }
910
GetAllUnFinishSubQueries(std::map<std::string,std::vector<QuerySyncObject>> & allSyncQueries)911 void SyncEngine::GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries)
912 {
913 subManager_->GetAllUnFinishSubQueries(allSyncQueries);
914 }
915
AllocCommunicator(const std::string & identifier,int & errCode,std::string userId)916 ICommunicator *SyncEngine::AllocCommunicator(const std::string &identifier, int &errCode, std::string userId)
917 {
918 ICommunicatorAggregator *communicatorAggregator = nullptr;
919 errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
920 if (communicatorAggregator == nullptr) {
921 LOGE("[SyncEngine] Get ICommunicatorAggregator error when SetEqualIdentifier err = %d", errCode);
922 return nullptr;
923 }
924 std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
925 auto communicator = communicatorAggregator->AllocCommunicator(identifierVect, errCode, userId);
926 if (communicator == nullptr) {
927 LOGE("[SyncEngine] AllocCommunicator error when SetEqualIdentifier! err = %d", errCode);
928 return communicator;
929 }
930
931 errCode = communicator->RegOnMessageCallback(
932 [this](const std::string &targetDev, Message *inMsg) { MessageReciveCallback(targetDev, inMsg); }, []() {});
933 if (errCode != E_OK) {
934 LOGE("[SyncEngine] SyncRequestCallback register failed in SetEqualIdentifier! err = %d", errCode);
935 communicatorAggregator->ReleaseCommunicator(communicator, userId);
936 return nullptr;
937 }
938
939 errCode = communicator->RegOnConnectCallback(
940 [this, deviceManager = deviceManager_](const std::string &targetDev, bool isConnect) {
941 deviceManager->OnDeviceConnectCallback(targetDev, isConnect);
942 }, nullptr);
943 if (errCode != E_OK) {
944 LOGE("[SyncEngine][RegConnCB] register failed in SetEqualIdentifier! err %d", errCode);
945 communicator->RegOnMessageCallback(nullptr, nullptr);
946 communicatorAggregator->ReleaseCommunicator(communicator, userId);
947 return nullptr;
948 }
949
950 return communicator;
951 }
952
UnRegCommunicatorsCallback()953 void SyncEngine::UnRegCommunicatorsCallback()
954 {
955 if (communicator_ != nullptr) {
956 communicator_->RegOnMessageCallback(nullptr, nullptr);
957 communicator_->RegOnConnectCallback(nullptr, nullptr);
958 communicator_->RegOnSendableCallback(nullptr, nullptr);
959 }
960 std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
961 for (const auto &iter : equalCommunicators_) {
962 iter.second->RegOnMessageCallback(nullptr, nullptr);
963 iter.second->RegOnConnectCallback(nullptr, nullptr);
964 iter.second->RegOnSendableCallback(nullptr, nullptr);
965 }
966 }
967
ReleaseCommunicators()968 void SyncEngine::ReleaseCommunicators()
969 {
970 {
971 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
972 RefObject::KillAndDecObjRef(communicatorProxy_);
973 communicatorProxy_ = nullptr;
974 }
975 ICommunicatorAggregator *communicatorAggregator = nullptr;
976 int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
977 if (communicatorAggregator == nullptr) {
978 LOGF("[SyncEngine] ICommunicatorAggregator get failed when fialize SyncEngine err %d", errCode);
979 return;
980 }
981
982 if (communicator_ != nullptr) {
983 communicatorAggregator->ReleaseCommunicator(communicator_, GetUserId());
984 communicator_ = nullptr;
985 }
986
987 std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
988 for (auto &iter : equalCommunicators_) {
989 communicatorAggregator->ReleaseCommunicator(iter.second, GetUserId());
990 }
991 equalCommunicators_.clear();
992 }
993
IsSkipCalculateLen(const Message * inMsg)994 bool SyncEngine::IsSkipCalculateLen(const Message *inMsg)
995 {
996 if (inMsg->IsFeedbackError()) {
997 LOGE("[SyncEngine] Feedback Message with errorNo=%u.", inMsg->GetErrorNo());
998 return true;
999 }
1000 return false;
1001 }
1002
GetSubscribeSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)1003 void SyncEngine::GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query,
1004 InternalSyncParma &outParam)
1005 {
1006 outParam.devices = { device };
1007 outParam.mode = SyncModeType::AUTO_SUBSCRIBE_QUERY;
1008 outParam.isQuerySync = true;
1009 outParam.syncQuery = query;
1010 }
1011
GetQueryAutoSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)1012 void SyncEngine::GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query,
1013 InternalSyncParma &outParam)
1014 {
1015 outParam.devices = { device };
1016 outParam.mode = SyncModeType::AUTO_PUSH;
1017 outParam.isQuerySync = true;
1018 outParam.syncQuery = query;
1019 }
1020
StartAutoSubscribeTimer(const ISyncInterface & syncInterface)1021 int SyncEngine::StartAutoSubscribeTimer([[gnu::unused]] const ISyncInterface &syncInterface)
1022 {
1023 return E_OK;
1024 }
1025
StopAutoSubscribeTimer()1026 void SyncEngine::StopAutoSubscribeTimer()
1027 {
1028 }
1029
SubscribeLimitCheck(const std::vector<std::string> & devices,QuerySyncObject & query) const1030 int SyncEngine::SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
1031 {
1032 return subManager_->LocalSubscribeLimitCheck(devices, query);
1033 }
1034
1035
ClearInnerResource()1036 void SyncEngine::ClearInnerResource()
1037 {
1038 ClearSyncInterface();
1039 if (deviceManager_ != nullptr) {
1040 delete deviceManager_;
1041 deviceManager_ = nullptr;
1042 }
1043 communicator_ = nullptr;
1044 metadata_ = nullptr;
1045 onRemoteDataChanged_ = nullptr;
1046 offlineChanged_ = nullptr;
1047 queryAutoSyncCallback_ = nullptr;
1048 std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1049 if (remoteExecutor_ != nullptr) {
1050 RefObject::KillAndDecObjRef(remoteExecutor_);
1051 remoteExecutor_ = nullptr;
1052 }
1053 }
1054
IsEngineActive() const1055 bool SyncEngine::IsEngineActive() const
1056 {
1057 return isActive_;
1058 }
1059
SchemaChange()1060 void SyncEngine::SchemaChange()
1061 {
1062 std::vector<ISyncTaskContext *> tmpContextVec;
1063 {
1064 std::lock_guard<std::mutex> lock(contextMapLock_);
1065 for (const auto &entry : syncTaskContextMap_) { // LCOV_EXCL_BR_LINE
1066 auto context = entry.second;
1067 if (context == nullptr || context->IsKilled()) {
1068 continue;
1069 }
1070 RefObject::IncObjRef(context);
1071 tmpContextVec.push_back(context);
1072 }
1073 }
1074 for (const auto &entryContext : tmpContextVec) {
1075 entryContext->SchemaChange();
1076 RefObject::DecObjRef(entryContext);
1077 }
1078 }
1079
IncExecTaskCount()1080 void SyncEngine::IncExecTaskCount()
1081 {
1082 std::lock_guard<std::mutex> incLock(execTaskCountLock_);
1083 execTaskCount_++;
1084 }
1085
DecExecTaskCount()1086 void SyncEngine::DecExecTaskCount()
1087 {
1088 {
1089 std::lock_guard<std::mutex> decLock(execTaskCountLock_);
1090 execTaskCount_--;
1091 }
1092 execTaskCv_.notify_all();
1093 }
1094
Dump(int fd)1095 void SyncEngine::Dump(int fd)
1096 {
1097 {
1098 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
1099 std::string communicatorLabel;
1100 if (communicatorProxy_ != nullptr) {
1101 communicatorProxy_->GetLocalIdentity(communicatorLabel);
1102 }
1103 DBDumpHelper::Dump(fd, "\tcommunicator label = %s, equalIdentify Info [\n", communicatorLabel.c_str());
1104 if (communicatorProxy_ != nullptr) {
1105 communicatorProxy_->GetLocalIdentity(communicatorLabel);
1106 communicatorProxy_->Dump(fd);
1107 }
1108 }
1109 DBDumpHelper::Dump(fd, "\t]\n\tcontext info [\n");
1110 // dump context info
1111 std::lock_guard<std::mutex> autoLock(contextMapLock_);
1112 for (const auto &entry : syncTaskContextMap_) {
1113 if (entry.second != nullptr) {
1114 entry.second->Dump(fd);
1115 }
1116 }
1117 DBDumpHelper::Dump(fd, "\t]\n\n");
1118 }
1119
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)1120 int SyncEngine::RemoteQuery(const std::string &device, const RemoteCondition &condition,
1121 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
1122 {
1123 RemoteExecutor *executor = GetAndIncRemoteExector();
1124 if (!isActive_ || executor == nullptr) {
1125 RefObject::DecObjRef(executor);
1126 return -E_BUSY; // db is closing just return
1127 }
1128 int errCode = executor->RemoteQuery(device, condition, timeout, connectionId, result);
1129 RefObject::DecObjRef(executor);
1130 return errCode;
1131 }
1132
NotifyConnectionClosed(uint64_t connectionId)1133 void SyncEngine::NotifyConnectionClosed(uint64_t connectionId)
1134 {
1135 RemoteExecutor *executor = GetAndIncRemoteExector();
1136 if (!isActive_ || executor == nullptr) {
1137 RefObject::DecObjRef(executor);
1138 return; // db is closing just return
1139 }
1140 executor->NotifyConnectionClosed(connectionId);
1141 RefObject::DecObjRef(executor);
1142 }
1143
NotifyUserChange()1144 void SyncEngine::NotifyUserChange()
1145 {
1146 RemoteExecutor *executor = GetAndIncRemoteExector();
1147 if (!isActive_ || executor == nullptr) {
1148 RefObject::DecObjRef(executor);
1149 return; // db is closing just return
1150 }
1151 executor->NotifyUserChange();
1152 RefObject::DecObjRef(executor);
1153 }
1154
GetAndIncRemoteExector()1155 RemoteExecutor *SyncEngine::GetAndIncRemoteExector()
1156 {
1157 std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1158 RefObject::IncObjRef(remoteExecutor_);
1159 return remoteExecutor_;
1160 }
1161
SetRemoteExector(RemoteExecutor * executor)1162 void SyncEngine::SetRemoteExector(RemoteExecutor *executor)
1163 {
1164 std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1165 remoteExecutor_ = executor;
1166 }
1167
CheckDeviceIdValid(const std::string & checkDeviceId,const std::string & localDeviceId)1168 bool SyncEngine::CheckDeviceIdValid(const std::string &checkDeviceId, const std::string &localDeviceId)
1169 {
1170 if (checkDeviceId.empty()) {
1171 return false;
1172 }
1173 if (checkDeviceId.length() > DBConstant::MAX_DEV_LENGTH) {
1174 LOGE("[SyncEngine] dev is too long len=%zu", checkDeviceId.length());
1175 return false;
1176 }
1177 return localDeviceId != checkDeviceId;
1178 }
1179
GetLocalDeviceId(std::string & deviceId)1180 int SyncEngine::GetLocalDeviceId(std::string &deviceId)
1181 {
1182 if (!isActive_ || communicator_ == nullptr) {
1183 // db is closing
1184 return -E_BUSY;
1185 }
1186 auto communicator = communicator_;
1187 RefObject::IncObjRef(communicator);
1188 int errCode = communicator->GetLocalIdentity(deviceId);
1189 RefObject::DecObjRef(communicator);
1190 return errCode;
1191 }
1192
AbortMachineIfNeed(uint32_t syncId)1193 void SyncEngine::AbortMachineIfNeed(uint32_t syncId)
1194 {
1195 std::vector<ISyncTaskContext *> abortContexts;
1196 {
1197 std::lock_guard<std::mutex> lock(contextMapLock_);
1198 for (const auto &entry : syncTaskContextMap_) {
1199 auto context = entry.second;
1200 if (context == nullptr || context->IsKilled()) { // LCOV_EXCL_BR_LINE
1201 continue;
1202 }
1203 RefObject::IncObjRef(context);
1204 if (context->GetSyncId() == syncId) {
1205 RefObject::IncObjRef(context);
1206 abortContexts.push_back(context);
1207 }
1208 RefObject::DecObjRef(context);
1209 }
1210 }
1211 for (const auto &abortContext : abortContexts) {
1212 abortContext->AbortMachineIfNeed(static_cast<uint32_t>(syncId));
1213 RefObject::DecObjRef(abortContext);
1214 }
1215 }
1216
WaitingExecTaskExist()1217 void SyncEngine::WaitingExecTaskExist()
1218 {
1219 std::unique_lock<std::mutex> closeLock(execTaskCountLock_);
1220 bool isTimeout = execTaskCv_.wait_for(closeLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
1221 [this]() { return execTaskCount_ == 0; });
1222 if (!isTimeout) { // LCOV_EXCL_BR_LINE
1223 LOGD("SyncEngine Close with executing task!");
1224 }
1225 }
1226
HandleRemoteExecutorMsg(const std::string & targetDev,Message * inMsg)1227 int SyncEngine::HandleRemoteExecutorMsg(const std::string &targetDev, Message *inMsg)
1228 {
1229 RemoteExecutor *executor = GetAndIncRemoteExector();
1230 int errCode = E_OK;
1231 if (executor != nullptr) {
1232 errCode = executor->ReceiveMessage(targetDev, inMsg);
1233 } else {
1234 errCode = -E_BUSY;
1235 }
1236 DecExecTaskCount();
1237 RefObject::DecObjRef(executor);
1238 return errCode;
1239 }
1240
AddSubscribe(SyncGenericInterface * storage,const std::map<std::string,std::vector<QuerySyncObject>> & subscribeQuery)1241 void SyncEngine::AddSubscribe(SyncGenericInterface *storage,
1242 const std::map<std::string, std::vector<QuerySyncObject>> &subscribeQuery)
1243 {
1244 for (const auto &[device, queryList]: subscribeQuery) {
1245 for (const auto &query: queryList) {
1246 AddQuerySubscribe(storage, device, query);
1247 }
1248 }
1249 }
1250
AddQuerySubscribe(SyncGenericInterface * storage,const std::string & device,const QuerySyncObject & query)1251 void SyncEngine::AddQuerySubscribe(SyncGenericInterface *storage, const std::string &device,
1252 const QuerySyncObject &query)
1253 {
1254 int errCode = storage->AddSubscribe(query.GetIdentify(), query, true);
1255 if (errCode != E_OK) {
1256 LOGW("[SyncEngine][AddSubscribe] Add trigger failed dev = %s queryId = %s",
1257 STR_MASK(device), STR_MASK(query.GetIdentify()));
1258 return;
1259 }
1260 errCode = subManager_->ReserveRemoteSubscribeQuery(device, query);
1261 if (errCode != E_OK) {
1262 if (!subManager_->IsQueryExistSubscribe(query.GetIdentify())) { // LCOV_EXCL_BR_LINE
1263 (void)storage->RemoveSubscribe(query.GetIdentify());
1264 }
1265 return;
1266 }
1267 subManager_->ActiveRemoteSubscribeQuery(device, query);
1268 }
1269
TimeChange()1270 void SyncEngine::TimeChange()
1271 {
1272 std::vector<ISyncTaskContext *> decContext;
1273 {
1274 // copy context
1275 std::lock_guard<std::mutex> lock(contextMapLock_);
1276 for (const auto &iter : syncTaskContextMap_) {
1277 RefObject::IncObjRef(iter.second);
1278 decContext.push_back(iter.second);
1279 }
1280 }
1281 for (auto &iter : decContext) {
1282 iter->TimeChange();
1283 RefObject::DecObjRef(iter);
1284 }
1285 }
1286
GetResponseTaskCount()1287 int32_t SyncEngine::GetResponseTaskCount()
1288 {
1289 std::vector<ISyncTaskContext *> decContext;
1290 {
1291 // copy context
1292 std::lock_guard<std::mutex> lock(contextMapLock_);
1293 for (const auto &iter : syncTaskContextMap_) {
1294 RefObject::IncObjRef(iter.second);
1295 decContext.push_back(iter.second);
1296 }
1297 }
1298 int32_t taskCount = 0;
1299 for (auto &iter : decContext) {
1300 taskCount += iter->GetResponseTaskCount();
1301 RefObject::DecObjRef(iter);
1302 }
1303 {
1304 std::lock_guard<std::mutex> decLock(execTaskCountLock_);
1305 taskCount += static_cast<int32_t>(execTaskCount_);
1306 }
1307 return taskCount;
1308 }
1309
ClearSyncInterface()1310 void SyncEngine::ClearSyncInterface()
1311 {
1312 ISyncInterface *syncInterface = nullptr;
1313 {
1314 std::lock_guard<std::mutex> autoLock(storageMutex_);
1315 if (syncInterface_ == nullptr) {
1316 return;
1317 }
1318 syncInterface = syncInterface_;
1319 syncInterface_ = nullptr;
1320 }
1321 syncInterface->DecRefCount();
1322 }
1323
GetAndIncSyncInterface()1324 ISyncInterface *SyncEngine::GetAndIncSyncInterface()
1325 {
1326 std::lock_guard<std::mutex> autoLock(storageMutex_);
1327 if (syncInterface_ == nullptr) {
1328 return nullptr;
1329 }
1330 syncInterface_->IncRefCount();
1331 return syncInterface_;
1332 }
1333
SetSyncInterface(ISyncInterface * syncInterface)1334 void SyncEngine::SetSyncInterface(ISyncInterface *syncInterface)
1335 {
1336 std::lock_guard<std::mutex> autoLock(storageMutex_);
1337 syncInterface_ = syncInterface;
1338 }
1339
GetUserId(const ISyncInterface * syncInterface)1340 std::string SyncEngine::GetUserId(const ISyncInterface *syncInterface)
1341 {
1342 if (syncInterface == nullptr) {
1343 LOGW("[SyncEngine][GetUserId] sync interface has not initialized");
1344 return "";
1345 }
1346 std::string userId = syncInterface->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
1347 std::string subUserId = syncInterface->GetDbProperties().GetStringProp(DBProperties::SUB_USER, "");
1348 if (!subUserId.empty()) {
1349 userId += "-" + subUserId;
1350 }
1351 return userId;
1352 }
1353
GetUserId()1354 std::string SyncEngine::GetUserId()
1355 {
1356 std::lock_guard<std::mutex> autoLock(storageMutex_);
1357 return GetUserId(syncInterface_);
1358 }
1359
GetTimeout(const std::string & dev)1360 uint32_t SyncEngine::GetTimeout(const std::string &dev)
1361 {
1362 ICommunicator *communicator = nullptr;
1363 {
1364 std::lock_guard<std::mutex> autoLock(communicatorProxyLock_);
1365 if (communicatorProxy_ == nullptr) {
1366 LOGW("[SyncEngine] Communicator is null when get %.3s timeout", dev.c_str());
1367 return DBConstant::MIN_TIMEOUT;
1368 }
1369 communicator = communicatorProxy_;
1370 RefObject::IncObjRef(communicator);
1371 }
1372 uint32_t timeout = communicator->GetTimeout(dev);
1373 RefObject::DecObjRef(communicator);
1374 return timeout;
1375 }
1376 } // namespace DistributedDB
1377