1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_engine.h"
17
18 #include <algorithm>
19 #include <deque>
20 #include <functional>
21
22 #include "ability_sync.h"
23 #include "db_common.h"
24 #include "db_dump_helper.h"
25 #include "db_errno.h"
26 #include "device_manager.h"
27 #include "hash.h"
28 #include "isync_state_machine.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "single_ver_serialize_manager.h"
32 #include "subscribe_manager.h"
33 #include "time_sync.h"
34
35 #ifndef OMIT_MULTI_VER
36 #include "commit_history_sync.h"
37 #include "multi_ver_data_sync.h"
38 #include "value_slice_sync.h"
39 #endif
40
41 namespace DistributedDB {
42 int SyncEngine::queueCacheSize_ = 0;
43 int SyncEngine::maxQueueCacheSize_ = DEFAULT_CACHE_SIZE;
44 unsigned int SyncEngine::discardMsgNum_ = 0;
45 std::mutex SyncEngine::queueLock_;
46
SyncEngine()47 SyncEngine::SyncEngine()
48 : syncInterface_(nullptr),
49 communicator_(nullptr),
50 deviceManager_(nullptr),
51 metadata_(nullptr),
52 execTaskCount_(0),
53 isSyncRetry_(false),
54 communicatorProxy_(nullptr),
55 isActive_(false),
56 remoteExecutor_(nullptr)
57 {
58 }
59
~SyncEngine()60 SyncEngine::~SyncEngine()
61 {
62 LOGD("[SyncEngine] ~SyncEngine!");
63 ClearInnerResource();
64 equalIdentifierMap_.clear();
65 subManager_ = nullptr;
66 LOGD("[SyncEngine] ~SyncEngine ok!");
67 }
68
Initialize(ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,const InitCallbackParam & callbackParam)69 int SyncEngine::Initialize(ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata,
70 const InitCallbackParam &callbackParam)
71 {
72 if ((syncInterface == nullptr) || (metadata == nullptr)) {
73 return -E_INVALID_ARGS;
74 }
75 syncInterface_ = syncInterface;
76 int errCode = StartAutoSubscribeTimer();
77 if (errCode != OK) {
78 syncInterface_ = nullptr;
79 return errCode;
80 }
81
82 errCode = InitComunicator(syncInterface);
83 if (errCode != E_OK) {
84 LOGE("[SyncEngine] Init Communicator failed");
85 // There need to set nullptr. other wise, syncInterface will be
86 // DecRef in th destroy-method.
87 StopAutoSubscribeTimer();
88 syncInterface_ = nullptr;
89 return errCode;
90 }
91 onRemoteDataChanged_ = callbackParam.onRemoteDataChanged;
92 offlineChanged_ = callbackParam.offlineChanged;
93 queryAutoSyncCallback_ = callbackParam.queryAutoSyncCallback;
94 errCode = InitInnerSource(callbackParam.onRemoteDataChanged, callbackParam.offlineChanged);
95 if (errCode != E_OK) {
96 // reset ptr if initialize device manager failed
97 syncInterface_ = nullptr;
98 StopAutoSubscribeTimer();
99 return errCode;
100 }
101 if (subManager_ == nullptr) {
102 subManager_ = std::make_shared<SubscribeManager>();
103 }
104 metadata_ = metadata;
105 isActive_ = true;
106 LOGI("[SyncEngine] Engine init ok");
107 return E_OK;
108 }
109
Close()110 int SyncEngine::Close()
111 {
112 LOGI("[SyncEngine] SyncEngine[%s] close enter!", label_.c_str());
113 isActive_ = false;
114 UnRegCommunicatorsCallback();
115 StopAutoSubscribeTimer();
116 std::vector<ISyncTaskContext *> decContext;
117 // Clear SyncContexts
118 {
119 std::unique_lock<std::mutex> lock(contextMapLock_);
120 for (auto &iter : syncTaskContextMap_) {
121 decContext.push_back(iter.second);
122 iter.second = nullptr;
123 }
124 syncTaskContextMap_.clear();
125 }
126 for (auto &iter : decContext) {
127 RefObject::KillAndDecObjRef(iter);
128 iter = nullptr;
129 }
130 WaitingExecTaskExist();
131 ReleaseCommunicators();
132 {
133 std::lock_guard<std::mutex> msgLock(queueLock_);
134 while (!msgQueue_.empty()) {
135 Message *inMsg = msgQueue_.front();
136 msgQueue_.pop_front();
137 if (inMsg != nullptr) {
138 queueCacheSize_ -= GetMsgSize(inMsg);
139 delete inMsg;
140 inMsg = nullptr;
141 }
142 }
143 }
144 // close db, rekey or import scene, need clear all remote query info
145 // local query info will destroy with syncEngine destruct
146 if (subManager_ != nullptr) {
147 subManager_->ClearAllRemoteQuery();
148 }
149
150 RemoteExecutor *executor = GetAndIncRemoteExector();
151 if (executor != nullptr) {
152 executor->Close();
153 RefObject::DecObjRef(executor);
154 }
155 ClearInnerResource();
156 LOGI("[SyncEngine] SyncEngine closed!");
157 return E_OK;
158 }
159
AddSyncOperation(SyncOperation * operation)160 int SyncEngine::AddSyncOperation(SyncOperation *operation)
161 {
162 if (operation == nullptr) {
163 LOGE("[SyncEngine] operation is nullptr");
164 return -E_INVALID_ARGS;
165 }
166
167 std::vector<std::string> devices = operation->GetDevices();
168 std::string localDeviceId;
169 int errCode = GetLocalDeviceId(localDeviceId);
170 for (const auto &deviceId : devices) {
171 if (errCode != E_OK) {
172 operation->SetStatus(deviceId, errCode == -E_BUSY ?
173 SyncOperation::OP_BUSY_FAILURE : SyncOperation::OP_FAILED);
174 continue;
175 }
176 if (!CheckDeviceIdValid(deviceId, localDeviceId)) {
177 operation->SetStatus(deviceId, SyncOperation::OP_INVALID_ARGS);
178 continue;
179 }
180 operation->SetStatus(deviceId, SyncOperation::OP_WAITING);
181 if (AddSyncOperForContext(deviceId, operation) != E_OK) {
182 operation->SetStatus(deviceId, SyncOperation::OP_FAILED);
183 }
184 }
185 return E_OK;
186 }
187
RemoveSyncOperation(int syncId)188 void SyncEngine::RemoveSyncOperation(int syncId)
189 {
190 std::lock_guard<std::mutex> lock(contextMapLock_);
191 for (auto &iter : syncTaskContextMap_) {
192 ISyncTaskContext *context = iter.second;
193 if (context != nullptr) {
194 context->RemoveSyncOperation(syncId);
195 }
196 }
197 }
198
199 #ifndef OMIT_MULTI_VER
BroadCastDataChanged() const200 void SyncEngine::BroadCastDataChanged() const
201 {
202 if (deviceManager_ != nullptr) {
203 (void)deviceManager_->SendBroadCast(LOCAL_DATA_CHANGED);
204 }
205 }
206 #endif // OMIT_MULTI_VER
207
StartCommunicator()208 void SyncEngine::StartCommunicator()
209 {
210 if (communicator_ == nullptr) {
211 LOGE("[SyncEngine][StartCommunicator] communicator is not set!");
212 return;
213 }
214 LOGD("[SyncEngine][StartCommunicator] RegOnConnectCallback");
215 int errCode = communicator_->RegOnConnectCallback(std::bind(&DeviceManager::OnDeviceConnectCallback,
216 deviceManager_, std::placeholders::_1, std::placeholders::_2), nullptr);
217 if (errCode != E_OK) {
218 LOGE("[SyncEngine][StartCommunicator] register failed, auto sync can not use! err %d", errCode);
219 return;
220 }
221 communicator_->Activate();
222 }
223
GetOnlineDevices(std::vector<std::string> & devices) const224 void SyncEngine::GetOnlineDevices(std::vector<std::string> &devices) const
225 {
226 devices.clear();
227 if (deviceManager_ != nullptr) {
228 deviceManager_->GetOnlineDevices(devices);
229 }
230 }
231
InitInnerSource(const std::function<void (std::string)> & onRemoteDataChanged,const std::function<void (std::string)> & offlineChanged)232 int SyncEngine::InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged,
233 const std::function<void(std::string)> &offlineChanged)
234 {
235 deviceManager_ = new (std::nothrow) DeviceManager();
236 if (deviceManager_ == nullptr) {
237 LOGE("[SyncEngine] deviceManager alloc failed!");
238 return -E_OUT_OF_MEMORY;
239 }
240 auto executor = new (std::nothrow) RemoteExecutor();
241 if (executor == nullptr) {
242 LOGE("[SyncEngine] remoteExecutor alloc failed!");
243 delete deviceManager_;
244 deviceManager_ = nullptr;
245 return -E_OUT_OF_MEMORY;
246 }
247
248 int errCode = E_OK;
249 do {
250 CommunicatorProxy *comProxy = nullptr;
251 {
252 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
253 comProxy = communicatorProxy_;
254 RefObject::IncObjRef(comProxy);
255 }
256 errCode = deviceManager_->Initialize(comProxy, onRemoteDataChanged, offlineChanged);
257 RefObject::DecObjRef(comProxy);
258 if (errCode != E_OK) {
259 LOGE("[SyncEngine] deviceManager init failed! err %d", errCode);
260 break;
261 }
262 errCode = executor->Initialize(syncInterface_, communicator_);
263 } while (false);
264 if (errCode != E_OK) {
265 delete deviceManager_;
266 deviceManager_ = nullptr;
267 delete executor;
268 executor = nullptr;
269 } else {
270 SetRemoteExector(executor);
271 }
272 return errCode;
273 }
274
InitComunicator(const ISyncInterface * syncInterface)275 int SyncEngine::InitComunicator(const ISyncInterface *syncInterface)
276 {
277 ICommunicatorAggregator *communicatorAggregator = nullptr;
278 int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
279 if (communicatorAggregator == nullptr) {
280 LOGE("[SyncEngine] Get ICommunicatorAggregator error when init the sync engine err = %d", errCode);
281 return errCode;
282 }
283 std::vector<uint8_t> label = syncInterface->GetIdentifier();
284 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
285 if (isSyncDualTupleMode) {
286 std::vector<uint8_t> dualTuplelabel = syncInterface->GetDualTupleIdentifier();
287 LOGI("[SyncEngine] dual tuple mode, original identifier=%.3s, target identifier=%.3s", VEC_TO_STR(label),
288 VEC_TO_STR(dualTuplelabel));
289 communicator_ = communicatorAggregator->AllocCommunicator(dualTuplelabel, errCode);
290 } else {
291 communicator_ = communicatorAggregator->AllocCommunicator(label, errCode);
292 }
293 if (communicator_ == nullptr) {
294 LOGE("[SyncEngine] AllocCommunicator error when init the sync engine! err = %d", errCode);
295 return errCode;
296 }
297
298 errCode = communicator_->RegOnMessageCallback(
299 std::bind(&SyncEngine::MessageReciveCallback, this, std::placeholders::_1, std::placeholders::_2),
300 []() {});
301 if (errCode != E_OK) {
302 LOGE("[SyncEngine] SyncRequestCallback register failed! err = %d", errCode);
303 communicatorAggregator->ReleaseCommunicator(communicator_);
304 communicator_ = nullptr;
305 return errCode;
306 }
307 {
308 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
309 communicatorProxy_ = new (std::nothrow) CommunicatorProxy();
310 if (communicatorProxy_ == nullptr) {
311 communicatorAggregator->ReleaseCommunicator(communicator_);
312 communicator_ = nullptr;
313 return -E_OUT_OF_MEMORY;
314 }
315 communicatorProxy_->SetMainCommunicator(communicator_);
316 }
317 label.resize(3); // only show 3 Bytes enough
318 label_ = DBCommon::VectorToHexString(label);
319 LOGD("[SyncEngine] RegOnConnectCallback");
320 return errCode;
321 }
322
AddSyncOperForContext(const std::string & deviceId,SyncOperation * operation)323 int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation)
324 {
325 int errCode = E_OK;
326 ISyncTaskContext *context = nullptr;
327 {
328 std::lock_guard<std::mutex> lock(contextMapLock_);
329 context = FindSyncTaskContext(deviceId);
330 if (context == nullptr) {
331 if (!IsKilled()) {
332 context = GetSyncTaskContext(deviceId, errCode);
333 }
334 if (context == nullptr) {
335 return errCode;
336 }
337 }
338 if (context->IsKilled()) {
339 return -E_OBJ_IS_KILLED;
340 }
341 // IncRef for SyncEngine to make sure context is valid, to avoid a big lock
342 RefObject::IncObjRef(context);
343 }
344
345 errCode = context->AddSyncOperation(operation);
346 if (operation != nullptr) {
347 operation->SetSyncContext(context); // make the life cycle of context and operation are same
348 }
349 RefObject::DecObjRef(context);
350 return errCode;
351 }
352
MessageReciveCallbackTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)353 void SyncEngine::MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator,
354 Message *inMsg)
355 {
356 std::string deviceId = context->GetDeviceId();
357
358 if (inMsg->GetMessageId() != LOCAL_DATA_CHANGED) {
359 int errCode = context->ReceiveMessageCallback(inMsg);
360 if (errCode == -E_NOT_NEED_DELETE_MSG) {
361 goto MSG_CALLBACK_OUT_NOT_DEL;
362 }
363 // add auto sync here while recv subscribe request
364 QuerySyncObject syncObject;
365 if (errCode == E_OK && context->IsNeedTriggerQueryAutoSync(inMsg, syncObject)) {
366 InternalSyncParma param;
367 GetQueryAutoSyncParam(deviceId, syncObject, param);
368 queryAutoSyncCallback_(param);
369 }
370 }
371
372 delete inMsg;
373 inMsg = nullptr;
374 MSG_CALLBACK_OUT_NOT_DEL:
375 ScheduleTaskOut(context, communicator);
376 }
377
RemoteDataChangedTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)378 void SyncEngine::RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg)
379 {
380 std::string deviceId = context->GetDeviceId();
381 if (onRemoteDataChanged_ && deviceManager_->IsDeviceOnline(deviceId)) {
382 onRemoteDataChanged_(deviceId);
383 } else {
384 LOGE("[SyncEngine] onRemoteDataChanged is null!");
385 }
386 delete inMsg;
387 inMsg = nullptr;
388 ScheduleTaskOut(context, communicator);
389 }
390
ScheduleTaskOut(ISyncTaskContext * context,const ICommunicator * communicator)391 void SyncEngine::ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator)
392 {
393 (void)DealMsgUtilQueueEmpty();
394 DecExecTaskCount();
395 RefObject::DecObjRef(communicator);
396 RefObject::DecObjRef(context);
397 }
398
DealMsgUtilQueueEmpty()399 int SyncEngine::DealMsgUtilQueueEmpty()
400 {
401 if (!isActive_) {
402 return -E_BUSY; // db is closing just return
403 }
404 int errCode = E_OK;
405 Message *inMsg = nullptr;
406 {
407 std::lock_guard<std::mutex> lock(queueLock_);
408 if (msgQueue_.empty()) {
409 return errCode;
410 }
411 inMsg = msgQueue_.front();
412 msgQueue_.pop_front();
413 queueCacheSize_ -= GetMsgSize(inMsg);
414 }
415
416 IncExecTaskCount();
417 // it will deal with the first message in queue, we should increase object reference counts and sure that resources
418 // could be prevented from destroying by other threads.
419 do {
420 ISyncTaskContext *nextContext = GetContextForMsg(inMsg->GetTarget(), errCode);
421 if (errCode != E_OK) {
422 break;
423 }
424 errCode = ScheduleDealMsg(nextContext, inMsg);
425 if (errCode != E_OK) {
426 RefObject::DecObjRef(nextContext);
427 }
428 } while (false);
429 if (errCode != E_OK) {
430 delete inMsg;
431 inMsg = nullptr;
432 DecExecTaskCount();
433 }
434 return errCode;
435 }
436
GetContextForMsg(const std::string & targetDev,int & errCode)437 ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, int &errCode)
438 {
439 ISyncTaskContext *context = nullptr;
440 {
441 std::lock_guard<std::mutex> lock(contextMapLock_);
442 context = FindSyncTaskContext(targetDev);
443 if (context != nullptr) {
444 if (context->IsKilled()) {
445 errCode = -E_OBJ_IS_KILLED;
446 return nullptr;
447 }
448 } else {
449 if (IsKilled()) {
450 errCode = -E_OBJ_IS_KILLED;
451 return nullptr;
452 }
453 context = GetSyncTaskContext(targetDev, errCode);
454 if (context == nullptr) {
455 return nullptr;
456 }
457 }
458 // IncRef for context to make sure context is valid, when task run another thread
459 RefObject::IncObjRef(context);
460 }
461 return context;
462 }
463
ScheduleDealMsg(ISyncTaskContext * context,Message * inMsg)464 int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg)
465 {
466 if (inMsg == nullptr) {
467 LOGE("[SyncEngine] MessageReciveCallback inMsg is null!");
468 DecExecTaskCount();
469 return E_OK;
470 }
471 CommunicatorProxy *comProxy = nullptr;
472 {
473 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
474 comProxy = communicatorProxy_;
475 RefObject::IncObjRef(comProxy);
476 }
477 int errCode = E_OK;
478 // deal remote local data changed message
479 if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
480 RemoteDataChangedTask(context, comProxy, inMsg);
481 } else {
482 errCode = RuntimeContext::GetInstance()->ScheduleTask(std::bind(&SyncEngine::MessageReciveCallbackTask,
483 this, context, comProxy, inMsg));
484 }
485
486 if (errCode != E_OK) {
487 LOGE("[SyncEngine] MessageReciveCallbackTask Schedule failed err %d", errCode);
488 RefObject::DecObjRef(comProxy);
489 }
490 return errCode;
491 }
492
MessageReciveCallback(const std::string & targetDev,Message * inMsg)493 void SyncEngine::MessageReciveCallback(const std::string &targetDev, Message *inMsg)
494 {
495 IncExecTaskCount();
496 int errCode = MessageReciveCallbackInner(targetDev, inMsg);
497 if (errCode != E_OK) {
498 delete inMsg;
499 inMsg = nullptr;
500 DecExecTaskCount();
501 LOGE("[SyncEngine] MessageReciveCallback failed!");
502 }
503 }
504
MessageReciveCallbackInner(const std::string & targetDev,Message * inMsg)505 int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg)
506 {
507 if (targetDev.empty() || inMsg == nullptr) {
508 LOGE("[SyncEngine][MessageReciveCallback] from a invalid device or inMsg is null ");
509 return -E_INVALID_ARGS;
510 }
511 if (!isActive_) {
512 LOGE("[SyncEngine] engine is closing, ignore msg");
513 return -E_BUSY;
514 }
515 RemoteExecutor *executor = GetAndIncRemoteExector();
516 if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE && executor != nullptr) {
517 int errCode = executor->ReceiveMessage(targetDev, inMsg);
518 RefObject::DecObjRef(executor);
519 DecExecTaskCount();
520 return errCode;
521 } else if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE) {
522 DecExecTaskCount();
523 return -E_BUSY;
524 }
525 RefObject::DecObjRef(executor);
526 int msgSize = 0;
527 if (!IsSkipCalculateLen(inMsg)) {
528 msgSize = GetMsgSize(inMsg);
529 if (msgSize <= 0) {
530 LOGE("[SyncEngine] GetMsgSize makes a mistake");
531 return -E_NOT_SUPPORT;
532 }
533 }
534
535 {
536 std::lock_guard<std::mutex> lock(queueLock_);
537 if ((queueCacheSize_ + msgSize) > maxQueueCacheSize_) {
538 LOGE("[SyncEngine] The size of message queue is beyond maximum");
539 discardMsgNum_++;
540 return -E_BUSY;
541 }
542
543 if (execTaskCount_ > MAX_EXEC_NUM) {
544 PutMsgIntoQueue(targetDev, inMsg, msgSize);
545 // task dont exec here
546 DecExecTaskCount();
547 return E_OK;
548 }
549 }
550
551 int errCode = E_OK;
552 ISyncTaskContext *nextContext = GetContextForMsg(targetDev, errCode);
553 if (errCode != E_OK) {
554 return errCode;
555 }
556 LOGD("[SyncEngine] MessageReciveCallback MSG ID = %d", inMsg->GetMessageId());
557 return ScheduleDealMsg(nextContext, inMsg);
558 }
559
PutMsgIntoQueue(const std::string & targetDev,Message * inMsg,int msgSize)560 void SyncEngine::PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize)
561 {
562 if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
563 auto iter = std::find_if(msgQueue_.begin(), msgQueue_.end(),
564 [&targetDev](const Message *msg) {
565 return targetDev == msg->GetTarget() && msg->GetMessageId() == LOCAL_DATA_CHANGED;
566 });
567 if (iter != msgQueue_.end()) {
568 delete inMsg;
569 inMsg = nullptr;
570 return;
571 }
572 }
573 inMsg->SetTarget(targetDev);
574 msgQueue_.push_back(inMsg);
575 queueCacheSize_ += msgSize;
576 LOGW("[SyncEngine] The quantity of executing threads is beyond maximum. msgQueueSize = %zu", msgQueue_.size());
577 }
578
GetMsgSize(const Message * inMsg) const579 int SyncEngine::GetMsgSize(const Message *inMsg) const
580 {
581 switch (inMsg->GetMessageId()) {
582 case TIME_SYNC_MESSAGE:
583 return TimeSync::CalculateLen(inMsg);
584 case ABILITY_SYNC_MESSAGE:
585 return AbilitySync::CalculateLen(inMsg);
586 case DATA_SYNC_MESSAGE:
587 case QUERY_SYNC_MESSAGE:
588 case CONTROL_SYNC_MESSAGE:
589 return SingleVerSerializeManager::CalculateLen(inMsg);
590 #ifndef OMIT_MULTI_VER
591 case COMMIT_HISTORY_SYNC_MESSAGE:
592 return CommitHistorySync::CalculateLen(inMsg);
593 case MULTI_VER_DATA_SYNC_MESSAGE:
594 return MultiVerDataSync::CalculateLen(inMsg);
595 case VALUE_SLICE_SYNC_MESSAGE:
596 return ValueSliceSync::CalculateLen(inMsg);
597 #endif
598 case LOCAL_DATA_CHANGED:
599 return DeviceManager::CalculateLen();
600 default:
601 LOGE("[SyncEngine] GetMsgSize not support msgId:%u", inMsg->GetMessageId());
602 return -E_NOT_SUPPORT;
603 }
604 }
605
FindSyncTaskContext(const std::string & deviceId)606 ISyncTaskContext *SyncEngine::FindSyncTaskContext(const std::string &deviceId)
607 {
608 auto iter = syncTaskContextMap_.find(deviceId);
609 if (iter != syncTaskContextMap_.end()) {
610 ISyncTaskContext *context = iter->second;
611 return context;
612 }
613 return nullptr;
614 }
615
GetSyncTaskContextAndInc(const std::string & deviceId)616 ISyncTaskContext *SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId)
617 {
618 ISyncTaskContext *context = nullptr;
619 std::lock_guard<std::mutex> lock(contextMapLock_);
620 context = FindSyncTaskContext(deviceId);
621 if (context == nullptr) {
622 LOGI("[SyncEngine] dev=%s, context is null, no need to clear sync operation", STR_MASK(deviceId));
623 return nullptr;
624 }
625 if (context->IsKilled()) {
626 LOGI("[SyncEngine] context is killing");
627 return nullptr;
628 }
629 RefObject::IncObjRef(context);
630 return context;
631 }
632
GetSyncTaskContext(const std::string & deviceId,int & errCode)633 ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, int &errCode)
634 {
635 ISyncTaskContext *context = CreateSyncTaskContext();
636 if (context == nullptr) {
637 errCode = -E_OUT_OF_MEMORY;
638 LOGE("[SyncEngine] SyncTaskContext alloc failed, may be no memory available!");
639 return nullptr;
640 }
641 errCode = context->Initialize(deviceId, syncInterface_, metadata_, communicatorProxy_);
642 if (errCode != E_OK) {
643 LOGE("[SyncEngine] context init failed err %d, dev %s", errCode, STR_MASK(deviceId));
644 RefObject::DecObjRef(context);
645 context = nullptr;
646 return nullptr;
647 }
648 syncTaskContextMap_.insert(std::pair<std::string, ISyncTaskContext *>(deviceId, context));
649 // IncRef for SyncEngine to make sure SyncEngine is valid when context access
650 RefObject::IncObjRef(this);
651 auto storage = syncInterface_;
652 if (storage != nullptr) {
653 storage->IncRefCount();
654 }
655 context->OnLastRef([this, deviceId, storage]() {
656 LOGD("[SyncEngine] SyncTaskContext for id %s finalized", STR_MASK(deviceId));
657 RefObject::DecObjRef(this);
658 if (storage != nullptr) {
659 storage->DecRefCount();
660 }
661 });
662 context->RegOnSyncTask(std::bind(&SyncEngine::ExecSyncTask, this, context));
663 return context;
664 }
665
ExecSyncTask(ISyncTaskContext * context)666 int SyncEngine::ExecSyncTask(ISyncTaskContext *context)
667 {
668 if (IsKilled()) {
669 return -E_OBJ_IS_KILLED;
670 }
671 AutoLock lockGuard(context);
672 int status = context->GetTaskExecStatus();
673 if ((status == SyncTaskContext::RUNNING) || context->IsKilled()) {
674 return -E_NOT_SUPPORT;
675 }
676 context->SetTaskExecStatus(ISyncTaskContext::RUNNING);
677 while (!context->IsTargetQueueEmpty()) {
678 int errCode = context->GetNextTarget();
679 if (errCode != E_OK) {
680 // current task execute failed, try next task
681 context->ClearSyncOperation();
682 continue;
683 }
684 if (context->IsCurrentSyncTaskCanBeSkipped()) {
685 context->SetOperationStatus(SyncOperation::OP_FINISHED_ALL);
686 context->ClearSyncOperation();
687 continue;
688 }
689 context->UnlockObj();
690 errCode = context->StartStateMachine();
691 context->LockObj();
692 if (errCode != E_OK) {
693 // machine start failed because timer start failed, try to execute next task
694 LOGW("[SyncEngine] machine StartSync failed");
695 context->SetOperationStatus(SyncOperation::OP_FAILED);
696 context->ClearSyncOperation();
697 continue;
698 }
699 // now task is running just return here
700 return errCode;
701 }
702 LOGD("[SyncEngine] ExecSyncTask finished");
703 context->SetTaskExecStatus(ISyncTaskContext::FINISHED);
704 return E_OK;
705 }
706
GetQueueCacheSize() const707 int SyncEngine::GetQueueCacheSize() const
708 {
709 return queueCacheSize_;
710 }
711
GetDiscardMsgNum() const712 unsigned int SyncEngine::GetDiscardMsgNum() const
713 {
714 return discardMsgNum_;
715 }
716
GetMaxExecNum() const717 unsigned int SyncEngine::GetMaxExecNum() const
718 {
719 return MAX_EXEC_NUM;
720 }
721
SetMaxQueueCacheSize(int value)722 void SyncEngine::SetMaxQueueCacheSize(int value)
723 {
724 maxQueueCacheSize_ = value;
725 }
726
GetLabel() const727 std::string SyncEngine::GetLabel() const
728 {
729 return label_;
730 }
731
GetSyncRetry() const732 bool SyncEngine::GetSyncRetry() const
733 {
734 return isSyncRetry_;
735 }
736
SetSyncRetry(bool isRetry)737 void SyncEngine::SetSyncRetry(bool isRetry)
738 {
739 if (isSyncRetry_ == isRetry) {
740 LOGI("sync retry is equal, syncTry=%d, no need to set.", isRetry);
741 return;
742 }
743 isSyncRetry_ = isRetry;
744 LOGI("[SyncEngine] SetSyncRetry:%d ok", isRetry);
745 std::lock_guard<std::mutex> lock(contextMapLock_);
746 for (auto &iter : syncTaskContextMap_) {
747 ISyncTaskContext *context = iter.second;
748 if (context != nullptr) {
749 context->SetSyncRetry(isRetry);
750 }
751 }
752 }
753
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)754 int SyncEngine::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
755 {
756 if (!isActive_) {
757 LOGI("[SyncEngine] engine is closed, just put into map");
758 return E_OK;
759 }
760 ICommunicator *communicator = nullptr;
761 {
762 std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
763 if (equalCommunicators_.count(identifier) != 0) {
764 communicator = equalCommunicators_[identifier];
765 } else {
766 int errCode = E_OK;
767 communicator = AllocCommunicator(identifier, errCode);
768 if (communicator == nullptr) {
769 return errCode;
770 }
771 equalCommunicators_[identifier] = communicator;
772 }
773 }
774 std::string targetDevices;
775 for (const auto &dev : targets) {
776 targetDevices += DBCommon::StringMasking(dev) + ",";
777 }
778 LOGI("[SyncEngine] set equal identifier=%.3s, original=%.3s, targetDevices=%s",
779 DBCommon::TransferStringToHex(identifier).c_str(), label_.c_str(),
780 targetDevices.substr(0, (targetDevices.size() > 0 ? targetDevices.size() - 1 : 0)).c_str());
781 {
782 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
783 if (communicatorProxy_ == nullptr) {
784 return -E_INTERNAL_ERROR;
785 }
786 communicatorProxy_->SetEqualCommunicator(communicator, identifier, targets);
787 }
788 communicator->Activate();
789 return E_OK;
790 }
791
SetEqualIdentifier()792 void SyncEngine::SetEqualIdentifier()
793 {
794 std::map<std::string, std::vector<std::string>> equalIdentifier; // key: equalIdentifier value: devices
795 for (auto &item : equalIdentifierMap_) {
796 if (equalIdentifier.find(item.second) == equalIdentifier.end()) {
797 equalIdentifier[item.second] = {item.first};
798 } else {
799 equalIdentifier[item.second].push_back(item.first);
800 }
801 }
802 for (const auto &item : equalIdentifier) {
803 SetEqualIdentifier(item.first, item.second);
804 }
805 }
806
SetEqualIdentifierMap(const std::string & identifier,const std::vector<std::string> & targets)807 void SyncEngine::SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets)
808 {
809 for (auto iter = equalIdentifierMap_.begin(); iter != equalIdentifierMap_.end();) {
810 if (identifier == iter->second) {
811 iter = equalIdentifierMap_.erase(iter);
812 continue;
813 }
814 iter++;
815 }
816 for (const auto &device : targets) {
817 equalIdentifierMap_[device] = identifier;
818 }
819 }
820
OfflineHandleByDevice(const std::string & deviceId)821 void SyncEngine::OfflineHandleByDevice(const std::string &deviceId)
822 {
823 RemoteExecutor *executor = GetAndIncRemoteExector();
824 if (executor != nullptr) {
825 executor->NotifyDeviceOffline(deviceId);
826 RefObject::DecObjRef(executor);
827 }
828 // db closed or device is offline
829 // clear remote subscribe and trigger
830 std::vector<std::string> remoteQueryId;
831 subManager_->GetRemoteSubscribeQueryIds(deviceId, remoteQueryId);
832 subManager_->ClearRemoteSubscribeQuery(deviceId);
833 for (const auto &queryId: remoteQueryId) {
834 if (!subManager_->IsQueryExistSubscribe(queryId)) {
835 static_cast<SingleVerKvDBSyncInterface *>(syncInterface_)->RemoveSubscribe(queryId);
836 }
837 }
838 // get context and Inc context if context is not nullprt
839 ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId);
840 if (context != nullptr) {
841 context->SetIsNeedResetAbilitySync(true);
842 }
843 {
844 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
845 if (communicatorProxy_ == nullptr) {
846 return;
847 }
848 if (communicatorProxy_->IsDeviceOnline(deviceId)) {
849 LOGI("[SyncEngine] target dev=%s is online, no need to clear task.", STR_MASK(deviceId));
850 RefObject::DecObjRef(context);
851 return;
852 }
853 }
854 // means device is offline, clear local subscribe
855 subManager_->ClearLocalSubscribeQuery(deviceId);
856 // clear sync task
857 if (context != nullptr) {
858 context->ClearAllSyncTask();
859 RefObject::DecObjRef(context);
860 }
861 }
862
GetLocalSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)863 void SyncEngine::GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
864 {
865 subManager_->GetLocalSubscribeQueries(device, subscribeQueries);
866 }
867
GetRemoteSubscribeQueryIds(const std::string & device,std::vector<std::string> & subscribeQueryIds)868 void SyncEngine::GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds)
869 {
870 subManager_->GetRemoteSubscribeQueryIds(device, subscribeQueryIds);
871 }
872
GetRemoteSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)873 void SyncEngine::GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
874 {
875 subManager_->GetRemoteSubscribeQueries(device, subscribeQueries);
876 }
877
PutUnfinishedSubQueries(const std::string & device,const std::vector<QuerySyncObject> & subscribeQueries)878 void SyncEngine::PutUnfinishedSubQueries(const std::string &device,
879 const std::vector<QuerySyncObject> &subscribeQueries)
880 {
881 subManager_->PutLocalUnFinishedSubQueries(device, subscribeQueries);
882 }
883
GetAllUnFinishSubQueries(std::map<std::string,std::vector<QuerySyncObject>> & allSyncQueries)884 void SyncEngine::GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries)
885 {
886 subManager_->GetAllUnFinishSubQueries(allSyncQueries);
887 }
888
AllocCommunicator(const std::string & identifier,int & errCode)889 ICommunicator *SyncEngine::AllocCommunicator(const std::string &identifier, int &errCode)
890 {
891 ICommunicatorAggregator *communicatorAggregator = nullptr;
892 errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
893 if (communicatorAggregator == nullptr) {
894 LOGE("[SyncEngine] Get ICommunicatorAggregator error when SetEqualIdentifier err = %d", errCode);
895 return nullptr;
896 }
897 std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
898 auto communicator = communicatorAggregator->AllocCommunicator(identifierVect, errCode);
899 if (communicator == nullptr) {
900 LOGE("[SyncEngine] AllocCommunicator error when SetEqualIdentifier! err = %d", errCode);
901 return communicator;
902 }
903
904 errCode = communicator->RegOnMessageCallback(
905 std::bind(&SyncEngine::MessageReciveCallback, this, std::placeholders::_1, std::placeholders::_2),
906 []() {});
907 if (errCode != E_OK) {
908 LOGE("[SyncEngine] SyncRequestCallback register failed in SetEqualIdentifier! err = %d", errCode);
909 communicatorAggregator->ReleaseCommunicator(communicator);
910 return nullptr;
911 }
912
913 errCode = communicator->RegOnConnectCallback(
914 std::bind(&DeviceManager::OnDeviceConnectCallback, deviceManager_,
915 std::placeholders::_1, std::placeholders::_2), nullptr);
916 if (errCode != E_OK) {
917 LOGE("[SyncEngine][RegConnCB] register failed in SetEqualIdentifier! err %d", errCode);
918 communicator->RegOnMessageCallback(nullptr, nullptr);
919 communicatorAggregator->ReleaseCommunicator(communicator);
920 return nullptr;
921 }
922
923 return communicator;
924 }
925
UnRegCommunicatorsCallback()926 void SyncEngine::UnRegCommunicatorsCallback()
927 {
928 if (communicator_ != nullptr) {
929 communicator_->RegOnMessageCallback(nullptr, nullptr);
930 communicator_->RegOnConnectCallback(nullptr, nullptr);
931 communicator_->RegOnSendableCallback(nullptr, nullptr);
932 }
933 std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
934 for (const auto &iter : equalCommunicators_) {
935 iter.second->RegOnMessageCallback(nullptr, nullptr);
936 iter.second->RegOnConnectCallback(nullptr, nullptr);
937 iter.second->RegOnSendableCallback(nullptr, nullptr);
938 }
939 }
940
ReleaseCommunicators()941 void SyncEngine::ReleaseCommunicators()
942 {
943 {
944 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
945 RefObject::KillAndDecObjRef(communicatorProxy_);
946 communicatorProxy_ = nullptr;
947 }
948 ICommunicatorAggregator *communicatorAggregator = nullptr;
949 int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
950 if (communicatorAggregator == nullptr) {
951 LOGF("[SyncEngine] ICommunicatorAggregator get failed when fialize SyncEngine err %d", errCode);
952 return;
953 }
954
955 if (communicator_ != nullptr) {
956 communicatorAggregator->ReleaseCommunicator(communicator_);
957 communicator_ = nullptr;
958 }
959
960 std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
961 for (auto &iter : equalCommunicators_) {
962 communicatorAggregator->ReleaseCommunicator(iter.second);
963 }
964 equalCommunicators_.clear();
965 }
966
IsSkipCalculateLen(const Message * inMsg)967 bool SyncEngine::IsSkipCalculateLen(const Message *inMsg)
968 {
969 if (inMsg->IsFeedbackError()) {
970 LOGE("[SyncEngine] Feedback Message with errorNo=%u.", inMsg->GetErrorNo());
971 return true;
972 }
973 return false;
974 }
975
GetSubscribeSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)976 void SyncEngine::GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query,
977 InternalSyncParma &outParam)
978 {
979 outParam.devices = { device };
980 outParam.mode = SyncModeType::AUTO_SUBSCRIBE_QUERY;
981 outParam.isQuerySync = true;
982 outParam.syncQuery = query;
983 }
984
GetQueryAutoSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)985 void SyncEngine::GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query,
986 InternalSyncParma &outParam)
987 {
988 outParam.devices = { device };
989 outParam.mode = SyncModeType::AUTO_PUSH;
990 outParam.isQuerySync = true;
991 outParam.syncQuery = query;
992 }
993
StartAutoSubscribeTimer()994 int SyncEngine::StartAutoSubscribeTimer()
995 {
996 return E_OK;
997 }
998
StopAutoSubscribeTimer()999 void SyncEngine::StopAutoSubscribeTimer()
1000 {
1001 }
1002
SubscribeLimitCheck(const std::vector<std::string> & devices,QuerySyncObject & query) const1003 int SyncEngine::SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
1004 {
1005 return subManager_->LocalSubscribeLimitCheck(devices, query);
1006 }
1007
1008
ClearInnerResource()1009 void SyncEngine::ClearInnerResource()
1010 {
1011 if (syncInterface_ != nullptr) {
1012 syncInterface_->DecRefCount();
1013 syncInterface_ = nullptr;
1014 }
1015 if (deviceManager_ != nullptr) {
1016 delete deviceManager_;
1017 deviceManager_ = nullptr;
1018 }
1019 communicator_ = nullptr;
1020 metadata_ = nullptr;
1021 onRemoteDataChanged_ = nullptr;
1022 offlineChanged_ = nullptr;
1023 queryAutoSyncCallback_ = nullptr;
1024 std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1025 if (remoteExecutor_ != nullptr) {
1026 RefObject::KillAndDecObjRef(remoteExecutor_);
1027 remoteExecutor_ = nullptr;
1028 }
1029 }
1030
IsEngineActive() const1031 bool SyncEngine::IsEngineActive() const
1032 {
1033 return isActive_;
1034 }
1035
SchemaChange()1036 void SyncEngine::SchemaChange()
1037 {
1038 std::vector<ISyncTaskContext *> tmpContextVec;
1039 {
1040 std::lock_guard<std::mutex> lock(contextMapLock_);
1041 for (const auto &entry : syncTaskContextMap_) {
1042 auto context = entry.second;
1043 if (context == nullptr || context->IsKilled()) {
1044 continue;
1045 }
1046 RefObject::IncObjRef(context);
1047 tmpContextVec.push_back(context);
1048 }
1049 }
1050 for (const auto &entryContext : tmpContextVec) {
1051 entryContext->SchemaChange();
1052 RefObject::DecObjRef(entryContext);
1053 }
1054 }
1055
IncExecTaskCount()1056 void SyncEngine::IncExecTaskCount()
1057 {
1058 std::lock_guard<std::mutex> incLock(execTaskCountLock_);
1059 execTaskCount_++;
1060 }
1061
DecExecTaskCount()1062 void SyncEngine::DecExecTaskCount()
1063 {
1064 {
1065 std::lock_guard<std::mutex> decLock(execTaskCountLock_);
1066 execTaskCount_--;
1067 }
1068 execTaskCv_.notify_all();
1069 }
1070
Dump(int fd)1071 void SyncEngine::Dump(int fd)
1072 {
1073 {
1074 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
1075 std::string communicatorLabel;
1076 if (communicatorProxy_ != nullptr) {
1077 communicatorProxy_->GetLocalIdentity(communicatorLabel);
1078 }
1079 DBDumpHelper::Dump(fd, "\tcommunicator label = %s, equalIdentify Info [\n", communicatorLabel.c_str());
1080 if (communicatorProxy_ != nullptr) {
1081 communicatorProxy_->GetLocalIdentity(communicatorLabel);
1082 communicatorProxy_->Dump(fd);
1083 }
1084 }
1085 DBDumpHelper::Dump(fd, "\t]\n\tcontext info [\n");
1086 // dump context info
1087 std::lock_guard<std::mutex> autoLock(contextMapLock_);
1088 for (const auto &entry : syncTaskContextMap_) {
1089 if (entry.second != nullptr) {
1090 entry.second->Dump(fd);
1091 }
1092 }
1093 DBDumpHelper::Dump(fd, "\t]\n\n");
1094 }
1095
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)1096 int SyncEngine::RemoteQuery(const std::string &device, const RemoteCondition &condition,
1097 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
1098 {
1099 RemoteExecutor *executor = GetAndIncRemoteExector();
1100 if (!isActive_ || executor == nullptr) {
1101 RefObject::DecObjRef(executor);
1102 return -E_BUSY; // db is closing just return
1103 }
1104 int errCode = executor->RemoteQuery(device, condition, timeout, connectionId, result);
1105 RefObject::DecObjRef(executor);
1106 return errCode;
1107 }
1108
NotifyConnectionClosed(uint64_t connectionId)1109 void SyncEngine::NotifyConnectionClosed(uint64_t connectionId)
1110 {
1111 RemoteExecutor *executor = GetAndIncRemoteExector();
1112 if (!isActive_ || executor == nullptr) {
1113 RefObject::DecObjRef(executor);
1114 return; // db is closing just return
1115 }
1116 executor->NotifyConnectionClosed(connectionId);
1117 RefObject::DecObjRef(executor);
1118 }
1119
NotifyUserChange()1120 void SyncEngine::NotifyUserChange()
1121 {
1122 RemoteExecutor *executor = GetAndIncRemoteExector();
1123 if (!isActive_ || executor == nullptr) {
1124 RefObject::DecObjRef(executor);
1125 return; // db is closing just return
1126 }
1127 executor->NotifyUserChange();
1128 RefObject::DecObjRef(executor);
1129 }
1130
GetAndIncRemoteExector()1131 RemoteExecutor *SyncEngine::GetAndIncRemoteExector()
1132 {
1133 std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1134 RefObject::IncObjRef(remoteExecutor_);
1135 return remoteExecutor_;
1136 }
1137
SetRemoteExector(RemoteExecutor * executor)1138 void SyncEngine::SetRemoteExector(RemoteExecutor *executor)
1139 {
1140 std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1141 remoteExecutor_ = executor;
1142 }
1143
CheckDeviceIdValid(const std::string & checkDeviceId,const std::string & localDeviceId)1144 bool SyncEngine::CheckDeviceIdValid(const std::string &checkDeviceId, const std::string &localDeviceId)
1145 {
1146 if (checkDeviceId.empty()) {
1147 return false;
1148 }
1149 if (checkDeviceId.length() > DBConstant::MAX_DEV_LENGTH) {
1150 LOGE("[SyncEngine] dev is too long len=%zu", checkDeviceId.length());
1151 return false;
1152 }
1153 return localDeviceId != checkDeviceId;
1154 }
1155
GetLocalDeviceId(std::string & deviceId)1156 int SyncEngine::GetLocalDeviceId(std::string &deviceId)
1157 {
1158 if (!isActive_ || communicator_ == nullptr) {
1159 // db is closing
1160 return -E_BUSY;
1161 }
1162 auto communicator = communicator_;
1163 RefObject::IncObjRef(communicator);
1164 int errCode = communicator->GetLocalIdentity(deviceId);
1165 RefObject::DecObjRef(communicator);
1166 return errCode;
1167 }
1168
AbortMachineIfNeed(uint32_t syncId)1169 void SyncEngine::AbortMachineIfNeed(uint32_t syncId)
1170 {
1171 std::vector<ISyncTaskContext *> abortContexts;
1172 {
1173 std::lock_guard<std::mutex> lock(contextMapLock_);
1174 for (const auto &entry : syncTaskContextMap_) {
1175 auto context = entry.second;
1176 if (context == nullptr || context->IsKilled()) {
1177 continue;
1178 }
1179 RefObject::IncObjRef(context);
1180 if (context->GetSyncId() == syncId) {
1181 RefObject::IncObjRef(context);
1182 abortContexts.push_back(context);
1183 }
1184 RefObject::DecObjRef(context);
1185 }
1186 }
1187 for (const auto &abortContext : abortContexts) {
1188 abortContext->AbortMachineIfNeed(static_cast<uint32_t>(syncId));
1189 RefObject::DecObjRef(abortContext);
1190 }
1191 }
1192
WaitingExecTaskExist()1193 void SyncEngine::WaitingExecTaskExist()
1194 {
1195 std::unique_lock<std::mutex> closeLock(execTaskCountLock_);
1196 bool isTimeout = execTaskCv_.wait_for(closeLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
1197 [this]() { return execTaskCount_ == 0; });
1198 if (!isTimeout) {
1199 LOGD("SyncEngine Close with executing task!");
1200 }
1201 }
1202 } // namespace DistributedDB
1203