• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_engine.h"
17 
18 #include <algorithm>
19 #include <deque>
20 #include <functional>
21 
22 #include "ability_sync.h"
23 #include "db_common.h"
24 #include "db_dump_helper.h"
25 #include "db_errno.h"
26 #include "device_manager.h"
27 #include "hash.h"
28 #include "isync_state_machine.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "single_ver_serialize_manager.h"
32 #include "subscribe_manager.h"
33 #include "time_sync.h"
34 
35 #ifndef OMIT_MULTI_VER
36 #include "commit_history_sync.h"
37 #include "multi_ver_data_sync.h"
38 #include "value_slice_sync.h"
39 #endif
40 
41 namespace DistributedDB {
42 int SyncEngine::queueCacheSize_ = 0;
43 int SyncEngine::maxQueueCacheSize_ = DEFAULT_CACHE_SIZE;
44 unsigned int SyncEngine::discardMsgNum_ = 0;
45 std::mutex SyncEngine::queueLock_;
46 
SyncEngine()47 SyncEngine::SyncEngine()
48     : syncInterface_(nullptr),
49       communicator_(nullptr),
50       deviceManager_(nullptr),
51       metadata_(nullptr),
52       execTaskCount_(0),
53       isSyncRetry_(false),
54       communicatorProxy_(nullptr),
55       isActive_(false),
56       remoteExecutor_(nullptr)
57 {
58 }
59 
~SyncEngine()60 SyncEngine::~SyncEngine()
61 {
62     LOGD("[SyncEngine] ~SyncEngine!");
63     ClearInnerResource();
64     equalIdentifierMap_.clear();
65     subManager_ = nullptr;
66     LOGD("[SyncEngine] ~SyncEngine ok!");
67 }
68 
Initialize(ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,const InitCallbackParam & callbackParam)69 int SyncEngine::Initialize(ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata,
70     const InitCallbackParam &callbackParam)
71 {
72     if ((syncInterface == nullptr) || (metadata == nullptr)) {
73         return -E_INVALID_ARGS;
74     }
75     syncInterface_ = syncInterface;
76     int errCode = StartAutoSubscribeTimer();
77     if (errCode != OK) {
78         syncInterface_ = nullptr;
79         return errCode;
80     }
81 
82     errCode = InitComunicator(syncInterface);
83     if (errCode != E_OK) {
84         LOGE("[SyncEngine] Init Communicator failed");
85         // There need to set nullptr. other wise, syncInterface will be
86         // DecRef in th destroy-method.
87         StopAutoSubscribeTimer();
88         syncInterface_ = nullptr;
89         return errCode;
90     }
91     onRemoteDataChanged_ = callbackParam.onRemoteDataChanged;
92     offlineChanged_ = callbackParam.offlineChanged;
93     queryAutoSyncCallback_ = callbackParam.queryAutoSyncCallback;
94     errCode = InitInnerSource(callbackParam.onRemoteDataChanged, callbackParam.offlineChanged);
95     if (errCode != E_OK) {
96         // reset ptr if initialize device manager failed
97         syncInterface_ = nullptr;
98         StopAutoSubscribeTimer();
99         return errCode;
100     }
101     if (subManager_ == nullptr) {
102         subManager_ = std::make_shared<SubscribeManager>();
103     }
104     metadata_ = metadata;
105     isActive_ = true;
106     LOGI("[SyncEngine] Engine init ok");
107     return E_OK;
108 }
109 
Close()110 int SyncEngine::Close()
111 {
112     LOGI("[SyncEngine] SyncEngine[%s] close enter!", label_.c_str());
113     isActive_ = false;
114     UnRegCommunicatorsCallback();
115     StopAutoSubscribeTimer();
116     std::vector<ISyncTaskContext *> decContext;
117     // Clear SyncContexts
118     {
119         std::unique_lock<std::mutex> lock(contextMapLock_);
120         for (auto &iter : syncTaskContextMap_) {
121             decContext.push_back(iter.second);
122             iter.second = nullptr;
123         }
124         syncTaskContextMap_.clear();
125     }
126     for (auto &iter : decContext) {
127         RefObject::KillAndDecObjRef(iter);
128         iter = nullptr;
129     }
130     WaitingExecTaskExist();
131     ReleaseCommunicators();
132     {
133         std::lock_guard<std::mutex> msgLock(queueLock_);
134         while (!msgQueue_.empty()) {
135             Message *inMsg = msgQueue_.front();
136             msgQueue_.pop_front();
137             if (inMsg != nullptr) {
138                 queueCacheSize_ -= GetMsgSize(inMsg);
139                 delete inMsg;
140                 inMsg = nullptr;
141             }
142         }
143     }
144     // close db, rekey or import scene, need clear all remote query info
145     // local query info will destroy with syncEngine destruct
146     if (subManager_ != nullptr) {
147         subManager_->ClearAllRemoteQuery();
148     }
149 
150     RemoteExecutor *executor = GetAndIncRemoteExector();
151     if (executor != nullptr) {
152         executor->Close();
153         RefObject::DecObjRef(executor);
154     }
155     ClearInnerResource();
156     LOGI("[SyncEngine] SyncEngine closed!");
157     return E_OK;
158 }
159 
AddSyncOperation(SyncOperation * operation)160 int SyncEngine::AddSyncOperation(SyncOperation *operation)
161 {
162     if (operation == nullptr) {
163         LOGE("[SyncEngine] operation is nullptr");
164         return -E_INVALID_ARGS;
165     }
166 
167     std::vector<std::string> devices = operation->GetDevices();
168     std::string localDeviceId;
169     int errCode = GetLocalDeviceId(localDeviceId);
170     for (const auto &deviceId : devices) {
171         if (errCode != E_OK) {
172             operation->SetStatus(deviceId, errCode == -E_BUSY ?
173                 SyncOperation::OP_BUSY_FAILURE : SyncOperation::OP_FAILED);
174             continue;
175         }
176         if (!CheckDeviceIdValid(deviceId, localDeviceId)) {
177             operation->SetStatus(deviceId, SyncOperation::OP_INVALID_ARGS);
178             continue;
179         }
180         operation->SetStatus(deviceId, SyncOperation::OP_WAITING);
181         if (AddSyncOperForContext(deviceId, operation) != E_OK) {
182             operation->SetStatus(deviceId, SyncOperation::OP_FAILED);
183         }
184     }
185     return E_OK;
186 }
187 
RemoveSyncOperation(int syncId)188 void SyncEngine::RemoveSyncOperation(int syncId)
189 {
190     std::lock_guard<std::mutex> lock(contextMapLock_);
191     for (auto &iter : syncTaskContextMap_) {
192         ISyncTaskContext *context = iter.second;
193         if (context != nullptr) {
194             context->RemoveSyncOperation(syncId);
195         }
196     }
197 }
198 
199 #ifndef OMIT_MULTI_VER
BroadCastDataChanged() const200 void SyncEngine::BroadCastDataChanged() const
201 {
202     if (deviceManager_ != nullptr) {
203         (void)deviceManager_->SendBroadCast(LOCAL_DATA_CHANGED);
204     }
205 }
206 #endif // OMIT_MULTI_VER
207 
StartCommunicator()208 void SyncEngine::StartCommunicator()
209 {
210     if (communicator_ == nullptr) {
211         LOGE("[SyncEngine][StartCommunicator] communicator is not set!");
212         return;
213     }
214     LOGD("[SyncEngine][StartCommunicator] RegOnConnectCallback");
215     int errCode = communicator_->RegOnConnectCallback(std::bind(&DeviceManager::OnDeviceConnectCallback,
216         deviceManager_, std::placeholders::_1, std::placeholders::_2), nullptr);
217     if (errCode != E_OK) {
218         LOGE("[SyncEngine][StartCommunicator] register failed, auto sync can not use! err %d", errCode);
219         return;
220     }
221     communicator_->Activate();
222 }
223 
GetOnlineDevices(std::vector<std::string> & devices) const224 void SyncEngine::GetOnlineDevices(std::vector<std::string> &devices) const
225 {
226     devices.clear();
227     if (deviceManager_ != nullptr) {
228         deviceManager_->GetOnlineDevices(devices);
229     }
230 }
231 
InitInnerSource(const std::function<void (std::string)> & onRemoteDataChanged,const std::function<void (std::string)> & offlineChanged)232 int SyncEngine::InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged,
233     const std::function<void(std::string)> &offlineChanged)
234 {
235     deviceManager_ = new (std::nothrow) DeviceManager();
236     if (deviceManager_ == nullptr) {
237         LOGE("[SyncEngine] deviceManager alloc failed!");
238         return -E_OUT_OF_MEMORY;
239     }
240     auto executor = new (std::nothrow) RemoteExecutor();
241     if (executor == nullptr) {
242         LOGE("[SyncEngine] remoteExecutor alloc failed!");
243         delete deviceManager_;
244         deviceManager_ = nullptr;
245         return -E_OUT_OF_MEMORY;
246     }
247 
248     int errCode = E_OK;
249     do {
250         CommunicatorProxy *comProxy = nullptr;
251         {
252             std::lock_guard<std::mutex> lock(communicatorProxyLock_);
253             comProxy = communicatorProxy_;
254             RefObject::IncObjRef(comProxy);
255         }
256         errCode = deviceManager_->Initialize(comProxy, onRemoteDataChanged, offlineChanged);
257         RefObject::DecObjRef(comProxy);
258         if (errCode != E_OK) {
259             LOGE("[SyncEngine] deviceManager init failed! err %d", errCode);
260             break;
261         }
262         errCode = executor->Initialize(syncInterface_, communicator_);
263     } while (false);
264     if (errCode != E_OK) {
265         delete deviceManager_;
266         deviceManager_ = nullptr;
267         delete executor;
268         executor = nullptr;
269     } else {
270         SetRemoteExector(executor);
271     }
272     return errCode;
273 }
274 
InitComunicator(const ISyncInterface * syncInterface)275 int SyncEngine::InitComunicator(const ISyncInterface *syncInterface)
276 {
277     ICommunicatorAggregator *communicatorAggregator = nullptr;
278     int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
279     if (communicatorAggregator == nullptr) {
280         LOGE("[SyncEngine] Get ICommunicatorAggregator error when init the sync engine err = %d", errCode);
281         return errCode;
282     }
283     std::vector<uint8_t> label = syncInterface->GetIdentifier();
284     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
285     if (isSyncDualTupleMode) {
286         std::vector<uint8_t> dualTuplelabel = syncInterface->GetDualTupleIdentifier();
287         LOGI("[SyncEngine] dual tuple mode, original identifier=%.3s, target identifier=%.3s", VEC_TO_STR(label),
288             VEC_TO_STR(dualTuplelabel));
289         communicator_ = communicatorAggregator->AllocCommunicator(dualTuplelabel, errCode);
290     } else {
291         communicator_ = communicatorAggregator->AllocCommunicator(label, errCode);
292     }
293     if (communicator_ == nullptr) {
294         LOGE("[SyncEngine] AllocCommunicator error when init the sync engine! err = %d", errCode);
295         return errCode;
296     }
297 
298     errCode = communicator_->RegOnMessageCallback(
299         std::bind(&SyncEngine::MessageReciveCallback, this, std::placeholders::_1, std::placeholders::_2),
300         []() {});
301     if (errCode != E_OK) {
302         LOGE("[SyncEngine] SyncRequestCallback register failed! err = %d", errCode);
303         communicatorAggregator->ReleaseCommunicator(communicator_);
304         communicator_ = nullptr;
305         return errCode;
306     }
307     {
308         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
309         communicatorProxy_ = new (std::nothrow) CommunicatorProxy();
310         if (communicatorProxy_ == nullptr) {
311             communicatorAggregator->ReleaseCommunicator(communicator_);
312             communicator_ = nullptr;
313             return -E_OUT_OF_MEMORY;
314         }
315         communicatorProxy_->SetMainCommunicator(communicator_);
316     }
317     label.resize(3); // only show 3 Bytes enough
318     label_ = DBCommon::VectorToHexString(label);
319     LOGD("[SyncEngine] RegOnConnectCallback");
320     return errCode;
321 }
322 
AddSyncOperForContext(const std::string & deviceId,SyncOperation * operation)323 int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation)
324 {
325     int errCode = E_OK;
326     ISyncTaskContext *context = nullptr;
327     {
328         std::lock_guard<std::mutex> lock(contextMapLock_);
329         context = FindSyncTaskContext(deviceId);
330         if (context == nullptr) {
331             if (!IsKilled()) {
332                 context = GetSyncTaskContext(deviceId, errCode);
333             }
334             if (context == nullptr) {
335                 return errCode;
336             }
337         }
338         if (context->IsKilled()) {
339             return -E_OBJ_IS_KILLED;
340         }
341         // IncRef for SyncEngine to make sure context is valid, to avoid a big lock
342         RefObject::IncObjRef(context);
343     }
344 
345     errCode = context->AddSyncOperation(operation);
346     if (operation != nullptr) {
347         operation->SetSyncContext(context); // make the life cycle of context and operation are same
348     }
349     RefObject::DecObjRef(context);
350     return errCode;
351 }
352 
MessageReciveCallbackTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)353 void SyncEngine::MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator,
354     Message *inMsg)
355 {
356     std::string deviceId = context->GetDeviceId();
357 
358     if (inMsg->GetMessageId() != LOCAL_DATA_CHANGED) {
359         int errCode = context->ReceiveMessageCallback(inMsg);
360         if (errCode == -E_NOT_NEED_DELETE_MSG) {
361             goto MSG_CALLBACK_OUT_NOT_DEL;
362         }
363         // add auto sync here while recv subscribe request
364         QuerySyncObject syncObject;
365         if (errCode == E_OK && context->IsNeedTriggerQueryAutoSync(inMsg, syncObject)) {
366             InternalSyncParma param;
367             GetQueryAutoSyncParam(deviceId, syncObject, param);
368             queryAutoSyncCallback_(param);
369         }
370     }
371 
372     delete inMsg;
373     inMsg = nullptr;
374 MSG_CALLBACK_OUT_NOT_DEL:
375     ScheduleTaskOut(context, communicator);
376 }
377 
RemoteDataChangedTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)378 void SyncEngine::RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg)
379 {
380     std::string deviceId = context->GetDeviceId();
381     if (onRemoteDataChanged_ && deviceManager_->IsDeviceOnline(deviceId)) {
382         onRemoteDataChanged_(deviceId);
383     } else {
384         LOGE("[SyncEngine] onRemoteDataChanged is null!");
385     }
386     delete inMsg;
387     inMsg = nullptr;
388     ScheduleTaskOut(context, communicator);
389 }
390 
ScheduleTaskOut(ISyncTaskContext * context,const ICommunicator * communicator)391 void SyncEngine::ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator)
392 {
393     (void)DealMsgUtilQueueEmpty();
394     DecExecTaskCount();
395     RefObject::DecObjRef(communicator);
396     RefObject::DecObjRef(context);
397 }
398 
DealMsgUtilQueueEmpty()399 int SyncEngine::DealMsgUtilQueueEmpty()
400 {
401     if (!isActive_) {
402         return -E_BUSY; // db is closing just return
403     }
404     int errCode = E_OK;
405     Message *inMsg = nullptr;
406     {
407         std::lock_guard<std::mutex> lock(queueLock_);
408         if (msgQueue_.empty()) {
409             return errCode;
410         }
411         inMsg = msgQueue_.front();
412         msgQueue_.pop_front();
413         queueCacheSize_ -= GetMsgSize(inMsg);
414     }
415 
416     IncExecTaskCount();
417     // it will deal with the first message in queue, we should increase object reference counts and sure that resources
418     // could be prevented from destroying by other threads.
419     do {
420         ISyncTaskContext *nextContext = GetContextForMsg(inMsg->GetTarget(), errCode);
421         if (errCode != E_OK) {
422             break;
423         }
424         errCode = ScheduleDealMsg(nextContext, inMsg);
425         if (errCode != E_OK) {
426             RefObject::DecObjRef(nextContext);
427         }
428     } while (false);
429     if (errCode != E_OK) {
430         delete inMsg;
431         inMsg = nullptr;
432         DecExecTaskCount();
433     }
434     return errCode;
435 }
436 
GetContextForMsg(const std::string & targetDev,int & errCode)437 ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, int &errCode)
438 {
439     ISyncTaskContext *context = nullptr;
440     {
441         std::lock_guard<std::mutex> lock(contextMapLock_);
442         context = FindSyncTaskContext(targetDev);
443         if (context != nullptr) {
444             if (context->IsKilled()) {
445                 errCode = -E_OBJ_IS_KILLED;
446                 return nullptr;
447             }
448         } else {
449             if (IsKilled()) {
450                 errCode = -E_OBJ_IS_KILLED;
451                 return nullptr;
452             }
453             context = GetSyncTaskContext(targetDev, errCode);
454             if (context == nullptr) {
455                 return nullptr;
456             }
457         }
458         // IncRef for context to make sure context is valid, when task run another thread
459         RefObject::IncObjRef(context);
460     }
461     return context;
462 }
463 
ScheduleDealMsg(ISyncTaskContext * context,Message * inMsg)464 int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg)
465 {
466     if (inMsg == nullptr) {
467         LOGE("[SyncEngine] MessageReciveCallback inMsg is null!");
468         DecExecTaskCount();
469         return E_OK;
470     }
471     CommunicatorProxy *comProxy = nullptr;
472     {
473         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
474         comProxy = communicatorProxy_;
475         RefObject::IncObjRef(comProxy);
476     }
477     int errCode = E_OK;
478     // deal remote local data changed message
479     if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
480         RemoteDataChangedTask(context, comProxy, inMsg);
481     } else {
482         errCode = RuntimeContext::GetInstance()->ScheduleTask(std::bind(&SyncEngine::MessageReciveCallbackTask,
483             this, context, comProxy, inMsg));
484     }
485 
486     if (errCode != E_OK) {
487         LOGE("[SyncEngine] MessageReciveCallbackTask Schedule failed err %d", errCode);
488         RefObject::DecObjRef(comProxy);
489     }
490     return errCode;
491 }
492 
MessageReciveCallback(const std::string & targetDev,Message * inMsg)493 void SyncEngine::MessageReciveCallback(const std::string &targetDev, Message *inMsg)
494 {
495     IncExecTaskCount();
496     int errCode = MessageReciveCallbackInner(targetDev, inMsg);
497     if (errCode != E_OK) {
498         delete inMsg;
499         inMsg = nullptr;
500         DecExecTaskCount();
501         LOGE("[SyncEngine] MessageReciveCallback failed!");
502     }
503 }
504 
MessageReciveCallbackInner(const std::string & targetDev,Message * inMsg)505 int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg)
506 {
507     if (targetDev.empty() || inMsg == nullptr) {
508         LOGE("[SyncEngine][MessageReciveCallback] from a invalid device or inMsg is null ");
509         return -E_INVALID_ARGS;
510     }
511     if (!isActive_) {
512         LOGE("[SyncEngine] engine is closing, ignore msg");
513         return -E_BUSY;
514     }
515     RemoteExecutor *executor = GetAndIncRemoteExector();
516     if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE && executor != nullptr) {
517         int errCode = executor->ReceiveMessage(targetDev, inMsg);
518         RefObject::DecObjRef(executor);
519         DecExecTaskCount();
520         return errCode;
521     } else if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE) {
522         DecExecTaskCount();
523         return -E_BUSY;
524     }
525     RefObject::DecObjRef(executor);
526     int msgSize = 0;
527     if (!IsSkipCalculateLen(inMsg)) {
528         msgSize = GetMsgSize(inMsg);
529         if (msgSize <= 0) {
530             LOGE("[SyncEngine] GetMsgSize makes a mistake");
531             return -E_NOT_SUPPORT;
532         }
533     }
534 
535     {
536         std::lock_guard<std::mutex> lock(queueLock_);
537         if ((queueCacheSize_ + msgSize) > maxQueueCacheSize_) {
538             LOGE("[SyncEngine] The size of message queue is beyond maximum");
539             discardMsgNum_++;
540             return -E_BUSY;
541         }
542 
543         if (execTaskCount_ > MAX_EXEC_NUM) {
544             PutMsgIntoQueue(targetDev, inMsg, msgSize);
545             // task dont exec here
546             DecExecTaskCount();
547             return E_OK;
548         }
549     }
550 
551     int errCode = E_OK;
552     ISyncTaskContext *nextContext = GetContextForMsg(targetDev, errCode);
553     if (errCode != E_OK) {
554         return errCode;
555     }
556     LOGD("[SyncEngine] MessageReciveCallback MSG ID = %d", inMsg->GetMessageId());
557     return ScheduleDealMsg(nextContext, inMsg);
558 }
559 
PutMsgIntoQueue(const std::string & targetDev,Message * inMsg,int msgSize)560 void SyncEngine::PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize)
561 {
562     if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
563         auto iter = std::find_if(msgQueue_.begin(), msgQueue_.end(),
564             [&targetDev](const Message *msg) {
565                 return targetDev == msg->GetTarget() && msg->GetMessageId() == LOCAL_DATA_CHANGED;
566             });
567         if (iter != msgQueue_.end()) {
568             delete inMsg;
569             inMsg = nullptr;
570             return;
571         }
572     }
573     inMsg->SetTarget(targetDev);
574     msgQueue_.push_back(inMsg);
575     queueCacheSize_ += msgSize;
576     LOGW("[SyncEngine] The quantity of executing threads is beyond maximum. msgQueueSize = %zu", msgQueue_.size());
577 }
578 
GetMsgSize(const Message * inMsg) const579 int SyncEngine::GetMsgSize(const Message *inMsg) const
580 {
581     switch (inMsg->GetMessageId()) {
582         case TIME_SYNC_MESSAGE:
583             return TimeSync::CalculateLen(inMsg);
584         case ABILITY_SYNC_MESSAGE:
585             return AbilitySync::CalculateLen(inMsg);
586         case DATA_SYNC_MESSAGE:
587         case QUERY_SYNC_MESSAGE:
588         case CONTROL_SYNC_MESSAGE:
589             return SingleVerSerializeManager::CalculateLen(inMsg);
590 #ifndef OMIT_MULTI_VER
591         case COMMIT_HISTORY_SYNC_MESSAGE:
592             return CommitHistorySync::CalculateLen(inMsg);
593         case MULTI_VER_DATA_SYNC_MESSAGE:
594             return MultiVerDataSync::CalculateLen(inMsg);
595         case VALUE_SLICE_SYNC_MESSAGE:
596             return ValueSliceSync::CalculateLen(inMsg);
597 #endif
598         case LOCAL_DATA_CHANGED:
599             return DeviceManager::CalculateLen();
600         default:
601             LOGE("[SyncEngine] GetMsgSize not support msgId:%u", inMsg->GetMessageId());
602             return -E_NOT_SUPPORT;
603     }
604 }
605 
FindSyncTaskContext(const std::string & deviceId)606 ISyncTaskContext *SyncEngine::FindSyncTaskContext(const std::string &deviceId)
607 {
608     auto iter = syncTaskContextMap_.find(deviceId);
609     if (iter != syncTaskContextMap_.end()) {
610         ISyncTaskContext *context = iter->second;
611         return context;
612     }
613     return nullptr;
614 }
615 
GetSyncTaskContextAndInc(const std::string & deviceId)616 ISyncTaskContext *SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId)
617 {
618     ISyncTaskContext *context = nullptr;
619     std::lock_guard<std::mutex> lock(contextMapLock_);
620     context = FindSyncTaskContext(deviceId);
621     if (context == nullptr) {
622         LOGI("[SyncEngine] dev=%s, context is null, no need to clear sync operation", STR_MASK(deviceId));
623         return nullptr;
624     }
625     if (context->IsKilled()) {
626         LOGI("[SyncEngine] context is killing");
627         return nullptr;
628     }
629     RefObject::IncObjRef(context);
630     return context;
631 }
632 
GetSyncTaskContext(const std::string & deviceId,int & errCode)633 ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, int &errCode)
634 {
635     ISyncTaskContext *context = CreateSyncTaskContext();
636     if (context == nullptr) {
637         errCode = -E_OUT_OF_MEMORY;
638         LOGE("[SyncEngine] SyncTaskContext alloc failed, may be no memory available!");
639         return nullptr;
640     }
641     errCode = context->Initialize(deviceId, syncInterface_, metadata_, communicatorProxy_);
642     if (errCode != E_OK) {
643         LOGE("[SyncEngine] context init failed err %d, dev %s", errCode, STR_MASK(deviceId));
644         RefObject::DecObjRef(context);
645         context = nullptr;
646         return nullptr;
647     }
648     syncTaskContextMap_.insert(std::pair<std::string, ISyncTaskContext *>(deviceId, context));
649     // IncRef for SyncEngine to make sure SyncEngine is valid when context access
650     RefObject::IncObjRef(this);
651     auto storage = syncInterface_;
652     if (storage != nullptr) {
653         storage->IncRefCount();
654     }
655     context->OnLastRef([this, deviceId, storage]() {
656         LOGD("[SyncEngine] SyncTaskContext for id %s finalized", STR_MASK(deviceId));
657         RefObject::DecObjRef(this);
658         if (storage != nullptr) {
659             storage->DecRefCount();
660         }
661     });
662     context->RegOnSyncTask(std::bind(&SyncEngine::ExecSyncTask, this, context));
663     return context;
664 }
665 
ExecSyncTask(ISyncTaskContext * context)666 int SyncEngine::ExecSyncTask(ISyncTaskContext *context)
667 {
668     if (IsKilled()) {
669         return -E_OBJ_IS_KILLED;
670     }
671     AutoLock lockGuard(context);
672     int status = context->GetTaskExecStatus();
673     if ((status == SyncTaskContext::RUNNING) || context->IsKilled()) {
674         return -E_NOT_SUPPORT;
675     }
676     context->SetTaskExecStatus(ISyncTaskContext::RUNNING);
677     while (!context->IsTargetQueueEmpty()) {
678         int errCode = context->GetNextTarget();
679         if (errCode != E_OK) {
680             // current task execute failed, try next task
681             context->ClearSyncOperation();
682             continue;
683         }
684         if (context->IsCurrentSyncTaskCanBeSkipped()) {
685             context->SetOperationStatus(SyncOperation::OP_FINISHED_ALL);
686             context->ClearSyncOperation();
687             continue;
688         }
689         context->UnlockObj();
690         errCode = context->StartStateMachine();
691         context->LockObj();
692         if (errCode != E_OK) {
693             // machine start failed because timer start failed, try to execute next task
694             LOGW("[SyncEngine] machine StartSync failed");
695             context->SetOperationStatus(SyncOperation::OP_FAILED);
696             context->ClearSyncOperation();
697             continue;
698         }
699         // now task is running just return here
700         return errCode;
701     }
702     LOGD("[SyncEngine] ExecSyncTask finished");
703     context->SetTaskExecStatus(ISyncTaskContext::FINISHED);
704     return E_OK;
705 }
706 
GetQueueCacheSize() const707 int SyncEngine::GetQueueCacheSize() const
708 {
709     return queueCacheSize_;
710 }
711 
GetDiscardMsgNum() const712 unsigned int SyncEngine::GetDiscardMsgNum() const
713 {
714     return discardMsgNum_;
715 }
716 
GetMaxExecNum() const717 unsigned int SyncEngine::GetMaxExecNum() const
718 {
719     return MAX_EXEC_NUM;
720 }
721 
SetMaxQueueCacheSize(int value)722 void SyncEngine::SetMaxQueueCacheSize(int value)
723 {
724     maxQueueCacheSize_ = value;
725 }
726 
GetLabel() const727 std::string SyncEngine::GetLabel() const
728 {
729     return label_;
730 }
731 
GetSyncRetry() const732 bool SyncEngine::GetSyncRetry() const
733 {
734     return isSyncRetry_;
735 }
736 
SetSyncRetry(bool isRetry)737 void SyncEngine::SetSyncRetry(bool isRetry)
738 {
739     if (isSyncRetry_ == isRetry) {
740         LOGI("sync retry is equal, syncTry=%d, no need to set.", isRetry);
741         return;
742     }
743     isSyncRetry_ = isRetry;
744     LOGI("[SyncEngine] SetSyncRetry:%d ok", isRetry);
745     std::lock_guard<std::mutex> lock(contextMapLock_);
746     for (auto &iter : syncTaskContextMap_) {
747         ISyncTaskContext *context = iter.second;
748         if (context != nullptr) {
749             context->SetSyncRetry(isRetry);
750         }
751     }
752 }
753 
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)754 int SyncEngine::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
755 {
756     if (!isActive_) {
757         LOGI("[SyncEngine] engine is closed, just put into map");
758         return E_OK;
759     }
760     ICommunicator *communicator = nullptr;
761     {
762         std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
763         if (equalCommunicators_.count(identifier) != 0) {
764             communicator = equalCommunicators_[identifier];
765         } else {
766             int errCode = E_OK;
767             communicator = AllocCommunicator(identifier, errCode);
768             if (communicator == nullptr) {
769                 return errCode;
770             }
771             equalCommunicators_[identifier] = communicator;
772         }
773     }
774     std::string targetDevices;
775     for (const auto &dev : targets) {
776         targetDevices += DBCommon::StringMasking(dev) + ",";
777     }
778     LOGI("[SyncEngine] set equal identifier=%.3s, original=%.3s, targetDevices=%s",
779         DBCommon::TransferStringToHex(identifier).c_str(), label_.c_str(),
780         targetDevices.substr(0, (targetDevices.size() > 0 ? targetDevices.size() - 1 : 0)).c_str());
781     {
782         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
783         if (communicatorProxy_ == nullptr) {
784             return -E_INTERNAL_ERROR;
785         }
786         communicatorProxy_->SetEqualCommunicator(communicator, identifier, targets);
787     }
788     communicator->Activate();
789     return E_OK;
790 }
791 
SetEqualIdentifier()792 void SyncEngine::SetEqualIdentifier()
793 {
794     std::map<std::string, std::vector<std::string>> equalIdentifier; // key: equalIdentifier value: devices
795     for (auto &item : equalIdentifierMap_) {
796         if (equalIdentifier.find(item.second) == equalIdentifier.end()) {
797             equalIdentifier[item.second] = {item.first};
798         } else {
799             equalIdentifier[item.second].push_back(item.first);
800         }
801     }
802     for (const auto &item : equalIdentifier) {
803         SetEqualIdentifier(item.first, item.second);
804     }
805 }
806 
SetEqualIdentifierMap(const std::string & identifier,const std::vector<std::string> & targets)807 void SyncEngine::SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets)
808 {
809     for (auto iter = equalIdentifierMap_.begin(); iter != equalIdentifierMap_.end();) {
810         if (identifier == iter->second) {
811             iter = equalIdentifierMap_.erase(iter);
812             continue;
813         }
814         iter++;
815     }
816     for (const auto &device : targets) {
817         equalIdentifierMap_[device] = identifier;
818     }
819 }
820 
OfflineHandleByDevice(const std::string & deviceId)821 void SyncEngine::OfflineHandleByDevice(const std::string &deviceId)
822 {
823     RemoteExecutor *executor = GetAndIncRemoteExector();
824     if (executor != nullptr) {
825         executor->NotifyDeviceOffline(deviceId);
826         RefObject::DecObjRef(executor);
827     }
828     // db closed or device is offline
829     // clear remote subscribe and trigger
830     std::vector<std::string> remoteQueryId;
831     subManager_->GetRemoteSubscribeQueryIds(deviceId, remoteQueryId);
832     subManager_->ClearRemoteSubscribeQuery(deviceId);
833     for (const auto &queryId: remoteQueryId) {
834         if (!subManager_->IsQueryExistSubscribe(queryId)) {
835             static_cast<SingleVerKvDBSyncInterface *>(syncInterface_)->RemoveSubscribe(queryId);
836         }
837     }
838     // get context and Inc context if context is not nullprt
839     ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId);
840     if (context != nullptr) {
841         context->SetIsNeedResetAbilitySync(true);
842     }
843     {
844         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
845         if (communicatorProxy_ == nullptr) {
846             return;
847         }
848         if (communicatorProxy_->IsDeviceOnline(deviceId)) {
849             LOGI("[SyncEngine] target dev=%s is online, no need to clear task.", STR_MASK(deviceId));
850             RefObject::DecObjRef(context);
851             return;
852         }
853     }
854     // means device is offline, clear local subscribe
855     subManager_->ClearLocalSubscribeQuery(deviceId);
856     // clear sync task
857     if (context != nullptr) {
858         context->ClearAllSyncTask();
859         RefObject::DecObjRef(context);
860     }
861 }
862 
GetLocalSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)863 void SyncEngine::GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
864 {
865     subManager_->GetLocalSubscribeQueries(device, subscribeQueries);
866 }
867 
GetRemoteSubscribeQueryIds(const std::string & device,std::vector<std::string> & subscribeQueryIds)868 void SyncEngine::GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds)
869 {
870     subManager_->GetRemoteSubscribeQueryIds(device, subscribeQueryIds);
871 }
872 
GetRemoteSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)873 void SyncEngine::GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
874 {
875     subManager_->GetRemoteSubscribeQueries(device, subscribeQueries);
876 }
877 
PutUnfinishedSubQueries(const std::string & device,const std::vector<QuerySyncObject> & subscribeQueries)878 void SyncEngine::PutUnfinishedSubQueries(const std::string &device,
879     const std::vector<QuerySyncObject> &subscribeQueries)
880 {
881     subManager_->PutLocalUnFinishedSubQueries(device, subscribeQueries);
882 }
883 
GetAllUnFinishSubQueries(std::map<std::string,std::vector<QuerySyncObject>> & allSyncQueries)884 void SyncEngine::GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries)
885 {
886     subManager_->GetAllUnFinishSubQueries(allSyncQueries);
887 }
888 
AllocCommunicator(const std::string & identifier,int & errCode)889 ICommunicator *SyncEngine::AllocCommunicator(const std::string &identifier, int &errCode)
890 {
891     ICommunicatorAggregator *communicatorAggregator = nullptr;
892     errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
893     if (communicatorAggregator == nullptr) {
894         LOGE("[SyncEngine] Get ICommunicatorAggregator error when SetEqualIdentifier err = %d", errCode);
895         return nullptr;
896     }
897     std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
898     auto communicator = communicatorAggregator->AllocCommunicator(identifierVect, errCode);
899     if (communicator == nullptr) {
900         LOGE("[SyncEngine] AllocCommunicator error when SetEqualIdentifier! err = %d", errCode);
901         return communicator;
902     }
903 
904     errCode = communicator->RegOnMessageCallback(
905         std::bind(&SyncEngine::MessageReciveCallback, this, std::placeholders::_1, std::placeholders::_2),
906         []() {});
907     if (errCode != E_OK) {
908         LOGE("[SyncEngine] SyncRequestCallback register failed in SetEqualIdentifier! err = %d", errCode);
909         communicatorAggregator->ReleaseCommunicator(communicator);
910         return nullptr;
911     }
912 
913     errCode = communicator->RegOnConnectCallback(
914         std::bind(&DeviceManager::OnDeviceConnectCallback, deviceManager_,
915             std::placeholders::_1, std::placeholders::_2), nullptr);
916     if (errCode != E_OK) {
917         LOGE("[SyncEngine][RegConnCB] register failed in SetEqualIdentifier! err %d", errCode);
918         communicator->RegOnMessageCallback(nullptr, nullptr);
919         communicatorAggregator->ReleaseCommunicator(communicator);
920         return nullptr;
921     }
922 
923     return communicator;
924 }
925 
UnRegCommunicatorsCallback()926 void SyncEngine::UnRegCommunicatorsCallback()
927 {
928     if (communicator_ != nullptr) {
929         communicator_->RegOnMessageCallback(nullptr, nullptr);
930         communicator_->RegOnConnectCallback(nullptr, nullptr);
931         communicator_->RegOnSendableCallback(nullptr, nullptr);
932     }
933     std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
934     for (const auto &iter : equalCommunicators_) {
935         iter.second->RegOnMessageCallback(nullptr, nullptr);
936         iter.second->RegOnConnectCallback(nullptr, nullptr);
937         iter.second->RegOnSendableCallback(nullptr, nullptr);
938     }
939 }
940 
ReleaseCommunicators()941 void SyncEngine::ReleaseCommunicators()
942 {
943     {
944         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
945         RefObject::KillAndDecObjRef(communicatorProxy_);
946         communicatorProxy_ = nullptr;
947     }
948     ICommunicatorAggregator *communicatorAggregator = nullptr;
949     int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
950     if (communicatorAggregator == nullptr) {
951         LOGF("[SyncEngine] ICommunicatorAggregator get failed when fialize SyncEngine err %d", errCode);
952         return;
953     }
954 
955     if (communicator_ != nullptr) {
956         communicatorAggregator->ReleaseCommunicator(communicator_);
957         communicator_ = nullptr;
958     }
959 
960     std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
961     for (auto &iter : equalCommunicators_) {
962         communicatorAggregator->ReleaseCommunicator(iter.second);
963     }
964     equalCommunicators_.clear();
965 }
966 
IsSkipCalculateLen(const Message * inMsg)967 bool SyncEngine::IsSkipCalculateLen(const Message *inMsg)
968 {
969     if (inMsg->IsFeedbackError()) {
970         LOGE("[SyncEngine] Feedback Message with errorNo=%u.", inMsg->GetErrorNo());
971         return true;
972     }
973     return false;
974 }
975 
GetSubscribeSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)976 void SyncEngine::GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query,
977     InternalSyncParma &outParam)
978 {
979     outParam.devices = { device };
980     outParam.mode = SyncModeType::AUTO_SUBSCRIBE_QUERY;
981     outParam.isQuerySync = true;
982     outParam.syncQuery = query;
983 }
984 
GetQueryAutoSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)985 void SyncEngine::GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query,
986     InternalSyncParma &outParam)
987 {
988     outParam.devices = { device };
989     outParam.mode = SyncModeType::AUTO_PUSH;
990     outParam.isQuerySync = true;
991     outParam.syncQuery = query;
992 }
993 
StartAutoSubscribeTimer()994 int SyncEngine::StartAutoSubscribeTimer()
995 {
996     return E_OK;
997 }
998 
StopAutoSubscribeTimer()999 void SyncEngine::StopAutoSubscribeTimer()
1000 {
1001 }
1002 
SubscribeLimitCheck(const std::vector<std::string> & devices,QuerySyncObject & query) const1003 int SyncEngine::SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
1004 {
1005     return subManager_->LocalSubscribeLimitCheck(devices, query);
1006 }
1007 
1008 
ClearInnerResource()1009 void SyncEngine::ClearInnerResource()
1010 {
1011     if (syncInterface_ != nullptr) {
1012         syncInterface_->DecRefCount();
1013         syncInterface_ = nullptr;
1014     }
1015     if (deviceManager_ != nullptr) {
1016         delete deviceManager_;
1017         deviceManager_ = nullptr;
1018     }
1019     communicator_ = nullptr;
1020     metadata_ = nullptr;
1021     onRemoteDataChanged_ = nullptr;
1022     offlineChanged_ = nullptr;
1023     queryAutoSyncCallback_ = nullptr;
1024     std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1025     if (remoteExecutor_ != nullptr) {
1026         RefObject::KillAndDecObjRef(remoteExecutor_);
1027         remoteExecutor_ = nullptr;
1028     }
1029 }
1030 
IsEngineActive() const1031 bool SyncEngine::IsEngineActive() const
1032 {
1033     return isActive_;
1034 }
1035 
SchemaChange()1036 void SyncEngine::SchemaChange()
1037 {
1038     std::vector<ISyncTaskContext *> tmpContextVec;
1039     {
1040         std::lock_guard<std::mutex> lock(contextMapLock_);
1041         for (const auto &entry : syncTaskContextMap_) {
1042             auto context = entry.second;
1043             if (context == nullptr || context->IsKilled()) {
1044                 continue;
1045             }
1046             RefObject::IncObjRef(context);
1047             tmpContextVec.push_back(context);
1048         }
1049     }
1050     for (const auto &entryContext : tmpContextVec) {
1051         entryContext->SchemaChange();
1052         RefObject::DecObjRef(entryContext);
1053     }
1054 }
1055 
IncExecTaskCount()1056 void SyncEngine::IncExecTaskCount()
1057 {
1058     std::lock_guard<std::mutex> incLock(execTaskCountLock_);
1059     execTaskCount_++;
1060 }
1061 
DecExecTaskCount()1062 void SyncEngine::DecExecTaskCount()
1063 {
1064     {
1065         std::lock_guard<std::mutex> decLock(execTaskCountLock_);
1066         execTaskCount_--;
1067     }
1068     execTaskCv_.notify_all();
1069 }
1070 
Dump(int fd)1071 void SyncEngine::Dump(int fd)
1072 {
1073     {
1074         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
1075         std::string communicatorLabel;
1076         if (communicatorProxy_ != nullptr) {
1077             communicatorProxy_->GetLocalIdentity(communicatorLabel);
1078         }
1079         DBDumpHelper::Dump(fd, "\tcommunicator label = %s, equalIdentify Info [\n", communicatorLabel.c_str());
1080         if (communicatorProxy_ != nullptr) {
1081             communicatorProxy_->GetLocalIdentity(communicatorLabel);
1082             communicatorProxy_->Dump(fd);
1083         }
1084     }
1085     DBDumpHelper::Dump(fd, "\t]\n\tcontext info [\n");
1086     // dump context info
1087     std::lock_guard<std::mutex> autoLock(contextMapLock_);
1088     for (const auto &entry : syncTaskContextMap_) {
1089         if (entry.second != nullptr) {
1090             entry.second->Dump(fd);
1091         }
1092     }
1093     DBDumpHelper::Dump(fd, "\t]\n\n");
1094 }
1095 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)1096 int SyncEngine::RemoteQuery(const std::string &device, const RemoteCondition &condition,
1097     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
1098 {
1099     RemoteExecutor *executor = GetAndIncRemoteExector();
1100     if (!isActive_ || executor == nullptr) {
1101         RefObject::DecObjRef(executor);
1102         return -E_BUSY; // db is closing just return
1103     }
1104     int errCode = executor->RemoteQuery(device, condition, timeout, connectionId, result);
1105     RefObject::DecObjRef(executor);
1106     return errCode;
1107 }
1108 
NotifyConnectionClosed(uint64_t connectionId)1109 void SyncEngine::NotifyConnectionClosed(uint64_t connectionId)
1110 {
1111     RemoteExecutor *executor = GetAndIncRemoteExector();
1112     if (!isActive_ || executor == nullptr) {
1113         RefObject::DecObjRef(executor);
1114         return; // db is closing just return
1115     }
1116     executor->NotifyConnectionClosed(connectionId);
1117     RefObject::DecObjRef(executor);
1118 }
1119 
NotifyUserChange()1120 void SyncEngine::NotifyUserChange()
1121 {
1122     RemoteExecutor *executor = GetAndIncRemoteExector();
1123     if (!isActive_ || executor == nullptr) {
1124         RefObject::DecObjRef(executor);
1125         return; // db is closing just return
1126     }
1127     executor->NotifyUserChange();
1128     RefObject::DecObjRef(executor);
1129 }
1130 
GetAndIncRemoteExector()1131 RemoteExecutor *SyncEngine::GetAndIncRemoteExector()
1132 {
1133     std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1134     RefObject::IncObjRef(remoteExecutor_);
1135     return remoteExecutor_;
1136 }
1137 
SetRemoteExector(RemoteExecutor * executor)1138 void SyncEngine::SetRemoteExector(RemoteExecutor *executor)
1139 {
1140     std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1141     remoteExecutor_ = executor;
1142 }
1143 
CheckDeviceIdValid(const std::string & checkDeviceId,const std::string & localDeviceId)1144 bool SyncEngine::CheckDeviceIdValid(const std::string &checkDeviceId, const std::string &localDeviceId)
1145 {
1146     if (checkDeviceId.empty()) {
1147         return false;
1148     }
1149     if (checkDeviceId.length() > DBConstant::MAX_DEV_LENGTH) {
1150         LOGE("[SyncEngine] dev is too long len=%zu", checkDeviceId.length());
1151         return false;
1152     }
1153     return localDeviceId != checkDeviceId;
1154 }
1155 
GetLocalDeviceId(std::string & deviceId)1156 int SyncEngine::GetLocalDeviceId(std::string &deviceId)
1157 {
1158     if (!isActive_ || communicator_ == nullptr) {
1159         // db is closing
1160         return -E_BUSY;
1161     }
1162     auto communicator = communicator_;
1163     RefObject::IncObjRef(communicator);
1164     int errCode = communicator->GetLocalIdentity(deviceId);
1165     RefObject::DecObjRef(communicator);
1166     return errCode;
1167 }
1168 
AbortMachineIfNeed(uint32_t syncId)1169 void SyncEngine::AbortMachineIfNeed(uint32_t syncId)
1170 {
1171     std::vector<ISyncTaskContext *> abortContexts;
1172     {
1173         std::lock_guard<std::mutex> lock(contextMapLock_);
1174         for (const auto &entry : syncTaskContextMap_) {
1175             auto context = entry.second;
1176             if (context == nullptr || context->IsKilled()) {
1177                 continue;
1178             }
1179             RefObject::IncObjRef(context);
1180             if (context->GetSyncId() == syncId) {
1181                 RefObject::IncObjRef(context);
1182                 abortContexts.push_back(context);
1183             }
1184             RefObject::DecObjRef(context);
1185         }
1186     }
1187     for (const auto &abortContext : abortContexts) {
1188         abortContext->AbortMachineIfNeed(static_cast<uint32_t>(syncId));
1189         RefObject::DecObjRef(abortContext);
1190     }
1191 }
1192 
WaitingExecTaskExist()1193 void SyncEngine::WaitingExecTaskExist()
1194 {
1195     std::unique_lock<std::mutex> closeLock(execTaskCountLock_);
1196     bool isTimeout = execTaskCv_.wait_for(closeLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
1197         [this]() { return execTaskCount_ == 0; });
1198     if (!isTimeout) {
1199         LOGD("SyncEngine Close with executing task!");
1200     }
1201 }
1202 } // namespace DistributedDB
1203