• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_engine.h"
17 
18 #include <algorithm>
19 #include <deque>
20 #include <functional>
21 
22 #include "ability_sync.h"
23 #include "db_common.h"
24 #include "db_dump_helper.h"
25 #include "db_errno.h"
26 #include "device_manager.h"
27 #include "hash.h"
28 #include "isync_state_machine.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "single_ver_serialize_manager.h"
32 #include "subscribe_manager.h"
33 #include "time_sync.h"
34 
35 #ifndef OMIT_MULTI_VER
36 #include "commit_history_sync.h"
37 #include "multi_ver_data_sync.h"
38 #include "value_slice_sync.h"
39 #endif
40 
41 namespace DistributedDB {
42 int SyncEngine::queueCacheSize_ = 0;
43 int SyncEngine::maxQueueCacheSize_ = DEFAULT_CACHE_SIZE;
44 unsigned int SyncEngine::discardMsgNum_ = 0;
45 std::mutex SyncEngine::queueLock_;
46 
SyncEngine()47 SyncEngine::SyncEngine()
48     : syncInterface_(nullptr),
49       communicator_(nullptr),
50       deviceManager_(nullptr),
51       metadata_(nullptr),
52       execTaskCount_(0),
53       isSyncRetry_(false),
54       communicatorProxy_(nullptr),
55       isActive_(false),
56       remoteExecutor_(nullptr)
57 {
58 }
59 
~SyncEngine()60 SyncEngine::~SyncEngine()
61 {
62     LOGD("[SyncEngine] ~SyncEngine!");
63     ClearInnerResource();
64     equalIdentifierMap_.clear();
65     subManager_ = nullptr;
66     LOGD("[SyncEngine] ~SyncEngine ok!");
67 }
68 
Initialize(ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,const InitCallbackParam & callbackParam)69 int SyncEngine::Initialize(ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata,
70     const InitCallbackParam &callbackParam)
71 {
72     if ((syncInterface == nullptr) || (metadata == nullptr)) {
73         LOGE("[SyncEngine] [Initialize] syncInterface or metadata is nullptr.");
74         return -E_INVALID_ARGS;
75     }
76     int errCode = StartAutoSubscribeTimer(*syncInterface);
77     if (errCode != E_OK) {
78         return errCode;
79     }
80 
81     errCode = InitComunicator(syncInterface);
82     if (errCode != E_OK) {
83         LOGE("[SyncEngine] Init Communicator failed");
84         // There need to set nullptr. other wise, syncInterface will be
85         // DecRef in th destroy-method.
86         StopAutoSubscribeTimer();
87         return errCode;
88     }
89     onRemoteDataChanged_ = callbackParam.onRemoteDataChanged;
90     offlineChanged_ = callbackParam.offlineChanged;
91     queryAutoSyncCallback_ = callbackParam.queryAutoSyncCallback;
92     errCode = InitInnerSource(callbackParam.onRemoteDataChanged, callbackParam.offlineChanged, syncInterface);
93     if (errCode != E_OK) {
94         // reset ptr if initialize device manager failed
95         StopAutoSubscribeTimer();
96         return errCode;
97     }
98     SetSyncInterface(syncInterface);
99     if (subManager_ == nullptr) {
100         subManager_ = std::make_shared<SubscribeManager>();
101     }
102     metadata_ = metadata;
103     isActive_ = true;
104     LOGI("[SyncEngine] Engine [%.3s] init ok", label_.c_str());
105     return E_OK;
106 }
107 
Close()108 int SyncEngine::Close()
109 {
110     LOGI("[SyncEngine] [%.3s] close enter!", label_.c_str());
111     isActive_ = false;
112     UnRegCommunicatorsCallback();
113     StopAutoSubscribeTimer();
114     std::vector<ISyncTaskContext *> decContext;
115     // Clear SyncContexts
116     {
117         std::unique_lock<std::mutex> lock(contextMapLock_);
118         for (auto &iter : syncTaskContextMap_) {
119             decContext.push_back(iter.second);
120             iter.second = nullptr;
121         }
122         syncTaskContextMap_.clear();
123     }
124     for (auto &iter : decContext) {
125         RefObject::KillAndDecObjRef(iter);
126         iter = nullptr;
127     }
128     WaitingExecTaskExist();
129     ReleaseCommunicators();
130     {
131         std::lock_guard<std::mutex> msgLock(queueLock_);
132         while (!msgQueue_.empty()) {
133             Message *inMsg = msgQueue_.front();
134             msgQueue_.pop_front();
135             if (inMsg != nullptr) { // LCOV_EXCL_BR_LINE
136                 queueCacheSize_ -= GetMsgSize(inMsg);
137                 delete inMsg;
138                 inMsg = nullptr;
139             }
140         }
141     }
142     // close db, rekey or import scene, need clear all remote query info
143     // local query info will destroy with syncEngine destruct
144     if (subManager_ != nullptr) {
145         subManager_->ClearAllRemoteQuery();
146     }
147 
148     RemoteExecutor *executor = GetAndIncRemoteExector();
149     if (executor != nullptr) {
150         executor->Close();
151         RefObject::DecObjRef(executor);
152         executor = nullptr;
153     }
154     ClearInnerResource();
155     LOGI("[SyncEngine] [%.3s] closed!", label_.c_str());
156     return E_OK;
157 }
158 
AddSyncOperation(SyncOperation * operation)159 int SyncEngine::AddSyncOperation(SyncOperation *operation)
160 {
161     if (operation == nullptr) {
162         LOGE("[SyncEngine] operation is nullptr");
163         return -E_INVALID_ARGS;
164     }
165 
166     std::vector<std::string> devices = operation->GetDevices();
167     std::string localDeviceId;
168     int errCode = GetLocalDeviceId(localDeviceId);
169     for (const auto &deviceId : devices) {
170         if (errCode != E_OK) {
171             operation->SetStatus(deviceId, errCode == -E_BUSY ?
172                 SyncOperation::OP_BUSY_FAILURE : SyncOperation::OP_FAILED);
173             continue;
174         }
175         if (!CheckDeviceIdValid(deviceId, localDeviceId)) {
176             operation->SetStatus(deviceId, SyncOperation::OP_INVALID_ARGS);
177             continue;
178         }
179         operation->SetStatus(deviceId, SyncOperation::OP_WAITING);
180         if (AddSyncOperForContext(deviceId, operation) != E_OK) {
181             operation->SetStatus(deviceId, SyncOperation::OP_FAILED);
182         }
183     }
184     return E_OK;
185 }
186 
RemoveSyncOperation(int syncId)187 void SyncEngine::RemoveSyncOperation(int syncId)
188 {
189     std::lock_guard<std::mutex> lock(contextMapLock_);
190     for (auto &iter : syncTaskContextMap_) {
191         ISyncTaskContext *context = iter.second;
192         if (context != nullptr) {
193             context->RemoveSyncOperation(syncId);
194         }
195     }
196 }
197 
198 #ifndef OMIT_MULTI_VER
BroadCastDataChanged() const199 void SyncEngine::BroadCastDataChanged() const
200 {
201     if (deviceManager_ != nullptr) {
202         (void)deviceManager_->SendBroadCast(LOCAL_DATA_CHANGED);
203     }
204 }
205 #endif // OMIT_MULTI_VER
206 
StartCommunicator()207 void SyncEngine::StartCommunicator()
208 {
209     if (communicator_ == nullptr) {
210         LOGE("[SyncEngine][StartCommunicator] communicator is not set!");
211         return;
212     }
213     LOGD("[SyncEngine][StartCommunicator] RegOnConnectCallback");
214     int errCode = communicator_->RegOnConnectCallback(
215         [this, deviceManager = deviceManager_](const std::string &targetDev, bool isConnect) {
216             deviceManager->OnDeviceConnectCallback(targetDev, isConnect);
217         }, nullptr);
218     if (errCode != E_OK) {
219         LOGE("[SyncEngine][StartCommunicator] register failed, auto sync can not use! err %d", errCode);
220         return;
221     }
222     communicator_->Activate(GetUserId());
223 }
224 
GetOnlineDevices(std::vector<std::string> & devices) const225 void SyncEngine::GetOnlineDevices(std::vector<std::string> &devices) const
226 {
227     devices.clear();
228     if (deviceManager_ != nullptr) {
229         deviceManager_->GetOnlineDevices(devices);
230     }
231 }
232 
InitInnerSource(const std::function<void (std::string)> & onRemoteDataChanged,const std::function<void (std::string)> & offlineChanged,ISyncInterface * syncInterface)233 int SyncEngine::InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged,
234     const std::function<void(std::string)> &offlineChanged, ISyncInterface *syncInterface)
235 {
236     deviceManager_ = new (std::nothrow) DeviceManager();
237     if (deviceManager_ == nullptr) {
238         LOGE("[SyncEngine] deviceManager alloc failed!");
239         return -E_OUT_OF_MEMORY;
240     }
241     auto executor = new (std::nothrow) RemoteExecutor();
242     if (executor == nullptr) {
243         LOGE("[SyncEngine] remoteExecutor alloc failed!");
244         delete deviceManager_;
245         deviceManager_ = nullptr;
246         return -E_OUT_OF_MEMORY;
247     }
248 
249     int errCode = E_OK;
250     do {
251         CommunicatorProxy *comProxy = nullptr;
252         {
253             std::lock_guard<std::mutex> lock(communicatorProxyLock_);
254             comProxy = communicatorProxy_;
255             RefObject::IncObjRef(comProxy);
256         }
257         errCode = deviceManager_->Initialize(comProxy, onRemoteDataChanged, offlineChanged);
258         RefObject::DecObjRef(comProxy);
259         if (errCode != E_OK) {
260             LOGE("[SyncEngine] deviceManager init failed! err %d", errCode);
261             break;
262         }
263         errCode = executor->Initialize(syncInterface, communicator_);
264     } while (false);
265     if (errCode != E_OK) {
266         delete deviceManager_;
267         deviceManager_ = nullptr;
268         delete executor;
269         executor = nullptr;
270     } else {
271         SetRemoteExector(executor);
272     }
273     return errCode;
274 }
275 
InitComunicator(const ISyncInterface * syncInterface)276 int SyncEngine::InitComunicator(const ISyncInterface *syncInterface)
277 {
278     ICommunicatorAggregator *communicatorAggregator = nullptr;
279     int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
280     if (communicatorAggregator == nullptr) {
281         LOGE("[SyncEngine] Get ICommunicatorAggregator error when init the sync engine err = %d", errCode);
282         return errCode;
283     }
284     std::vector<uint8_t> label = syncInterface->GetIdentifier();
285     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
286     if (isSyncDualTupleMode) {
287         std::vector<uint8_t> dualTuplelabel = syncInterface->GetDualTupleIdentifier();
288         LOGI("[SyncEngine] dual tuple mode, original identifier=%.3s, target identifier=%.3s", VEC_TO_STR(label),
289             VEC_TO_STR(dualTuplelabel));
290         communicator_ = communicatorAggregator->AllocCommunicator(dualTuplelabel, errCode, GetUserId(syncInterface));
291     } else {
292         communicator_ = communicatorAggregator->AllocCommunicator(label, errCode, GetUserId(syncInterface));
293     }
294     if (communicator_ == nullptr) {
295         LOGE("[SyncEngine] AllocCommunicator error when init the sync engine! err = %d", errCode);
296         return errCode;
297     }
298 
299     errCode = RegCallbackOnInitComunicator(communicatorAggregator, syncInterface);
300     if (errCode != E_OK) {
301         return errCode;
302     }
303     {
304         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
305         communicatorProxy_ = new (std::nothrow) CommunicatorProxy();
306         if (communicatorProxy_ == nullptr) {
307             communicatorAggregator->ReleaseCommunicator(communicator_, GetUserId(syncInterface));
308             communicator_ = nullptr;
309             return -E_OUT_OF_MEMORY;
310         }
311         communicatorProxy_->SetMainCommunicator(communicator_);
312     }
313     label.resize(3); // only show 3 Bytes enough
314     label_ = DBCommon::VectorToHexString(label);
315     LOGD("[SyncEngine] RegOnConnectCallback");
316     return errCode;
317 }
318 
AddSyncOperForContext(const std::string & deviceId,SyncOperation * operation)319 int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation)
320 {
321     if (syncInterface_ == nullptr) {
322         LOGE("[SyncEngine][AddSyncOperForContext] sync interface has not initialized");
323         return -E_INVALID_DB;
324     }
325     bool isSyncDualTupleMode = syncInterface_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
326     std::string targetUserId = DBConstant::DEFAULT_USER;
327     if (isSyncDualTupleMode) {
328         targetUserId = GetTargetUserId(deviceId);
329     }
330     int errCode = E_OK;
331     ISyncTaskContext *context = nullptr;
332     {
333         std::lock_guard<std::mutex> lock(contextMapLock_);
334         context = FindSyncTaskContext({deviceId, targetUserId}, false);
335         if (context == nullptr) {
336             if (!IsKilled()) {
337                 context = GetSyncTaskContext({deviceId, targetUserId}, errCode);
338             }
339             if (context == nullptr) {
340                 return errCode;
341             }
342         }
343         if (context->IsKilled()) { // LCOV_EXCL_BR_LINE
344             return -E_OBJ_IS_KILLED;
345         }
346         // IncRef for SyncEngine to make sure context is valid, to avoid a big lock
347         RefObject::IncObjRef(context);
348     }
349 
350     errCode = context->AddSyncOperation(operation);
351     if (operation != nullptr) {
352         operation->SetSyncContext(context); // make the life cycle of context and operation are same
353     }
354     RefObject::DecObjRef(context);
355     return errCode;
356 }
357 
MessageReciveCallbackTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)358 void SyncEngine::MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator,
359     Message *inMsg)
360 {
361     std::string deviceId = context->GetDeviceId();
362 
363     if (inMsg->GetMessageId() != LOCAL_DATA_CHANGED) {
364         int errCode = context->ReceiveMessageCallback(inMsg);
365         if (errCode == -E_NOT_NEED_DELETE_MSG) {
366             goto MSG_CALLBACK_OUT_NOT_DEL;
367         }
368         // add auto sync here while recv subscribe request
369         QuerySyncObject syncObject;
370         if (errCode == E_OK && context->IsNeedTriggerQueryAutoSync(inMsg, syncObject)) {
371             InternalSyncParma param;
372             GetQueryAutoSyncParam(deviceId, syncObject, param);
373             queryAutoSyncCallback_(param);
374         }
375     }
376 
377     delete inMsg;
378     inMsg = nullptr;
379 MSG_CALLBACK_OUT_NOT_DEL:
380     ScheduleTaskOut(context, communicator);
381 }
382 
RemoteDataChangedTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)383 void SyncEngine::RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg)
384 {
385     std::string deviceId = context->GetDeviceId();
386     if (onRemoteDataChanged_ && deviceManager_->IsDeviceOnline(deviceId)) {
387         onRemoteDataChanged_(deviceId);
388     } else {
389         LOGE("[SyncEngine] onRemoteDataChanged is null!");
390     }
391     delete inMsg;
392     inMsg = nullptr;
393     ScheduleTaskOut(context, communicator);
394 }
395 
ScheduleTaskOut(ISyncTaskContext * context,const ICommunicator * communicator)396 void SyncEngine::ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator)
397 {
398     (void)DealMsgUtilQueueEmpty();
399     DecExecTaskCount();
400     RefObject::DecObjRef(communicator);
401     RefObject::DecObjRef(context);
402 }
403 
DealMsgUtilQueueEmpty()404 int SyncEngine::DealMsgUtilQueueEmpty()
405 {
406     if (!isActive_) {
407         return -E_BUSY; // db is closing just return
408     }
409     int errCode = E_OK;
410     Message *inMsg = nullptr;
411     {
412         std::lock_guard<std::mutex> lock(queueLock_);
413         if (msgQueue_.empty()) {
414             return errCode;
415         }
416         inMsg = msgQueue_.front();
417         msgQueue_.pop_front();
418         queueCacheSize_ -= GetMsgSize(inMsg);
419     }
420 
421     IncExecTaskCount();
422     // it will deal with the first message in queue, we should increase object reference counts and sure that resources
423     // could be prevented from destroying by other threads.
424     do {
425         ISyncTaskContext *nextContext = GetContextForMsg({inMsg->GetTarget(), inMsg->GetSenderUserId()}, errCode,
426             inMsg->GetErrorNo() == E_NEED_CORRECT_TARGET_USER);
427         if (errCode != E_OK) {
428             break;
429         }
430         errCode = ScheduleDealMsg(nextContext, inMsg);
431         if (errCode != E_OK) {
432             RefObject::DecObjRef(nextContext);
433         }
434     } while (false);
435     if (errCode != E_OK) {
436         delete inMsg;
437         inMsg = nullptr;
438         DecExecTaskCount();
439     }
440     return errCode;
441 }
442 
GetContextForMsg(const DeviceSyncTarget & target,int & errCode,bool isNeedCorrectUserId)443 ISyncTaskContext *SyncEngine::GetContextForMsg(const DeviceSyncTarget &target, int &errCode, bool isNeedCorrectUserId)
444 {
445     ISyncTaskContext *context = nullptr;
446     {
447         std::lock_guard<std::mutex> lock(contextMapLock_);
448         context = FindSyncTaskContext(target, isNeedCorrectUserId);
449         if (context != nullptr) { // LCOV_EXCL_BR_LINE
450             if (context->IsKilled()) {
451                 errCode = -E_OBJ_IS_KILLED;
452                 return nullptr;
453             }
454         } else {
455             if (IsKilled()) {
456                 errCode = -E_OBJ_IS_KILLED;
457                 return nullptr;
458             }
459             context = GetSyncTaskContext(target, errCode);
460             if (context == nullptr) {
461                 return nullptr;
462             }
463         }
464         // IncRef for context to make sure context is valid, when task run another thread
465         RefObject::IncObjRef(context);
466     }
467     return context;
468 }
469 
ScheduleDealMsg(ISyncTaskContext * context,Message * inMsg)470 int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg)
471 {
472     if (inMsg == nullptr) {
473         LOGE("[SyncEngine] MessageReciveCallback inMsg is null!");
474         DecExecTaskCount();
475         return E_OK;
476     }
477     CommunicatorProxy *comProxy = nullptr;
478     {
479         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
480         comProxy = communicatorProxy_;
481         RefObject::IncObjRef(comProxy);
482     }
483     int errCode = E_OK;
484     // deal remote local data changed message
485     if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
486         RemoteDataChangedTask(context, comProxy, inMsg);
487     } else {
488         errCode = RuntimeContext::GetInstance()->ScheduleTask(
489             [this, context, comProxy, inMsg] { MessageReciveCallbackTask(context, comProxy, inMsg); });
490     }
491 
492     if (errCode != E_OK) {
493         LOGE("[SyncEngine] MessageReciveCallbackTask Schedule failed err %d", errCode);
494         RefObject::DecObjRef(comProxy);
495     }
496     return errCode;
497 }
498 
MessageReciveCallback(const std::string & targetDev,Message * inMsg)499 int SyncEngine::MessageReciveCallback(const std::string &targetDev, Message *inMsg)
500 {
501     IncExecTaskCount();
502     int errCode = MessageReciveCallbackInner(targetDev, inMsg);
503     if (errCode != E_OK) {
504         if (inMsg != nullptr) {
505             delete inMsg;
506             inMsg = nullptr;
507         }
508         DecExecTaskCount();
509         LOGE("[SyncEngine] MessageReciveCallback failed!");
510     }
511     return errCode;
512 }
513 
MessageReciveCallbackInner(const std::string & targetDev,Message * inMsg)514 int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg)
515 {
516     if (targetDev.empty() || inMsg == nullptr) {
517         LOGE("[SyncEngine][MessageReciveCallback] from a invalid device or inMsg is null ");
518         return -E_INVALID_ARGS;
519     }
520     if (!isActive_) {
521         LOGE("[SyncEngine] engine is closing, ignore msg");
522         return -E_BUSY;
523     }
524     if (inMsg->IsSupportFeedDbClosing() && ExchangeClosePending(false)) {
525         return -E_FEEDBACK_DB_CLOSING;
526     }
527     if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE) {
528         return HandleRemoteExecutorMsg(targetDev, inMsg);
529     }
530 
531     int msgSize = 0;
532     if (!IsSkipCalculateLen(inMsg)) {
533         msgSize = GetMsgSize(inMsg);
534         if (msgSize <= 0) {
535             LOGE("[SyncEngine] GetMsgSize makes a mistake");
536             return -E_NOT_SUPPORT;
537         }
538     }
539 
540     {
541         std::lock_guard<std::mutex> lock(queueLock_);
542         if ((queueCacheSize_ + msgSize) > maxQueueCacheSize_) {
543             LOGE("[SyncEngine] The size of message queue is beyond maximum");
544             discardMsgNum_++;
545             return -E_BUSY;
546         }
547 
548         if (GetExecTaskCount() > MAX_EXEC_NUM) {
549             PutMsgIntoQueue(targetDev, inMsg, msgSize);
550             // task dont exec here
551             DecExecTaskCount();
552             return E_OK;
553         }
554     }
555 
556     int errCode = E_OK;
557     ISyncTaskContext *nextContext = GetContextForMsg({targetDev, inMsg->GetSenderUserId()}, errCode,
558         inMsg->GetErrorNo() == E_NEED_CORRECT_TARGET_USER);
559     if (errCode != E_OK) {
560         return errCode;
561     }
562     LOGD("[SyncEngine] MessageReciveCallback MSG ID = %d", inMsg->GetMessageId());
563     return ScheduleDealMsg(nextContext, inMsg);
564 }
565 
PutMsgIntoQueue(const std::string & targetDev,Message * inMsg,int msgSize)566 void SyncEngine::PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize)
567 {
568     if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
569         auto iter = std::find_if(msgQueue_.begin(), msgQueue_.end(),
570             [&targetDev](const Message *msg) {
571                 return targetDev == msg->GetTarget() && msg->GetMessageId() == LOCAL_DATA_CHANGED;
572             });
573         if (iter != msgQueue_.end()) { // LCOV_EXCL_BR_LINE
574             delete inMsg;
575             inMsg = nullptr;
576             return;
577         }
578     }
579     inMsg->SetTarget(targetDev);
580     msgQueue_.push_back(inMsg);
581     queueCacheSize_ += msgSize;
582     LOGW("[SyncEngine] The quantity of executing threads is beyond maximum. msgQueueSize = %zu", msgQueue_.size());
583 }
584 
GetMsgSize(const Message * inMsg) const585 int SyncEngine::GetMsgSize(const Message *inMsg) const
586 {
587     switch (inMsg->GetMessageId()) {
588         case TIME_SYNC_MESSAGE:
589             return TimeSync::CalculateLen(inMsg);
590         case ABILITY_SYNC_MESSAGE:
591             return AbilitySync::CalculateLen(inMsg);
592         case DATA_SYNC_MESSAGE:
593         case QUERY_SYNC_MESSAGE:
594         case CONTROL_SYNC_MESSAGE:
595             return SingleVerSerializeManager::CalculateLen(inMsg);
596 #ifndef OMIT_MULTI_VER
597         case COMMIT_HISTORY_SYNC_MESSAGE:
598             return CommitHistorySync::CalculateLen(inMsg);
599         case MULTI_VER_DATA_SYNC_MESSAGE:
600             return MultiVerDataSync::CalculateLen(inMsg);
601         case VALUE_SLICE_SYNC_MESSAGE:
602             return ValueSliceSync::CalculateLen(inMsg);
603 #endif
604         case LOCAL_DATA_CHANGED:
605             return DeviceManager::CalculateLen();
606         default:
607             LOGE("[SyncEngine] GetMsgSize not support msgId:%u", inMsg->GetMessageId());
608             return -E_NOT_SUPPORT;
609     }
610 }
611 
FindSyncTaskContext(const DeviceSyncTarget & target,bool isNeedCorrectUserId)612 ISyncTaskContext *SyncEngine::FindSyncTaskContext(const DeviceSyncTarget &target, bool isNeedCorrectUserId)
613 {
614     if (target.userId == DBConstant::DEFAULT_USER) {
615         for (auto it = syncTaskContextMap_.begin(); it != syncTaskContextMap_.end(); it++) {
616             if (it->first.device == target.device) {
617                 ISyncTaskContext *context = it->second;
618                 CorrectTargetUserId(it, isNeedCorrectUserId);
619                 return context;
620             }
621         }
622     }
623     auto iter = syncTaskContextMap_.find(target);
624     if (iter != syncTaskContextMap_.end()) {
625         ISyncTaskContext *context = iter->second;
626         CorrectTargetUserId(iter, isNeedCorrectUserId);
627         return context;
628     }
629     return nullptr;
630 }
631 
GetSyncTaskContextAndInc(const std::string & deviceId)632 std::vector<ISyncTaskContext *> SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId)
633 {
634     std::vector<ISyncTaskContext *> contexts;
635     std::lock_guard<std::mutex> lock(contextMapLock_);
636     for (const auto &iter : syncTaskContextMap_) {
637         if (iter.first.device != deviceId) {
638             continue;
639         }
640         if (iter.second == nullptr) {
641             LOGI("[SyncEngine] dev=%s, user=%s, context is null, no need to clear sync operation", STR_MASK(deviceId),
642                 iter.first.userId.c_str());
643             continue;
644         }
645         if (iter.second->IsKilled()) { // LCOV_EXCL_BR_LINE
646             LOGI("[SyncEngine] context is killing");
647             continue;
648         }
649         RefObject::IncObjRef(iter.second);
650         contexts.push_back(iter.second);
651     }
652     return contexts;
653 }
654 
GetSyncTaskContext(const DeviceSyncTarget & target,int & errCode)655 ISyncTaskContext *SyncEngine::GetSyncTaskContext(const DeviceSyncTarget &target, int &errCode)
656 {
657     auto storage = GetAndIncSyncInterface();
658     if (storage == nullptr) {
659         errCode = -E_INVALID_DB;
660         LOGE("[SyncEngine] SyncTaskContext alloc failed with null db");
661         return nullptr;
662     }
663     ISyncTaskContext *context = CreateSyncTaskContext(*storage);
664     if (context == nullptr) {
665         errCode = -E_OUT_OF_MEMORY;
666         LOGE("[SyncEngine] SyncTaskContext alloc failed, may be no memory available!");
667         return nullptr;
668     }
669     errCode = context->Initialize(target, storage, metadata_, communicatorProxy_);
670     if (errCode != E_OK) {
671         LOGE("[SyncEngine] context init failed err %d, dev %s", errCode, STR_MASK(target.device));
672         RefObject::DecObjRef(context);
673         storage->DecRefCount();
674         context = nullptr;
675         return nullptr;
676     }
677     syncTaskContextMap_.insert(std::pair<DeviceSyncTarget, ISyncTaskContext *>(target, context));
678     // IncRef for SyncEngine to make sure SyncEngine is valid when context access
679     RefObject::IncObjRef(this);
680     context->OnLastRef([this, target, storage]() {
681         LOGD("[SyncEngine] SyncTaskContext for id %s finalized", STR_MASK(target.device));
682         RefObject::DecObjRef(this);
683         storage->DecRefCount();
684     });
685     context->RegOnSyncTask([this, context] { return ExecSyncTask(context); });
686     return context;
687 }
688 
ExecSyncTask(ISyncTaskContext * context)689 int SyncEngine::ExecSyncTask(ISyncTaskContext *context)
690 {
691     if (IsKilled()) {
692         return -E_OBJ_IS_KILLED;
693     }
694     auto timeout = GetTimeout(context->GetDeviceId());
695     AutoLock lockGuard(context);
696     int status = context->GetTaskExecStatus();
697     if ((status == SyncTaskContext::RUNNING) || context->IsKilled()) {
698         return -E_NOT_SUPPORT;
699     }
700     context->SetTaskExecStatus(ISyncTaskContext::RUNNING);
701     while (!context->IsTargetQueueEmpty()) {
702         int errCode = context->GetNextTarget(timeout);
703         if (errCode != E_OK) {
704             // current task execute failed, try next task
705             context->ClearSyncOperation();
706             continue;
707         }
708         if (context->IsCurrentSyncTaskCanBeSkipped()) { // LCOV_EXCL_BR_LINE
709             context->SetOperationStatus(SyncOperation::OP_FINISHED_ALL);
710             context->ClearSyncOperation();
711             continue;
712         }
713         context->UnlockObj();
714         errCode = context->StartStateMachine();
715         context->LockObj();
716         if (errCode != E_OK) {
717             // machine start failed because timer start failed, try to execute next task
718             LOGW("[SyncEngine] machine StartSync failed");
719             context->SetOperationStatus(SyncOperation::OP_FAILED);
720             context->ClearSyncOperation();
721             continue;
722         }
723         // now task is running just return here
724         return errCode;
725     }
726     LOGD("[SyncEngine] ExecSyncTask finished");
727     context->SetTaskExecStatus(ISyncTaskContext::FINISHED);
728     return E_OK;
729 }
730 
GetQueueCacheSize() const731 int SyncEngine::GetQueueCacheSize() const
732 {
733     std::lock_guard<std::mutex> lock(queueLock_);
734     return queueCacheSize_;
735 }
736 
SetQueueCacheSize(int size)737 void SyncEngine::SetQueueCacheSize(int size)
738 {
739     std::lock_guard<std::mutex> lock(queueLock_);
740     queueCacheSize_ = size;
741 }
742 
GetDiscardMsgNum() const743 unsigned int SyncEngine::GetDiscardMsgNum() const
744 {
745     std::lock_guard<std::mutex> lock(queueLock_);
746     return discardMsgNum_;
747 }
748 
SetDiscardMsgNum(unsigned int num)749 void SyncEngine::SetDiscardMsgNum(unsigned int num)
750 {
751     std::lock_guard<std::mutex> lock(queueLock_);
752     discardMsgNum_ = num;
753 }
754 
GetMaxExecNum() const755 unsigned int SyncEngine::GetMaxExecNum() const
756 {
757     return MAX_EXEC_NUM;
758 }
759 
GetMaxQueueCacheSize() const760 int SyncEngine::GetMaxQueueCacheSize() const
761 {
762     return maxQueueCacheSize_;
763 }
764 
SetMaxQueueCacheSize(int value)765 void SyncEngine::SetMaxQueueCacheSize(int value)
766 {
767     maxQueueCacheSize_ = value;
768 }
769 
GetLabel() const770 std::string SyncEngine::GetLabel() const
771 {
772     return label_;
773 }
774 
GetSyncRetry() const775 bool SyncEngine::GetSyncRetry() const
776 {
777     return isSyncRetry_;
778 }
779 
SetSyncRetry(bool isRetry)780 void SyncEngine::SetSyncRetry(bool isRetry)
781 {
782     if (isSyncRetry_ == isRetry) {
783         LOGI("sync retry is equal, syncTry=%d, no need to set.", isRetry);
784         return;
785     }
786     isSyncRetry_ = isRetry;
787     LOGI("[SyncEngine] SetSyncRetry:%d ok", isRetry);
788     std::lock_guard<std::mutex> lock(contextMapLock_);
789     for (auto &iter : syncTaskContextMap_) {
790         ISyncTaskContext *context = iter.second;
791         if (context != nullptr) { // LCOV_EXCL_BR_LINE
792             context->SetSyncRetry(isRetry);
793         }
794     }
795 }
796 
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)797 int SyncEngine::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
798 {
799     if (!isActive_) {
800         LOGI("[SyncEngine] engine is closed, just put into map");
801         return E_OK;
802     }
803     ICommunicator *communicator = nullptr;
804     {
805         std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
806         if (equalCommunicators_.count(identifier) != 0) {
807             communicator = equalCommunicators_[identifier];
808         } else {
809             int errCode = E_OK;
810             communicator = AllocCommunicator(identifier, errCode, GetUserId());
811             if (communicator == nullptr) {
812                 return errCode;
813             }
814             equalCommunicators_[identifier] = communicator;
815         }
816     }
817     std::string targetDevices;
818     for (const auto &dev : targets) {
819         targetDevices += DBCommon::StringMasking(dev) + ",";
820     }
821     LOGI("[SyncEngine] set equal identifier=%.3s, original=%.3s, targetDevices=%s",
822         DBCommon::TransferStringToHex(identifier).c_str(), label_.c_str(),
823         targetDevices.substr(0, (targetDevices.size() > 0 ? targetDevices.size() - 1 : 0)).c_str());
824     {
825         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
826         if (communicatorProxy_ == nullptr) {
827             return -E_INTERNAL_ERROR;
828         }
829         communicatorProxy_->SetEqualCommunicator(communicator, identifier, targets);
830     }
831     communicator->Activate(GetUserId());
832     return E_OK;
833 }
834 
SetEqualIdentifier()835 void SyncEngine::SetEqualIdentifier()
836 {
837     std::map<std::string, std::vector<std::string>> equalIdentifier; // key: equalIdentifier value: devices
838     for (auto &item : equalIdentifierMap_) {
839         if (equalIdentifier.find(item.second) == equalIdentifier.end()) { // LCOV_EXCL_BR_LINE
840             equalIdentifier[item.second] = {item.first};
841         } else {
842             equalIdentifier[item.second].push_back(item.first);
843         }
844     }
845     for (const auto &item : equalIdentifier) {
846         SetEqualIdentifier(item.first, item.second);
847     }
848 }
849 
SetEqualIdentifierMap(const std::string & identifier,const std::vector<std::string> & targets)850 void SyncEngine::SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets)
851 {
852     for (auto iter = equalIdentifierMap_.begin(); iter != equalIdentifierMap_.end();) {
853         if (identifier == iter->second) {
854             iter = equalIdentifierMap_.erase(iter);
855             continue;
856         }
857         iter++;
858     }
859     for (const auto &device : targets) {
860         equalIdentifierMap_[device] = identifier;
861     }
862 }
863 
OfflineHandleByDevice(const std::string & deviceId,ISyncInterface * storage)864 void SyncEngine::OfflineHandleByDevice(const std::string &deviceId, ISyncInterface *storage)
865 {
866     if (!isActive_) {
867         LOGD("[SyncEngine][OfflineHandleByDevice] ignore offline because not init");
868         return;
869     }
870     RemoteExecutor *executor = GetAndIncRemoteExector();
871     if (executor != nullptr) {
872         executor->NotifyDeviceOffline(deviceId);
873         RefObject::DecObjRef(executor);
874         executor = nullptr;
875     }
876     // db closed or device is offline
877     // clear remote subscribe and trigger
878     std::vector<std::string> remoteQueryId;
879     subManager_->GetRemoteSubscribeQueryIds(deviceId, remoteQueryId);
880     subManager_->ClearRemoteSubscribeQuery(deviceId);
881     for (const auto &queryId: remoteQueryId) {
882         if (!subManager_->IsQueryExistSubscribe(queryId)) {
883             static_cast<SingleVerKvDBSyncInterface *>(storage)->RemoveSubscribe(queryId);
884         }
885     }
886     DBInfo dbInfo;
887     static_cast<SyncGenericInterface *>(storage)->GetDBInfo(dbInfo);
888     RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, deviceId);
889     {
890         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
891         if (communicatorProxy_ == nullptr) {
892             return;
893         }
894         if (communicatorProxy_->IsDeviceOnline(deviceId)) { // LCOV_EXCL_BR_LINE
895             LOGI("[SyncEngine] target dev=%s is online, no need to clear task.", STR_MASK(deviceId));
896             return;
897         }
898     }
899     // means device is offline, clear local subscribe
900     // get context and Inc context if context is not nullptr
901     std::vector<ISyncTaskContext *> contexts = GetSyncTaskContextAndInc(deviceId);
902     for (const auto &context : contexts) {
903         subManager_->ClearLocalSubscribeQuery(deviceId);
904         // clear sync task
905         if (context != nullptr) {
906             context->ClearAllSyncTask();
907             RefObject::DecObjRef(context);
908         }
909     }
910 }
911 
ClearAllSyncTaskByDevice(const std::string & deviceId)912 void SyncEngine::ClearAllSyncTaskByDevice(const std::string &deviceId)
913 {
914     std::vector<ISyncTaskContext *> contexts = GetSyncTaskContextAndInc(deviceId);
915     for (const auto &context : contexts) {
916         if (context != nullptr) {
917             context->ClearAllSyncTask();
918             RefObject::DecObjRef(context);
919         }
920     }
921 }
922 
GetLocalSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)923 void SyncEngine::GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
924 {
925     subManager_->GetLocalSubscribeQueries(device, subscribeQueries);
926 }
927 
GetRemoteSubscribeQueryIds(const std::string & device,std::vector<std::string> & subscribeQueryIds)928 void SyncEngine::GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds)
929 {
930     subManager_->GetRemoteSubscribeQueryIds(device, subscribeQueryIds);
931 }
932 
GetRemoteSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)933 void SyncEngine::GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
934 {
935     subManager_->GetRemoteSubscribeQueries(device, subscribeQueries);
936 }
937 
PutUnfinishedSubQueries(const std::string & device,const std::vector<QuerySyncObject> & subscribeQueries)938 void SyncEngine::PutUnfinishedSubQueries(const std::string &device,
939     const std::vector<QuerySyncObject> &subscribeQueries)
940 {
941     subManager_->PutLocalUnFinishedSubQueries(device, subscribeQueries);
942 }
943 
GetAllUnFinishSubQueries(std::map<std::string,std::vector<QuerySyncObject>> & allSyncQueries)944 void SyncEngine::GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries)
945 {
946     subManager_->GetAllUnFinishSubQueries(allSyncQueries);
947 }
948 
AllocCommunicator(const std::string & identifier,int & errCode,std::string userId)949 ICommunicator *SyncEngine::AllocCommunicator(const std::string &identifier, int &errCode, std::string userId)
950 {
951     ICommunicatorAggregator *communicatorAggregator = nullptr;
952     errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
953     if (communicatorAggregator == nullptr) {
954         LOGE("[SyncEngine] Get ICommunicatorAggregator error when SetEqualIdentifier err = %d", errCode);
955         return nullptr;
956     }
957     std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
958     auto communicator = communicatorAggregator->AllocCommunicator(identifierVect, errCode, userId);
959     if (communicator == nullptr) {
960         LOGE("[SyncEngine] AllocCommunicator error when SetEqualIdentifier! err = %d", errCode);
961         return communicator;
962     }
963 
964     errCode = communicator->RegOnMessageCallback(
965         [this](const std::string &targetDev, Message *inMsg) {
966             return MessageReciveCallback(targetDev, inMsg);
967         }, []() {});
968     if (errCode != E_OK) {
969         LOGE("[SyncEngine] SyncRequestCallback register failed in SetEqualIdentifier! err = %d", errCode);
970         communicatorAggregator->ReleaseCommunicator(communicator, userId);
971         return nullptr;
972     }
973 
974     errCode = communicator->RegOnConnectCallback(
975         [this, deviceManager = deviceManager_](const std::string &targetDev, bool isConnect) {
976             deviceManager->OnDeviceConnectCallback(targetDev, isConnect);
977         }, nullptr);
978     if (errCode != E_OK) {
979         LOGE("[SyncEngine][RegConnCB] register failed in SetEqualIdentifier! err %d", errCode);
980         communicator->RegOnMessageCallback(nullptr, nullptr);
981         communicatorAggregator->ReleaseCommunicator(communicator, userId);
982         return nullptr;
983     }
984 
985     return communicator;
986 }
987 
UnRegCommunicatorsCallback()988 void SyncEngine::UnRegCommunicatorsCallback()
989 {
990     if (communicator_ != nullptr) {
991         communicator_->RegOnMessageCallback(nullptr, nullptr);
992         communicator_->RegOnConnectCallback(nullptr, nullptr);
993         communicator_->RegOnSendableCallback(nullptr, nullptr);
994     }
995     std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
996     for (const auto &iter : equalCommunicators_) {
997         iter.second->RegOnMessageCallback(nullptr, nullptr);
998         iter.second->RegOnConnectCallback(nullptr, nullptr);
999         iter.second->RegOnSendableCallback(nullptr, nullptr);
1000     }
1001 }
1002 
ReleaseCommunicators()1003 void SyncEngine::ReleaseCommunicators()
1004 {
1005     {
1006         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
1007         RefObject::KillAndDecObjRef(communicatorProxy_);
1008         communicatorProxy_ = nullptr;
1009     }
1010     ICommunicatorAggregator *communicatorAggregator = nullptr;
1011     int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
1012     if (communicatorAggregator == nullptr) {
1013         LOGF("[SyncEngine] ICommunicatorAggregator get failed when fialize SyncEngine err %d", errCode);
1014         return;
1015     }
1016 
1017     if (communicator_ != nullptr) {
1018         communicatorAggregator->ReleaseCommunicator(communicator_, GetUserId());
1019         communicator_ = nullptr;
1020     }
1021 
1022     std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
1023     for (auto &iter : equalCommunicators_) {
1024         communicatorAggregator->ReleaseCommunicator(iter.second, GetUserId());
1025     }
1026     equalCommunicators_.clear();
1027 }
1028 
IsSkipCalculateLen(const Message * inMsg)1029 bool SyncEngine::IsSkipCalculateLen(const Message *inMsg)
1030 {
1031     if (inMsg->IsFeedbackError()) {
1032         LOGE("[SyncEngine] Feedback Message with errorNo=%u.", inMsg->GetErrorNo());
1033         return true;
1034     }
1035     return false;
1036 }
1037 
GetSubscribeSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)1038 void SyncEngine::GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query,
1039     InternalSyncParma &outParam)
1040 {
1041     outParam.devices = { device };
1042     outParam.mode = SyncModeType::AUTO_SUBSCRIBE_QUERY;
1043     outParam.isQuerySync = true;
1044     outParam.syncQuery = query;
1045 }
1046 
GetQueryAutoSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)1047 void SyncEngine::GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query,
1048     InternalSyncParma &outParam)
1049 {
1050     outParam.devices = { device };
1051     outParam.mode = SyncModeType::AUTO_PUSH;
1052     outParam.isQuerySync = true;
1053     outParam.syncQuery = query;
1054 }
1055 
StartAutoSubscribeTimer(const ISyncInterface & syncInterface)1056 int SyncEngine::StartAutoSubscribeTimer([[gnu::unused]] const ISyncInterface &syncInterface)
1057 {
1058     return E_OK;
1059 }
1060 
StopAutoSubscribeTimer()1061 void SyncEngine::StopAutoSubscribeTimer()
1062 {
1063 }
1064 
SubscribeLimitCheck(const std::vector<std::string> & devices,QuerySyncObject & query) const1065 int SyncEngine::SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
1066 {
1067     return subManager_->LocalSubscribeLimitCheck(devices, query);
1068 }
1069 
1070 
ClearInnerResource()1071 void SyncEngine::ClearInnerResource()
1072 {
1073     ClearSyncInterface();
1074     if (deviceManager_ != nullptr) {
1075         delete deviceManager_;
1076         deviceManager_ = nullptr;
1077     }
1078     communicator_ = nullptr;
1079     metadata_ = nullptr;
1080     onRemoteDataChanged_ = nullptr;
1081     offlineChanged_ = nullptr;
1082     queryAutoSyncCallback_ = nullptr;
1083     std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1084     if (remoteExecutor_ != nullptr) {
1085         RefObject::KillAndDecObjRef(remoteExecutor_);
1086         remoteExecutor_ = nullptr;
1087     }
1088 }
1089 
IsEngineActive() const1090 bool SyncEngine::IsEngineActive() const
1091 {
1092     return isActive_;
1093 }
1094 
SchemaChange()1095 void SyncEngine::SchemaChange()
1096 {
1097     std::vector<ISyncTaskContext *> tmpContextVec;
1098     {
1099         std::lock_guard<std::mutex> lock(contextMapLock_);
1100         for (const auto &entry : syncTaskContextMap_) { // LCOV_EXCL_BR_LINE
1101             auto context = entry.second;
1102             if (context == nullptr || context->IsKilled()) {
1103                 continue;
1104             }
1105             RefObject::IncObjRef(context);
1106             tmpContextVec.push_back(context);
1107         }
1108     }
1109     for (const auto &entryContext : tmpContextVec) {
1110         entryContext->SchemaChange();
1111         RefObject::DecObjRef(entryContext);
1112     }
1113 }
1114 
IncExecTaskCount()1115 void SyncEngine::IncExecTaskCount()
1116 {
1117     std::lock_guard<std::mutex> incLock(execTaskCountLock_);
1118     execTaskCount_++;
1119 }
1120 
DecExecTaskCount()1121 void SyncEngine::DecExecTaskCount()
1122 {
1123     {
1124         std::lock_guard<std::mutex> decLock(execTaskCountLock_);
1125         execTaskCount_--;
1126     }
1127     execTaskCv_.notify_all();
1128 }
1129 
GetExecTaskCount()1130 uint32_t SyncEngine::GetExecTaskCount()
1131 {
1132     std::lock_guard<std::mutex> autoLock(execTaskCountLock_);
1133     return execTaskCount_;
1134 }
1135 
Dump(int fd)1136 void SyncEngine::Dump(int fd)
1137 {
1138     {
1139         std::lock_guard<std::mutex> lock(communicatorProxyLock_);
1140         std::string communicatorLabel;
1141         if (communicatorProxy_ != nullptr) {
1142             communicatorProxy_->GetLocalIdentity(communicatorLabel);
1143         }
1144         DBDumpHelper::Dump(fd, "\tcommunicator label = %s, equalIdentify Info [\n", communicatorLabel.c_str());
1145         if (communicatorProxy_ != nullptr) {
1146             communicatorProxy_->GetLocalIdentity(communicatorLabel);
1147             communicatorProxy_->Dump(fd);
1148         }
1149     }
1150     DBDumpHelper::Dump(fd, "\t]\n\tcontext info [\n");
1151     // dump context info
1152     std::lock_guard<std::mutex> autoLock(contextMapLock_);
1153     for (const auto &entry : syncTaskContextMap_) {
1154         if (entry.second != nullptr) {
1155             entry.second->Dump(fd);
1156         }
1157     }
1158     DBDumpHelper::Dump(fd, "\t]\n\n");
1159 }
1160 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)1161 int SyncEngine::RemoteQuery(const std::string &device, const RemoteCondition &condition,
1162     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
1163 {
1164     RemoteExecutor *executor = GetAndIncRemoteExector();
1165     if (!isActive_ || executor == nullptr) {
1166         RefObject::DecObjRef(executor);
1167         return -E_BUSY; // db is closing just return
1168     }
1169     int errCode = executor->RemoteQuery(device, condition, timeout, connectionId, result);
1170     RefObject::DecObjRef(executor);
1171     return errCode;
1172 }
1173 
NotifyConnectionClosed(uint64_t connectionId)1174 void SyncEngine::NotifyConnectionClosed(uint64_t connectionId)
1175 {
1176     RemoteExecutor *executor = GetAndIncRemoteExector();
1177     if (!isActive_ || executor == nullptr) {
1178         RefObject::DecObjRef(executor);
1179         return; // db is closing just return
1180     }
1181     executor->NotifyConnectionClosed(connectionId);
1182     RefObject::DecObjRef(executor);
1183 }
1184 
NotifyUserChange()1185 void SyncEngine::NotifyUserChange()
1186 {
1187     RemoteExecutor *executor = GetAndIncRemoteExector();
1188     if (!isActive_ || executor == nullptr) {
1189         RefObject::DecObjRef(executor);
1190         return; // db is closing just return
1191     }
1192     executor->NotifyUserChange();
1193     RefObject::DecObjRef(executor);
1194 }
1195 
GetAndIncRemoteExector()1196 RemoteExecutor *SyncEngine::GetAndIncRemoteExector()
1197 {
1198     std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1199     RefObject::IncObjRef(remoteExecutor_);
1200     return remoteExecutor_;
1201 }
1202 
SetRemoteExector(RemoteExecutor * executor)1203 void SyncEngine::SetRemoteExector(RemoteExecutor *executor)
1204 {
1205     std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1206     remoteExecutor_ = executor;
1207 }
1208 
CheckDeviceIdValid(const std::string & checkDeviceId,const std::string & localDeviceId)1209 bool SyncEngine::CheckDeviceIdValid(const std::string &checkDeviceId, const std::string &localDeviceId)
1210 {
1211     if (checkDeviceId.empty()) {
1212         return false;
1213     }
1214     if (checkDeviceId.length() > DBConstant::MAX_DEV_LENGTH) {
1215         LOGE("[SyncEngine] dev is too long len=%zu", checkDeviceId.length());
1216         return false;
1217     }
1218     return localDeviceId != checkDeviceId;
1219 }
1220 
GetLocalDeviceId(std::string & deviceId)1221 int SyncEngine::GetLocalDeviceId(std::string &deviceId)
1222 {
1223     if (!isActive_ || communicator_ == nullptr) {
1224         // db is closing
1225         return -E_BUSY;
1226     }
1227     auto communicator = communicator_;
1228     RefObject::IncObjRef(communicator);
1229     int errCode = communicator->GetLocalIdentity(deviceId);
1230     RefObject::DecObjRef(communicator);
1231     return errCode;
1232 }
1233 
AbortMachineIfNeed(uint32_t syncId)1234 void SyncEngine::AbortMachineIfNeed(uint32_t syncId)
1235 {
1236     std::vector<ISyncTaskContext *> abortContexts;
1237     {
1238         std::lock_guard<std::mutex> lock(contextMapLock_);
1239         for (const auto &entry : syncTaskContextMap_) {
1240             auto context = entry.second;
1241             if (context == nullptr || context->IsKilled()) { // LCOV_EXCL_BR_LINE
1242                 continue;
1243             }
1244             RefObject::IncObjRef(context);
1245             if (context->GetSyncId() == syncId) {
1246                 RefObject::IncObjRef(context);
1247                 abortContexts.push_back(context);
1248             }
1249             RefObject::DecObjRef(context);
1250         }
1251     }
1252     for (const auto &abortContext : abortContexts) {
1253         abortContext->AbortMachineIfNeed(static_cast<uint32_t>(syncId));
1254         RefObject::DecObjRef(abortContext);
1255     }
1256 }
1257 
WaitingExecTaskExist()1258 void SyncEngine::WaitingExecTaskExist()
1259 {
1260     std::unique_lock<std::mutex> closeLock(execTaskCountLock_);
1261     bool isTimeout = execTaskCv_.wait_for(closeLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
1262         [this]() { return execTaskCount_ == 0; });
1263     if (!isTimeout) { // LCOV_EXCL_BR_LINE
1264         LOGD("SyncEngine Close with executing task!");
1265     }
1266 }
1267 
HandleRemoteExecutorMsg(const std::string & targetDev,Message * inMsg)1268 int SyncEngine::HandleRemoteExecutorMsg(const std::string &targetDev, Message *inMsg)
1269 {
1270     RemoteExecutor *executor = GetAndIncRemoteExector();
1271     int errCode = E_OK;
1272     if (executor != nullptr) {
1273         errCode = executor->ReceiveMessage(targetDev, inMsg);
1274     } else {
1275         errCode = -E_BUSY;
1276     }
1277     DecExecTaskCount();
1278     RefObject::DecObjRef(executor);
1279     return errCode;
1280 }
1281 
AddSubscribe(SyncGenericInterface * storage,const std::map<std::string,std::vector<QuerySyncObject>> & subscribeQuery)1282 void SyncEngine::AddSubscribe(SyncGenericInterface *storage,
1283     const std::map<std::string, std::vector<QuerySyncObject>> &subscribeQuery)
1284 {
1285     for (const auto &[device, queryList]: subscribeQuery) {
1286         for (const auto &query: queryList) {
1287             AddQuerySubscribe(storage, device, query);
1288         }
1289     }
1290 }
1291 
AddQuerySubscribe(SyncGenericInterface * storage,const std::string & device,const QuerySyncObject & query)1292 void SyncEngine::AddQuerySubscribe(SyncGenericInterface *storage, const std::string &device,
1293     const QuerySyncObject &query)
1294 {
1295     int errCode = storage->AddSubscribe(query.GetIdentify(), query, true);
1296     if (errCode != E_OK) {
1297         LOGW("[SyncEngine][AddSubscribe] Add trigger failed dev = %s queryId = %s",
1298             STR_MASK(device), STR_MASK(query.GetIdentify()));
1299         return;
1300     }
1301     errCode = subManager_->ReserveRemoteSubscribeQuery(device, query);
1302     if (errCode != E_OK) {
1303         if (!subManager_->IsQueryExistSubscribe(query.GetIdentify())) { // LCOV_EXCL_BR_LINE
1304             (void)storage->RemoveSubscribe(query.GetIdentify());
1305         }
1306         return;
1307     }
1308     subManager_->ActiveRemoteSubscribeQuery(device, query);
1309 }
1310 
TimeChange()1311 void SyncEngine::TimeChange()
1312 {
1313     std::vector<ISyncTaskContext *> decContext;
1314     {
1315         // copy context
1316         std::lock_guard<std::mutex> lock(contextMapLock_);
1317         for (const auto &iter : syncTaskContextMap_) {
1318             RefObject::IncObjRef(iter.second);
1319             decContext.push_back(iter.second);
1320         }
1321     }
1322     for (auto &iter : decContext) {
1323         iter->TimeChange();
1324         RefObject::DecObjRef(iter);
1325     }
1326 }
1327 
GetResponseTaskCount()1328 int32_t SyncEngine::GetResponseTaskCount()
1329 {
1330     std::vector<ISyncTaskContext *> decContext;
1331     {
1332         // copy context
1333         std::lock_guard<std::mutex> lock(contextMapLock_);
1334         for (const auto &iter : syncTaskContextMap_) {
1335             RefObject::IncObjRef(iter.second);
1336             decContext.push_back(iter.second);
1337         }
1338     }
1339     int32_t taskCount = 0;
1340     for (auto &iter : decContext) {
1341         taskCount += iter->GetResponseTaskCount();
1342         if (iter->IsSavingTask(GetTimeout(iter->GetDeviceId()))) {
1343             taskCount++;
1344         }
1345         RefObject::DecObjRef(iter);
1346     }
1347     {
1348         std::lock_guard<std::mutex> decLock(execTaskCountLock_);
1349         taskCount += static_cast<int32_t>(execTaskCount_);
1350     }
1351     return taskCount;
1352 }
1353 
ClearSyncInterface()1354 void SyncEngine::ClearSyncInterface()
1355 {
1356     ISyncInterface *syncInterface = nullptr;
1357     {
1358         std::lock_guard<std::mutex> autoLock(storageMutex_);
1359         if (syncInterface_ == nullptr) {
1360             return;
1361         }
1362         syncInterface = syncInterface_;
1363         syncInterface_ = nullptr;
1364     }
1365     syncInterface->DecRefCount();
1366 }
1367 
GetAndIncSyncInterface()1368 ISyncInterface *SyncEngine::GetAndIncSyncInterface()
1369 {
1370     std::lock_guard<std::mutex> autoLock(storageMutex_);
1371     if (syncInterface_ == nullptr) {
1372         return nullptr;
1373     }
1374     syncInterface_->IncRefCount();
1375     return syncInterface_;
1376 }
1377 
SetSyncInterface(ISyncInterface * syncInterface)1378 void SyncEngine::SetSyncInterface(ISyncInterface *syncInterface)
1379 {
1380     std::lock_guard<std::mutex> autoLock(storageMutex_);
1381     syncInterface_ = syncInterface;
1382 }
1383 
GetUserId(const ISyncInterface * syncInterface)1384 std::string SyncEngine::GetUserId(const ISyncInterface *syncInterface)
1385 {
1386     if (syncInterface == nullptr) {
1387         LOGW("[SyncEngine][GetUserId] sync interface has not initialized");
1388         return "";
1389     }
1390     std::string userId = syncInterface->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
1391     std::string subUserId = syncInterface->GetDbProperties().GetStringProp(DBProperties::SUB_USER, "");
1392     if (!subUserId.empty()) {
1393         userId += "-" + subUserId;
1394     }
1395     return userId;
1396 }
1397 
GetUserId()1398 std::string SyncEngine::GetUserId()
1399 {
1400     std::lock_guard<std::mutex> autoLock(storageMutex_);
1401     return GetUserId(syncInterface_);
1402 }
1403 
GetTimeout(const std::string & dev)1404 uint32_t SyncEngine::GetTimeout(const std::string &dev)
1405 {
1406     ICommunicator *communicator = nullptr;
1407     {
1408         std::lock_guard<std::mutex> autoLock(communicatorProxyLock_);
1409         if (communicatorProxy_ == nullptr) {
1410             LOGW("[SyncEngine] Communicator is null when get %.3s timeout", dev.c_str());
1411             return DBConstant::MIN_TIMEOUT;
1412         }
1413         communicator = communicatorProxy_;
1414         RefObject::IncObjRef(communicator);
1415     }
1416     uint32_t timeout = communicator->GetTimeout(dev);
1417     RefObject::DecObjRef(communicator);
1418     return timeout;
1419 }
1420 
GetTargetUserId(const std::string & dev)1421 std::string SyncEngine::GetTargetUserId(const std::string &dev)
1422 {
1423     std::string targetUserId;
1424     ICommunicator *communicator = nullptr;
1425     {
1426         std::lock_guard<std::mutex> autoLock(communicatorProxyLock_);
1427         if (communicatorProxy_ == nullptr) {
1428             LOGW("[SyncEngine] Communicator is null when get target user");
1429             return targetUserId;
1430         }
1431         communicator = communicatorProxy_;
1432         RefObject::IncObjRef(communicator);
1433     }
1434     DBProperties properties = syncInterface_->GetDbProperties();
1435     ExtendInfo extendInfo;
1436     extendInfo.appId = properties.GetStringProp(DBProperties::APP_ID, "");
1437     extendInfo.userId = properties.GetStringProp(DBProperties::USER_ID, "");
1438     extendInfo.storeId = properties.GetStringProp(DBProperties::STORE_ID, "");
1439     extendInfo.dstTarget = dev;
1440     extendInfo.subUserId = properties.GetStringProp(DBProperties::SUB_USER, "");
1441     targetUserId = communicator->GetTargetUserId(extendInfo);
1442     RefObject::DecObjRef(communicator);
1443     return targetUserId;
1444 }
1445 
RegCallbackOnInitComunicator(ICommunicatorAggregator * communicatorAggregator,const ISyncInterface * syncInterface)1446 int SyncEngine::RegCallbackOnInitComunicator(ICommunicatorAggregator *communicatorAggregator,
1447     const ISyncInterface *syncInterface)
1448 {
1449     int errCode = communicator_->RegOnMessageCallback(
1450         [this](const std::string &targetDev, Message *inMsg) {
1451             return MessageReciveCallback(targetDev, inMsg);
1452         }, []() {});
1453     if (errCode != E_OK) {
1454         LOGE("[SyncEngine] SyncRequestCallback register failed! err = %d", errCode);
1455         communicatorAggregator->ReleaseCommunicator(communicator_, GetUserId(syncInterface));
1456         communicator_ = nullptr;
1457         return errCode;
1458     }
1459     return E_OK;
1460 }
1461 
GetRemoteQueryTaskCount()1462 int32_t SyncEngine::GetRemoteQueryTaskCount()
1463 {
1464     auto executor = GetAndIncRemoteExector();
1465     if (executor == nullptr) {
1466         LOGW("[SyncEngine] RemoteExecutor is null when get remote query task count");
1467         RefObject::DecObjRef(executor);
1468         return 0;
1469     }
1470     auto count = executor->GetTaskCount();
1471     RefObject::DecObjRef(executor);
1472     return count;
1473 }
1474 
ExchangeClosePending(bool expected)1475 bool SyncEngine::ExchangeClosePending(bool expected)
1476 {
1477     if (communicator_ == nullptr) {
1478         return false;
1479     }
1480     auto communicator = communicator_;
1481     RefObject::IncObjRef(communicator);
1482     int res = communicator->ExchangeClosePending(expected);
1483     RefObject::DecObjRef(communicator);
1484     return res;
1485 }
1486 
CorrectTargetUserId(std::map<DeviceSyncTarget,ISyncTaskContext * >::iterator & it,bool isNeedCorrectUserId)1487 void SyncEngine::CorrectTargetUserId(std::map<DeviceSyncTarget, ISyncTaskContext *>::iterator &it,
1488     bool isNeedCorrectUserId)
1489 {
1490     if (!isNeedCorrectUserId) {
1491         return;
1492     }
1493     ISyncTaskContext *context = it->second;
1494     std::string targetDev = it->first.device;
1495     std::string newTargetUserId = GetTargetUserId(targetDev);
1496     it = syncTaskContextMap_.erase(it);
1497     context->SetTargetUserId(newTargetUserId);
1498     syncTaskContextMap_[{targetDev, newTargetUserId}] = context;
1499 }
1500 } // namespace DistributedDB
1501