1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_engine.h"
17
18 #include <algorithm>
19 #include <deque>
20 #include <functional>
21
22 #include "ability_sync.h"
23 #include "db_common.h"
24 #include "db_dump_helper.h"
25 #include "db_errno.h"
26 #include "device_manager.h"
27 #include "hash.h"
28 #include "isync_state_machine.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "single_ver_serialize_manager.h"
32 #include "subscribe_manager.h"
33 #include "time_sync.h"
34
35 #ifndef OMIT_MULTI_VER
36 #include "commit_history_sync.h"
37 #include "multi_ver_data_sync.h"
38 #include "value_slice_sync.h"
39 #endif
40
41 namespace DistributedDB {
42 int SyncEngine::queueCacheSize_ = 0;
43 int SyncEngine::maxQueueCacheSize_ = DEFAULT_CACHE_SIZE;
44 unsigned int SyncEngine::discardMsgNum_ = 0;
45 std::mutex SyncEngine::queueLock_;
46
SyncEngine()47 SyncEngine::SyncEngine()
48 : syncInterface_(nullptr),
49 communicator_(nullptr),
50 deviceManager_(nullptr),
51 metadata_(nullptr),
52 execTaskCount_(0),
53 isSyncRetry_(false),
54 communicatorProxy_(nullptr),
55 isActive_(false),
56 remoteExecutor_(nullptr)
57 {
58 }
59
~SyncEngine()60 SyncEngine::~SyncEngine()
61 {
62 LOGD("[SyncEngine] ~SyncEngine!");
63 ClearInnerResource();
64 equalIdentifierMap_.clear();
65 subManager_ = nullptr;
66 LOGD("[SyncEngine] ~SyncEngine ok!");
67 }
68
Initialize(ISyncInterface * syncInterface,std::shared_ptr<Metadata> & metadata,const std::function<void (std::string)> & onRemoteDataChanged,const std::function<void (std::string)> & offlineChanged,const std::function<void (const InternalSyncParma & param)> & queryAutoSyncCallback)69 int SyncEngine::Initialize(ISyncInterface *syncInterface, std::shared_ptr<Metadata> &metadata,
70 const std::function<void(std::string)> &onRemoteDataChanged, const std::function<void(std::string)> &offlineChanged,
71 const std::function<void(const InternalSyncParma ¶m)> &queryAutoSyncCallback)
72 {
73 if ((syncInterface == nullptr) || (metadata == nullptr)) {
74 return -E_INVALID_ARGS;
75 }
76 int errCode = StartAutoSubscribeTimer();
77 if (errCode != OK) {
78 return errCode;
79 }
80 syncInterface_ = syncInterface;
81 errCode = InitComunicator(syncInterface);
82 if (errCode != E_OK) {
83 LOGE("[SyncEngine] Init Communicator failed");
84 // There need to set nullptr. other wise, syncInterface will be
85 // DecRef in th destroy-method.
86 StopAutoSubscribeTimer();
87 syncInterface_ = nullptr;
88 return errCode;
89 }
90 onRemoteDataChanged_ = onRemoteDataChanged;
91 offlineChanged_ = offlineChanged;
92 queryAutoSyncCallback_ = queryAutoSyncCallback;
93 errCode = InitInnerSource(onRemoteDataChanged, offlineChanged);
94 if (errCode != E_OK) {
95 // reset ptr if initialize device manager failed
96 syncInterface_ = nullptr;
97 StopAutoSubscribeTimer();
98 return errCode;
99 }
100 if (subManager_ == nullptr) {
101 subManager_ = std::make_shared<SubscribeManager>();
102 }
103 metadata_ = metadata;
104 isActive_ = true;
105 LOGI("[SyncEngine] Engine init ok");
106 return E_OK;
107 }
108
Close()109 int SyncEngine::Close()
110 {
111 LOGI("[SyncEngine] SyncEngine[%s] close enter!", label_.c_str());
112 isActive_ = false;
113 UnRegCommunicatorsCallback();
114 StopAutoSubscribeTimer();
115
116 // Clear SyncContexts
117 {
118 std::unique_lock<std::mutex> lock(contextMapLock_);
119 for (auto &iter : syncTaskContextMap_) {
120 ISyncTaskContext *tempContext = iter.second;
121 lock.unlock();
122 RefObject::KillAndDecObjRef(tempContext);
123 tempContext = nullptr;
124 lock.lock();
125 iter.second = nullptr;
126 }
127 syncTaskContextMap_.clear();
128 }
129
130 WaitingExecTaskExist();
131 ReleaseCommunicators();
132 std::lock_guard<std::mutex> msgLock(queueLock_);
133 while (!msgQueue_.empty()) {
134 Message *inMsg = msgQueue_.front();
135 msgQueue_.pop_front();
136 if (inMsg != nullptr) {
137 queueCacheSize_ -= GetMsgSize(inMsg);
138 delete inMsg;
139 inMsg = nullptr;
140 }
141 }
142 // close db, rekey or import scene, need clear all remote query info
143 // local query info will destroy with syncEngine destruct
144 if (subManager_ != nullptr) {
145 subManager_->ClearAllRemoteQuery();
146 }
147
148 RemoteExecutor *executor = GetAndIncRemoteExector();
149 if (executor != nullptr) {
150 executor->Close();
151 RefObject::DecObjRef(executor);
152 }
153 ClearInnerResource();
154 LOGI("[SyncEngine] SyncEngine closed!");
155 return E_OK;
156 }
157
AddSyncOperation(SyncOperation * operation)158 int SyncEngine::AddSyncOperation(SyncOperation *operation)
159 {
160 if (operation == nullptr) {
161 LOGE("[SyncEngine] operation is nullptr");
162 return -E_INVALID_ARGS;
163 }
164
165 std::vector<std::string> devices = operation->GetDevices();
166 std::string localDeviceId;
167 int errCode = GetLocalDeviceId(localDeviceId);
168 for (const auto &deviceId : devices) {
169 if (errCode != E_OK) {
170 operation->SetStatus(deviceId, errCode == -E_BUSY ?
171 SyncOperation::OP_BUSY_FAILURE : SyncOperation::OP_FAILED);
172 continue;
173 }
174 if (!CheckDeviceIdValid(deviceId, localDeviceId)) {
175 operation->SetStatus(deviceId, SyncOperation::OP_INVALID_ARGS);
176 continue;
177 }
178 operation->SetStatus(deviceId, SyncOperation::OP_WAITING);
179 if (AddSyncOperForContext(deviceId, operation) != E_OK) {
180 operation->SetStatus(deviceId, SyncOperation::OP_FAILED);
181 }
182 }
183 return E_OK;
184 }
185
RemoveSyncOperation(int syncId)186 void SyncEngine::RemoveSyncOperation(int syncId)
187 {
188 std::lock_guard<std::mutex> lock(contextMapLock_);
189 for (auto &iter : syncTaskContextMap_) {
190 ISyncTaskContext *context = iter.second;
191 if (context != nullptr) {
192 context->RemoveSyncOperation(syncId);
193 }
194 }
195 }
196
BroadCastDataChanged() const197 void SyncEngine::BroadCastDataChanged() const
198 {
199 if (deviceManager_ != nullptr) {
200 (void)deviceManager_->SendBroadCast(LOCAL_DATA_CHANGED);
201 }
202 }
203
RegConnectCallback()204 void SyncEngine::RegConnectCallback()
205 {
206 if (communicator_ == nullptr) {
207 LOGE("[SyncEngine][RegConnCB] communicator is not set!");
208 return;
209 }
210 LOGD("[SyncEngine] RegOnConnectCallback");
211 int errCode = communicator_->RegOnConnectCallback(
212 std::bind(&DeviceManager::OnDeviceConnectCallback, deviceManager_,
213 std::placeholders::_1, std::placeholders::_2), nullptr);
214 if (errCode != E_OK) {
215 LOGE("[SyncEngine][RegConnCB] register failed, auto sync can not use! err %d", errCode);
216 return;
217 }
218 communicator_->Activate();
219 }
220
GetOnlineDevices(std::vector<std::string> & devices) const221 void SyncEngine::GetOnlineDevices(std::vector<std::string> &devices) const
222 {
223 devices.clear();
224 if (deviceManager_ != nullptr) {
225 deviceManager_->GetOnlineDevices(devices);
226 }
227 }
228
InitInnerSource(const std::function<void (std::string)> & onRemoteDataChanged,const std::function<void (std::string)> & offlineChanged)229 int SyncEngine::InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged,
230 const std::function<void(std::string)> &offlineChanged)
231 {
232 deviceManager_ = new (std::nothrow) DeviceManager();
233 if (deviceManager_ == nullptr) {
234 LOGE("[SyncEngine] deviceManager alloc failed!");
235 return -E_OUT_OF_MEMORY;
236 }
237 auto executor = new (std::nothrow) RemoteExecutor();
238 if (executor == nullptr) {
239 LOGE("[SyncEngine] remoteExecutor alloc failed!");
240 delete deviceManager_;
241 deviceManager_ = nullptr;
242 return -E_OUT_OF_MEMORY;
243 }
244
245 int errCode = E_OK;
246 do {
247 errCode = deviceManager_->Initialize(communicatorProxy_, onRemoteDataChanged, offlineChanged);
248 if (errCode != E_OK) {
249 LOGE("[SyncEngine] deviceManager init failed! err %d", errCode);
250 break;
251 }
252 errCode = executor->Initialize(syncInterface_, communicator_);
253 SetRemoteExector(executor);
254 } while (false);
255 if (errCode != E_OK) {
256 delete deviceManager_;
257 deviceManager_ = nullptr;
258 delete executor;
259 executor = nullptr;
260 }
261 return errCode;
262 }
263
InitComunicator(const ISyncInterface * syncInterface)264 int SyncEngine::InitComunicator(const ISyncInterface *syncInterface)
265 {
266 ICommunicatorAggregator *communicatorAggregator = nullptr;
267 int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
268 if (communicatorAggregator == nullptr) {
269 LOGE("[SyncEngine] Get ICommunicatorAggregator error when init the sync engine err = %d", errCode);
270 return errCode;
271 }
272 std::vector<uint8_t> label = syncInterface->GetIdentifier();
273 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
274 if (isSyncDualTupleMode) {
275 std::vector<uint8_t> dualTuplelabel = syncInterface->GetDualTupleIdentifier();
276 LOGI("[SyncEngine] dual tuple mode, original identifier=%.6s, target identifier=%.6s", VEC_TO_STR(label),
277 VEC_TO_STR(dualTuplelabel));
278 communicator_ = communicatorAggregator->AllocCommunicator(dualTuplelabel, errCode);
279 } else {
280 communicator_ = communicatorAggregator->AllocCommunicator(label, errCode);
281 }
282 if (communicator_ == nullptr) {
283 LOGE("[SyncEngine] AllocCommunicator error when init the sync engine! err = %d", errCode);
284 return errCode;
285 }
286
287 errCode = communicator_->RegOnMessageCallback(
288 std::bind(&SyncEngine::MessageReciveCallback, this, std::placeholders::_1, std::placeholders::_2),
289 []() {});
290 if (errCode != E_OK) {
291 LOGE("[SyncEngine] SyncRequestCallback register failed! err = %d", errCode);
292 communicatorAggregator->ReleaseCommunicator(communicator_);
293 communicator_ = nullptr;
294 return errCode;
295 }
296
297 communicatorProxy_ = new (std::nothrow) CommunicatorProxy();
298 if (communicatorProxy_ == nullptr) {
299 communicatorAggregator->ReleaseCommunicator(communicator_);
300 communicator_ = nullptr;
301 return -E_OUT_OF_MEMORY;
302 }
303
304 communicatorProxy_->SetMainCommunicator(communicator_);
305 label.resize(3); // only show 3 Bytes enough
306 label_ = DBCommon::VectorToHexString(label);
307 LOGD("[SyncEngine] RegOnConnectCallback");
308 return errCode;
309 }
310
AddSyncOperForContext(const std::string & deviceId,SyncOperation * operation)311 int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation)
312 {
313 int errCode = E_OK;
314 ISyncTaskContext *context = nullptr;
315 {
316 std::lock_guard<std::mutex> lock(contextMapLock_);
317 context = FindSyncTaskContext(deviceId);
318 if (context == nullptr) {
319 if (!IsKilled()) {
320 context = GetSyncTaskContext(deviceId, errCode);
321 }
322 if (context == nullptr) {
323 return errCode;
324 }
325 }
326 if (context->IsKilled()) {
327 return -E_OBJ_IS_KILLED;
328 }
329 // IncRef for SyncEngine to make sure context is valid, to avoid a big lock
330 RefObject::IncObjRef(context);
331 }
332
333 errCode = context->AddSyncOperation(operation);
334 RefObject::DecObjRef(context);
335 return errCode;
336 }
337
MessageReciveCallbackTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)338 void SyncEngine::MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator,
339 Message *inMsg)
340 {
341 std::string deviceId = context->GetDeviceId();
342
343 if (inMsg->GetMessageId() != LOCAL_DATA_CHANGED) {
344 int errCode = context->ReceiveMessageCallback(inMsg);
345 if (errCode == -E_NOT_NEED_DELETE_MSG) {
346 goto MSG_CALLBACK_OUT_NOT_DEL;
347 }
348 // add auto sync here while recv subscribe request
349 QuerySyncObject syncObject;
350 if (errCode == E_OK && context->IsNeedTriggerQueryAutoSync(inMsg, syncObject)) {
351 InternalSyncParma param;
352 GetQueryAutoSyncParam(deviceId, syncObject, param);
353 queryAutoSyncCallback_(param);
354 }
355 }
356
357 delete inMsg;
358 inMsg = nullptr;
359 MSG_CALLBACK_OUT_NOT_DEL:
360 ScheduleTaskOut(context, communicator);
361 }
362
RemoteDataChangedTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)363 void SyncEngine::RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg)
364 {
365 do {
366 std::string deviceId = context->GetDeviceId();
367 if (onRemoteDataChanged_ && deviceManager_->IsDeviceOnline(deviceId)) {
368 onRemoteDataChanged_(deviceId);
369 } else {
370 LOGE("[SyncEngine] onRemoteDataChanged is null!");
371 }
372 } while (false);
373 delete inMsg;
374 inMsg = nullptr;
375 ScheduleTaskOut(context, communicator);
376 }
377
ScheduleTaskOut(ISyncTaskContext * context,const ICommunicator * communicator)378 void SyncEngine::ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator)
379 {
380 (void)DealMsgUtilQueueEmpty();
381 DecExecTaskCount();
382 RefObject::DecObjRef(communicator);
383 RefObject::DecObjRef(context);
384 }
385
DealMsgUtilQueueEmpty()386 int SyncEngine::DealMsgUtilQueueEmpty()
387 {
388 if (!isActive_) {
389 return -E_BUSY; // db is closing just return
390 }
391 int errCode = E_OK;
392 Message *inMsg = nullptr;
393 {
394 std::lock_guard<std::mutex> lock(queueLock_);
395 if (msgQueue_.empty()) {
396 return errCode;
397 }
398 inMsg = msgQueue_.front();
399 msgQueue_.pop_front();
400 queueCacheSize_ -= GetMsgSize(inMsg);
401 }
402
403 IncExecTaskCount();
404 // it will deal with the first message in queue, we should increase object reference counts and sure that resources
405 // could be prevented from destroying by other threads.
406 do {
407 ISyncTaskContext *nextContext = GetConextForMsg(inMsg->GetTarget(), errCode);
408 if (errCode != E_OK) {
409 break;
410 }
411 errCode = ScheduleDealMsg(nextContext, inMsg);
412 if (errCode != E_OK) {
413 RefObject::DecObjRef(nextContext);
414 }
415 } while (false);
416 if (errCode != E_OK) {
417 delete inMsg;
418 inMsg = nullptr;
419 DecExecTaskCount();
420 }
421 return errCode;
422 }
423
GetConextForMsg(const std::string & targetDev,int & errCode)424 ISyncTaskContext *SyncEngine::GetConextForMsg(const std::string &targetDev, int &errCode)
425 {
426 ISyncTaskContext *context = nullptr;
427 {
428 std::lock_guard<std::mutex> lock(contextMapLock_);
429 context = FindSyncTaskContext(targetDev);
430 if (context != nullptr) {
431 if (context->IsKilled()) {
432 errCode = -E_OBJ_IS_KILLED;
433 return nullptr;
434 }
435 } else {
436 if (IsKilled()) {
437 errCode = -E_OBJ_IS_KILLED;
438 return nullptr;
439 }
440 context = GetSyncTaskContext(targetDev, errCode);
441 if (context == nullptr) {
442 return nullptr;
443 }
444 }
445 // IncRef for context to make sure context is valid, when task run another thread
446 RefObject::IncObjRef(context);
447 }
448 return context;
449 }
450
ScheduleDealMsg(ISyncTaskContext * context,Message * inMsg)451 int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg)
452 {
453 if (inMsg == nullptr) {
454 LOGE("[SyncEngine] MessageReciveCallback inMsg is null!");
455 DecExecTaskCount();
456 return E_OK;
457 }
458 RefObject::IncObjRef(communicatorProxy_);
459 int errCode = E_OK;
460 // deal remote local data changed message
461 if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
462 RemoteDataChangedTask(context, communicatorProxy_, inMsg);
463 } else {
464 errCode = RuntimeContext::GetInstance()->ScheduleTask(std::bind(&SyncEngine::MessageReciveCallbackTask,
465 this, context, communicatorProxy_, inMsg));
466 }
467
468 if (errCode != E_OK) {
469 LOGE("[SyncEngine] MessageReciveCallbackTask Schedule failed err %d", errCode);
470 RefObject::DecObjRef(communicatorProxy_);
471 }
472 return errCode;
473 }
474
MessageReciveCallback(const std::string & targetDev,Message * inMsg)475 void SyncEngine::MessageReciveCallback(const std::string &targetDev, Message *inMsg)
476 {
477 IncExecTaskCount();
478 int errCode = MessageReciveCallbackInner(targetDev, inMsg);
479 if (errCode != E_OK) {
480 delete inMsg;
481 inMsg = nullptr;
482 DecExecTaskCount();
483 LOGE("[SyncEngine] MessageReciveCallback failed!");
484 }
485 }
486
MessageReciveCallbackInner(const std::string & targetDev,Message * inMsg)487 int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg)
488 {
489 if (targetDev.empty() || inMsg == nullptr) {
490 LOGE("[SyncEngine][MessageReciveCallback] from a invalid device or inMsg is null ");
491 return -E_INVALID_ARGS;
492 }
493 if (!isActive_) {
494 LOGE("[SyncEngine] engine is closing, ignore msg");
495 return -E_BUSY;
496 }
497 RemoteExecutor *executor = GetAndIncRemoteExector();
498 if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE && executor != nullptr) {
499 int errCode = executor->ReceiveMessage(targetDev, inMsg);
500 RefObject::DecObjRef(executor);
501 DecExecTaskCount();
502 return errCode;
503 } else if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE) {
504 DecExecTaskCount();
505 return -E_BUSY;
506 }
507 int msgSize = 0;
508 if (!IsSkipCalculateLen(inMsg)) {
509 msgSize = GetMsgSize(inMsg);
510 if (msgSize <= 0) {
511 LOGE("[SyncEngine] GetMsgSize makes a mistake");
512 return -E_NOT_SUPPORT;
513 }
514 }
515
516 {
517 std::lock_guard<std::mutex> lock(queueLock_);
518 if ((queueCacheSize_ + msgSize) > maxQueueCacheSize_) {
519 LOGE("[SyncEngine] The size of message queue is beyond maximum");
520 discardMsgNum_++;
521 return -E_BUSY;
522 }
523
524 if (execTaskCount_ > MAX_EXEC_NUM) {
525 PutMsgIntoQueue(targetDev, inMsg, msgSize);
526 // task dont exec here
527 DecExecTaskCount();
528 return E_OK;
529 }
530 }
531
532 int errCode = E_OK;
533 ISyncTaskContext *nextContext = GetConextForMsg(targetDev, errCode);
534 if (errCode != E_OK) {
535 return errCode;
536 }
537 LOGD("[SyncEngine] MessageReciveCallback MSG ID = %d", inMsg->GetMessageId());
538 return ScheduleDealMsg(nextContext, inMsg);
539 }
540
PutMsgIntoQueue(const std::string & targetDev,Message * inMsg,int msgSize)541 void SyncEngine::PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize)
542 {
543 if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
544 auto iter = std::find_if(msgQueue_.begin(), msgQueue_.end(),
545 [&targetDev](const Message *msg) {
546 return targetDev == msg->GetTarget() && msg->GetMessageId() == LOCAL_DATA_CHANGED;
547 });
548 if (iter != msgQueue_.end()) {
549 delete inMsg;
550 inMsg = nullptr;
551 return;
552 }
553 }
554 inMsg->SetTarget(targetDev);
555 msgQueue_.push_back(inMsg);
556 queueCacheSize_ += msgSize;
557 LOGE("[SyncEngine] The quantity of executing threads is beyond maximum. msgQueueSize = %zu", msgQueue_.size());
558 }
559
GetMsgSize(const Message * inMsg) const560 int SyncEngine::GetMsgSize(const Message *inMsg) const
561 {
562 switch (inMsg->GetMessageId()) {
563 case TIME_SYNC_MESSAGE:
564 return TimeSync::CalculateLen(inMsg);
565 case ABILITY_SYNC_MESSAGE:
566 return AbilitySync::CalculateLen(inMsg);
567 case DATA_SYNC_MESSAGE:
568 case QUERY_SYNC_MESSAGE:
569 case CONTROL_SYNC_MESSAGE:
570 return SingleVerSerializeManager::CalculateLen(inMsg);
571 #ifndef OMIT_MULTI_VER
572 case COMMIT_HISTORY_SYNC_MESSAGE:
573 return CommitHistorySync::CalculateLen(inMsg);
574 case MULTI_VER_DATA_SYNC_MESSAGE:
575 return MultiVerDataSync::CalculateLen(inMsg);
576 case VALUE_SLICE_SYNC_MESSAGE:
577 return ValueSliceSync::CalculateLen(inMsg);
578 #endif
579 case LOCAL_DATA_CHANGED:
580 return DeviceManager::CalculateLen();
581 default:
582 LOGE("[SyncEngine] GetMsgSize not support msgId:%u", inMsg->GetMessageId());
583 return -E_NOT_SUPPORT;
584 }
585 }
586
FindSyncTaskContext(const std::string & deviceId)587 ISyncTaskContext *SyncEngine::FindSyncTaskContext(const std::string &deviceId)
588 {
589 auto iter = syncTaskContextMap_.find(deviceId);
590 if (iter != syncTaskContextMap_.end()) {
591 ISyncTaskContext *context = iter->second;
592 return context;
593 }
594 return nullptr;
595 }
596
GetSyncTaskContextAndInc(const std::string & deviceId)597 ISyncTaskContext *SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId)
598 {
599 ISyncTaskContext *context = nullptr;
600 std::lock_guard<std::mutex> lock(contextMapLock_);
601 context = FindSyncTaskContext(deviceId);
602 if (context == nullptr) {
603 LOGI("[SyncEngine] dev=%s, context is null, no need to clear sync operation", STR_MASK(deviceId));
604 return nullptr;
605 }
606 if (context->IsKilled()) {
607 LOGI("[SyncEngine] context is killing");
608 return nullptr;
609 }
610 RefObject::IncObjRef(context);
611 return context;
612 }
613
GetSyncTaskContext(const std::string & deviceId,int & errCode)614 ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, int &errCode)
615 {
616 ISyncTaskContext *context = CreateSyncTaskContext();
617 if (context == nullptr) {
618 errCode = -E_OUT_OF_MEMORY;
619 LOGE("[SyncEngine] SyncTaskContext alloc failed, may be no memory available!");
620 return nullptr;
621 }
622 errCode = context->Initialize(deviceId, syncInterface_, metadata_, communicatorProxy_);
623 if (errCode != E_OK) {
624 LOGE("[SyncEngine] context init failed err %d, dev %s", errCode, STR_MASK(deviceId));
625 RefObject::DecObjRef(context);
626 context = nullptr;
627 return nullptr;
628 }
629 syncTaskContextMap_.insert(std::pair<std::string, ISyncTaskContext *>(deviceId, context));
630 // IncRef for SyncEngine to make sure SyncEngine is valid when context access
631 RefObject::IncObjRef(this);
632 auto storage = syncInterface_;
633 if (storage != nullptr) {
634 storage->IncRefCount();
635 }
636 context->OnLastRef([this, deviceId, storage]() {
637 LOGD("[SyncEngine] SyncTaskContext for id %s finalized", STR_MASK(deviceId));
638 RefObject::DecObjRef(this);
639 if (storage != nullptr) {
640 storage->DecRefCount();
641 }
642 });
643 context->RegOnSyncTask(std::bind(&SyncEngine::ExecSyncTask, this, context));
644 return context;
645 }
646
ExecSyncTask(ISyncTaskContext * context)647 int SyncEngine::ExecSyncTask(ISyncTaskContext *context)
648 {
649 if (IsKilled()) {
650 return -E_OBJ_IS_KILLED;
651 }
652
653 AutoLock lockGuard(context);
654 int status = context->GetTaskExecStatus();
655 if ((status == SyncTaskContext::RUNNING) || context->IsKilled()) {
656 return -E_NOT_SUPPORT;
657 }
658 context->SetTaskExecStatus(ISyncTaskContext::RUNNING);
659 if (!context->IsTargetQueueEmpty()) {
660 int errCode = context->GetNextTarget(true);
661 if (errCode != E_OK) {
662 return errCode;
663 }
664 context->UnlockObj();
665 errCode = context->StartStateMachine();
666 context->LockObj();
667 if (errCode != E_OK) {
668 LOGE("[SyncEngine] machine StartSync failed");
669 context->SetOperationStatus(SyncOperation::OP_FAILED);
670 return errCode;
671 }
672 } else {
673 LOGD("[SyncEngine] ExecSyncTask finished");
674 context->SetTaskExecStatus(ISyncTaskContext::FINISHED);
675 }
676 return E_OK;
677 }
678
GetQueueCacheSize() const679 int SyncEngine::GetQueueCacheSize() const
680 {
681 return queueCacheSize_;
682 }
683
GetDiscardMsgNum() const684 unsigned int SyncEngine::GetDiscardMsgNum() const
685 {
686 return discardMsgNum_;
687 }
688
GetMaxExecNum() const689 unsigned int SyncEngine::GetMaxExecNum() const
690 {
691 return MAX_EXEC_NUM;
692 }
693
SetMaxQueueCacheSize(int value)694 void SyncEngine::SetMaxQueueCacheSize(int value)
695 {
696 maxQueueCacheSize_ = value;
697 }
698
GetLabel() const699 std::string SyncEngine::GetLabel() const
700 {
701 return label_;
702 }
703
GetSyncRetry() const704 bool SyncEngine::GetSyncRetry() const
705 {
706 return isSyncRetry_;
707 }
708
SetSyncRetry(bool isRetry)709 void SyncEngine::SetSyncRetry(bool isRetry)
710 {
711 if (isSyncRetry_ == isRetry) {
712 LOGI("sync retry is equal, syncTry=%d, no need to set.", isRetry);
713 return;
714 }
715 isSyncRetry_ = isRetry;
716 LOGI("[SyncEngine] SetSyncRetry:%d ok", isRetry);
717 std::lock_guard<std::mutex> lock(contextMapLock_);
718 for (auto &iter : syncTaskContextMap_) {
719 ISyncTaskContext *context = iter.second;
720 if (context != nullptr) {
721 context->SetSyncRetry(isRetry);
722 }
723 }
724 }
725
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)726 int SyncEngine::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
727 {
728 if (!isActive_) {
729 LOGI("[SyncEngine] engine is closed, just put into map");
730 return E_OK;
731 }
732 ICommunicator *communicator = nullptr;
733 {
734 std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
735 if (equalCommunicators_.count(identifier) != 0) {
736 communicator = equalCommunicators_[identifier];
737 } else {
738 int errCode = E_OK;
739 communicator = AllocCommunicator(identifier, errCode);
740 if (communicator == nullptr) {
741 return errCode;
742 }
743 equalCommunicators_[identifier] = communicator;
744 }
745 }
746 std::string targetDevices;
747 for (const auto &dev : targets) {
748 targetDevices += DBCommon::StringMasking(dev) + ",";
749 }
750 LOGI("[SyncEngine] set equal identifier=%s, original=%s, targetDevices=%s",
751 DBCommon::TransferStringToHex(identifier).c_str(), label_.c_str(),
752 targetDevices.substr(0, targetDevices.size() - 1).c_str());
753 communicatorProxy_->SetEqualCommunicator(communicator, identifier, targets);
754 communicator->Activate();
755 return E_OK;
756 }
757
SetEqualIdentifier()758 void SyncEngine::SetEqualIdentifier()
759 {
760 std::map<std::string, std::vector<std::string>> equalIdentifier; // key: equalIdentifier value: devices
761 for (auto &item : equalIdentifierMap_) {
762 if (equalIdentifier.find(item.second) == equalIdentifier.end()) {
763 equalIdentifier[item.second] = {item.first};
764 } else {
765 equalIdentifier[item.second].push_back(item.first);
766 }
767 }
768 for (const auto &item : equalIdentifier) {
769 SetEqualIdentifier(item.first, item.second);
770 }
771 }
772
SetEqualIdentifierMap(const std::string & identifier,const std::vector<std::string> & targets)773 void SyncEngine::SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets)
774 {
775 for (auto iter = equalIdentifierMap_.begin(); iter != equalIdentifierMap_.end();) {
776 if (identifier == iter->second) {
777 iter = equalIdentifierMap_.erase(iter);
778 continue;
779 }
780 iter++;
781 }
782 for (const auto &device : targets) {
783 equalIdentifierMap_[device] = identifier;
784 }
785 }
786
OfflineHandleByDevice(const std::string & deviceId)787 void SyncEngine::OfflineHandleByDevice(const std::string &deviceId)
788 {
789 if (communicatorProxy_ == nullptr) {
790 return;
791 }
792
793 RemoteExecutor *executor = GetAndIncRemoteExector();
794 if (executor != nullptr) {
795 executor->NotifyDeviceOffline(deviceId);
796 RefObject::DecObjRef(executor);
797 }
798 // db closed or device is offline
799 // clear remote subscribe and trigger
800 std::vector<std::string> remoteQueryId;
801 subManager_->GetRemoteSubscribeQueryIds(deviceId, remoteQueryId);
802 subManager_->ClearRemoteSubscribeQuery(deviceId);
803 static_cast<SingleVerKvDBSyncInterface *>(syncInterface_)->RemoveSubscribe(remoteQueryId);
804 // get context and Inc context if context is not nullprt
805 ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId);
806 if (context != nullptr) {
807 context->SetIsNeedResetAbilitySync(true);
808 }
809 if (communicatorProxy_->IsDeviceOnline(deviceId)) {
810 LOGI("[SyncEngine] target dev=%s is online, no need to clear task.", STR_MASK(deviceId));
811 RefObject::DecObjRef(context);
812 return;
813 }
814 // means device is offline, clear local subscribe
815 subManager_->ClearLocalSubscribeQuery(deviceId);
816 // clear sync task
817 if (context != nullptr) {
818 context->ClearAllSyncTask();
819 RefObject::DecObjRef(context);
820 }
821 }
822
GetLocalSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)823 void SyncEngine::GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
824 {
825 subManager_->GetLocalSubscribeQueries(device, subscribeQueries);
826 }
827
GetRemoteSubscribeQueryIds(const std::string & device,std::vector<std::string> & subscribeQueryIds)828 void SyncEngine::GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds)
829 {
830 subManager_->GetRemoteSubscribeQueryIds(device, subscribeQueryIds);
831 }
832
GetRemoteSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)833 void SyncEngine::GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
834 {
835 subManager_->GetRemoteSubscribeQueries(device, subscribeQueries);
836 }
837
PutUnfiniedSubQueries(const std::string & device,const std::vector<QuerySyncObject> & subscribeQueries)838 void SyncEngine::PutUnfiniedSubQueries(const std::string &device, const std::vector<QuerySyncObject> &subscribeQueries)
839 {
840 subManager_->PutLocalUnFiniedSubQueries(device, subscribeQueries);
841 }
842
GetAllUnFinishSubQueries(std::map<std::string,std::vector<QuerySyncObject>> & allSyncQueries)843 void SyncEngine::GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries)
844 {
845 subManager_->GetAllUnFinishSubQueries(allSyncQueries);
846 }
847
AllocCommunicator(const std::string & identifier,int & errCode)848 ICommunicator *SyncEngine::AllocCommunicator(const std::string &identifier, int &errCode)
849 {
850 ICommunicatorAggregator *communicatorAggregator = nullptr;
851 errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
852 if (communicatorAggregator == nullptr) {
853 LOGE("[SyncEngine] Get ICommunicatorAggregator error when SetEqualIdentifier err = %d", errCode);
854 return nullptr;
855 }
856 std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
857 auto communicator = communicatorAggregator->AllocCommunicator(identifierVect, errCode);
858 if (communicator == nullptr) {
859 LOGE("[SyncEngine] AllocCommunicator error when SetEqualIdentifier! err = %d", errCode);
860 return communicator;
861 }
862
863 errCode = communicator->RegOnMessageCallback(
864 std::bind(&SyncEngine::MessageReciveCallback, this, std::placeholders::_1, std::placeholders::_2),
865 []() {});
866 if (errCode != E_OK) {
867 LOGE("[SyncEngine] SyncRequestCallback register failed in SetEqualIdentifier! err = %d", errCode);
868 communicatorAggregator->ReleaseCommunicator(communicator);
869 return nullptr;
870 }
871
872 errCode = communicator->RegOnConnectCallback(
873 std::bind(&DeviceManager::OnDeviceConnectCallback, deviceManager_,
874 std::placeholders::_1, std::placeholders::_2), nullptr);
875 if (errCode != E_OK) {
876 LOGE("[SyncEngine][RegConnCB] register failed in SetEqualIdentifier! err %d", errCode);
877 communicator->RegOnMessageCallback(nullptr, nullptr);
878 communicatorAggregator->ReleaseCommunicator(communicator);
879 return nullptr;
880 }
881
882 return communicator;
883 }
884
UnRegCommunicatorsCallback()885 void SyncEngine::UnRegCommunicatorsCallback()
886 {
887 if (communicator_ != nullptr) {
888 communicator_->RegOnMessageCallback(nullptr, nullptr);
889 communicator_->RegOnConnectCallback(nullptr, nullptr);
890 communicator_->RegOnSendableCallback(nullptr, nullptr);
891 }
892 std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
893 for (const auto &iter : equalCommunicators_) {
894 iter.second->RegOnMessageCallback(nullptr, nullptr);
895 iter.second->RegOnConnectCallback(nullptr, nullptr);
896 iter.second->RegOnSendableCallback(nullptr, nullptr);
897 }
898 }
899
ReleaseCommunicators()900 void SyncEngine::ReleaseCommunicators()
901 {
902 RefObject::KillAndDecObjRef(communicatorProxy_);
903 communicatorProxy_ = nullptr;
904 ICommunicatorAggregator *communicatorAggregator = nullptr;
905 int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
906 if (communicatorAggregator == nullptr) {
907 LOGF("[SyncEngine] ICommunicatorAggregator get failed when fialize SyncEngine err %d", errCode);
908 return;
909 }
910
911 if (communicator_ != nullptr) {
912 communicatorAggregator->ReleaseCommunicator(communicator_);
913 communicator_ = nullptr;
914 }
915
916 std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
917 for (auto &iter : equalCommunicators_) {
918 communicatorAggregator->ReleaseCommunicator(iter.second);
919 }
920 equalCommunicators_.clear();
921 }
922
IsSkipCalculateLen(const Message * inMsg)923 bool SyncEngine::IsSkipCalculateLen(const Message *inMsg)
924 {
925 if (inMsg->IsFeedbackError()) {
926 LOGE("[SyncEngine] Feedback Message with errorNo=%u.", inMsg->GetErrorNo());
927 return true;
928 }
929 return false;
930 }
931
GetSubscribeSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)932 void SyncEngine::GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query,
933 InternalSyncParma &outParam)
934 {
935 outParam.devices = { device };
936 outParam.mode = SyncModeType::AUTO_SUBSCRIBE_QUERY;
937 outParam.isQuerySync = true;
938 outParam.syncQuery = query;
939 }
940
GetQueryAutoSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)941 void SyncEngine::GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query,
942 InternalSyncParma &outParam)
943 {
944 outParam.devices = { device };
945 outParam.mode = SyncModeType::AUTO_PUSH;
946 outParam.isQuerySync = true;
947 outParam.syncQuery = query;
948 }
949
StartAutoSubscribeTimer()950 int SyncEngine::StartAutoSubscribeTimer()
951 {
952 return E_OK;
953 }
954
StopAutoSubscribeTimer()955 void SyncEngine::StopAutoSubscribeTimer()
956 {
957 }
958
SubscribeLimitCheck(const std::vector<std::string> & devices,QuerySyncObject & query) const959 int SyncEngine::SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
960 {
961 return subManager_->LocalSubscribeLimitCheck(devices, query);
962 }
963
964
ClearInnerResource()965 void SyncEngine::ClearInnerResource()
966 {
967 if (syncInterface_ != nullptr) {
968 syncInterface_->DecRefCount();
969 syncInterface_ = nullptr;
970 }
971 if (deviceManager_ != nullptr) {
972 delete deviceManager_;
973 deviceManager_ = nullptr;
974 }
975 communicator_ = nullptr;
976 metadata_ = nullptr;
977 onRemoteDataChanged_ = nullptr;
978 offlineChanged_ = nullptr;
979 queryAutoSyncCallback_ = nullptr;
980 std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
981 if (remoteExecutor_ != nullptr) {
982 RefObject::KillAndDecObjRef(remoteExecutor_);
983 remoteExecutor_ = nullptr;
984 }
985 }
986
IsEngineActive() const987 bool SyncEngine::IsEngineActive() const
988 {
989 return isActive_;
990 }
991
SchemaChange()992 void SyncEngine::SchemaChange()
993 {
994 std::lock_guard<std::mutex> lock(contextMapLock_);
995 for (const auto &entry : syncTaskContextMap_) {
996 auto context = entry.second;
997 if (context->IsKilled()) {
998 continue;
999 }
1000 // IncRef for SyncEngine to make sure context is valid, to avoid a big lock
1001 context->SchemaChange();
1002 }
1003 }
1004
IncExecTaskCount()1005 void SyncEngine::IncExecTaskCount()
1006 {
1007 std::lock_guard<std::mutex> incLock(execTaskCountLock_);
1008 execTaskCount_++;
1009 }
1010
DecExecTaskCount()1011 void SyncEngine::DecExecTaskCount()
1012 {
1013 {
1014 std::lock_guard<std::mutex> decLock(execTaskCountLock_);
1015 execTaskCount_--;
1016 }
1017 execTaskCv_.notify_all();
1018 }
1019
Dump(int fd)1020 void SyncEngine::Dump(int fd)
1021 {
1022 std::string communicatorLabel;
1023 if (communicatorProxy_ != nullptr) {
1024 communicatorProxy_->GetLocalIdentity(communicatorLabel);
1025 }
1026 DBDumpHelper::Dump(fd, "\tcommunicator label = %s, equalIdentify Info [\n", communicatorLabel.c_str());
1027 if (communicatorProxy_ != nullptr) {
1028 communicatorProxy_->GetLocalIdentity(communicatorLabel);
1029 communicatorProxy_->Dump(fd);
1030 }
1031 DBDumpHelper::Dump(fd, "\t]\n\tcontext info [\n");
1032 // dump context info
1033 std::lock_guard<std::mutex> autoLock(contextMapLock_);
1034 for (const auto &entry : syncTaskContextMap_) {
1035 entry.second->Dump(fd);
1036 }
1037 DBDumpHelper::Dump(fd, "\t]\n\n");
1038 }
1039
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)1040 int SyncEngine::RemoteQuery(const std::string &device, const RemoteCondition &condition,
1041 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
1042 {
1043 RemoteExecutor *executor = GetAndIncRemoteExector();
1044 if (!isActive_ || executor == nullptr) {
1045 return -E_BUSY; // db is closing just return
1046 }
1047 int errCode = executor->RemoteQuery(device, condition, timeout, connectionId, result);
1048 RefObject::DecObjRef(executor);
1049 return errCode;
1050 }
1051
NotifyConnectionClosed(uint64_t connectionId)1052 void SyncEngine::NotifyConnectionClosed(uint64_t connectionId)
1053 {
1054 RemoteExecutor *executor = GetAndIncRemoteExector();
1055 if (!isActive_ || executor == nullptr) {
1056 return; // db is closing just return
1057 }
1058 executor->NotifyConnectionClosed(connectionId);
1059 RefObject::DecObjRef(executor);
1060 }
1061
NotifyUserChange()1062 void SyncEngine::NotifyUserChange()
1063 {
1064 RemoteExecutor *executor = GetAndIncRemoteExector();
1065 if (!isActive_ || executor == nullptr) {
1066 return; // db is closing just return
1067 }
1068 executor->NotifyUserChange();
1069 RefObject::DecObjRef(executor);
1070 }
1071
GetAndIncRemoteExector()1072 RemoteExecutor *SyncEngine::GetAndIncRemoteExector()
1073 {
1074 std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1075 RefObject::IncObjRef(remoteExecutor_);
1076 return remoteExecutor_;
1077 }
1078
SetRemoteExector(RemoteExecutor * executor)1079 void SyncEngine::SetRemoteExector(RemoteExecutor *executor)
1080 {
1081 std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1082 remoteExecutor_ = executor;
1083 }
1084
CheckDeviceIdValid(const std::string & deviceId,const std::string & localDeviceId)1085 bool SyncEngine::CheckDeviceIdValid(const std::string &deviceId, const std::string &localDeviceId)
1086 {
1087 if (deviceId.empty()) {
1088 return false;
1089 }
1090 return localDeviceId != deviceId;
1091 }
1092
GetLocalDeviceId(std::string & deviceId)1093 int SyncEngine::GetLocalDeviceId(std::string &deviceId)
1094 {
1095 if (!isActive_ || communicator_ == nullptr) {
1096 // db is closing
1097 return -E_BUSY;
1098 }
1099 auto communicator = communicator_;
1100 RefObject::IncObjRef(communicator);
1101 int errCode = communicator->GetLocalIdentity(deviceId);
1102 RefObject::DecObjRef(communicator);
1103 return errCode;
1104 }
1105
AbortMachineIfNeed(uint32_t syncId)1106 void SyncEngine::AbortMachineIfNeed(uint32_t syncId)
1107 {
1108 ISyncTaskContext *abortContext = nullptr;
1109 {
1110 std::lock_guard<std::mutex> lock(contextMapLock_);
1111 for (const auto &entry : syncTaskContextMap_) {
1112 auto context = entry.second;
1113 if (context->IsKilled()) {
1114 continue;
1115 }
1116 RefObject::IncObjRef(context);
1117 if (context->GetSyncId() == syncId) {
1118 abortContext = context;
1119 RefObject::IncObjRef(abortContext);
1120 }
1121 RefObject::DecObjRef(context);
1122 }
1123 }
1124 if (abortContext != nullptr) {
1125 abortContext->AbortMachineIfNeed(static_cast<uint32_t>(syncId));
1126 RefObject::DecObjRef(abortContext);
1127 }
1128 }
1129
WaitingExecTaskExist()1130 void SyncEngine::WaitingExecTaskExist()
1131 {
1132 std::unique_lock<std::mutex> closeLock(execTaskCountLock_);
1133 bool isTimeout = execTaskCv_.wait_for(closeLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
1134 [this]() { return execTaskCount_ == 0; });
1135 if (!isTimeout) {
1136 LOGD("SyncEngine Close with executing task!");
1137 }
1138 }
1139 } // namespace DistributedDB
1140