• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_engine.h"
17 
18 #include <algorithm>
19 #include <deque>
20 #include <functional>
21 
22 #include "ability_sync.h"
23 #include "db_common.h"
24 #include "db_dump_helper.h"
25 #include "db_errno.h"
26 #include "device_manager.h"
27 #include "hash.h"
28 #include "isync_state_machine.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "single_ver_serialize_manager.h"
32 #include "subscribe_manager.h"
33 #include "time_sync.h"
34 
35 #ifndef OMIT_MULTI_VER
36 #include "commit_history_sync.h"
37 #include "multi_ver_data_sync.h"
38 #include "value_slice_sync.h"
39 #endif
40 
41 namespace DistributedDB {
42 int SyncEngine::queueCacheSize_ = 0;
43 int SyncEngine::maxQueueCacheSize_ = DEFAULT_CACHE_SIZE;
44 unsigned int SyncEngine::discardMsgNum_ = 0;
45 std::mutex SyncEngine::queueLock_;
46 
SyncEngine()47 SyncEngine::SyncEngine()
48     : syncInterface_(nullptr),
49       communicator_(nullptr),
50       deviceManager_(nullptr),
51       metadata_(nullptr),
52       execTaskCount_(0),
53       isSyncRetry_(false),
54       communicatorProxy_(nullptr),
55       isActive_(false),
56       remoteExecutor_(nullptr)
57 {
58 }
59 
~SyncEngine()60 SyncEngine::~SyncEngine()
61 {
62     LOGD("[SyncEngine] ~SyncEngine!");
63     ClearInnerResource();
64     equalIdentifierMap_.clear();
65     subManager_ = nullptr;
66     LOGD("[SyncEngine] ~SyncEngine ok!");
67 }
68 
Initialize(ISyncInterface * syncInterface,std::shared_ptr<Metadata> & metadata,const std::function<void (std::string)> & onRemoteDataChanged,const std::function<void (std::string)> & offlineChanged,const std::function<void (const InternalSyncParma & param)> & queryAutoSyncCallback)69 int SyncEngine::Initialize(ISyncInterface *syncInterface, std::shared_ptr<Metadata> &metadata,
70     const std::function<void(std::string)> &onRemoteDataChanged, const std::function<void(std::string)> &offlineChanged,
71     const std::function<void(const InternalSyncParma &param)> &queryAutoSyncCallback)
72 {
73     if ((syncInterface == nullptr) || (metadata == nullptr)) {
74         return -E_INVALID_ARGS;
75     }
76     int errCode = StartAutoSubscribeTimer();
77     if (errCode != OK) {
78         return errCode;
79     }
80     syncInterface_ = syncInterface;
81     errCode = InitComunicator(syncInterface);
82     if (errCode != E_OK) {
83         LOGE("[SyncEngine] Init Communicator failed");
84         // There need to set nullptr. other wise, syncInterface will be
85         // DecRef in th destroy-method.
86         StopAutoSubscribeTimer();
87         syncInterface_ = nullptr;
88         return errCode;
89     }
90     onRemoteDataChanged_ = onRemoteDataChanged;
91     offlineChanged_ = offlineChanged;
92     queryAutoSyncCallback_ = queryAutoSyncCallback;
93     errCode = InitInnerSource(onRemoteDataChanged, offlineChanged);
94     if (errCode != E_OK) {
95         // reset ptr if initialize device manager failed
96         syncInterface_ = nullptr;
97         StopAutoSubscribeTimer();
98         return errCode;
99     }
100     if (subManager_ == nullptr) {
101         subManager_ = std::make_shared<SubscribeManager>();
102     }
103     metadata_ = metadata;
104     isActive_ = true;
105     LOGI("[SyncEngine] Engine init ok");
106     return E_OK;
107 }
108 
Close()109 int SyncEngine::Close()
110 {
111     LOGI("[SyncEngine] SyncEngine[%s] close enter!", label_.c_str());
112     isActive_ = false;
113     UnRegCommunicatorsCallback();
114     StopAutoSubscribeTimer();
115 
116     // Clear SyncContexts
117     {
118         std::unique_lock<std::mutex> lock(contextMapLock_);
119         for (auto &iter : syncTaskContextMap_) {
120             ISyncTaskContext *tempContext = iter.second;
121             lock.unlock();
122             RefObject::KillAndDecObjRef(tempContext);
123             tempContext = nullptr;
124             lock.lock();
125             iter.second = nullptr;
126         }
127         syncTaskContextMap_.clear();
128     }
129 
130     WaitingExecTaskExist();
131     ReleaseCommunicators();
132     std::lock_guard<std::mutex> msgLock(queueLock_);
133     while (!msgQueue_.empty()) {
134         Message *inMsg = msgQueue_.front();
135         msgQueue_.pop_front();
136         if (inMsg != nullptr) {
137             queueCacheSize_ -= GetMsgSize(inMsg);
138             delete inMsg;
139             inMsg = nullptr;
140         }
141     }
142     // close db, rekey or import scene, need clear all remote query info
143     // local query info will destroy with syncEngine destruct
144     if (subManager_ != nullptr) {
145         subManager_->ClearAllRemoteQuery();
146     }
147 
148     RemoteExecutor *executor = GetAndIncRemoteExector();
149     if (executor != nullptr) {
150         executor->Close();
151         RefObject::DecObjRef(executor);
152     }
153     ClearInnerResource();
154     LOGI("[SyncEngine] SyncEngine closed!");
155     return E_OK;
156 }
157 
AddSyncOperation(SyncOperation * operation)158 int SyncEngine::AddSyncOperation(SyncOperation *operation)
159 {
160     if (operation == nullptr) {
161         LOGE("[SyncEngine] operation is nullptr");
162         return -E_INVALID_ARGS;
163     }
164 
165     std::vector<std::string> devices = operation->GetDevices();
166     std::string localDeviceId;
167     int errCode = GetLocalDeviceId(localDeviceId);
168     for (const auto &deviceId : devices) {
169         if (errCode != E_OK) {
170             operation->SetStatus(deviceId, errCode == -E_BUSY ?
171                 SyncOperation::OP_BUSY_FAILURE : SyncOperation::OP_FAILED);
172             continue;
173         }
174         if (!CheckDeviceIdValid(deviceId, localDeviceId)) {
175             operation->SetStatus(deviceId, SyncOperation::OP_INVALID_ARGS);
176             continue;
177         }
178         operation->SetStatus(deviceId, SyncOperation::OP_WAITING);
179         if (AddSyncOperForContext(deviceId, operation) != E_OK) {
180             operation->SetStatus(deviceId, SyncOperation::OP_FAILED);
181         }
182     }
183     return E_OK;
184 }
185 
RemoveSyncOperation(int syncId)186 void SyncEngine::RemoveSyncOperation(int syncId)
187 {
188     std::lock_guard<std::mutex> lock(contextMapLock_);
189     for (auto &iter : syncTaskContextMap_) {
190         ISyncTaskContext *context = iter.second;
191         if (context != nullptr) {
192             context->RemoveSyncOperation(syncId);
193         }
194     }
195 }
196 
BroadCastDataChanged() const197 void SyncEngine::BroadCastDataChanged() const
198 {
199     if (deviceManager_ != nullptr) {
200         (void)deviceManager_->SendBroadCast(LOCAL_DATA_CHANGED);
201     }
202 }
203 
RegConnectCallback()204 void SyncEngine::RegConnectCallback()
205 {
206     if (communicator_ == nullptr) {
207         LOGE("[SyncEngine][RegConnCB] communicator is not set!");
208         return;
209     }
210     LOGD("[SyncEngine] RegOnConnectCallback");
211     int errCode = communicator_->RegOnConnectCallback(
212         std::bind(&DeviceManager::OnDeviceConnectCallback, deviceManager_,
213             std::placeholders::_1, std::placeholders::_2), nullptr);
214     if (errCode != E_OK) {
215         LOGE("[SyncEngine][RegConnCB] register failed, auto sync can not use! err %d", errCode);
216         return;
217     }
218     communicator_->Activate();
219 }
220 
GetOnlineDevices(std::vector<std::string> & devices) const221 void SyncEngine::GetOnlineDevices(std::vector<std::string> &devices) const
222 {
223     devices.clear();
224     if (deviceManager_ != nullptr) {
225         deviceManager_->GetOnlineDevices(devices);
226     }
227 }
228 
InitInnerSource(const std::function<void (std::string)> & onRemoteDataChanged,const std::function<void (std::string)> & offlineChanged)229 int SyncEngine::InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged,
230     const std::function<void(std::string)> &offlineChanged)
231 {
232     deviceManager_ = new (std::nothrow) DeviceManager();
233     if (deviceManager_ == nullptr) {
234         LOGE("[SyncEngine] deviceManager alloc failed!");
235         return -E_OUT_OF_MEMORY;
236     }
237     auto executor = new (std::nothrow) RemoteExecutor();
238     if (executor == nullptr) {
239         LOGE("[SyncEngine] remoteExecutor alloc failed!");
240         delete deviceManager_;
241         deviceManager_ = nullptr;
242         return -E_OUT_OF_MEMORY;
243     }
244 
245     int errCode = E_OK;
246     do {
247         errCode = deviceManager_->Initialize(communicatorProxy_, onRemoteDataChanged, offlineChanged);
248         if (errCode != E_OK) {
249             LOGE("[SyncEngine] deviceManager init failed! err %d", errCode);
250             break;
251         }
252         errCode = executor->Initialize(syncInterface_, communicator_);
253         SetRemoteExector(executor);
254     } while (false);
255     if (errCode != E_OK) {
256         delete deviceManager_;
257         deviceManager_ = nullptr;
258         delete executor;
259         executor = nullptr;
260     }
261     return errCode;
262 }
263 
InitComunicator(const ISyncInterface * syncInterface)264 int SyncEngine::InitComunicator(const ISyncInterface *syncInterface)
265 {
266     ICommunicatorAggregator *communicatorAggregator = nullptr;
267     int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
268     if (communicatorAggregator == nullptr) {
269         LOGE("[SyncEngine] Get ICommunicatorAggregator error when init the sync engine err = %d", errCode);
270         return errCode;
271     }
272     std::vector<uint8_t> label = syncInterface->GetIdentifier();
273     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
274     if (isSyncDualTupleMode) {
275         std::vector<uint8_t> dualTuplelabel = syncInterface->GetDualTupleIdentifier();
276         LOGI("[SyncEngine] dual tuple mode, original identifier=%.6s, target identifier=%.6s", VEC_TO_STR(label),
277             VEC_TO_STR(dualTuplelabel));
278         communicator_ = communicatorAggregator->AllocCommunicator(dualTuplelabel, errCode);
279     } else {
280         communicator_ = communicatorAggregator->AllocCommunicator(label, errCode);
281     }
282     if (communicator_ == nullptr) {
283         LOGE("[SyncEngine] AllocCommunicator error when init the sync engine! err = %d", errCode);
284         return errCode;
285     }
286 
287     errCode = communicator_->RegOnMessageCallback(
288         std::bind(&SyncEngine::MessageReciveCallback, this, std::placeholders::_1, std::placeholders::_2),
289         []() {});
290     if (errCode != E_OK) {
291         LOGE("[SyncEngine] SyncRequestCallback register failed! err = %d", errCode);
292         communicatorAggregator->ReleaseCommunicator(communicator_);
293         communicator_ = nullptr;
294         return errCode;
295     }
296 
297     communicatorProxy_ = new (std::nothrow) CommunicatorProxy();
298     if (communicatorProxy_ == nullptr) {
299         communicatorAggregator->ReleaseCommunicator(communicator_);
300         communicator_ = nullptr;
301         return -E_OUT_OF_MEMORY;
302     }
303 
304     communicatorProxy_->SetMainCommunicator(communicator_);
305     label.resize(3); // only show 3 Bytes enough
306     label_ = DBCommon::VectorToHexString(label);
307     LOGD("[SyncEngine] RegOnConnectCallback");
308     return errCode;
309 }
310 
AddSyncOperForContext(const std::string & deviceId,SyncOperation * operation)311 int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation)
312 {
313     int errCode = E_OK;
314     ISyncTaskContext *context = nullptr;
315     {
316         std::lock_guard<std::mutex> lock(contextMapLock_);
317         context = FindSyncTaskContext(deviceId);
318         if (context == nullptr) {
319             if (!IsKilled()) {
320                 context = GetSyncTaskContext(deviceId, errCode);
321             }
322             if (context == nullptr) {
323                 return errCode;
324             }
325         }
326         if (context->IsKilled()) {
327             return -E_OBJ_IS_KILLED;
328         }
329         // IncRef for SyncEngine to make sure context is valid, to avoid a big lock
330         RefObject::IncObjRef(context);
331     }
332 
333     errCode = context->AddSyncOperation(operation);
334     RefObject::DecObjRef(context);
335     return errCode;
336 }
337 
MessageReciveCallbackTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)338 void SyncEngine::MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator,
339     Message *inMsg)
340 {
341     std::string deviceId = context->GetDeviceId();
342 
343     if (inMsg->GetMessageId() != LOCAL_DATA_CHANGED) {
344         int errCode = context->ReceiveMessageCallback(inMsg);
345         if (errCode == -E_NOT_NEED_DELETE_MSG) {
346             goto MSG_CALLBACK_OUT_NOT_DEL;
347         }
348         // add auto sync here while recv subscribe request
349         QuerySyncObject syncObject;
350         if (errCode == E_OK && context->IsNeedTriggerQueryAutoSync(inMsg, syncObject)) {
351             InternalSyncParma param;
352             GetQueryAutoSyncParam(deviceId, syncObject, param);
353             queryAutoSyncCallback_(param);
354         }
355     }
356 
357     delete inMsg;
358     inMsg = nullptr;
359 MSG_CALLBACK_OUT_NOT_DEL:
360     ScheduleTaskOut(context, communicator);
361 }
362 
RemoteDataChangedTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)363 void SyncEngine::RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg)
364 {
365     do {
366         std::string deviceId = context->GetDeviceId();
367         if (onRemoteDataChanged_ && deviceManager_->IsDeviceOnline(deviceId)) {
368             onRemoteDataChanged_(deviceId);
369         } else {
370             LOGE("[SyncEngine] onRemoteDataChanged is null!");
371         }
372     } while (false);
373     delete inMsg;
374     inMsg = nullptr;
375     ScheduleTaskOut(context, communicator);
376 }
377 
ScheduleTaskOut(ISyncTaskContext * context,const ICommunicator * communicator)378 void SyncEngine::ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator)
379 {
380     (void)DealMsgUtilQueueEmpty();
381     DecExecTaskCount();
382     RefObject::DecObjRef(communicator);
383     RefObject::DecObjRef(context);
384 }
385 
DealMsgUtilQueueEmpty()386 int SyncEngine::DealMsgUtilQueueEmpty()
387 {
388     if (!isActive_) {
389         return -E_BUSY; // db is closing just return
390     }
391     int errCode = E_OK;
392     Message *inMsg = nullptr;
393     {
394         std::lock_guard<std::mutex> lock(queueLock_);
395         if (msgQueue_.empty()) {
396             return errCode;
397         }
398         inMsg = msgQueue_.front();
399         msgQueue_.pop_front();
400         queueCacheSize_ -= GetMsgSize(inMsg);
401     }
402 
403     IncExecTaskCount();
404     // it will deal with the first message in queue, we should increase object reference counts and sure that resources
405     // could be prevented from destroying by other threads.
406     do {
407         ISyncTaskContext *nextContext = GetConextForMsg(inMsg->GetTarget(), errCode);
408         if (errCode != E_OK) {
409             break;
410         }
411         errCode = ScheduleDealMsg(nextContext, inMsg);
412         if (errCode != E_OK) {
413             RefObject::DecObjRef(nextContext);
414         }
415     } while (false);
416     if (errCode != E_OK) {
417         delete inMsg;
418         inMsg = nullptr;
419         DecExecTaskCount();
420     }
421     return errCode;
422 }
423 
GetConextForMsg(const std::string & targetDev,int & errCode)424 ISyncTaskContext *SyncEngine::GetConextForMsg(const std::string &targetDev, int &errCode)
425 {
426     ISyncTaskContext *context = nullptr;
427     {
428         std::lock_guard<std::mutex> lock(contextMapLock_);
429         context = FindSyncTaskContext(targetDev);
430         if (context != nullptr) {
431             if (context->IsKilled()) {
432                 errCode = -E_OBJ_IS_KILLED;
433                 return nullptr;
434             }
435         } else {
436             if (IsKilled()) {
437                 errCode = -E_OBJ_IS_KILLED;
438                 return nullptr;
439             }
440             context = GetSyncTaskContext(targetDev, errCode);
441             if (context == nullptr) {
442                 return nullptr;
443             }
444         }
445         // IncRef for context to make sure context is valid, when task run another thread
446         RefObject::IncObjRef(context);
447     }
448     return context;
449 }
450 
ScheduleDealMsg(ISyncTaskContext * context,Message * inMsg)451 int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg)
452 {
453     if (inMsg == nullptr) {
454         LOGE("[SyncEngine] MessageReciveCallback inMsg is null!");
455         DecExecTaskCount();
456         return E_OK;
457     }
458     RefObject::IncObjRef(communicatorProxy_);
459     int errCode = E_OK;
460     // deal remote local data changed message
461     if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
462         RemoteDataChangedTask(context, communicatorProxy_, inMsg);
463     } else {
464         errCode = RuntimeContext::GetInstance()->ScheduleTask(std::bind(&SyncEngine::MessageReciveCallbackTask,
465             this, context, communicatorProxy_, inMsg));
466     }
467 
468     if (errCode != E_OK) {
469         LOGE("[SyncEngine] MessageReciveCallbackTask Schedule failed err %d", errCode);
470         RefObject::DecObjRef(communicatorProxy_);
471     }
472     return errCode;
473 }
474 
MessageReciveCallback(const std::string & targetDev,Message * inMsg)475 void SyncEngine::MessageReciveCallback(const std::string &targetDev, Message *inMsg)
476 {
477     IncExecTaskCount();
478     int errCode = MessageReciveCallbackInner(targetDev, inMsg);
479     if (errCode != E_OK) {
480         delete inMsg;
481         inMsg = nullptr;
482         DecExecTaskCount();
483         LOGE("[SyncEngine] MessageReciveCallback failed!");
484     }
485 }
486 
MessageReciveCallbackInner(const std::string & targetDev,Message * inMsg)487 int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg)
488 {
489     if (targetDev.empty() || inMsg == nullptr) {
490         LOGE("[SyncEngine][MessageReciveCallback] from a invalid device or inMsg is null ");
491         return -E_INVALID_ARGS;
492     }
493     if (!isActive_) {
494         LOGE("[SyncEngine] engine is closing, ignore msg");
495         return -E_BUSY;
496     }
497     RemoteExecutor *executor = GetAndIncRemoteExector();
498     if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE && executor != nullptr) {
499         int errCode = executor->ReceiveMessage(targetDev, inMsg);
500         RefObject::DecObjRef(executor);
501         DecExecTaskCount();
502         return errCode;
503     } else if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE) {
504         DecExecTaskCount();
505         return -E_BUSY;
506     }
507     int msgSize = 0;
508     if (!IsSkipCalculateLen(inMsg)) {
509         msgSize = GetMsgSize(inMsg);
510         if (msgSize <= 0) {
511             LOGE("[SyncEngine] GetMsgSize makes a mistake");
512             return -E_NOT_SUPPORT;
513         }
514     }
515 
516     {
517         std::lock_guard<std::mutex> lock(queueLock_);
518         if ((queueCacheSize_ + msgSize) > maxQueueCacheSize_) {
519             LOGE("[SyncEngine] The size of message queue is beyond maximum");
520             discardMsgNum_++;
521             return -E_BUSY;
522         }
523 
524         if (execTaskCount_ > MAX_EXEC_NUM) {
525             PutMsgIntoQueue(targetDev, inMsg, msgSize);
526             // task dont exec here
527             DecExecTaskCount();
528             return E_OK;
529         }
530     }
531 
532     int errCode = E_OK;
533     ISyncTaskContext *nextContext = GetConextForMsg(targetDev, errCode);
534     if (errCode != E_OK) {
535         return errCode;
536     }
537     LOGD("[SyncEngine] MessageReciveCallback MSG ID = %d", inMsg->GetMessageId());
538     return ScheduleDealMsg(nextContext, inMsg);
539 }
540 
PutMsgIntoQueue(const std::string & targetDev,Message * inMsg,int msgSize)541 void SyncEngine::PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize)
542 {
543     if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
544         auto iter = std::find_if(msgQueue_.begin(), msgQueue_.end(),
545             [&targetDev](const Message *msg) {
546                 return targetDev == msg->GetTarget() && msg->GetMessageId() == LOCAL_DATA_CHANGED;
547             });
548         if (iter != msgQueue_.end()) {
549             delete inMsg;
550             inMsg = nullptr;
551             return;
552         }
553     }
554     inMsg->SetTarget(targetDev);
555     msgQueue_.push_back(inMsg);
556     queueCacheSize_ += msgSize;
557     LOGE("[SyncEngine] The quantity of executing threads is beyond maximum. msgQueueSize = %zu", msgQueue_.size());
558 }
559 
GetMsgSize(const Message * inMsg) const560 int SyncEngine::GetMsgSize(const Message *inMsg) const
561 {
562     switch (inMsg->GetMessageId()) {
563         case TIME_SYNC_MESSAGE:
564             return TimeSync::CalculateLen(inMsg);
565         case ABILITY_SYNC_MESSAGE:
566             return AbilitySync::CalculateLen(inMsg);
567         case DATA_SYNC_MESSAGE:
568         case QUERY_SYNC_MESSAGE:
569         case CONTROL_SYNC_MESSAGE:
570             return SingleVerSerializeManager::CalculateLen(inMsg);
571 #ifndef OMIT_MULTI_VER
572         case COMMIT_HISTORY_SYNC_MESSAGE:
573             return CommitHistorySync::CalculateLen(inMsg);
574         case MULTI_VER_DATA_SYNC_MESSAGE:
575             return MultiVerDataSync::CalculateLen(inMsg);
576         case VALUE_SLICE_SYNC_MESSAGE:
577             return ValueSliceSync::CalculateLen(inMsg);
578 #endif
579         case LOCAL_DATA_CHANGED:
580             return DeviceManager::CalculateLen();
581         default:
582             LOGE("[SyncEngine] GetMsgSize not support msgId:%u", inMsg->GetMessageId());
583             return -E_NOT_SUPPORT;
584     }
585 }
586 
FindSyncTaskContext(const std::string & deviceId)587 ISyncTaskContext *SyncEngine::FindSyncTaskContext(const std::string &deviceId)
588 {
589     auto iter = syncTaskContextMap_.find(deviceId);
590     if (iter != syncTaskContextMap_.end()) {
591         ISyncTaskContext *context = iter->second;
592         return context;
593     }
594     return nullptr;
595 }
596 
GetSyncTaskContextAndInc(const std::string & deviceId)597 ISyncTaskContext *SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId)
598 {
599     ISyncTaskContext *context = nullptr;
600     std::lock_guard<std::mutex> lock(contextMapLock_);
601     context = FindSyncTaskContext(deviceId);
602     if (context == nullptr) {
603         LOGI("[SyncEngine] dev=%s, context is null, no need to clear sync operation", STR_MASK(deviceId));
604         return nullptr;
605     }
606     if (context->IsKilled()) {
607         LOGI("[SyncEngine] context is killing");
608         return nullptr;
609     }
610     RefObject::IncObjRef(context);
611     return context;
612 }
613 
GetSyncTaskContext(const std::string & deviceId,int & errCode)614 ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, int &errCode)
615 {
616     ISyncTaskContext *context = CreateSyncTaskContext();
617     if (context == nullptr) {
618         errCode = -E_OUT_OF_MEMORY;
619         LOGE("[SyncEngine] SyncTaskContext alloc failed, may be no memory available!");
620         return nullptr;
621     }
622     errCode = context->Initialize(deviceId, syncInterface_, metadata_, communicatorProxy_);
623     if (errCode != E_OK) {
624         LOGE("[SyncEngine] context init failed err %d, dev %s", errCode, STR_MASK(deviceId));
625         RefObject::DecObjRef(context);
626         context = nullptr;
627         return nullptr;
628     }
629     syncTaskContextMap_.insert(std::pair<std::string, ISyncTaskContext *>(deviceId, context));
630     // IncRef for SyncEngine to make sure SyncEngine is valid when context access
631     RefObject::IncObjRef(this);
632     auto storage = syncInterface_;
633     if (storage != nullptr) {
634         storage->IncRefCount();
635     }
636     context->OnLastRef([this, deviceId, storage]() {
637         LOGD("[SyncEngine] SyncTaskContext for id %s finalized", STR_MASK(deviceId));
638         RefObject::DecObjRef(this);
639         if (storage != nullptr) {
640             storage->DecRefCount();
641         }
642     });
643     context->RegOnSyncTask(std::bind(&SyncEngine::ExecSyncTask, this, context));
644     return context;
645 }
646 
ExecSyncTask(ISyncTaskContext * context)647 int SyncEngine::ExecSyncTask(ISyncTaskContext *context)
648 {
649     if (IsKilled()) {
650         return -E_OBJ_IS_KILLED;
651     }
652 
653     AutoLock lockGuard(context);
654     int status = context->GetTaskExecStatus();
655     if ((status == SyncTaskContext::RUNNING) || context->IsKilled()) {
656         return -E_NOT_SUPPORT;
657     }
658     context->SetTaskExecStatus(ISyncTaskContext::RUNNING);
659     if (!context->IsTargetQueueEmpty()) {
660         int errCode = context->GetNextTarget(true);
661         if (errCode != E_OK) {
662             return errCode;
663         }
664         context->UnlockObj();
665         errCode = context->StartStateMachine();
666         context->LockObj();
667         if (errCode != E_OK) {
668             LOGE("[SyncEngine] machine StartSync failed");
669             context->SetOperationStatus(SyncOperation::OP_FAILED);
670             return errCode;
671         }
672     } else {
673         LOGD("[SyncEngine] ExecSyncTask finished");
674         context->SetTaskExecStatus(ISyncTaskContext::FINISHED);
675     }
676     return E_OK;
677 }
678 
GetQueueCacheSize() const679 int SyncEngine::GetQueueCacheSize() const
680 {
681     return queueCacheSize_;
682 }
683 
GetDiscardMsgNum() const684 unsigned int SyncEngine::GetDiscardMsgNum() const
685 {
686     return discardMsgNum_;
687 }
688 
GetMaxExecNum() const689 unsigned int SyncEngine::GetMaxExecNum() const
690 {
691     return MAX_EXEC_NUM;
692 }
693 
SetMaxQueueCacheSize(int value)694 void SyncEngine::SetMaxQueueCacheSize(int value)
695 {
696     maxQueueCacheSize_ = value;
697 }
698 
GetLabel() const699 std::string SyncEngine::GetLabel() const
700 {
701     return label_;
702 }
703 
GetSyncRetry() const704 bool SyncEngine::GetSyncRetry() const
705 {
706     return isSyncRetry_;
707 }
708 
SetSyncRetry(bool isRetry)709 void SyncEngine::SetSyncRetry(bool isRetry)
710 {
711     if (isSyncRetry_ == isRetry) {
712         LOGI("sync retry is equal, syncTry=%d, no need to set.", isRetry);
713         return;
714     }
715     isSyncRetry_ = isRetry;
716     LOGI("[SyncEngine] SetSyncRetry:%d ok", isRetry);
717     std::lock_guard<std::mutex> lock(contextMapLock_);
718     for (auto &iter : syncTaskContextMap_) {
719         ISyncTaskContext *context = iter.second;
720         if (context != nullptr) {
721             context->SetSyncRetry(isRetry);
722         }
723     }
724 }
725 
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)726 int SyncEngine::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
727 {
728     if (!isActive_) {
729         LOGI("[SyncEngine] engine is closed, just put into map");
730         return E_OK;
731     }
732     ICommunicator *communicator = nullptr;
733     {
734         std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
735         if (equalCommunicators_.count(identifier) != 0) {
736             communicator = equalCommunicators_[identifier];
737         } else {
738             int errCode = E_OK;
739             communicator = AllocCommunicator(identifier, errCode);
740             if (communicator == nullptr) {
741                 return errCode;
742             }
743             equalCommunicators_[identifier] = communicator;
744         }
745     }
746     std::string targetDevices;
747     for (const auto &dev : targets) {
748         targetDevices += DBCommon::StringMasking(dev) + ",";
749     }
750     LOGI("[SyncEngine] set equal identifier=%s, original=%s, targetDevices=%s",
751         DBCommon::TransferStringToHex(identifier).c_str(), label_.c_str(),
752         targetDevices.substr(0, targetDevices.size() - 1).c_str());
753     communicatorProxy_->SetEqualCommunicator(communicator, identifier, targets);
754     communicator->Activate();
755     return E_OK;
756 }
757 
SetEqualIdentifier()758 void SyncEngine::SetEqualIdentifier()
759 {
760     std::map<std::string, std::vector<std::string>> equalIdentifier; // key: equalIdentifier value: devices
761     for (auto &item : equalIdentifierMap_) {
762         if (equalIdentifier.find(item.second) == equalIdentifier.end()) {
763             equalIdentifier[item.second] = {item.first};
764         } else {
765             equalIdentifier[item.second].push_back(item.first);
766         }
767     }
768     for (const auto &item : equalIdentifier) {
769         SetEqualIdentifier(item.first, item.second);
770     }
771 }
772 
SetEqualIdentifierMap(const std::string & identifier,const std::vector<std::string> & targets)773 void SyncEngine::SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets)
774 {
775     for (auto iter = equalIdentifierMap_.begin(); iter != equalIdentifierMap_.end();) {
776         if (identifier == iter->second) {
777             iter = equalIdentifierMap_.erase(iter);
778             continue;
779         }
780         iter++;
781     }
782     for (const auto &device : targets) {
783         equalIdentifierMap_[device] = identifier;
784     }
785 }
786 
OfflineHandleByDevice(const std::string & deviceId)787 void SyncEngine::OfflineHandleByDevice(const std::string &deviceId)
788 {
789     if (communicatorProxy_ == nullptr) {
790         return;
791     }
792 
793     RemoteExecutor *executor = GetAndIncRemoteExector();
794     if (executor != nullptr) {
795         executor->NotifyDeviceOffline(deviceId);
796         RefObject::DecObjRef(executor);
797     }
798     // db closed or device is offline
799     // clear remote subscribe and trigger
800     std::vector<std::string> remoteQueryId;
801     subManager_->GetRemoteSubscribeQueryIds(deviceId, remoteQueryId);
802     subManager_->ClearRemoteSubscribeQuery(deviceId);
803     static_cast<SingleVerKvDBSyncInterface *>(syncInterface_)->RemoveSubscribe(remoteQueryId);
804     // get context and Inc context if context is not nullprt
805     ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId);
806     if (context != nullptr) {
807         context->SetIsNeedResetAbilitySync(true);
808     }
809     if (communicatorProxy_->IsDeviceOnline(deviceId)) {
810         LOGI("[SyncEngine] target dev=%s is online, no need to clear task.", STR_MASK(deviceId));
811         RefObject::DecObjRef(context);
812         return;
813     }
814     // means device is offline, clear local subscribe
815     subManager_->ClearLocalSubscribeQuery(deviceId);
816     // clear sync task
817     if (context != nullptr) {
818         context->ClearAllSyncTask();
819         RefObject::DecObjRef(context);
820     }
821 }
822 
GetLocalSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)823 void SyncEngine::GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
824 {
825     subManager_->GetLocalSubscribeQueries(device, subscribeQueries);
826 }
827 
GetRemoteSubscribeQueryIds(const std::string & device,std::vector<std::string> & subscribeQueryIds)828 void SyncEngine::GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds)
829 {
830     subManager_->GetRemoteSubscribeQueryIds(device, subscribeQueryIds);
831 }
832 
GetRemoteSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)833 void SyncEngine::GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
834 {
835     subManager_->GetRemoteSubscribeQueries(device, subscribeQueries);
836 }
837 
PutUnfiniedSubQueries(const std::string & device,const std::vector<QuerySyncObject> & subscribeQueries)838 void SyncEngine::PutUnfiniedSubQueries(const std::string &device, const std::vector<QuerySyncObject> &subscribeQueries)
839 {
840     subManager_->PutLocalUnFiniedSubQueries(device, subscribeQueries);
841 }
842 
GetAllUnFinishSubQueries(std::map<std::string,std::vector<QuerySyncObject>> & allSyncQueries)843 void SyncEngine::GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries)
844 {
845     subManager_->GetAllUnFinishSubQueries(allSyncQueries);
846 }
847 
AllocCommunicator(const std::string & identifier,int & errCode)848 ICommunicator *SyncEngine::AllocCommunicator(const std::string &identifier, int &errCode)
849 {
850     ICommunicatorAggregator *communicatorAggregator = nullptr;
851     errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
852     if (communicatorAggregator == nullptr) {
853         LOGE("[SyncEngine] Get ICommunicatorAggregator error when SetEqualIdentifier err = %d", errCode);
854         return nullptr;
855     }
856     std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
857     auto communicator = communicatorAggregator->AllocCommunicator(identifierVect, errCode);
858     if (communicator == nullptr) {
859         LOGE("[SyncEngine] AllocCommunicator error when SetEqualIdentifier! err = %d", errCode);
860         return communicator;
861     }
862 
863     errCode = communicator->RegOnMessageCallback(
864         std::bind(&SyncEngine::MessageReciveCallback, this, std::placeholders::_1, std::placeholders::_2),
865         []() {});
866     if (errCode != E_OK) {
867         LOGE("[SyncEngine] SyncRequestCallback register failed in SetEqualIdentifier! err = %d", errCode);
868         communicatorAggregator->ReleaseCommunicator(communicator);
869         return nullptr;
870     }
871 
872     errCode = communicator->RegOnConnectCallback(
873         std::bind(&DeviceManager::OnDeviceConnectCallback, deviceManager_,
874             std::placeholders::_1, std::placeholders::_2), nullptr);
875     if (errCode != E_OK) {
876         LOGE("[SyncEngine][RegConnCB] register failed in SetEqualIdentifier! err %d", errCode);
877         communicator->RegOnMessageCallback(nullptr, nullptr);
878         communicatorAggregator->ReleaseCommunicator(communicator);
879         return nullptr;
880     }
881 
882     return communicator;
883 }
884 
UnRegCommunicatorsCallback()885 void SyncEngine::UnRegCommunicatorsCallback()
886 {
887     if (communicator_ != nullptr) {
888         communicator_->RegOnMessageCallback(nullptr, nullptr);
889         communicator_->RegOnConnectCallback(nullptr, nullptr);
890         communicator_->RegOnSendableCallback(nullptr, nullptr);
891     }
892     std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
893     for (const auto &iter : equalCommunicators_) {
894         iter.second->RegOnMessageCallback(nullptr, nullptr);
895         iter.second->RegOnConnectCallback(nullptr, nullptr);
896         iter.second->RegOnSendableCallback(nullptr, nullptr);
897     }
898 }
899 
ReleaseCommunicators()900 void SyncEngine::ReleaseCommunicators()
901 {
902     RefObject::KillAndDecObjRef(communicatorProxy_);
903     communicatorProxy_ = nullptr;
904     ICommunicatorAggregator *communicatorAggregator = nullptr;
905     int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
906     if (communicatorAggregator == nullptr) {
907         LOGF("[SyncEngine] ICommunicatorAggregator get failed when fialize SyncEngine err %d", errCode);
908         return;
909     }
910 
911     if (communicator_ != nullptr) {
912         communicatorAggregator->ReleaseCommunicator(communicator_);
913         communicator_ = nullptr;
914     }
915 
916     std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
917     for (auto &iter : equalCommunicators_) {
918         communicatorAggregator->ReleaseCommunicator(iter.second);
919     }
920     equalCommunicators_.clear();
921 }
922 
IsSkipCalculateLen(const Message * inMsg)923 bool SyncEngine::IsSkipCalculateLen(const Message *inMsg)
924 {
925     if (inMsg->IsFeedbackError()) {
926         LOGE("[SyncEngine] Feedback Message with errorNo=%u.", inMsg->GetErrorNo());
927         return true;
928     }
929     return false;
930 }
931 
GetSubscribeSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)932 void SyncEngine::GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query,
933     InternalSyncParma &outParam)
934 {
935     outParam.devices = { device };
936     outParam.mode = SyncModeType::AUTO_SUBSCRIBE_QUERY;
937     outParam.isQuerySync = true;
938     outParam.syncQuery = query;
939 }
940 
GetQueryAutoSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)941 void SyncEngine::GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query,
942     InternalSyncParma &outParam)
943 {
944     outParam.devices = { device };
945     outParam.mode = SyncModeType::AUTO_PUSH;
946     outParam.isQuerySync = true;
947     outParam.syncQuery = query;
948 }
949 
StartAutoSubscribeTimer()950 int SyncEngine::StartAutoSubscribeTimer()
951 {
952     return E_OK;
953 }
954 
StopAutoSubscribeTimer()955 void SyncEngine::StopAutoSubscribeTimer()
956 {
957 }
958 
SubscribeLimitCheck(const std::vector<std::string> & devices,QuerySyncObject & query) const959 int SyncEngine::SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
960 {
961     return subManager_->LocalSubscribeLimitCheck(devices, query);
962 }
963 
964 
ClearInnerResource()965 void SyncEngine::ClearInnerResource()
966 {
967     if (syncInterface_ != nullptr) {
968         syncInterface_->DecRefCount();
969         syncInterface_ = nullptr;
970     }
971     if (deviceManager_ != nullptr) {
972         delete deviceManager_;
973         deviceManager_ = nullptr;
974     }
975     communicator_ = nullptr;
976     metadata_ = nullptr;
977     onRemoteDataChanged_ = nullptr;
978     offlineChanged_ = nullptr;
979     queryAutoSyncCallback_ = nullptr;
980     std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
981     if (remoteExecutor_ != nullptr) {
982         RefObject::KillAndDecObjRef(remoteExecutor_);
983         remoteExecutor_ = nullptr;
984     }
985 }
986 
IsEngineActive() const987 bool SyncEngine::IsEngineActive() const
988 {
989     return isActive_;
990 }
991 
SchemaChange()992 void SyncEngine::SchemaChange()
993 {
994     std::lock_guard<std::mutex> lock(contextMapLock_);
995     for (const auto &entry : syncTaskContextMap_) {
996         auto context = entry.second;
997         if (context->IsKilled()) {
998             continue;
999         }
1000         // IncRef for SyncEngine to make sure context is valid, to avoid a big lock
1001         context->SchemaChange();
1002     }
1003 }
1004 
IncExecTaskCount()1005 void SyncEngine::IncExecTaskCount()
1006 {
1007     std::lock_guard<std::mutex> incLock(execTaskCountLock_);
1008     execTaskCount_++;
1009 }
1010 
DecExecTaskCount()1011 void SyncEngine::DecExecTaskCount()
1012 {
1013     {
1014         std::lock_guard<std::mutex> decLock(execTaskCountLock_);
1015         execTaskCount_--;
1016     }
1017     execTaskCv_.notify_all();
1018 }
1019 
Dump(int fd)1020 void SyncEngine::Dump(int fd)
1021 {
1022     std::string communicatorLabel;
1023     if (communicatorProxy_ != nullptr) {
1024         communicatorProxy_->GetLocalIdentity(communicatorLabel);
1025     }
1026     DBDumpHelper::Dump(fd, "\tcommunicator label = %s, equalIdentify Info [\n", communicatorLabel.c_str());
1027     if (communicatorProxy_ != nullptr) {
1028         communicatorProxy_->GetLocalIdentity(communicatorLabel);
1029         communicatorProxy_->Dump(fd);
1030     }
1031     DBDumpHelper::Dump(fd, "\t]\n\tcontext info [\n");
1032     // dump context info
1033     std::lock_guard<std::mutex> autoLock(contextMapLock_);
1034     for (const auto &entry : syncTaskContextMap_) {
1035         entry.second->Dump(fd);
1036     }
1037     DBDumpHelper::Dump(fd, "\t]\n\n");
1038 }
1039 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)1040 int SyncEngine::RemoteQuery(const std::string &device, const RemoteCondition &condition,
1041     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
1042 {
1043     RemoteExecutor *executor = GetAndIncRemoteExector();
1044     if (!isActive_ || executor == nullptr) {
1045         return -E_BUSY; // db is closing just return
1046     }
1047     int errCode = executor->RemoteQuery(device, condition, timeout, connectionId, result);
1048     RefObject::DecObjRef(executor);
1049     return errCode;
1050 }
1051 
NotifyConnectionClosed(uint64_t connectionId)1052 void SyncEngine::NotifyConnectionClosed(uint64_t connectionId)
1053 {
1054     RemoteExecutor *executor = GetAndIncRemoteExector();
1055     if (!isActive_ || executor == nullptr) {
1056         return; // db is closing just return
1057     }
1058     executor->NotifyConnectionClosed(connectionId);
1059     RefObject::DecObjRef(executor);
1060 }
1061 
NotifyUserChange()1062 void SyncEngine::NotifyUserChange()
1063 {
1064     RemoteExecutor *executor = GetAndIncRemoteExector();
1065     if (!isActive_ || executor == nullptr) {
1066         return; // db is closing just return
1067     }
1068     executor->NotifyUserChange();
1069     RefObject::DecObjRef(executor);
1070 }
1071 
GetAndIncRemoteExector()1072 RemoteExecutor *SyncEngine::GetAndIncRemoteExector()
1073 {
1074     std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1075     RefObject::IncObjRef(remoteExecutor_);
1076     return remoteExecutor_;
1077 }
1078 
SetRemoteExector(RemoteExecutor * executor)1079 void SyncEngine::SetRemoteExector(RemoteExecutor *executor)
1080 {
1081     std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1082     remoteExecutor_ = executor;
1083 }
1084 
CheckDeviceIdValid(const std::string & deviceId,const std::string & localDeviceId)1085 bool SyncEngine::CheckDeviceIdValid(const std::string &deviceId, const std::string &localDeviceId)
1086 {
1087     if (deviceId.empty()) {
1088         return false;
1089     }
1090     return localDeviceId != deviceId;
1091 }
1092 
GetLocalDeviceId(std::string & deviceId)1093 int SyncEngine::GetLocalDeviceId(std::string &deviceId)
1094 {
1095     if (!isActive_ || communicator_ == nullptr) {
1096         // db is closing
1097         return -E_BUSY;
1098     }
1099     auto communicator = communicator_;
1100     RefObject::IncObjRef(communicator);
1101     int errCode = communicator->GetLocalIdentity(deviceId);
1102     RefObject::DecObjRef(communicator);
1103     return errCode;
1104 }
1105 
AbortMachineIfNeed(uint32_t syncId)1106 void SyncEngine::AbortMachineIfNeed(uint32_t syncId)
1107 {
1108     ISyncTaskContext *abortContext = nullptr;
1109     {
1110         std::lock_guard<std::mutex> lock(contextMapLock_);
1111         for (const auto &entry : syncTaskContextMap_) {
1112             auto context = entry.second;
1113             if (context->IsKilled()) {
1114                 continue;
1115             }
1116             RefObject::IncObjRef(context);
1117             if (context->GetSyncId() == syncId) {
1118                 abortContext = context;
1119                 RefObject::IncObjRef(abortContext);
1120             }
1121             RefObject::DecObjRef(context);
1122         }
1123     }
1124     if (abortContext != nullptr) {
1125         abortContext->AbortMachineIfNeed(static_cast<uint32_t>(syncId));
1126         RefObject::DecObjRef(abortContext);
1127     }
1128 }
1129 
WaitingExecTaskExist()1130 void SyncEngine::WaitingExecTaskExist()
1131 {
1132     std::unique_lock<std::mutex> closeLock(execTaskCountLock_);
1133     bool isTimeout = execTaskCv_.wait_for(closeLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
1134         [this]() { return execTaskCount_ == 0; });
1135     if (!isTimeout) {
1136         LOGD("SyncEngine Close with executing task!");
1137     }
1138 }
1139 } // namespace DistributedDB
1140