• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_engine.h"
17 
18 #include <algorithm>
19 #include <deque>
20 #include <functional>
21 
22 #include "ability_sync.h"
23 #include "db_common.h"
24 #include "db_dump_helper.h"
25 #include "db_errno.h"
26 #include "device_manager.h"
27 #include "hash.h"
28 #include "isync_state_machine.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "single_ver_serialize_manager.h"
32 #include "subscribe_manager.h"
33 #include "time_sync.h"
34 
35 #ifndef OMIT_MULTI_VER
36 #include "commit_history_sync.h"
37 #include "multi_ver_data_sync.h"
38 #include "value_slice_sync.h"
39 #endif
40 
41 namespace DistributedDB {
42 int SyncEngine::queueCacheSize_ = 0;
43 int SyncEngine::maxQueueCacheSize_ = DEFAULT_CACHE_SIZE;
44 unsigned int SyncEngine::discardMsgNum_ = 0;
45 std::mutex SyncEngine::queueLock_;
46 
SyncEngine()47 SyncEngine::SyncEngine()
48     : syncInterface_(nullptr),
49       communicator_(nullptr),
50       deviceManager_(nullptr),
51       metadata_(nullptr),
52       execTaskCount_(0),
53       isSyncRetry_(false),
54       communicatorProxy_(nullptr),
55       isActive_(false),
56       remoteExecutor_(nullptr)
57 {
58 }
59 
~SyncEngine()60 SyncEngine::~SyncEngine()
61 {
62     LOGD("[SyncEngine] ~SyncEngine!");
63     ClearInnerResource();
64     equalIdentifierMap_.clear();
65     subManager_ = nullptr;
66     LOGD("[SyncEngine] ~SyncEngine ok!");
67 }
68 
Initialize(ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,const InitCallbackParam & callbackParam)69 int SyncEngine::Initialize(ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata,
70     const InitCallbackParam &callbackParam)
71 {
72     if ((syncInterface == nullptr) || (metadata == nullptr)) {
73         return -E_INVALID_ARGS;
74     }
75     int errCode = StartAutoSubscribeTimer(*syncInterface);
76     if (errCode != E_OK) {
77         return errCode;
78     }
79 
80     errCode = InitComunicator(syncInterface);
81     if (errCode != E_OK) {
82         LOGE("[SyncEngine] Init Communicator failed");
83         // There need to set nullptr. other wise, syncInterface will be
84         // DecRef in th destroy-method.
85         StopAutoSubscribeTimer();
86         return errCode;
87     }
88     onRemoteDataChanged_ = callbackParam.onRemoteDataChanged;
89     offlineChanged_ = callbackParam.offlineChanged;
90     queryAutoSyncCallback_ = callbackParam.queryAutoSyncCallback;
91     errCode = InitInnerSource(callbackParam.onRemoteDataChanged, callbackParam.offlineChanged, syncInterface);
92     if (errCode != E_OK) {
93         // reset ptr if initialize device manager failed
94         StopAutoSubscribeTimer();
95         return errCode;
96     }
97     SetSyncInterface(syncInterface);
98     if (subManager_ == nullptr) {
99         subManager_ = std::make_shared<SubscribeManager>();
100     }
101     metadata_ = metadata;
102     isActive_ = true;
103     LOGI("[SyncEngine] Engine [%s] init ok", label_.c_str());
104     return E_OK;
105 }
106 
Close()107 int SyncEngine::Close()
108 {
109     LOGI("[SyncEngine] SyncEngine [%s] close enter!", label_.c_str());
110     isActive_ = false;
111     UnRegCommunicatorsCallback();
112     StopAutoSubscribeTimer();
113     std::vector<ISyncTaskContext *> decContext;
114     // Clear SyncContexts
115     {
116         std::unique_lock<std::mutex> lock(contextMapLock_);
117         for (auto &iter : syncTaskContextMap_) {
118             decContext.push_back(iter.second);
119             iter.second = nullptr;
120         }
121         syncTaskContextMap_.clear();
122     }
123     for (auto &iter : decContext) {
124         RefObject::KillAndDecObjRef(iter);
125         iter = nullptr;
126     }
127     WaitingExecTaskExist();
128     ReleaseCommunicators();
129     {
130         std::lock_guard<std::mutex> msgLock(queueLock_);
131         while (!msgQueue_.empty()) {
132             Message *inMsg = msgQueue_.front();
133             msgQueue_.pop_front();
134             if (inMsg != nullptr) { // LCOV_EXCL_BR_LINE
135                 queueCacheSize_ -= GetMsgSize(inMsg);
136                 delete inMsg;
137                 inMsg = nullptr;
138             }
139         }
140     }
141     // close db, rekey or import scene, need clear all remote query info
142     // local query info will destroy with syncEngine destruct
143     if (subManager_ != nullptr) {
144         subManager_->ClearAllRemoteQuery();
145     }
146 
147     RemoteExecutor *executor = GetAndIncRemoteExector();
148     if (executor != nullptr) {
149         executor->Close();
150         RefObject::DecObjRef(executor);
151         executor = nullptr;
152     }
153     ClearInnerResource();
154     LOGI("[SyncEngine] SyncEngine [%s] closed!", label_.c_str());
155     return E_OK;
156 }
157 
AddSyncOperation(SyncOperation * operation)158 int SyncEngine::AddSyncOperation(SyncOperation *operation)
159 {
160     if (operation == nullptr) {
161         LOGE("[SyncEngine] operation is nullptr");
162         return -E_INVALID_ARGS;
163     }
164 
165     std::vector<std::string> devices = operation->GetDevices();
166     std::string localDeviceId;
167     int errCode = GetLocalDeviceId(localDeviceId);
168     for (const auto &deviceId : devices) {
169         if (errCode != E_OK) {
170             operation->SetStatus(deviceId, errCode == -E_BUSY ?
171                 SyncOperation::OP_BUSY_FAILURE : SyncOperation::OP_FAILED);
172             continue;
173         }
174         if (!CheckDeviceIdValid(deviceId, localDeviceId)) {
175             operation->SetStatus(deviceId, SyncOperation::OP_INVALID_ARGS);
176             continue;
177         }
178         operation->SetStatus(deviceId, SyncOperation::OP_WAITING);
179         if (AddSyncOperForContext(deviceId, operation) != E_OK) {
180             operation->SetStatus(deviceId, SyncOperation::OP_FAILED);
181         }
182     }
183     return E_OK;
184 }
185 
RemoveSyncOperation(int syncId)186 void SyncEngine::RemoveSyncOperation(int syncId)
187 {
188     std::lock_guard<std::mutex> lock(contextMapLock_);
189     for (auto &iter : syncTaskContextMap_) {
190         ISyncTaskContext *context = iter.second;
191         if (context != nullptr) {
192             context->RemoveSyncOperation(syncId);
193         }
194     }
195 }
196 
197 #ifndef OMIT_MULTI_VER
BroadCastDataChanged() const198 void SyncEngine::BroadCastDataChanged() const
199 {
200     if (deviceManager_ != nullptr) {
201         (void)deviceManager_->SendBroadCast(LOCAL_DATA_CHANGED);
202     }
203 }
204 #endif // OMIT_MULTI_VER
205 
StartCommunicator()206 void SyncEngine::StartCommunicator()
207 {
208     if (communicator_ == nullptr) {
209         LOGE("[SyncEngine][StartCommunicator] communicator is not set!");
210         return;
211     }
212     LOGD("[SyncEngine][StartCommunicator] RegOnConnectCallback");
213     int errCode = communicator_->RegOnConnectCallback(
214         [this, deviceManager = deviceManager_](const std::string &targetDev, bool isConnect) {
215             deviceManager->OnDeviceConnectCallback(targetDev, isConnect);
216         }, nullptr);
217     if (errCode != E_OK) {
218         LOGE("[SyncEngine][StartCommunicator] register failed, auto sync can not use! err %d", errCode);
219         return;
220     }
221     communicator_->Activate(GetUserId());
222 }
223 
GetOnlineDevices(std::vector<std::string> & devices) const224 void SyncEngine::GetOnlineDevices(std::vector<std::string> &devices) const
225 {
226     devices.clear();
227     if (deviceManager_ != nullptr) {
228         deviceManager_->GetOnlineDevices(devices);
229     }
230 }
231 
InitInnerSource(const std::function<void (std::string)> & onRemoteDataChanged,const std::function<void (std::string)> & offlineChanged,ISyncInterface * syncInterface)232 int SyncEngine::InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged,
233     const std::function<void(std::string)> &offlineChanged, ISyncInterface *syncInterface)
234 {
235     deviceManager_ = new (std::nothrow) DeviceManager();
236     if (deviceManager_ == nullptr) {
237         LOGE("[SyncEngine] deviceManager alloc failed!");
238         return -E_OUT_OF_MEMORY;
239     }
240     auto executor = new (std::nothrow) RemoteExecutor();
241     if (executor == nullptr) {
242         LOGE("[SyncEngine] remoteExecutor alloc failed!");
243         delete deviceManager_;
244         deviceManager_ = nullptr;
245         return -E_OUT_OF_MEMORY;
246     }
247 
248     int errCode = E_OK;
249     do {
250         CommunicatorProxy *comProxy = nullptr;
251         {
252             std::lock_guard<std::mutex> lock(communicatorProxyLock_);
253             comProxy = communicatorProxy_;
254             RefObject::IncObjRef(comProxy);
255         }
256         errCode = deviceManager_->Initialize(comProxy, onRemoteDataChanged, offlineChanged);
257         RefObject::DecObjRef(comProxy);
258         if (errCode != E_OK) {
259             LOGE("[SyncEngine] deviceManager init failed! err %d", errCode);
260             break;
261         }
262         errCode = executor->Initialize(syncInterface, communicator_);
263     } while (false);
264     if (errCode != E_OK) {
265         delete deviceManager_;
266         deviceManager_ = nullptr;
267         delete executor;
268         executor = nullptr;
269     } else {
270         SetRemoteExector(executor);
271     }
272     return errCode;
273 }
274 
InitComunicator(const ISyncInterface * syncInterface)275 int SyncEngine::InitComunicator(const ISyncInterface *syncInterface)
276 {
277     ICommunicatorAggregator *communicatorAggregator = nullptr;
278     int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
279     if (communicatorAggregator == nullptr) {
280         LOGE("[SyncEngine] Get ICommunicatorAggregator error when init the sync engine err = %d", errCode);
281         return errCode;
282     }
283     std::vector<uint8_t> label = syncInterface->GetIdentifier();
284     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
285     if (isSyncDualTupleMode) {
286         std::vector<uint8_t> dualTuplelabel = syncInterface->GetDualTupleIdentifier();
287         LOGI("[SyncEngine] dual tuple mode, original identifier=%.3s, target identifier=%.3s", VEC_TO_STR(label),
288             VEC_TO_STR(dualTuplelabel));
289         communicator_ = communicatorAggregator->AllocCommunicator(dualTuplelabel, errCode, GetUserId(syncInterface));
290     } else {
291         communicator_ = communicatorAggregator->AllocCommunicator(label, errCode, GetUserId(syncInterface));
292     }
293     if (communicator_ == nullptr) {
294         LOGE("[SyncEngine] AllocCommunicator error when init the sync engine! err = %d", errCode);
295         return errCode;
296     }
297 
298     errCode = communicator_->RegOnMessageCallback(
299         [this](const std::string &targetDev, Message *inMsg) { MessageReciveCallback(targetDev, inMsg); }, []() {});
300     if (errCode != E_OK) {
301         LOGE("[SyncEngine] SyncRequestCallback register failed! err = %d", errCode);
302         communicatorAggregator->ReleaseCommunicator(communicator_, GetUserId(syncInterface));
303         communicator_ = nullptr;
304         return errCode;
305     }
306     {
307         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
308         communicatorProxy_ = new (std::nothrow) CommunicatorProxy();
309         if (communicatorProxy_ == nullptr) {
310             communicatorAggregator->ReleaseCommunicator(communicator_, GetUserId(syncInterface));
311             communicator_ = nullptr;
312             return -E_OUT_OF_MEMORY;
313         }
314         communicatorProxy_->SetMainCommunicator(communicator_);
315     }
316     label.resize(3); // only show 3 Bytes enough
317     label_ = DBCommon::VectorToHexString(label);
318     LOGD("[SyncEngine] RegOnConnectCallback");
319     return errCode;
320 }
321 
AddSyncOperForContext(const std::string & deviceId,SyncOperation * operation)322 int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation)
323 {
324     int errCode = E_OK;
325     ISyncTaskContext *context = nullptr;
326     {
327         std::lock_guard<std::mutex> lock(contextMapLock_);
328         context = FindSyncTaskContext(deviceId);
329         if (context == nullptr) {
330             if (!IsKilled()) {
331                 context = GetSyncTaskContext(deviceId, errCode);
332             }
333             if (context == nullptr) {
334                 return errCode;
335             }
336         }
337         if (context->IsKilled()) { // LCOV_EXCL_BR_LINE
338             return -E_OBJ_IS_KILLED;
339         }
340         // IncRef for SyncEngine to make sure context is valid, to avoid a big lock
341         RefObject::IncObjRef(context);
342     }
343 
344     errCode = context->AddSyncOperation(operation);
345     if (operation != nullptr) {
346         operation->SetSyncContext(context); // make the life cycle of context and operation are same
347     }
348     RefObject::DecObjRef(context);
349     return errCode;
350 }
351 
MessageReciveCallbackTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)352 void SyncEngine::MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator,
353     Message *inMsg)
354 {
355     std::string deviceId = context->GetDeviceId();
356 
357     if (inMsg->GetMessageId() != LOCAL_DATA_CHANGED) {
358         int errCode = context->ReceiveMessageCallback(inMsg);
359         if (errCode == -E_NOT_NEED_DELETE_MSG) {
360             goto MSG_CALLBACK_OUT_NOT_DEL;
361         }
362         // add auto sync here while recv subscribe request
363         QuerySyncObject syncObject;
364         if (errCode == E_OK && context->IsNeedTriggerQueryAutoSync(inMsg, syncObject)) {
365             InternalSyncParma param;
366             GetQueryAutoSyncParam(deviceId, syncObject, param);
367             queryAutoSyncCallback_(param);
368         }
369     }
370 
371     delete inMsg;
372     inMsg = nullptr;
373 MSG_CALLBACK_OUT_NOT_DEL:
374     ScheduleTaskOut(context, communicator);
375 }
376 
RemoteDataChangedTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)377 void SyncEngine::RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg)
378 {
379     std::string deviceId = context->GetDeviceId();
380     if (onRemoteDataChanged_ && deviceManager_->IsDeviceOnline(deviceId)) {
381         onRemoteDataChanged_(deviceId);
382     } else {
383         LOGE("[SyncEngine] onRemoteDataChanged is null!");
384     }
385     delete inMsg;
386     inMsg = nullptr;
387     ScheduleTaskOut(context, communicator);
388 }
389 
ScheduleTaskOut(ISyncTaskContext * context,const ICommunicator * communicator)390 void SyncEngine::ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator)
391 {
392     (void)DealMsgUtilQueueEmpty();
393     DecExecTaskCount();
394     RefObject::DecObjRef(communicator);
395     RefObject::DecObjRef(context);
396 }
397 
DealMsgUtilQueueEmpty()398 int SyncEngine::DealMsgUtilQueueEmpty()
399 {
400     if (!isActive_) {
401         return -E_BUSY; // db is closing just return
402     }
403     int errCode = E_OK;
404     Message *inMsg = nullptr;
405     {
406         std::lock_guard<std::mutex> lock(queueLock_);
407         if (msgQueue_.empty()) {
408             return errCode;
409         }
410         inMsg = msgQueue_.front();
411         msgQueue_.pop_front();
412         queueCacheSize_ -= GetMsgSize(inMsg);
413     }
414 
415     IncExecTaskCount();
416     // it will deal with the first message in queue, we should increase object reference counts and sure that resources
417     // could be prevented from destroying by other threads.
418     do {
419         ISyncTaskContext *nextContext = GetContextForMsg(inMsg->GetTarget(), errCode);
420         if (errCode != E_OK) {
421             break;
422         }
423         errCode = ScheduleDealMsg(nextContext, inMsg);
424         if (errCode != E_OK) {
425             RefObject::DecObjRef(nextContext);
426         }
427     } while (false);
428     if (errCode != E_OK) {
429         delete inMsg;
430         inMsg = nullptr;
431         DecExecTaskCount();
432     }
433     return errCode;
434 }
435 
GetContextForMsg(const std::string & targetDev,int & errCode)436 ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, int &errCode)
437 {
438     ISyncTaskContext *context = nullptr;
439     {
440         std::lock_guard<std::mutex> lock(contextMapLock_);
441         context = FindSyncTaskContext(targetDev);
442         if (context != nullptr) { // LCOV_EXCL_BR_LINE
443             if (context->IsKilled()) {
444                 errCode = -E_OBJ_IS_KILLED;
445                 return nullptr;
446             }
447         } else {
448             if (IsKilled()) {
449                 errCode = -E_OBJ_IS_KILLED;
450                 return nullptr;
451             }
452             context = GetSyncTaskContext(targetDev, errCode);
453             if (context == nullptr) {
454                 return nullptr;
455             }
456         }
457         // IncRef for context to make sure context is valid, when task run another thread
458         RefObject::IncObjRef(context);
459     }
460     return context;
461 }
462 
ScheduleDealMsg(ISyncTaskContext * context,Message * inMsg)463 int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg)
464 {
465     if (inMsg == nullptr) {
466         LOGE("[SyncEngine] MessageReciveCallback inMsg is null!");
467         DecExecTaskCount();
468         return E_OK;
469     }
470     CommunicatorProxy *comProxy = nullptr;
471     {
472         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
473         comProxy = communicatorProxy_;
474         RefObject::IncObjRef(comProxy);
475     }
476     int errCode = E_OK;
477     // deal remote local data changed message
478     if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
479         RemoteDataChangedTask(context, comProxy, inMsg);
480     } else {
481         errCode = RuntimeContext::GetInstance()->ScheduleTask(
482             [this, context, comProxy, inMsg] { MessageReciveCallbackTask(context, comProxy, inMsg); });
483     }
484 
485     if (errCode != E_OK) {
486         LOGE("[SyncEngine] MessageReciveCallbackTask Schedule failed err %d", errCode);
487         RefObject::DecObjRef(comProxy);
488     }
489     return errCode;
490 }
491 
MessageReciveCallback(const std::string & targetDev,Message * inMsg)492 void SyncEngine::MessageReciveCallback(const std::string &targetDev, Message *inMsg)
493 {
494     IncExecTaskCount();
495     int errCode = MessageReciveCallbackInner(targetDev, inMsg);
496     if (errCode != E_OK) {
497         if (inMsg != nullptr) {
498             delete inMsg;
499             inMsg = nullptr;
500         }
501         DecExecTaskCount();
502         LOGE("[SyncEngine] MessageReciveCallback failed!");
503     }
504 }
505 
MessageReciveCallbackInner(const std::string & targetDev,Message * inMsg)506 int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg)
507 {
508     if (targetDev.empty() || inMsg == nullptr) {
509         LOGE("[SyncEngine][MessageReciveCallback] from a invalid device or inMsg is null ");
510         return -E_INVALID_ARGS;
511     }
512     if (!isActive_) {
513         LOGE("[SyncEngine] engine is closing, ignore msg");
514         return -E_BUSY;
515     }
516     if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE) {
517         return HandleRemoteExecutorMsg(targetDev, inMsg);
518     }
519 
520     int msgSize = 0;
521     if (!IsSkipCalculateLen(inMsg)) {
522         msgSize = GetMsgSize(inMsg);
523         if (msgSize <= 0) {
524             LOGE("[SyncEngine] GetMsgSize makes a mistake");
525             return -E_NOT_SUPPORT;
526         }
527     }
528 
529     {
530         std::lock_guard<std::mutex> lock(queueLock_);
531         if ((queueCacheSize_ + msgSize) > maxQueueCacheSize_) {
532             LOGE("[SyncEngine] The size of message queue is beyond maximum");
533             discardMsgNum_++;
534             return -E_BUSY;
535         }
536 
537         if (execTaskCount_ > MAX_EXEC_NUM) {
538             PutMsgIntoQueue(targetDev, inMsg, msgSize);
539             // task dont exec here
540             DecExecTaskCount();
541             return E_OK;
542         }
543     }
544 
545     int errCode = E_OK;
546     ISyncTaskContext *nextContext = GetContextForMsg(targetDev, errCode);
547     if (errCode != E_OK) {
548         return errCode;
549     }
550     LOGD("[SyncEngine] MessageReciveCallback MSG ID = %d", inMsg->GetMessageId());
551     return ScheduleDealMsg(nextContext, inMsg);
552 }
553 
PutMsgIntoQueue(const std::string & targetDev,Message * inMsg,int msgSize)554 void SyncEngine::PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize)
555 {
556     if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
557         auto iter = std::find_if(msgQueue_.begin(), msgQueue_.end(),
558             [&targetDev](const Message *msg) {
559                 return targetDev == msg->GetTarget() && msg->GetMessageId() == LOCAL_DATA_CHANGED;
560             });
561         if (iter != msgQueue_.end()) { // LCOV_EXCL_BR_LINE
562             delete inMsg;
563             inMsg = nullptr;
564             return;
565         }
566     }
567     inMsg->SetTarget(targetDev);
568     msgQueue_.push_back(inMsg);
569     queueCacheSize_ += msgSize;
570     LOGW("[SyncEngine] The quantity of executing threads is beyond maximum. msgQueueSize = %zu", msgQueue_.size());
571 }
572 
GetMsgSize(const Message * inMsg) const573 int SyncEngine::GetMsgSize(const Message *inMsg) const
574 {
575     switch (inMsg->GetMessageId()) {
576         case TIME_SYNC_MESSAGE:
577             return TimeSync::CalculateLen(inMsg);
578         case ABILITY_SYNC_MESSAGE:
579             return AbilitySync::CalculateLen(inMsg);
580         case DATA_SYNC_MESSAGE:
581         case QUERY_SYNC_MESSAGE:
582         case CONTROL_SYNC_MESSAGE:
583             return SingleVerSerializeManager::CalculateLen(inMsg);
584 #ifndef OMIT_MULTI_VER
585         case COMMIT_HISTORY_SYNC_MESSAGE:
586             return CommitHistorySync::CalculateLen(inMsg);
587         case MULTI_VER_DATA_SYNC_MESSAGE:
588             return MultiVerDataSync::CalculateLen(inMsg);
589         case VALUE_SLICE_SYNC_MESSAGE:
590             return ValueSliceSync::CalculateLen(inMsg);
591 #endif
592         case LOCAL_DATA_CHANGED:
593             return DeviceManager::CalculateLen();
594         default:
595             LOGE("[SyncEngine] GetMsgSize not support msgId:%u", inMsg->GetMessageId());
596             return -E_NOT_SUPPORT;
597     }
598 }
599 
FindSyncTaskContext(const std::string & deviceId)600 ISyncTaskContext *SyncEngine::FindSyncTaskContext(const std::string &deviceId)
601 {
602     auto iter = syncTaskContextMap_.find(deviceId);
603     if (iter != syncTaskContextMap_.end()) {
604         ISyncTaskContext *context = iter->second;
605         return context;
606     }
607     return nullptr;
608 }
609 
GetSyncTaskContextAndInc(const std::string & deviceId)610 ISyncTaskContext *SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId)
611 {
612     ISyncTaskContext *context = nullptr;
613     std::lock_guard<std::mutex> lock(contextMapLock_);
614     context = FindSyncTaskContext(deviceId);
615     if (context == nullptr) {
616         LOGI("[SyncEngine] dev=%s, context is null, no need to clear sync operation", STR_MASK(deviceId));
617         return nullptr;
618     }
619     if (context->IsKilled()) { // LCOV_EXCL_BR_LINE
620         LOGI("[SyncEngine] context is killing");
621         return nullptr;
622     }
623     RefObject::IncObjRef(context);
624     return context;
625 }
626 
GetSyncTaskContext(const std::string & deviceId,int & errCode)627 ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, int &errCode)
628 {
629     auto storage = GetAndIncSyncInterface();
630     if (storage == nullptr) {
631         errCode = -E_INVALID_DB;
632         LOGE("[SyncEngine] SyncTaskContext alloc failed with null db");
633         return nullptr;
634     }
635     ISyncTaskContext *context = CreateSyncTaskContext(*storage);
636     if (context == nullptr) {
637         errCode = -E_OUT_OF_MEMORY;
638         LOGE("[SyncEngine] SyncTaskContext alloc failed, may be no memory available!");
639         return nullptr;
640     }
641     errCode = context->Initialize(deviceId, storage, metadata_, communicatorProxy_);
642     if (errCode != E_OK) {
643         LOGE("[SyncEngine] context init failed err %d, dev %s", errCode, STR_MASK(deviceId));
644         RefObject::DecObjRef(context);
645         storage->DecRefCount();
646         context = nullptr;
647         return nullptr;
648     }
649     syncTaskContextMap_.insert(std::pair<std::string, ISyncTaskContext *>(deviceId, context));
650     // IncRef for SyncEngine to make sure SyncEngine is valid when context access
651     RefObject::IncObjRef(this);
652     context->OnLastRef([this, deviceId, storage]() {
653         LOGD("[SyncEngine] SyncTaskContext for id %s finalized", STR_MASK(deviceId));
654         RefObject::DecObjRef(this);
655         storage->DecRefCount();
656     });
657     context->RegOnSyncTask([this, context] { return ExecSyncTask(context); });
658     return context;
659 }
660 
ExecSyncTask(ISyncTaskContext * context)661 int SyncEngine::ExecSyncTask(ISyncTaskContext *context)
662 {
663     if (IsKilled()) {
664         return -E_OBJ_IS_KILLED;
665     }
666     auto timeout = GetTimeout(context->GetDeviceId());
667     AutoLock lockGuard(context);
668     int status = context->GetTaskExecStatus();
669     if ((status == SyncTaskContext::RUNNING) || context->IsKilled()) {
670         return -E_NOT_SUPPORT;
671     }
672     context->SetTaskExecStatus(ISyncTaskContext::RUNNING);
673     while (!context->IsTargetQueueEmpty()) {
674         int errCode = context->GetNextTarget(timeout);
675         if (errCode != E_OK) {
676             // current task execute failed, try next task
677             context->ClearSyncOperation();
678             continue;
679         }
680         if (context->IsCurrentSyncTaskCanBeSkipped()) { // LCOV_EXCL_BR_LINE
681             context->SetOperationStatus(SyncOperation::OP_FINISHED_ALL);
682             context->ClearSyncOperation();
683             continue;
684         }
685         context->UnlockObj();
686         errCode = context->StartStateMachine();
687         context->LockObj();
688         if (errCode != E_OK) {
689             // machine start failed because timer start failed, try to execute next task
690             LOGW("[SyncEngine] machine StartSync failed");
691             context->SetOperationStatus(SyncOperation::OP_FAILED);
692             context->ClearSyncOperation();
693             continue;
694         }
695         // now task is running just return here
696         return errCode;
697     }
698     LOGD("[SyncEngine] ExecSyncTask finished");
699     context->SetTaskExecStatus(ISyncTaskContext::FINISHED);
700     return E_OK;
701 }
702 
GetQueueCacheSize() const703 int SyncEngine::GetQueueCacheSize() const
704 {
705     return queueCacheSize_;
706 }
707 
SetQueueCacheSize(int size)708 void SyncEngine::SetQueueCacheSize(int size)
709 {
710     std::lock_guard<std::mutex> lock(queueLock_);
711     queueCacheSize_ = size;
712 }
713 
GetDiscardMsgNum() const714 unsigned int SyncEngine::GetDiscardMsgNum() const
715 {
716     return discardMsgNum_;
717 }
718 
SetDiscardMsgNum(unsigned int num)719 void SyncEngine::SetDiscardMsgNum(unsigned int num)
720 {
721     std::lock_guard<std::mutex> lock(queueLock_);
722     discardMsgNum_ = num;
723 }
724 
GetMaxExecNum() const725 unsigned int SyncEngine::GetMaxExecNum() const
726 {
727     return MAX_EXEC_NUM;
728 }
729 
GetMaxQueueCacheSize() const730 int SyncEngine::GetMaxQueueCacheSize() const
731 {
732     return maxQueueCacheSize_;
733 }
734 
SetMaxQueueCacheSize(int value)735 void SyncEngine::SetMaxQueueCacheSize(int value)
736 {
737     maxQueueCacheSize_ = value;
738 }
739 
GetLabel() const740 std::string SyncEngine::GetLabel() const
741 {
742     return label_;
743 }
744 
GetSyncRetry() const745 bool SyncEngine::GetSyncRetry() const
746 {
747     return isSyncRetry_;
748 }
749 
SetSyncRetry(bool isRetry)750 void SyncEngine::SetSyncRetry(bool isRetry)
751 {
752     if (isSyncRetry_ == isRetry) {
753         LOGI("sync retry is equal, syncTry=%d, no need to set.", isRetry);
754         return;
755     }
756     isSyncRetry_ = isRetry;
757     LOGI("[SyncEngine] SetSyncRetry:%d ok", isRetry);
758     std::lock_guard<std::mutex> lock(contextMapLock_);
759     for (auto &iter : syncTaskContextMap_) {
760         ISyncTaskContext *context = iter.second;
761         if (context != nullptr) { // LCOV_EXCL_BR_LINE
762             context->SetSyncRetry(isRetry);
763         }
764     }
765 }
766 
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)767 int SyncEngine::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
768 {
769     if (!isActive_) {
770         LOGI("[SyncEngine] engine is closed, just put into map");
771         return E_OK;
772     }
773     ICommunicator *communicator = nullptr;
774     {
775         std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
776         if (equalCommunicators_.count(identifier) != 0) {
777             communicator = equalCommunicators_[identifier];
778         } else {
779             int errCode = E_OK;
780             communicator = AllocCommunicator(identifier, errCode, GetUserId());
781             if (communicator == nullptr) {
782                 return errCode;
783             }
784             equalCommunicators_[identifier] = communicator;
785         }
786     }
787     std::string targetDevices;
788     for (const auto &dev : targets) {
789         targetDevices += DBCommon::StringMasking(dev) + ",";
790     }
791     LOGI("[SyncEngine] set equal identifier=%.3s, original=%.3s, targetDevices=%s",
792         DBCommon::TransferStringToHex(identifier).c_str(), label_.c_str(),
793         targetDevices.substr(0, (targetDevices.size() > 0 ? targetDevices.size() - 1 : 0)).c_str());
794     {
795         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
796         if (communicatorProxy_ == nullptr) {
797             return -E_INTERNAL_ERROR;
798         }
799         communicatorProxy_->SetEqualCommunicator(communicator, identifier, targets);
800     }
801     communicator->Activate(GetUserId());
802     return E_OK;
803 }
804 
SetEqualIdentifier()805 void SyncEngine::SetEqualIdentifier()
806 {
807     std::map<std::string, std::vector<std::string>> equalIdentifier; // key: equalIdentifier value: devices
808     for (auto &item : equalIdentifierMap_) {
809         if (equalIdentifier.find(item.second) == equalIdentifier.end()) { // LCOV_EXCL_BR_LINE
810             equalIdentifier[item.second] = {item.first};
811         } else {
812             equalIdentifier[item.second].push_back(item.first);
813         }
814     }
815     for (const auto &item : equalIdentifier) {
816         SetEqualIdentifier(item.first, item.second);
817     }
818 }
819 
SetEqualIdentifierMap(const std::string & identifier,const std::vector<std::string> & targets)820 void SyncEngine::SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets)
821 {
822     for (auto iter = equalIdentifierMap_.begin(); iter != equalIdentifierMap_.end();) {
823         if (identifier == iter->second) {
824             iter = equalIdentifierMap_.erase(iter);
825             continue;
826         }
827         iter++;
828     }
829     for (const auto &device : targets) {
830         equalIdentifierMap_[device] = identifier;
831     }
832 }
833 
OfflineHandleByDevice(const std::string & deviceId,ISyncInterface * storage)834 void SyncEngine::OfflineHandleByDevice(const std::string &deviceId, ISyncInterface *storage)
835 {
836     if (!isActive_) {
837         LOGD("[SyncEngine][OfflineHandleByDevice] ignore offline because not init");
838         return;
839     }
840     RemoteExecutor *executor = GetAndIncRemoteExector();
841     if (executor != nullptr) {
842         executor->NotifyDeviceOffline(deviceId);
843         RefObject::DecObjRef(executor);
844         executor = nullptr;
845     }
846     // db closed or device is offline
847     // clear remote subscribe and trigger
848     std::vector<std::string> remoteQueryId;
849     subManager_->GetRemoteSubscribeQueryIds(deviceId, remoteQueryId);
850     subManager_->ClearRemoteSubscribeQuery(deviceId);
851     for (const auto &queryId: remoteQueryId) {
852         if (!subManager_->IsQueryExistSubscribe(queryId)) {
853             static_cast<SingleVerKvDBSyncInterface *>(storage)->RemoveSubscribe(queryId);
854         }
855     }
856     DBInfo dbInfo;
857     static_cast<SyncGenericInterface *>(storage)->GetDBInfo(dbInfo);
858     RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, deviceId);
859     // get context and Inc context if context is not nullptr
860     ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId);
861     {
862         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
863         if (communicatorProxy_ == nullptr) {
864             return;
865         }
866         if (communicatorProxy_->IsDeviceOnline(deviceId)) { // LCOV_EXCL_BR_LINE
867             LOGI("[SyncEngine] target dev=%s is online, no need to clear task.", STR_MASK(deviceId));
868             RefObject::DecObjRef(context);
869             return;
870         }
871     }
872     // means device is offline, clear local subscribe
873     subManager_->ClearLocalSubscribeQuery(deviceId);
874     // clear sync task
875     if (context != nullptr) {
876         context->ClearAllSyncTask();
877         RefObject::DecObjRef(context);
878     }
879 }
880 
ClearAllSyncTaskByDevice(const std::string & deviceId)881 void SyncEngine::ClearAllSyncTaskByDevice(const std::string &deviceId)
882 {
883     ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId);
884     if (context != nullptr) {
885         context->ClearAllSyncTask();
886         RefObject::DecObjRef(context);
887     }
888 }
889 
GetLocalSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)890 void SyncEngine::GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
891 {
892     subManager_->GetLocalSubscribeQueries(device, subscribeQueries);
893 }
894 
GetRemoteSubscribeQueryIds(const std::string & device,std::vector<std::string> & subscribeQueryIds)895 void SyncEngine::GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds)
896 {
897     subManager_->GetRemoteSubscribeQueryIds(device, subscribeQueryIds);
898 }
899 
GetRemoteSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)900 void SyncEngine::GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
901 {
902     subManager_->GetRemoteSubscribeQueries(device, subscribeQueries);
903 }
904 
PutUnfinishedSubQueries(const std::string & device,const std::vector<QuerySyncObject> & subscribeQueries)905 void SyncEngine::PutUnfinishedSubQueries(const std::string &device,
906     const std::vector<QuerySyncObject> &subscribeQueries)
907 {
908     subManager_->PutLocalUnFinishedSubQueries(device, subscribeQueries);
909 }
910 
GetAllUnFinishSubQueries(std::map<std::string,std::vector<QuerySyncObject>> & allSyncQueries)911 void SyncEngine::GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries)
912 {
913     subManager_->GetAllUnFinishSubQueries(allSyncQueries);
914 }
915 
AllocCommunicator(const std::string & identifier,int & errCode,std::string userId)916 ICommunicator *SyncEngine::AllocCommunicator(const std::string &identifier, int &errCode, std::string userId)
917 {
918     ICommunicatorAggregator *communicatorAggregator = nullptr;
919     errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
920     if (communicatorAggregator == nullptr) {
921         LOGE("[SyncEngine] Get ICommunicatorAggregator error when SetEqualIdentifier err = %d", errCode);
922         return nullptr;
923     }
924     std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
925     auto communicator = communicatorAggregator->AllocCommunicator(identifierVect, errCode, userId);
926     if (communicator == nullptr) {
927         LOGE("[SyncEngine] AllocCommunicator error when SetEqualIdentifier! err = %d", errCode);
928         return communicator;
929     }
930 
931     errCode = communicator->RegOnMessageCallback(
932         [this](const std::string &targetDev, Message *inMsg) { MessageReciveCallback(targetDev, inMsg); }, []() {});
933     if (errCode != E_OK) {
934         LOGE("[SyncEngine] SyncRequestCallback register failed in SetEqualIdentifier! err = %d", errCode);
935         communicatorAggregator->ReleaseCommunicator(communicator, userId);
936         return nullptr;
937     }
938 
939     errCode = communicator->RegOnConnectCallback(
940         [this, deviceManager = deviceManager_](const std::string &targetDev, bool isConnect) {
941             deviceManager->OnDeviceConnectCallback(targetDev, isConnect);
942         }, nullptr);
943     if (errCode != E_OK) {
944         LOGE("[SyncEngine][RegConnCB] register failed in SetEqualIdentifier! err %d", errCode);
945         communicator->RegOnMessageCallback(nullptr, nullptr);
946         communicatorAggregator->ReleaseCommunicator(communicator, userId);
947         return nullptr;
948     }
949 
950     return communicator;
951 }
952 
UnRegCommunicatorsCallback()953 void SyncEngine::UnRegCommunicatorsCallback()
954 {
955     if (communicator_ != nullptr) {
956         communicator_->RegOnMessageCallback(nullptr, nullptr);
957         communicator_->RegOnConnectCallback(nullptr, nullptr);
958         communicator_->RegOnSendableCallback(nullptr, nullptr);
959     }
960     std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
961     for (const auto &iter : equalCommunicators_) {
962         iter.second->RegOnMessageCallback(nullptr, nullptr);
963         iter.second->RegOnConnectCallback(nullptr, nullptr);
964         iter.second->RegOnSendableCallback(nullptr, nullptr);
965     }
966 }
967 
ReleaseCommunicators()968 void SyncEngine::ReleaseCommunicators()
969 {
970     {
971         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
972         RefObject::KillAndDecObjRef(communicatorProxy_);
973         communicatorProxy_ = nullptr;
974     }
975     ICommunicatorAggregator *communicatorAggregator = nullptr;
976     int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
977     if (communicatorAggregator == nullptr) {
978         LOGF("[SyncEngine] ICommunicatorAggregator get failed when fialize SyncEngine err %d", errCode);
979         return;
980     }
981 
982     if (communicator_ != nullptr) {
983         communicatorAggregator->ReleaseCommunicator(communicator_, GetUserId());
984         communicator_ = nullptr;
985     }
986 
987     std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
988     for (auto &iter : equalCommunicators_) {
989         communicatorAggregator->ReleaseCommunicator(iter.second, GetUserId());
990     }
991     equalCommunicators_.clear();
992 }
993 
IsSkipCalculateLen(const Message * inMsg)994 bool SyncEngine::IsSkipCalculateLen(const Message *inMsg)
995 {
996     if (inMsg->IsFeedbackError()) {
997         LOGE("[SyncEngine] Feedback Message with errorNo=%u.", inMsg->GetErrorNo());
998         return true;
999     }
1000     return false;
1001 }
1002 
GetSubscribeSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)1003 void SyncEngine::GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query,
1004     InternalSyncParma &outParam)
1005 {
1006     outParam.devices = { device };
1007     outParam.mode = SyncModeType::AUTO_SUBSCRIBE_QUERY;
1008     outParam.isQuerySync = true;
1009     outParam.syncQuery = query;
1010 }
1011 
GetQueryAutoSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)1012 void SyncEngine::GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query,
1013     InternalSyncParma &outParam)
1014 {
1015     outParam.devices = { device };
1016     outParam.mode = SyncModeType::AUTO_PUSH;
1017     outParam.isQuerySync = true;
1018     outParam.syncQuery = query;
1019 }
1020 
StartAutoSubscribeTimer(const ISyncInterface & syncInterface)1021 int SyncEngine::StartAutoSubscribeTimer([[gnu::unused]] const ISyncInterface &syncInterface)
1022 {
1023     return E_OK;
1024 }
1025 
StopAutoSubscribeTimer()1026 void SyncEngine::StopAutoSubscribeTimer()
1027 {
1028 }
1029 
SubscribeLimitCheck(const std::vector<std::string> & devices,QuerySyncObject & query) const1030 int SyncEngine::SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
1031 {
1032     return subManager_->LocalSubscribeLimitCheck(devices, query);
1033 }
1034 
1035 
ClearInnerResource()1036 void SyncEngine::ClearInnerResource()
1037 {
1038     ClearSyncInterface();
1039     if (deviceManager_ != nullptr) {
1040         delete deviceManager_;
1041         deviceManager_ = nullptr;
1042     }
1043     communicator_ = nullptr;
1044     metadata_ = nullptr;
1045     onRemoteDataChanged_ = nullptr;
1046     offlineChanged_ = nullptr;
1047     queryAutoSyncCallback_ = nullptr;
1048     std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1049     if (remoteExecutor_ != nullptr) {
1050         RefObject::KillAndDecObjRef(remoteExecutor_);
1051         remoteExecutor_ = nullptr;
1052     }
1053 }
1054 
IsEngineActive() const1055 bool SyncEngine::IsEngineActive() const
1056 {
1057     return isActive_;
1058 }
1059 
SchemaChange()1060 void SyncEngine::SchemaChange()
1061 {
1062     std::vector<ISyncTaskContext *> tmpContextVec;
1063     {
1064         std::lock_guard<std::mutex> lock(contextMapLock_);
1065         for (const auto &entry : syncTaskContextMap_) { // LCOV_EXCL_BR_LINE
1066             auto context = entry.second;
1067             if (context == nullptr || context->IsKilled()) {
1068                 continue;
1069             }
1070             RefObject::IncObjRef(context);
1071             tmpContextVec.push_back(context);
1072         }
1073     }
1074     for (const auto &entryContext : tmpContextVec) {
1075         entryContext->SchemaChange();
1076         RefObject::DecObjRef(entryContext);
1077     }
1078 }
1079 
IncExecTaskCount()1080 void SyncEngine::IncExecTaskCount()
1081 {
1082     std::lock_guard<std::mutex> incLock(execTaskCountLock_);
1083     execTaskCount_++;
1084 }
1085 
DecExecTaskCount()1086 void SyncEngine::DecExecTaskCount()
1087 {
1088     {
1089         std::lock_guard<std::mutex> decLock(execTaskCountLock_);
1090         execTaskCount_--;
1091     }
1092     execTaskCv_.notify_all();
1093 }
1094 
Dump(int fd)1095 void SyncEngine::Dump(int fd)
1096 {
1097     {
1098         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
1099         std::string communicatorLabel;
1100         if (communicatorProxy_ != nullptr) {
1101             communicatorProxy_->GetLocalIdentity(communicatorLabel);
1102         }
1103         DBDumpHelper::Dump(fd, "\tcommunicator label = %s, equalIdentify Info [\n", communicatorLabel.c_str());
1104         if (communicatorProxy_ != nullptr) {
1105             communicatorProxy_->GetLocalIdentity(communicatorLabel);
1106             communicatorProxy_->Dump(fd);
1107         }
1108     }
1109     DBDumpHelper::Dump(fd, "\t]\n\tcontext info [\n");
1110     // dump context info
1111     std::lock_guard<std::mutex> autoLock(contextMapLock_);
1112     for (const auto &entry : syncTaskContextMap_) {
1113         if (entry.second != nullptr) {
1114             entry.second->Dump(fd);
1115         }
1116     }
1117     DBDumpHelper::Dump(fd, "\t]\n\n");
1118 }
1119 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)1120 int SyncEngine::RemoteQuery(const std::string &device, const RemoteCondition &condition,
1121     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
1122 {
1123     RemoteExecutor *executor = GetAndIncRemoteExector();
1124     if (!isActive_ || executor == nullptr) {
1125         RefObject::DecObjRef(executor);
1126         return -E_BUSY; // db is closing just return
1127     }
1128     int errCode = executor->RemoteQuery(device, condition, timeout, connectionId, result);
1129     RefObject::DecObjRef(executor);
1130     return errCode;
1131 }
1132 
NotifyConnectionClosed(uint64_t connectionId)1133 void SyncEngine::NotifyConnectionClosed(uint64_t connectionId)
1134 {
1135     RemoteExecutor *executor = GetAndIncRemoteExector();
1136     if (!isActive_ || executor == nullptr) {
1137         RefObject::DecObjRef(executor);
1138         return; // db is closing just return
1139     }
1140     executor->NotifyConnectionClosed(connectionId);
1141     RefObject::DecObjRef(executor);
1142 }
1143 
NotifyUserChange()1144 void SyncEngine::NotifyUserChange()
1145 {
1146     RemoteExecutor *executor = GetAndIncRemoteExector();
1147     if (!isActive_ || executor == nullptr) {
1148         RefObject::DecObjRef(executor);
1149         return; // db is closing just return
1150     }
1151     executor->NotifyUserChange();
1152     RefObject::DecObjRef(executor);
1153 }
1154 
GetAndIncRemoteExector()1155 RemoteExecutor *SyncEngine::GetAndIncRemoteExector()
1156 {
1157     std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1158     RefObject::IncObjRef(remoteExecutor_);
1159     return remoteExecutor_;
1160 }
1161 
SetRemoteExector(RemoteExecutor * executor)1162 void SyncEngine::SetRemoteExector(RemoteExecutor *executor)
1163 {
1164     std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1165     remoteExecutor_ = executor;
1166 }
1167 
CheckDeviceIdValid(const std::string & checkDeviceId,const std::string & localDeviceId)1168 bool SyncEngine::CheckDeviceIdValid(const std::string &checkDeviceId, const std::string &localDeviceId)
1169 {
1170     if (checkDeviceId.empty()) {
1171         return false;
1172     }
1173     if (checkDeviceId.length() > DBConstant::MAX_DEV_LENGTH) {
1174         LOGE("[SyncEngine] dev is too long len=%zu", checkDeviceId.length());
1175         return false;
1176     }
1177     return localDeviceId != checkDeviceId;
1178 }
1179 
GetLocalDeviceId(std::string & deviceId)1180 int SyncEngine::GetLocalDeviceId(std::string &deviceId)
1181 {
1182     if (!isActive_ || communicator_ == nullptr) {
1183         // db is closing
1184         return -E_BUSY;
1185     }
1186     auto communicator = communicator_;
1187     RefObject::IncObjRef(communicator);
1188     int errCode = communicator->GetLocalIdentity(deviceId);
1189     RefObject::DecObjRef(communicator);
1190     return errCode;
1191 }
1192 
AbortMachineIfNeed(uint32_t syncId)1193 void SyncEngine::AbortMachineIfNeed(uint32_t syncId)
1194 {
1195     std::vector<ISyncTaskContext *> abortContexts;
1196     {
1197         std::lock_guard<std::mutex> lock(contextMapLock_);
1198         for (const auto &entry : syncTaskContextMap_) {
1199             auto context = entry.second;
1200             if (context == nullptr || context->IsKilled()) { // LCOV_EXCL_BR_LINE
1201                 continue;
1202             }
1203             RefObject::IncObjRef(context);
1204             if (context->GetSyncId() == syncId) {
1205                 RefObject::IncObjRef(context);
1206                 abortContexts.push_back(context);
1207             }
1208             RefObject::DecObjRef(context);
1209         }
1210     }
1211     for (const auto &abortContext : abortContexts) {
1212         abortContext->AbortMachineIfNeed(static_cast<uint32_t>(syncId));
1213         RefObject::DecObjRef(abortContext);
1214     }
1215 }
1216 
WaitingExecTaskExist()1217 void SyncEngine::WaitingExecTaskExist()
1218 {
1219     std::unique_lock<std::mutex> closeLock(execTaskCountLock_);
1220     bool isTimeout = execTaskCv_.wait_for(closeLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
1221         [this]() { return execTaskCount_ == 0; });
1222     if (!isTimeout) { // LCOV_EXCL_BR_LINE
1223         LOGD("SyncEngine Close with executing task!");
1224     }
1225 }
1226 
HandleRemoteExecutorMsg(const std::string & targetDev,Message * inMsg)1227 int SyncEngine::HandleRemoteExecutorMsg(const std::string &targetDev, Message *inMsg)
1228 {
1229     RemoteExecutor *executor = GetAndIncRemoteExector();
1230     int errCode = E_OK;
1231     if (executor != nullptr) {
1232         errCode = executor->ReceiveMessage(targetDev, inMsg);
1233     } else {
1234         errCode = -E_BUSY;
1235     }
1236     DecExecTaskCount();
1237     RefObject::DecObjRef(executor);
1238     return errCode;
1239 }
1240 
AddSubscribe(SyncGenericInterface * storage,const std::map<std::string,std::vector<QuerySyncObject>> & subscribeQuery)1241 void SyncEngine::AddSubscribe(SyncGenericInterface *storage,
1242     const std::map<std::string, std::vector<QuerySyncObject>> &subscribeQuery)
1243 {
1244     for (const auto &[device, queryList]: subscribeQuery) {
1245         for (const auto &query: queryList) {
1246             AddQuerySubscribe(storage, device, query);
1247         }
1248     }
1249 }
1250 
AddQuerySubscribe(SyncGenericInterface * storage,const std::string & device,const QuerySyncObject & query)1251 void SyncEngine::AddQuerySubscribe(SyncGenericInterface *storage, const std::string &device,
1252     const QuerySyncObject &query)
1253 {
1254     int errCode = storage->AddSubscribe(query.GetIdentify(), query, true);
1255     if (errCode != E_OK) {
1256         LOGW("[SyncEngine][AddSubscribe] Add trigger failed dev = %s queryId = %s",
1257             STR_MASK(device), STR_MASK(query.GetIdentify()));
1258         return;
1259     }
1260     errCode = subManager_->ReserveRemoteSubscribeQuery(device, query);
1261     if (errCode != E_OK) {
1262         if (!subManager_->IsQueryExistSubscribe(query.GetIdentify())) { // LCOV_EXCL_BR_LINE
1263             (void)storage->RemoveSubscribe(query.GetIdentify());
1264         }
1265         return;
1266     }
1267     subManager_->ActiveRemoteSubscribeQuery(device, query);
1268 }
1269 
TimeChange()1270 void SyncEngine::TimeChange()
1271 {
1272     std::vector<ISyncTaskContext *> decContext;
1273     {
1274         // copy context
1275         std::lock_guard<std::mutex> lock(contextMapLock_);
1276         for (const auto &iter : syncTaskContextMap_) {
1277             RefObject::IncObjRef(iter.second);
1278             decContext.push_back(iter.second);
1279         }
1280     }
1281     for (auto &iter : decContext) {
1282         iter->TimeChange();
1283         RefObject::DecObjRef(iter);
1284     }
1285 }
1286 
GetResponseTaskCount()1287 int32_t SyncEngine::GetResponseTaskCount()
1288 {
1289     std::vector<ISyncTaskContext *> decContext;
1290     {
1291         // copy context
1292         std::lock_guard<std::mutex> lock(contextMapLock_);
1293         for (const auto &iter : syncTaskContextMap_) {
1294             RefObject::IncObjRef(iter.second);
1295             decContext.push_back(iter.second);
1296         }
1297     }
1298     int32_t taskCount = 0;
1299     for (auto &iter : decContext) {
1300         taskCount += iter->GetResponseTaskCount();
1301         RefObject::DecObjRef(iter);
1302     }
1303     {
1304         std::lock_guard<std::mutex> decLock(execTaskCountLock_);
1305         taskCount += static_cast<int32_t>(execTaskCount_);
1306     }
1307     return taskCount;
1308 }
1309 
ClearSyncInterface()1310 void SyncEngine::ClearSyncInterface()
1311 {
1312     ISyncInterface *syncInterface = nullptr;
1313     {
1314         std::lock_guard<std::mutex> autoLock(storageMutex_);
1315         if (syncInterface_ == nullptr) {
1316             return;
1317         }
1318         syncInterface = syncInterface_;
1319         syncInterface_ = nullptr;
1320     }
1321     syncInterface->DecRefCount();
1322 }
1323 
GetAndIncSyncInterface()1324 ISyncInterface *SyncEngine::GetAndIncSyncInterface()
1325 {
1326     std::lock_guard<std::mutex> autoLock(storageMutex_);
1327     if (syncInterface_ == nullptr) {
1328         return nullptr;
1329     }
1330     syncInterface_->IncRefCount();
1331     return syncInterface_;
1332 }
1333 
SetSyncInterface(ISyncInterface * syncInterface)1334 void SyncEngine::SetSyncInterface(ISyncInterface *syncInterface)
1335 {
1336     std::lock_guard<std::mutex> autoLock(storageMutex_);
1337     syncInterface_ = syncInterface;
1338 }
1339 
GetUserId(const ISyncInterface * syncInterface)1340 std::string SyncEngine::GetUserId(const ISyncInterface *syncInterface)
1341 {
1342     if (syncInterface == nullptr) {
1343         LOGW("[SyncEngine][GetUserId] sync interface has not initialized");
1344         return "";
1345     }
1346     std::string userId = syncInterface->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
1347     std::string subUserId = syncInterface->GetDbProperties().GetStringProp(DBProperties::SUB_USER, "");
1348     if (!subUserId.empty()) {
1349         userId += "-" + subUserId;
1350     }
1351     return userId;
1352 }
1353 
GetUserId()1354 std::string SyncEngine::GetUserId()
1355 {
1356     std::lock_guard<std::mutex> autoLock(storageMutex_);
1357     return GetUserId(syncInterface_);
1358 }
1359 
GetTimeout(const std::string & dev)1360 uint32_t SyncEngine::GetTimeout(const std::string &dev)
1361 {
1362     ICommunicator *communicator = nullptr;
1363     {
1364         std::lock_guard<std::mutex> autoLock(communicatorProxyLock_);
1365         if (communicatorProxy_ == nullptr) {
1366             LOGW("[SyncEngine] Communicator is null when get %.3s timeout", dev.c_str());
1367             return DBConstant::MIN_TIMEOUT;
1368         }
1369         communicator = communicatorProxy_;
1370         RefObject::IncObjRef(communicator);
1371     }
1372     uint32_t timeout = communicator->GetTimeout(dev);
1373     RefObject::DecObjRef(communicator);
1374     return timeout;
1375 }
1376 } // namespace DistributedDB
1377