1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_engine.h"
17
18 #include <algorithm>
19 #include <deque>
20 #include <functional>
21
22 #include "ability_sync.h"
23 #include "db_common.h"
24 #include "db_dump_helper.h"
25 #include "db_errno.h"
26 #include "device_manager.h"
27 #include "hash.h"
28 #include "isync_state_machine.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "single_ver_serialize_manager.h"
32 #include "subscribe_manager.h"
33 #include "time_sync.h"
34
35 #ifndef OMIT_MULTI_VER
36 #include "commit_history_sync.h"
37 #include "multi_ver_data_sync.h"
38 #include "value_slice_sync.h"
39 #endif
40
41 namespace DistributedDB {
42 int SyncEngine::queueCacheSize_ = 0;
43 int SyncEngine::maxQueueCacheSize_ = DEFAULT_CACHE_SIZE;
44 unsigned int SyncEngine::discardMsgNum_ = 0;
45 std::mutex SyncEngine::queueLock_;
46
SyncEngine()47 SyncEngine::SyncEngine()
48 : syncInterface_(nullptr),
49 communicator_(nullptr),
50 deviceManager_(nullptr),
51 metadata_(nullptr),
52 execTaskCount_(0),
53 isSyncRetry_(false),
54 communicatorProxy_(nullptr),
55 isActive_(false),
56 remoteExecutor_(nullptr)
57 {
58 }
59
~SyncEngine()60 SyncEngine::~SyncEngine()
61 {
62 LOGD("[SyncEngine] ~SyncEngine!");
63 ClearInnerResource();
64 equalIdentifierMap_.clear();
65 subManager_ = nullptr;
66 LOGD("[SyncEngine] ~SyncEngine ok!");
67 }
68
Initialize(ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,const InitCallbackParam & callbackParam)69 int SyncEngine::Initialize(ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata,
70 const InitCallbackParam &callbackParam)
71 {
72 if ((syncInterface == nullptr) || (metadata == nullptr)) {
73 LOGE("[SyncEngine] [Initialize] syncInterface or metadata is nullptr.");
74 return -E_INVALID_ARGS;
75 }
76 int errCode = StartAutoSubscribeTimer(*syncInterface);
77 if (errCode != E_OK) {
78 return errCode;
79 }
80
81 errCode = InitComunicator(syncInterface);
82 if (errCode != E_OK) {
83 LOGE("[SyncEngine] Init Communicator failed");
84 // There need to set nullptr. other wise, syncInterface will be
85 // DecRef in th destroy-method.
86 StopAutoSubscribeTimer();
87 return errCode;
88 }
89 onRemoteDataChanged_ = callbackParam.onRemoteDataChanged;
90 offlineChanged_ = callbackParam.offlineChanged;
91 queryAutoSyncCallback_ = callbackParam.queryAutoSyncCallback;
92 errCode = InitInnerSource(callbackParam.onRemoteDataChanged, callbackParam.offlineChanged, syncInterface);
93 if (errCode != E_OK) {
94 // reset ptr if initialize device manager failed
95 StopAutoSubscribeTimer();
96 return errCode;
97 }
98 SetSyncInterface(syncInterface);
99 if (subManager_ == nullptr) {
100 subManager_ = std::make_shared<SubscribeManager>();
101 }
102 metadata_ = metadata;
103 isActive_ = true;
104 LOGI("[SyncEngine] Engine [%.3s] init ok", label_.c_str());
105 return E_OK;
106 }
107
Close()108 int SyncEngine::Close()
109 {
110 LOGI("[SyncEngine] [%.3s] close enter!", label_.c_str());
111 isActive_ = false;
112 UnRegCommunicatorsCallback();
113 StopAutoSubscribeTimer();
114 std::vector<ISyncTaskContext *> decContext;
115 // Clear SyncContexts
116 {
117 std::unique_lock<std::mutex> lock(contextMapLock_);
118 for (auto &iter : syncTaskContextMap_) {
119 decContext.push_back(iter.second);
120 iter.second = nullptr;
121 }
122 syncTaskContextMap_.clear();
123 }
124 for (auto &iter : decContext) {
125 RefObject::KillAndDecObjRef(iter);
126 iter = nullptr;
127 }
128 WaitingExecTaskExist();
129 ReleaseCommunicators();
130 {
131 std::lock_guard<std::mutex> msgLock(queueLock_);
132 while (!msgQueue_.empty()) {
133 Message *inMsg = msgQueue_.front();
134 msgQueue_.pop_front();
135 if (inMsg != nullptr) { // LCOV_EXCL_BR_LINE
136 queueCacheSize_ -= GetMsgSize(inMsg);
137 delete inMsg;
138 inMsg = nullptr;
139 }
140 }
141 }
142 // close db, rekey or import scene, need clear all remote query info
143 // local query info will destroy with syncEngine destruct
144 if (subManager_ != nullptr) {
145 subManager_->ClearAllRemoteQuery();
146 }
147
148 RemoteExecutor *executor = GetAndIncRemoteExector();
149 if (executor != nullptr) {
150 executor->Close();
151 RefObject::DecObjRef(executor);
152 executor = nullptr;
153 }
154 ClearInnerResource();
155 LOGI("[SyncEngine] [%.3s] closed!", label_.c_str());
156 return E_OK;
157 }
158
AddSyncOperation(SyncOperation * operation)159 int SyncEngine::AddSyncOperation(SyncOperation *operation)
160 {
161 if (operation == nullptr) {
162 LOGE("[SyncEngine] operation is nullptr");
163 return -E_INVALID_ARGS;
164 }
165
166 std::vector<std::string> devices = operation->GetDevices();
167 std::string localDeviceId;
168 int errCode = GetLocalDeviceId(localDeviceId);
169 for (const auto &deviceId : devices) {
170 if (errCode != E_OK) {
171 operation->SetStatus(deviceId, errCode == -E_BUSY ?
172 SyncOperation::OP_BUSY_FAILURE : SyncOperation::OP_FAILED);
173 continue;
174 }
175 if (!CheckDeviceIdValid(deviceId, localDeviceId)) {
176 operation->SetStatus(deviceId, SyncOperation::OP_INVALID_ARGS);
177 continue;
178 }
179 operation->SetStatus(deviceId, SyncOperation::OP_WAITING);
180 if (AddSyncOperForContext(deviceId, operation) != E_OK) {
181 operation->SetStatus(deviceId, SyncOperation::OP_FAILED);
182 }
183 }
184 return E_OK;
185 }
186
RemoveSyncOperation(int syncId)187 void SyncEngine::RemoveSyncOperation(int syncId)
188 {
189 std::lock_guard<std::mutex> lock(contextMapLock_);
190 for (auto &iter : syncTaskContextMap_) {
191 ISyncTaskContext *context = iter.second;
192 if (context != nullptr) {
193 context->RemoveSyncOperation(syncId);
194 }
195 }
196 }
197
198 #ifndef OMIT_MULTI_VER
BroadCastDataChanged() const199 void SyncEngine::BroadCastDataChanged() const
200 {
201 if (deviceManager_ != nullptr) {
202 (void)deviceManager_->SendBroadCast(LOCAL_DATA_CHANGED);
203 }
204 }
205 #endif // OMIT_MULTI_VER
206
StartCommunicator()207 void SyncEngine::StartCommunicator()
208 {
209 if (communicator_ == nullptr) {
210 LOGE("[SyncEngine][StartCommunicator] communicator is not set!");
211 return;
212 }
213 LOGD("[SyncEngine][StartCommunicator] RegOnConnectCallback");
214 int errCode = communicator_->RegOnConnectCallback(
215 [this, deviceManager = deviceManager_](const std::string &targetDev, bool isConnect) {
216 deviceManager->OnDeviceConnectCallback(targetDev, isConnect);
217 }, nullptr);
218 if (errCode != E_OK) {
219 LOGE("[SyncEngine][StartCommunicator] register failed, auto sync can not use! err %d", errCode);
220 return;
221 }
222 communicator_->Activate(GetUserId());
223 }
224
GetOnlineDevices(std::vector<std::string> & devices) const225 void SyncEngine::GetOnlineDevices(std::vector<std::string> &devices) const
226 {
227 devices.clear();
228 if (deviceManager_ != nullptr) {
229 deviceManager_->GetOnlineDevices(devices);
230 }
231 }
232
InitInnerSource(const std::function<void (std::string)> & onRemoteDataChanged,const std::function<void (std::string)> & offlineChanged,ISyncInterface * syncInterface)233 int SyncEngine::InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged,
234 const std::function<void(std::string)> &offlineChanged, ISyncInterface *syncInterface)
235 {
236 deviceManager_ = new (std::nothrow) DeviceManager();
237 if (deviceManager_ == nullptr) {
238 LOGE("[SyncEngine] deviceManager alloc failed!");
239 return -E_OUT_OF_MEMORY;
240 }
241 auto executor = new (std::nothrow) RemoteExecutor();
242 if (executor == nullptr) {
243 LOGE("[SyncEngine] remoteExecutor alloc failed!");
244 delete deviceManager_;
245 deviceManager_ = nullptr;
246 return -E_OUT_OF_MEMORY;
247 }
248
249 int errCode = E_OK;
250 do {
251 CommunicatorProxy *comProxy = nullptr;
252 {
253 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
254 comProxy = communicatorProxy_;
255 RefObject::IncObjRef(comProxy);
256 }
257 errCode = deviceManager_->Initialize(comProxy, onRemoteDataChanged, offlineChanged);
258 RefObject::DecObjRef(comProxy);
259 if (errCode != E_OK) {
260 LOGE("[SyncEngine] deviceManager init failed! err %d", errCode);
261 break;
262 }
263 errCode = executor->Initialize(syncInterface, communicator_);
264 } while (false);
265 if (errCode != E_OK) {
266 delete deviceManager_;
267 deviceManager_ = nullptr;
268 delete executor;
269 executor = nullptr;
270 } else {
271 SetRemoteExector(executor);
272 }
273 return errCode;
274 }
275
InitComunicator(const ISyncInterface * syncInterface)276 int SyncEngine::InitComunicator(const ISyncInterface *syncInterface)
277 {
278 ICommunicatorAggregator *communicatorAggregator = nullptr;
279 int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
280 if (communicatorAggregator == nullptr) {
281 LOGE("[SyncEngine] Get ICommunicatorAggregator error when init the sync engine err = %d", errCode);
282 return errCode;
283 }
284 std::vector<uint8_t> label = syncInterface->GetIdentifier();
285 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
286 if (isSyncDualTupleMode) {
287 std::vector<uint8_t> dualTuplelabel = syncInterface->GetDualTupleIdentifier();
288 LOGI("[SyncEngine] dual tuple mode, original identifier=%.3s, target identifier=%.3s", VEC_TO_STR(label),
289 VEC_TO_STR(dualTuplelabel));
290 communicator_ = communicatorAggregator->AllocCommunicator(dualTuplelabel, errCode, GetUserId(syncInterface));
291 } else {
292 communicator_ = communicatorAggregator->AllocCommunicator(label, errCode, GetUserId(syncInterface));
293 }
294 if (communicator_ == nullptr) {
295 LOGE("[SyncEngine] AllocCommunicator error when init the sync engine! err = %d", errCode);
296 return errCode;
297 }
298
299 errCode = RegCallbackOnInitComunicator(communicatorAggregator, syncInterface);
300 if (errCode != E_OK) {
301 return errCode;
302 }
303 {
304 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
305 communicatorProxy_ = new (std::nothrow) CommunicatorProxy();
306 if (communicatorProxy_ == nullptr) {
307 communicatorAggregator->ReleaseCommunicator(communicator_, GetUserId(syncInterface));
308 communicator_ = nullptr;
309 return -E_OUT_OF_MEMORY;
310 }
311 communicatorProxy_->SetMainCommunicator(communicator_);
312 }
313 label.resize(3); // only show 3 Bytes enough
314 label_ = DBCommon::VectorToHexString(label);
315 LOGD("[SyncEngine] RegOnConnectCallback");
316 return errCode;
317 }
318
AddSyncOperForContext(const std::string & deviceId,SyncOperation * operation)319 int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation)
320 {
321 if (syncInterface_ == nullptr) {
322 LOGE("[SyncEngine][AddSyncOperForContext] sync interface has not initialized");
323 return -E_INVALID_DB;
324 }
325 bool isSyncDualTupleMode = syncInterface_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
326 std::string targetUserId = DBConstant::DEFAULT_USER;
327 if (isSyncDualTupleMode) {
328 targetUserId = GetTargetUserId(deviceId);
329 }
330 int errCode = E_OK;
331 ISyncTaskContext *context = nullptr;
332 {
333 std::lock_guard<std::mutex> lock(contextMapLock_);
334 context = FindSyncTaskContext({deviceId, targetUserId}, false);
335 if (context == nullptr) {
336 if (!IsKilled()) {
337 context = GetSyncTaskContext({deviceId, targetUserId}, errCode);
338 }
339 if (context == nullptr) {
340 return errCode;
341 }
342 }
343 if (context->IsKilled()) { // LCOV_EXCL_BR_LINE
344 return -E_OBJ_IS_KILLED;
345 }
346 // IncRef for SyncEngine to make sure context is valid, to avoid a big lock
347 RefObject::IncObjRef(context);
348 }
349
350 errCode = context->AddSyncOperation(operation);
351 if (operation != nullptr) {
352 operation->SetSyncContext(context); // make the life cycle of context and operation are same
353 }
354 RefObject::DecObjRef(context);
355 return errCode;
356 }
357
MessageReciveCallbackTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)358 void SyncEngine::MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator,
359 Message *inMsg)
360 {
361 std::string deviceId = context->GetDeviceId();
362
363 if (inMsg->GetMessageId() != LOCAL_DATA_CHANGED) {
364 int errCode = context->ReceiveMessageCallback(inMsg);
365 if (errCode == -E_NOT_NEED_DELETE_MSG) {
366 goto MSG_CALLBACK_OUT_NOT_DEL;
367 }
368 // add auto sync here while recv subscribe request
369 QuerySyncObject syncObject;
370 if (errCode == E_OK && context->IsNeedTriggerQueryAutoSync(inMsg, syncObject)) {
371 InternalSyncParma param;
372 GetQueryAutoSyncParam(deviceId, syncObject, param);
373 queryAutoSyncCallback_(param);
374 }
375 }
376
377 delete inMsg;
378 inMsg = nullptr;
379 MSG_CALLBACK_OUT_NOT_DEL:
380 ScheduleTaskOut(context, communicator);
381 }
382
RemoteDataChangedTask(ISyncTaskContext * context,const ICommunicator * communicator,Message * inMsg)383 void SyncEngine::RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg)
384 {
385 std::string deviceId = context->GetDeviceId();
386 if (onRemoteDataChanged_ && deviceManager_->IsDeviceOnline(deviceId)) {
387 onRemoteDataChanged_(deviceId);
388 } else {
389 LOGE("[SyncEngine] onRemoteDataChanged is null!");
390 }
391 delete inMsg;
392 inMsg = nullptr;
393 ScheduleTaskOut(context, communicator);
394 }
395
ScheduleTaskOut(ISyncTaskContext * context,const ICommunicator * communicator)396 void SyncEngine::ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator)
397 {
398 (void)DealMsgUtilQueueEmpty();
399 DecExecTaskCount();
400 RefObject::DecObjRef(communicator);
401 RefObject::DecObjRef(context);
402 }
403
DealMsgUtilQueueEmpty()404 int SyncEngine::DealMsgUtilQueueEmpty()
405 {
406 if (!isActive_) {
407 return -E_BUSY; // db is closing just return
408 }
409 int errCode = E_OK;
410 Message *inMsg = nullptr;
411 {
412 std::lock_guard<std::mutex> lock(queueLock_);
413 if (msgQueue_.empty()) {
414 return errCode;
415 }
416 inMsg = msgQueue_.front();
417 msgQueue_.pop_front();
418 queueCacheSize_ -= GetMsgSize(inMsg);
419 }
420
421 IncExecTaskCount();
422 // it will deal with the first message in queue, we should increase object reference counts and sure that resources
423 // could be prevented from destroying by other threads.
424 do {
425 ISyncTaskContext *nextContext = GetContextForMsg({inMsg->GetTarget(), inMsg->GetSenderUserId()}, errCode,
426 inMsg->GetErrorNo() == E_NEED_CORRECT_TARGET_USER);
427 if (errCode != E_OK) {
428 break;
429 }
430 errCode = ScheduleDealMsg(nextContext, inMsg);
431 if (errCode != E_OK) {
432 RefObject::DecObjRef(nextContext);
433 }
434 } while (false);
435 if (errCode != E_OK) {
436 delete inMsg;
437 inMsg = nullptr;
438 DecExecTaskCount();
439 }
440 return errCode;
441 }
442
GetContextForMsg(const DeviceSyncTarget & target,int & errCode,bool isNeedCorrectUserId)443 ISyncTaskContext *SyncEngine::GetContextForMsg(const DeviceSyncTarget &target, int &errCode, bool isNeedCorrectUserId)
444 {
445 ISyncTaskContext *context = nullptr;
446 {
447 std::lock_guard<std::mutex> lock(contextMapLock_);
448 context = FindSyncTaskContext(target, isNeedCorrectUserId);
449 if (context != nullptr) { // LCOV_EXCL_BR_LINE
450 if (context->IsKilled()) {
451 errCode = -E_OBJ_IS_KILLED;
452 return nullptr;
453 }
454 } else {
455 if (IsKilled()) {
456 errCode = -E_OBJ_IS_KILLED;
457 return nullptr;
458 }
459 context = GetSyncTaskContext(target, errCode);
460 if (context == nullptr) {
461 return nullptr;
462 }
463 }
464 // IncRef for context to make sure context is valid, when task run another thread
465 RefObject::IncObjRef(context);
466 }
467 return context;
468 }
469
ScheduleDealMsg(ISyncTaskContext * context,Message * inMsg)470 int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg)
471 {
472 if (inMsg == nullptr) {
473 LOGE("[SyncEngine] MessageReciveCallback inMsg is null!");
474 DecExecTaskCount();
475 return E_OK;
476 }
477 CommunicatorProxy *comProxy = nullptr;
478 {
479 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
480 comProxy = communicatorProxy_;
481 RefObject::IncObjRef(comProxy);
482 }
483 int errCode = E_OK;
484 // deal remote local data changed message
485 if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
486 RemoteDataChangedTask(context, comProxy, inMsg);
487 } else {
488 errCode = RuntimeContext::GetInstance()->ScheduleTask(
489 [this, context, comProxy, inMsg] { MessageReciveCallbackTask(context, comProxy, inMsg); });
490 }
491
492 if (errCode != E_OK) {
493 LOGE("[SyncEngine] MessageReciveCallbackTask Schedule failed err %d", errCode);
494 RefObject::DecObjRef(comProxy);
495 }
496 return errCode;
497 }
498
MessageReciveCallback(const std::string & targetDev,Message * inMsg)499 int SyncEngine::MessageReciveCallback(const std::string &targetDev, Message *inMsg)
500 {
501 IncExecTaskCount();
502 int errCode = MessageReciveCallbackInner(targetDev, inMsg);
503 if (errCode != E_OK) {
504 if (inMsg != nullptr) {
505 delete inMsg;
506 inMsg = nullptr;
507 }
508 DecExecTaskCount();
509 LOGE("[SyncEngine] MessageReciveCallback failed!");
510 }
511 return errCode;
512 }
513
MessageReciveCallbackInner(const std::string & targetDev,Message * inMsg)514 int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg)
515 {
516 if (targetDev.empty() || inMsg == nullptr) {
517 LOGE("[SyncEngine][MessageReciveCallback] from a invalid device or inMsg is null ");
518 return -E_INVALID_ARGS;
519 }
520 if (!isActive_) {
521 LOGE("[SyncEngine] engine is closing, ignore msg");
522 return -E_BUSY;
523 }
524 if (inMsg->IsSupportFeedDbClosing() && ExchangeClosePending(false)) {
525 return -E_FEEDBACK_DB_CLOSING;
526 }
527 if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE) {
528 return HandleRemoteExecutorMsg(targetDev, inMsg);
529 }
530
531 int msgSize = 0;
532 if (!IsSkipCalculateLen(inMsg)) {
533 msgSize = GetMsgSize(inMsg);
534 if (msgSize <= 0) {
535 LOGE("[SyncEngine] GetMsgSize makes a mistake");
536 return -E_NOT_SUPPORT;
537 }
538 }
539
540 {
541 std::lock_guard<std::mutex> lock(queueLock_);
542 if ((queueCacheSize_ + msgSize) > maxQueueCacheSize_) {
543 LOGE("[SyncEngine] The size of message queue is beyond maximum");
544 discardMsgNum_++;
545 return -E_BUSY;
546 }
547
548 if (GetExecTaskCount() > MAX_EXEC_NUM) {
549 PutMsgIntoQueue(targetDev, inMsg, msgSize);
550 // task dont exec here
551 DecExecTaskCount();
552 return E_OK;
553 }
554 }
555
556 int errCode = E_OK;
557 ISyncTaskContext *nextContext = GetContextForMsg({targetDev, inMsg->GetSenderUserId()}, errCode,
558 inMsg->GetErrorNo() == E_NEED_CORRECT_TARGET_USER);
559 if (errCode != E_OK) {
560 return errCode;
561 }
562 LOGD("[SyncEngine] MessageReciveCallback MSG ID = %d", inMsg->GetMessageId());
563 return ScheduleDealMsg(nextContext, inMsg);
564 }
565
PutMsgIntoQueue(const std::string & targetDev,Message * inMsg,int msgSize)566 void SyncEngine::PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize)
567 {
568 if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
569 auto iter = std::find_if(msgQueue_.begin(), msgQueue_.end(),
570 [&targetDev](const Message *msg) {
571 return targetDev == msg->GetTarget() && msg->GetMessageId() == LOCAL_DATA_CHANGED;
572 });
573 if (iter != msgQueue_.end()) { // LCOV_EXCL_BR_LINE
574 delete inMsg;
575 inMsg = nullptr;
576 return;
577 }
578 }
579 inMsg->SetTarget(targetDev);
580 msgQueue_.push_back(inMsg);
581 queueCacheSize_ += msgSize;
582 LOGW("[SyncEngine] The quantity of executing threads is beyond maximum. msgQueueSize = %zu", msgQueue_.size());
583 }
584
GetMsgSize(const Message * inMsg) const585 int SyncEngine::GetMsgSize(const Message *inMsg) const
586 {
587 switch (inMsg->GetMessageId()) {
588 case TIME_SYNC_MESSAGE:
589 return TimeSync::CalculateLen(inMsg);
590 case ABILITY_SYNC_MESSAGE:
591 return AbilitySync::CalculateLen(inMsg);
592 case DATA_SYNC_MESSAGE:
593 case QUERY_SYNC_MESSAGE:
594 case CONTROL_SYNC_MESSAGE:
595 return SingleVerSerializeManager::CalculateLen(inMsg);
596 #ifndef OMIT_MULTI_VER
597 case COMMIT_HISTORY_SYNC_MESSAGE:
598 return CommitHistorySync::CalculateLen(inMsg);
599 case MULTI_VER_DATA_SYNC_MESSAGE:
600 return MultiVerDataSync::CalculateLen(inMsg);
601 case VALUE_SLICE_SYNC_MESSAGE:
602 return ValueSliceSync::CalculateLen(inMsg);
603 #endif
604 case LOCAL_DATA_CHANGED:
605 return DeviceManager::CalculateLen();
606 default:
607 LOGE("[SyncEngine] GetMsgSize not support msgId:%u", inMsg->GetMessageId());
608 return -E_NOT_SUPPORT;
609 }
610 }
611
FindSyncTaskContext(const DeviceSyncTarget & target,bool isNeedCorrectUserId)612 ISyncTaskContext *SyncEngine::FindSyncTaskContext(const DeviceSyncTarget &target, bool isNeedCorrectUserId)
613 {
614 if (target.userId == DBConstant::DEFAULT_USER) {
615 for (auto it = syncTaskContextMap_.begin(); it != syncTaskContextMap_.end(); it++) {
616 if (it->first.device == target.device) {
617 ISyncTaskContext *context = it->second;
618 CorrectTargetUserId(it, isNeedCorrectUserId);
619 return context;
620 }
621 }
622 }
623 auto iter = syncTaskContextMap_.find(target);
624 if (iter != syncTaskContextMap_.end()) {
625 ISyncTaskContext *context = iter->second;
626 CorrectTargetUserId(iter, isNeedCorrectUserId);
627 return context;
628 }
629 return nullptr;
630 }
631
GetSyncTaskContextAndInc(const std::string & deviceId)632 std::vector<ISyncTaskContext *> SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId)
633 {
634 std::vector<ISyncTaskContext *> contexts;
635 std::lock_guard<std::mutex> lock(contextMapLock_);
636 for (const auto &iter : syncTaskContextMap_) {
637 if (iter.first.device != deviceId) {
638 continue;
639 }
640 if (iter.second == nullptr) {
641 LOGI("[SyncEngine] dev=%s, user=%s, context is null, no need to clear sync operation", STR_MASK(deviceId),
642 iter.first.userId.c_str());
643 continue;
644 }
645 if (iter.second->IsKilled()) { // LCOV_EXCL_BR_LINE
646 LOGI("[SyncEngine] context is killing");
647 continue;
648 }
649 RefObject::IncObjRef(iter.second);
650 contexts.push_back(iter.second);
651 }
652 return contexts;
653 }
654
GetSyncTaskContext(const DeviceSyncTarget & target,int & errCode)655 ISyncTaskContext *SyncEngine::GetSyncTaskContext(const DeviceSyncTarget &target, int &errCode)
656 {
657 auto storage = GetAndIncSyncInterface();
658 if (storage == nullptr) {
659 errCode = -E_INVALID_DB;
660 LOGE("[SyncEngine] SyncTaskContext alloc failed with null db");
661 return nullptr;
662 }
663 ISyncTaskContext *context = CreateSyncTaskContext(*storage);
664 if (context == nullptr) {
665 errCode = -E_OUT_OF_MEMORY;
666 LOGE("[SyncEngine] SyncTaskContext alloc failed, may be no memory available!");
667 return nullptr;
668 }
669 errCode = context->Initialize(target, storage, metadata_, communicatorProxy_);
670 if (errCode != E_OK) {
671 LOGE("[SyncEngine] context init failed err %d, dev %s", errCode, STR_MASK(target.device));
672 RefObject::DecObjRef(context);
673 storage->DecRefCount();
674 context = nullptr;
675 return nullptr;
676 }
677 syncTaskContextMap_.insert(std::pair<DeviceSyncTarget, ISyncTaskContext *>(target, context));
678 // IncRef for SyncEngine to make sure SyncEngine is valid when context access
679 RefObject::IncObjRef(this);
680 context->OnLastRef([this, target, storage]() {
681 LOGD("[SyncEngine] SyncTaskContext for id %s finalized", STR_MASK(target.device));
682 RefObject::DecObjRef(this);
683 storage->DecRefCount();
684 });
685 context->RegOnSyncTask([this, context] { return ExecSyncTask(context); });
686 return context;
687 }
688
ExecSyncTask(ISyncTaskContext * context)689 int SyncEngine::ExecSyncTask(ISyncTaskContext *context)
690 {
691 if (IsKilled()) {
692 return -E_OBJ_IS_KILLED;
693 }
694 auto timeout = GetTimeout(context->GetDeviceId());
695 AutoLock lockGuard(context);
696 int status = context->GetTaskExecStatus();
697 if ((status == SyncTaskContext::RUNNING) || context->IsKilled()) {
698 return -E_NOT_SUPPORT;
699 }
700 context->SetTaskExecStatus(ISyncTaskContext::RUNNING);
701 while (!context->IsTargetQueueEmpty()) {
702 int errCode = context->GetNextTarget(timeout);
703 if (errCode != E_OK) {
704 // current task execute failed, try next task
705 context->ClearSyncOperation();
706 continue;
707 }
708 if (context->IsCurrentSyncTaskCanBeSkipped()) { // LCOV_EXCL_BR_LINE
709 context->SetOperationStatus(SyncOperation::OP_FINISHED_ALL);
710 context->ClearSyncOperation();
711 continue;
712 }
713 context->UnlockObj();
714 errCode = context->StartStateMachine();
715 context->LockObj();
716 if (errCode != E_OK) {
717 // machine start failed because timer start failed, try to execute next task
718 LOGW("[SyncEngine] machine StartSync failed");
719 context->SetOperationStatus(SyncOperation::OP_FAILED);
720 context->ClearSyncOperation();
721 continue;
722 }
723 // now task is running just return here
724 return errCode;
725 }
726 LOGD("[SyncEngine] ExecSyncTask finished");
727 context->SetTaskExecStatus(ISyncTaskContext::FINISHED);
728 return E_OK;
729 }
730
GetQueueCacheSize() const731 int SyncEngine::GetQueueCacheSize() const
732 {
733 std::lock_guard<std::mutex> lock(queueLock_);
734 return queueCacheSize_;
735 }
736
SetQueueCacheSize(int size)737 void SyncEngine::SetQueueCacheSize(int size)
738 {
739 std::lock_guard<std::mutex> lock(queueLock_);
740 queueCacheSize_ = size;
741 }
742
GetDiscardMsgNum() const743 unsigned int SyncEngine::GetDiscardMsgNum() const
744 {
745 std::lock_guard<std::mutex> lock(queueLock_);
746 return discardMsgNum_;
747 }
748
SetDiscardMsgNum(unsigned int num)749 void SyncEngine::SetDiscardMsgNum(unsigned int num)
750 {
751 std::lock_guard<std::mutex> lock(queueLock_);
752 discardMsgNum_ = num;
753 }
754
GetMaxExecNum() const755 unsigned int SyncEngine::GetMaxExecNum() const
756 {
757 return MAX_EXEC_NUM;
758 }
759
GetMaxQueueCacheSize() const760 int SyncEngine::GetMaxQueueCacheSize() const
761 {
762 return maxQueueCacheSize_;
763 }
764
SetMaxQueueCacheSize(int value)765 void SyncEngine::SetMaxQueueCacheSize(int value)
766 {
767 maxQueueCacheSize_ = value;
768 }
769
GetLabel() const770 std::string SyncEngine::GetLabel() const
771 {
772 return label_;
773 }
774
GetSyncRetry() const775 bool SyncEngine::GetSyncRetry() const
776 {
777 return isSyncRetry_;
778 }
779
SetSyncRetry(bool isRetry)780 void SyncEngine::SetSyncRetry(bool isRetry)
781 {
782 if (isSyncRetry_ == isRetry) {
783 LOGI("sync retry is equal, syncTry=%d, no need to set.", isRetry);
784 return;
785 }
786 isSyncRetry_ = isRetry;
787 LOGI("[SyncEngine] SetSyncRetry:%d ok", isRetry);
788 std::lock_guard<std::mutex> lock(contextMapLock_);
789 for (auto &iter : syncTaskContextMap_) {
790 ISyncTaskContext *context = iter.second;
791 if (context != nullptr) { // LCOV_EXCL_BR_LINE
792 context->SetSyncRetry(isRetry);
793 }
794 }
795 }
796
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)797 int SyncEngine::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
798 {
799 if (!isActive_) {
800 LOGI("[SyncEngine] engine is closed, just put into map");
801 return E_OK;
802 }
803 ICommunicator *communicator = nullptr;
804 {
805 std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
806 if (equalCommunicators_.count(identifier) != 0) {
807 communicator = equalCommunicators_[identifier];
808 } else {
809 int errCode = E_OK;
810 communicator = AllocCommunicator(identifier, errCode, GetUserId());
811 if (communicator == nullptr) {
812 return errCode;
813 }
814 equalCommunicators_[identifier] = communicator;
815 }
816 }
817 std::string targetDevices;
818 for (const auto &dev : targets) {
819 targetDevices += DBCommon::StringMasking(dev) + ",";
820 }
821 LOGI("[SyncEngine] set equal identifier=%.3s, original=%.3s, targetDevices=%s",
822 DBCommon::TransferStringToHex(identifier).c_str(), label_.c_str(),
823 targetDevices.substr(0, (targetDevices.size() > 0 ? targetDevices.size() - 1 : 0)).c_str());
824 {
825 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
826 if (communicatorProxy_ == nullptr) {
827 return -E_INTERNAL_ERROR;
828 }
829 communicatorProxy_->SetEqualCommunicator(communicator, identifier, targets);
830 }
831 communicator->Activate(GetUserId());
832 return E_OK;
833 }
834
SetEqualIdentifier()835 void SyncEngine::SetEqualIdentifier()
836 {
837 std::map<std::string, std::vector<std::string>> equalIdentifier; // key: equalIdentifier value: devices
838 for (auto &item : equalIdentifierMap_) {
839 if (equalIdentifier.find(item.second) == equalIdentifier.end()) { // LCOV_EXCL_BR_LINE
840 equalIdentifier[item.second] = {item.first};
841 } else {
842 equalIdentifier[item.second].push_back(item.first);
843 }
844 }
845 for (const auto &item : equalIdentifier) {
846 SetEqualIdentifier(item.first, item.second);
847 }
848 }
849
SetEqualIdentifierMap(const std::string & identifier,const std::vector<std::string> & targets)850 void SyncEngine::SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets)
851 {
852 for (auto iter = equalIdentifierMap_.begin(); iter != equalIdentifierMap_.end();) {
853 if (identifier == iter->second) {
854 iter = equalIdentifierMap_.erase(iter);
855 continue;
856 }
857 iter++;
858 }
859 for (const auto &device : targets) {
860 equalIdentifierMap_[device] = identifier;
861 }
862 }
863
OfflineHandleByDevice(const std::string & deviceId,ISyncInterface * storage)864 void SyncEngine::OfflineHandleByDevice(const std::string &deviceId, ISyncInterface *storage)
865 {
866 if (!isActive_) {
867 LOGD("[SyncEngine][OfflineHandleByDevice] ignore offline because not init");
868 return;
869 }
870 RemoteExecutor *executor = GetAndIncRemoteExector();
871 if (executor != nullptr) {
872 executor->NotifyDeviceOffline(deviceId);
873 RefObject::DecObjRef(executor);
874 executor = nullptr;
875 }
876 // db closed or device is offline
877 // clear remote subscribe and trigger
878 std::vector<std::string> remoteQueryId;
879 subManager_->GetRemoteSubscribeQueryIds(deviceId, remoteQueryId);
880 subManager_->ClearRemoteSubscribeQuery(deviceId);
881 for (const auto &queryId: remoteQueryId) {
882 if (!subManager_->IsQueryExistSubscribe(queryId)) {
883 static_cast<SingleVerKvDBSyncInterface *>(storage)->RemoveSubscribe(queryId);
884 }
885 }
886 DBInfo dbInfo;
887 static_cast<SyncGenericInterface *>(storage)->GetDBInfo(dbInfo);
888 RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, deviceId);
889 {
890 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
891 if (communicatorProxy_ == nullptr) {
892 return;
893 }
894 if (communicatorProxy_->IsDeviceOnline(deviceId)) { // LCOV_EXCL_BR_LINE
895 LOGI("[SyncEngine] target dev=%s is online, no need to clear task.", STR_MASK(deviceId));
896 return;
897 }
898 }
899 // means device is offline, clear local subscribe
900 // get context and Inc context if context is not nullptr
901 std::vector<ISyncTaskContext *> contexts = GetSyncTaskContextAndInc(deviceId);
902 for (const auto &context : contexts) {
903 subManager_->ClearLocalSubscribeQuery(deviceId);
904 // clear sync task
905 if (context != nullptr) {
906 context->ClearAllSyncTask();
907 RefObject::DecObjRef(context);
908 }
909 }
910 }
911
ClearAllSyncTaskByDevice(const std::string & deviceId)912 void SyncEngine::ClearAllSyncTaskByDevice(const std::string &deviceId)
913 {
914 std::vector<ISyncTaskContext *> contexts = GetSyncTaskContextAndInc(deviceId);
915 for (const auto &context : contexts) {
916 if (context != nullptr) {
917 context->ClearAllSyncTask();
918 RefObject::DecObjRef(context);
919 }
920 }
921 }
922
GetLocalSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)923 void SyncEngine::GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
924 {
925 subManager_->GetLocalSubscribeQueries(device, subscribeQueries);
926 }
927
GetRemoteSubscribeQueryIds(const std::string & device,std::vector<std::string> & subscribeQueryIds)928 void SyncEngine::GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds)
929 {
930 subManager_->GetRemoteSubscribeQueryIds(device, subscribeQueryIds);
931 }
932
GetRemoteSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)933 void SyncEngine::GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries)
934 {
935 subManager_->GetRemoteSubscribeQueries(device, subscribeQueries);
936 }
937
PutUnfinishedSubQueries(const std::string & device,const std::vector<QuerySyncObject> & subscribeQueries)938 void SyncEngine::PutUnfinishedSubQueries(const std::string &device,
939 const std::vector<QuerySyncObject> &subscribeQueries)
940 {
941 subManager_->PutLocalUnFinishedSubQueries(device, subscribeQueries);
942 }
943
GetAllUnFinishSubQueries(std::map<std::string,std::vector<QuerySyncObject>> & allSyncQueries)944 void SyncEngine::GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries)
945 {
946 subManager_->GetAllUnFinishSubQueries(allSyncQueries);
947 }
948
AllocCommunicator(const std::string & identifier,int & errCode,std::string userId)949 ICommunicator *SyncEngine::AllocCommunicator(const std::string &identifier, int &errCode, std::string userId)
950 {
951 ICommunicatorAggregator *communicatorAggregator = nullptr;
952 errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
953 if (communicatorAggregator == nullptr) {
954 LOGE("[SyncEngine] Get ICommunicatorAggregator error when SetEqualIdentifier err = %d", errCode);
955 return nullptr;
956 }
957 std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
958 auto communicator = communicatorAggregator->AllocCommunicator(identifierVect, errCode, userId);
959 if (communicator == nullptr) {
960 LOGE("[SyncEngine] AllocCommunicator error when SetEqualIdentifier! err = %d", errCode);
961 return communicator;
962 }
963
964 errCode = communicator->RegOnMessageCallback(
965 [this](const std::string &targetDev, Message *inMsg) {
966 return MessageReciveCallback(targetDev, inMsg);
967 }, []() {});
968 if (errCode != E_OK) {
969 LOGE("[SyncEngine] SyncRequestCallback register failed in SetEqualIdentifier! err = %d", errCode);
970 communicatorAggregator->ReleaseCommunicator(communicator, userId);
971 return nullptr;
972 }
973
974 errCode = communicator->RegOnConnectCallback(
975 [this, deviceManager = deviceManager_](const std::string &targetDev, bool isConnect) {
976 deviceManager->OnDeviceConnectCallback(targetDev, isConnect);
977 }, nullptr);
978 if (errCode != E_OK) {
979 LOGE("[SyncEngine][RegConnCB] register failed in SetEqualIdentifier! err %d", errCode);
980 communicator->RegOnMessageCallback(nullptr, nullptr);
981 communicatorAggregator->ReleaseCommunicator(communicator, userId);
982 return nullptr;
983 }
984
985 return communicator;
986 }
987
UnRegCommunicatorsCallback()988 void SyncEngine::UnRegCommunicatorsCallback()
989 {
990 if (communicator_ != nullptr) {
991 communicator_->RegOnMessageCallback(nullptr, nullptr);
992 communicator_->RegOnConnectCallback(nullptr, nullptr);
993 communicator_->RegOnSendableCallback(nullptr, nullptr);
994 }
995 std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
996 for (const auto &iter : equalCommunicators_) {
997 iter.second->RegOnMessageCallback(nullptr, nullptr);
998 iter.second->RegOnConnectCallback(nullptr, nullptr);
999 iter.second->RegOnSendableCallback(nullptr, nullptr);
1000 }
1001 }
1002
ReleaseCommunicators()1003 void SyncEngine::ReleaseCommunicators()
1004 {
1005 {
1006 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
1007 RefObject::KillAndDecObjRef(communicatorProxy_);
1008 communicatorProxy_ = nullptr;
1009 }
1010 ICommunicatorAggregator *communicatorAggregator = nullptr;
1011 int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
1012 if (communicatorAggregator == nullptr) {
1013 LOGF("[SyncEngine] ICommunicatorAggregator get failed when fialize SyncEngine err %d", errCode);
1014 return;
1015 }
1016
1017 if (communicator_ != nullptr) {
1018 communicatorAggregator->ReleaseCommunicator(communicator_, GetUserId());
1019 communicator_ = nullptr;
1020 }
1021
1022 std::lock_guard<std::mutex> lock(equalCommunicatorsLock_);
1023 for (auto &iter : equalCommunicators_) {
1024 communicatorAggregator->ReleaseCommunicator(iter.second, GetUserId());
1025 }
1026 equalCommunicators_.clear();
1027 }
1028
IsSkipCalculateLen(const Message * inMsg)1029 bool SyncEngine::IsSkipCalculateLen(const Message *inMsg)
1030 {
1031 if (inMsg->IsFeedbackError()) {
1032 LOGE("[SyncEngine] Feedback Message with errorNo=%u.", inMsg->GetErrorNo());
1033 return true;
1034 }
1035 return false;
1036 }
1037
GetSubscribeSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)1038 void SyncEngine::GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query,
1039 InternalSyncParma &outParam)
1040 {
1041 outParam.devices = { device };
1042 outParam.mode = SyncModeType::AUTO_SUBSCRIBE_QUERY;
1043 outParam.isQuerySync = true;
1044 outParam.syncQuery = query;
1045 }
1046
GetQueryAutoSyncParam(const std::string & device,const QuerySyncObject & query,InternalSyncParma & outParam)1047 void SyncEngine::GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query,
1048 InternalSyncParma &outParam)
1049 {
1050 outParam.devices = { device };
1051 outParam.mode = SyncModeType::AUTO_PUSH;
1052 outParam.isQuerySync = true;
1053 outParam.syncQuery = query;
1054 }
1055
StartAutoSubscribeTimer(const ISyncInterface & syncInterface)1056 int SyncEngine::StartAutoSubscribeTimer([[gnu::unused]] const ISyncInterface &syncInterface)
1057 {
1058 return E_OK;
1059 }
1060
StopAutoSubscribeTimer()1061 void SyncEngine::StopAutoSubscribeTimer()
1062 {
1063 }
1064
SubscribeLimitCheck(const std::vector<std::string> & devices,QuerySyncObject & query) const1065 int SyncEngine::SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
1066 {
1067 return subManager_->LocalSubscribeLimitCheck(devices, query);
1068 }
1069
1070
ClearInnerResource()1071 void SyncEngine::ClearInnerResource()
1072 {
1073 ClearSyncInterface();
1074 if (deviceManager_ != nullptr) {
1075 delete deviceManager_;
1076 deviceManager_ = nullptr;
1077 }
1078 communicator_ = nullptr;
1079 metadata_ = nullptr;
1080 onRemoteDataChanged_ = nullptr;
1081 offlineChanged_ = nullptr;
1082 queryAutoSyncCallback_ = nullptr;
1083 std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1084 if (remoteExecutor_ != nullptr) {
1085 RefObject::KillAndDecObjRef(remoteExecutor_);
1086 remoteExecutor_ = nullptr;
1087 }
1088 }
1089
IsEngineActive() const1090 bool SyncEngine::IsEngineActive() const
1091 {
1092 return isActive_;
1093 }
1094
SchemaChange()1095 void SyncEngine::SchemaChange()
1096 {
1097 std::vector<ISyncTaskContext *> tmpContextVec;
1098 {
1099 std::lock_guard<std::mutex> lock(contextMapLock_);
1100 for (const auto &entry : syncTaskContextMap_) { // LCOV_EXCL_BR_LINE
1101 auto context = entry.second;
1102 if (context == nullptr || context->IsKilled()) {
1103 continue;
1104 }
1105 RefObject::IncObjRef(context);
1106 tmpContextVec.push_back(context);
1107 }
1108 }
1109 for (const auto &entryContext : tmpContextVec) {
1110 entryContext->SchemaChange();
1111 RefObject::DecObjRef(entryContext);
1112 }
1113 }
1114
IncExecTaskCount()1115 void SyncEngine::IncExecTaskCount()
1116 {
1117 std::lock_guard<std::mutex> incLock(execTaskCountLock_);
1118 execTaskCount_++;
1119 }
1120
DecExecTaskCount()1121 void SyncEngine::DecExecTaskCount()
1122 {
1123 {
1124 std::lock_guard<std::mutex> decLock(execTaskCountLock_);
1125 execTaskCount_--;
1126 }
1127 execTaskCv_.notify_all();
1128 }
1129
GetExecTaskCount()1130 uint32_t SyncEngine::GetExecTaskCount()
1131 {
1132 std::lock_guard<std::mutex> autoLock(execTaskCountLock_);
1133 return execTaskCount_;
1134 }
1135
Dump(int fd)1136 void SyncEngine::Dump(int fd)
1137 {
1138 {
1139 std::lock_guard<std::mutex> lock(communicatorProxyLock_);
1140 std::string communicatorLabel;
1141 if (communicatorProxy_ != nullptr) {
1142 communicatorProxy_->GetLocalIdentity(communicatorLabel);
1143 }
1144 DBDumpHelper::Dump(fd, "\tcommunicator label = %s, equalIdentify Info [\n", communicatorLabel.c_str());
1145 if (communicatorProxy_ != nullptr) {
1146 communicatorProxy_->GetLocalIdentity(communicatorLabel);
1147 communicatorProxy_->Dump(fd);
1148 }
1149 }
1150 DBDumpHelper::Dump(fd, "\t]\n\tcontext info [\n");
1151 // dump context info
1152 std::lock_guard<std::mutex> autoLock(contextMapLock_);
1153 for (const auto &entry : syncTaskContextMap_) {
1154 if (entry.second != nullptr) {
1155 entry.second->Dump(fd);
1156 }
1157 }
1158 DBDumpHelper::Dump(fd, "\t]\n\n");
1159 }
1160
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)1161 int SyncEngine::RemoteQuery(const std::string &device, const RemoteCondition &condition,
1162 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
1163 {
1164 RemoteExecutor *executor = GetAndIncRemoteExector();
1165 if (!isActive_ || executor == nullptr) {
1166 RefObject::DecObjRef(executor);
1167 return -E_BUSY; // db is closing just return
1168 }
1169 int errCode = executor->RemoteQuery(device, condition, timeout, connectionId, result);
1170 RefObject::DecObjRef(executor);
1171 return errCode;
1172 }
1173
NotifyConnectionClosed(uint64_t connectionId)1174 void SyncEngine::NotifyConnectionClosed(uint64_t connectionId)
1175 {
1176 RemoteExecutor *executor = GetAndIncRemoteExector();
1177 if (!isActive_ || executor == nullptr) {
1178 RefObject::DecObjRef(executor);
1179 return; // db is closing just return
1180 }
1181 executor->NotifyConnectionClosed(connectionId);
1182 RefObject::DecObjRef(executor);
1183 }
1184
NotifyUserChange()1185 void SyncEngine::NotifyUserChange()
1186 {
1187 RemoteExecutor *executor = GetAndIncRemoteExector();
1188 if (!isActive_ || executor == nullptr) {
1189 RefObject::DecObjRef(executor);
1190 return; // db is closing just return
1191 }
1192 executor->NotifyUserChange();
1193 RefObject::DecObjRef(executor);
1194 }
1195
GetAndIncRemoteExector()1196 RemoteExecutor *SyncEngine::GetAndIncRemoteExector()
1197 {
1198 std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1199 RefObject::IncObjRef(remoteExecutor_);
1200 return remoteExecutor_;
1201 }
1202
SetRemoteExector(RemoteExecutor * executor)1203 void SyncEngine::SetRemoteExector(RemoteExecutor *executor)
1204 {
1205 std::lock_guard<std::mutex> autoLock(remoteExecutorLock_);
1206 remoteExecutor_ = executor;
1207 }
1208
CheckDeviceIdValid(const std::string & checkDeviceId,const std::string & localDeviceId)1209 bool SyncEngine::CheckDeviceIdValid(const std::string &checkDeviceId, const std::string &localDeviceId)
1210 {
1211 if (checkDeviceId.empty()) {
1212 return false;
1213 }
1214 if (checkDeviceId.length() > DBConstant::MAX_DEV_LENGTH) {
1215 LOGE("[SyncEngine] dev is too long len=%zu", checkDeviceId.length());
1216 return false;
1217 }
1218 return localDeviceId != checkDeviceId;
1219 }
1220
GetLocalDeviceId(std::string & deviceId)1221 int SyncEngine::GetLocalDeviceId(std::string &deviceId)
1222 {
1223 if (!isActive_ || communicator_ == nullptr) {
1224 // db is closing
1225 return -E_BUSY;
1226 }
1227 auto communicator = communicator_;
1228 RefObject::IncObjRef(communicator);
1229 int errCode = communicator->GetLocalIdentity(deviceId);
1230 RefObject::DecObjRef(communicator);
1231 return errCode;
1232 }
1233
AbortMachineIfNeed(uint32_t syncId)1234 void SyncEngine::AbortMachineIfNeed(uint32_t syncId)
1235 {
1236 std::vector<ISyncTaskContext *> abortContexts;
1237 {
1238 std::lock_guard<std::mutex> lock(contextMapLock_);
1239 for (const auto &entry : syncTaskContextMap_) {
1240 auto context = entry.second;
1241 if (context == nullptr || context->IsKilled()) { // LCOV_EXCL_BR_LINE
1242 continue;
1243 }
1244 RefObject::IncObjRef(context);
1245 if (context->GetSyncId() == syncId) {
1246 RefObject::IncObjRef(context);
1247 abortContexts.push_back(context);
1248 }
1249 RefObject::DecObjRef(context);
1250 }
1251 }
1252 for (const auto &abortContext : abortContexts) {
1253 abortContext->AbortMachineIfNeed(static_cast<uint32_t>(syncId));
1254 RefObject::DecObjRef(abortContext);
1255 }
1256 }
1257
WaitingExecTaskExist()1258 void SyncEngine::WaitingExecTaskExist()
1259 {
1260 std::unique_lock<std::mutex> closeLock(execTaskCountLock_);
1261 bool isTimeout = execTaskCv_.wait_for(closeLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
1262 [this]() { return execTaskCount_ == 0; });
1263 if (!isTimeout) { // LCOV_EXCL_BR_LINE
1264 LOGD("SyncEngine Close with executing task!");
1265 }
1266 }
1267
HandleRemoteExecutorMsg(const std::string & targetDev,Message * inMsg)1268 int SyncEngine::HandleRemoteExecutorMsg(const std::string &targetDev, Message *inMsg)
1269 {
1270 RemoteExecutor *executor = GetAndIncRemoteExector();
1271 int errCode = E_OK;
1272 if (executor != nullptr) {
1273 errCode = executor->ReceiveMessage(targetDev, inMsg);
1274 } else {
1275 errCode = -E_BUSY;
1276 }
1277 DecExecTaskCount();
1278 RefObject::DecObjRef(executor);
1279 return errCode;
1280 }
1281
AddSubscribe(SyncGenericInterface * storage,const std::map<std::string,std::vector<QuerySyncObject>> & subscribeQuery)1282 void SyncEngine::AddSubscribe(SyncGenericInterface *storage,
1283 const std::map<std::string, std::vector<QuerySyncObject>> &subscribeQuery)
1284 {
1285 for (const auto &[device, queryList]: subscribeQuery) {
1286 for (const auto &query: queryList) {
1287 AddQuerySubscribe(storage, device, query);
1288 }
1289 }
1290 }
1291
AddQuerySubscribe(SyncGenericInterface * storage,const std::string & device,const QuerySyncObject & query)1292 void SyncEngine::AddQuerySubscribe(SyncGenericInterface *storage, const std::string &device,
1293 const QuerySyncObject &query)
1294 {
1295 int errCode = storage->AddSubscribe(query.GetIdentify(), query, true);
1296 if (errCode != E_OK) {
1297 LOGW("[SyncEngine][AddSubscribe] Add trigger failed dev = %s queryId = %s",
1298 STR_MASK(device), STR_MASK(query.GetIdentify()));
1299 return;
1300 }
1301 errCode = subManager_->ReserveRemoteSubscribeQuery(device, query);
1302 if (errCode != E_OK) {
1303 if (!subManager_->IsQueryExistSubscribe(query.GetIdentify())) { // LCOV_EXCL_BR_LINE
1304 (void)storage->RemoveSubscribe(query.GetIdentify());
1305 }
1306 return;
1307 }
1308 subManager_->ActiveRemoteSubscribeQuery(device, query);
1309 }
1310
TimeChange()1311 void SyncEngine::TimeChange()
1312 {
1313 std::vector<ISyncTaskContext *> decContext;
1314 {
1315 // copy context
1316 std::lock_guard<std::mutex> lock(contextMapLock_);
1317 for (const auto &iter : syncTaskContextMap_) {
1318 RefObject::IncObjRef(iter.second);
1319 decContext.push_back(iter.second);
1320 }
1321 }
1322 for (auto &iter : decContext) {
1323 iter->TimeChange();
1324 RefObject::DecObjRef(iter);
1325 }
1326 }
1327
GetResponseTaskCount()1328 int32_t SyncEngine::GetResponseTaskCount()
1329 {
1330 std::vector<ISyncTaskContext *> decContext;
1331 {
1332 // copy context
1333 std::lock_guard<std::mutex> lock(contextMapLock_);
1334 for (const auto &iter : syncTaskContextMap_) {
1335 RefObject::IncObjRef(iter.second);
1336 decContext.push_back(iter.second);
1337 }
1338 }
1339 int32_t taskCount = 0;
1340 for (auto &iter : decContext) {
1341 taskCount += iter->GetResponseTaskCount();
1342 if (iter->IsSavingTask(GetTimeout(iter->GetDeviceId()))) {
1343 taskCount++;
1344 }
1345 RefObject::DecObjRef(iter);
1346 }
1347 {
1348 std::lock_guard<std::mutex> decLock(execTaskCountLock_);
1349 taskCount += static_cast<int32_t>(execTaskCount_);
1350 }
1351 return taskCount;
1352 }
1353
ClearSyncInterface()1354 void SyncEngine::ClearSyncInterface()
1355 {
1356 ISyncInterface *syncInterface = nullptr;
1357 {
1358 std::lock_guard<std::mutex> autoLock(storageMutex_);
1359 if (syncInterface_ == nullptr) {
1360 return;
1361 }
1362 syncInterface = syncInterface_;
1363 syncInterface_ = nullptr;
1364 }
1365 syncInterface->DecRefCount();
1366 }
1367
GetAndIncSyncInterface()1368 ISyncInterface *SyncEngine::GetAndIncSyncInterface()
1369 {
1370 std::lock_guard<std::mutex> autoLock(storageMutex_);
1371 if (syncInterface_ == nullptr) {
1372 return nullptr;
1373 }
1374 syncInterface_->IncRefCount();
1375 return syncInterface_;
1376 }
1377
SetSyncInterface(ISyncInterface * syncInterface)1378 void SyncEngine::SetSyncInterface(ISyncInterface *syncInterface)
1379 {
1380 std::lock_guard<std::mutex> autoLock(storageMutex_);
1381 syncInterface_ = syncInterface;
1382 }
1383
GetUserId(const ISyncInterface * syncInterface)1384 std::string SyncEngine::GetUserId(const ISyncInterface *syncInterface)
1385 {
1386 if (syncInterface == nullptr) {
1387 LOGW("[SyncEngine][GetUserId] sync interface has not initialized");
1388 return "";
1389 }
1390 std::string userId = syncInterface->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
1391 std::string subUserId = syncInterface->GetDbProperties().GetStringProp(DBProperties::SUB_USER, "");
1392 if (!subUserId.empty()) {
1393 userId += "-" + subUserId;
1394 }
1395 return userId;
1396 }
1397
GetUserId()1398 std::string SyncEngine::GetUserId()
1399 {
1400 std::lock_guard<std::mutex> autoLock(storageMutex_);
1401 return GetUserId(syncInterface_);
1402 }
1403
GetTimeout(const std::string & dev)1404 uint32_t SyncEngine::GetTimeout(const std::string &dev)
1405 {
1406 ICommunicator *communicator = nullptr;
1407 {
1408 std::lock_guard<std::mutex> autoLock(communicatorProxyLock_);
1409 if (communicatorProxy_ == nullptr) {
1410 LOGW("[SyncEngine] Communicator is null when get %.3s timeout", dev.c_str());
1411 return DBConstant::MIN_TIMEOUT;
1412 }
1413 communicator = communicatorProxy_;
1414 RefObject::IncObjRef(communicator);
1415 }
1416 uint32_t timeout = communicator->GetTimeout(dev);
1417 RefObject::DecObjRef(communicator);
1418 return timeout;
1419 }
1420
GetTargetUserId(const std::string & dev)1421 std::string SyncEngine::GetTargetUserId(const std::string &dev)
1422 {
1423 std::string targetUserId;
1424 ICommunicator *communicator = nullptr;
1425 {
1426 std::lock_guard<std::mutex> autoLock(communicatorProxyLock_);
1427 if (communicatorProxy_ == nullptr) {
1428 LOGW("[SyncEngine] Communicator is null when get target user");
1429 return targetUserId;
1430 }
1431 communicator = communicatorProxy_;
1432 RefObject::IncObjRef(communicator);
1433 }
1434 DBProperties properties = syncInterface_->GetDbProperties();
1435 ExtendInfo extendInfo;
1436 extendInfo.appId = properties.GetStringProp(DBProperties::APP_ID, "");
1437 extendInfo.userId = properties.GetStringProp(DBProperties::USER_ID, "");
1438 extendInfo.storeId = properties.GetStringProp(DBProperties::STORE_ID, "");
1439 extendInfo.dstTarget = dev;
1440 extendInfo.subUserId = properties.GetStringProp(DBProperties::SUB_USER, "");
1441 targetUserId = communicator->GetTargetUserId(extendInfo);
1442 RefObject::DecObjRef(communicator);
1443 return targetUserId;
1444 }
1445
RegCallbackOnInitComunicator(ICommunicatorAggregator * communicatorAggregator,const ISyncInterface * syncInterface)1446 int SyncEngine::RegCallbackOnInitComunicator(ICommunicatorAggregator *communicatorAggregator,
1447 const ISyncInterface *syncInterface)
1448 {
1449 int errCode = communicator_->RegOnMessageCallback(
1450 [this](const std::string &targetDev, Message *inMsg) {
1451 return MessageReciveCallback(targetDev, inMsg);
1452 }, []() {});
1453 if (errCode != E_OK) {
1454 LOGE("[SyncEngine] SyncRequestCallback register failed! err = %d", errCode);
1455 communicatorAggregator->ReleaseCommunicator(communicator_, GetUserId(syncInterface));
1456 communicator_ = nullptr;
1457 return errCode;
1458 }
1459 return E_OK;
1460 }
1461
GetRemoteQueryTaskCount()1462 int32_t SyncEngine::GetRemoteQueryTaskCount()
1463 {
1464 auto executor = GetAndIncRemoteExector();
1465 if (executor == nullptr) {
1466 LOGW("[SyncEngine] RemoteExecutor is null when get remote query task count");
1467 RefObject::DecObjRef(executor);
1468 return 0;
1469 }
1470 auto count = executor->GetTaskCount();
1471 RefObject::DecObjRef(executor);
1472 return count;
1473 }
1474
ExchangeClosePending(bool expected)1475 bool SyncEngine::ExchangeClosePending(bool expected)
1476 {
1477 if (communicator_ == nullptr) {
1478 return false;
1479 }
1480 auto communicator = communicator_;
1481 RefObject::IncObjRef(communicator);
1482 int res = communicator->ExchangeClosePending(expected);
1483 RefObject::DecObjRef(communicator);
1484 return res;
1485 }
1486
CorrectTargetUserId(std::map<DeviceSyncTarget,ISyncTaskContext * >::iterator & it,bool isNeedCorrectUserId)1487 void SyncEngine::CorrectTargetUserId(std::map<DeviceSyncTarget, ISyncTaskContext *>::iterator &it,
1488 bool isNeedCorrectUserId)
1489 {
1490 if (!isNeedCorrectUserId) {
1491 return;
1492 }
1493 ISyncTaskContext *context = it->second;
1494 std::string targetDev = it->first.device;
1495 std::string newTargetUserId = GetTargetUserId(targetDev);
1496 it = syncTaskContextMap_.erase(it);
1497 context->SetTargetUserId(newTargetUserId);
1498 syncTaskContextMap_[{targetDev, newTargetUserId}] = context;
1499 }
1500 } // namespace DistributedDB
1501