1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_task_context.h"
17
18 #include <algorithm>
19 #include <cmath>
20
21 #include "db_constant.h"
22 #include "db_dump_helper.h"
23 #include "db_dfx_adapter.h"
24 #include "db_errno.h"
25 #include "hash.h"
26 #include "isync_state_machine.h"
27 #include "log_print.h"
28 #include "time_helper.h"
29 #include "version.h"
30
31 namespace DistributedDB {
32 std::mutex SyncTaskContext::synTaskContextSetLock_;
33 std::set<ISyncTaskContext *> SyncTaskContext::synTaskContextSet_;
34
35 namespace {
36 constexpr int NEGOTIATION_LIMIT = 2;
37 constexpr uint32_t SESSION_ID_MAX_VALUE = 0x8fffffffu;
38 }
39
SyncTaskContext()40 SyncTaskContext::SyncTaskContext()
41 : syncOperation_(nullptr),
42 syncId_(0),
43 mode_(0),
44 isAutoSync_(false),
45 status_(0),
46 taskExecStatus_(0),
47 syncInterface_(nullptr),
48 communicator_(nullptr),
49 stateMachine_(nullptr),
50 requestSessionId_(0),
51 lastRequestSessionId_(0),
52 timeHelper_(nullptr),
53 remoteSoftwareVersion_(0),
54 remoteSoftwareVersionId_(0),
55 isCommNormal_(true),
56 taskErrCode_(E_OK),
57 commErrCode_(E_OK),
58 syncTaskRetryStatus_(false),
59 isSyncRetry_(false),
60 negotiationCount_(0),
61 isAutoSubscribe_(false)
62 {
63 }
64
~SyncTaskContext()65 SyncTaskContext::~SyncTaskContext()
66 {
67 if (stateMachine_ != nullptr) {
68 delete stateMachine_;
69 stateMachine_ = nullptr;
70 }
71 SyncTaskContext::ClearSyncOperation();
72 ClearSyncTarget();
73 syncInterface_ = nullptr;
74 communicator_ = nullptr;
75 }
76
AddSyncTarget(ISyncTarget * target)77 int SyncTaskContext::AddSyncTarget(ISyncTarget *target)
78 {
79 if (target == nullptr) {
80 return -E_INVALID_ARGS;
81 }
82 int targetMode = target->GetMode();
83 auto syncId = static_cast<uint32_t>(target->GetSyncId());
84 {
85 std::lock_guard<std::mutex> lock(targetQueueLock_);
86 if (target->GetTaskType() == ISyncTarget::REQUEST) {
87 requestTargetQueue_.push_back(target);
88 } else if (target->GetTaskType() == ISyncTarget::RESPONSE) {
89 responseTargetQueue_.push_back(target);
90 } else {
91 return -E_INVALID_ARGS;
92 }
93 }
94 RefObject::IncObjRef(this);
95 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetMode, syncId]() {
96 CancelCurrentSyncRetryIfNeed(targetMode, syncId);
97 RefObject::DecObjRef(this);
98 });
99 if (errCode != E_OK) {
100 RefObject::DecObjRef(this);
101 }
102 if (onSyncTaskAdd_) {
103 RefObject::IncObjRef(this);
104 errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
105 onSyncTaskAdd_();
106 RefObject::DecObjRef(this);
107 });
108 if (errCode != E_OK) {
109 RefObject::DecObjRef(this);
110 }
111 }
112 return E_OK;
113 }
114
SetOperationStatus(int status)115 void SyncTaskContext::SetOperationStatus(int status)
116 {
117 std::lock_guard<std::mutex> lock(operationLock_);
118 if (syncOperation_ == nullptr) {
119 LOGD("[SyncTaskContext][SetStatus] syncOperation is null");
120 return;
121 }
122 int finalStatus = status;
123
124 int operationStatus = syncOperation_->GetStatus(deviceId_);
125 if (status == SyncOperation::OP_SEND_FINISHED && operationStatus == SyncOperation::OP_RECV_FINISHED) {
126 if (GetTaskErrCode() == -E_EKEYREVOKED) { // LCOV_EXCL_BR_LINE
127 finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
128 } else {
129 finalStatus = SyncOperation::OP_FINISHED_ALL;
130 }
131 } else if (status == SyncOperation::OP_RECV_FINISHED && operationStatus == SyncOperation::OP_SEND_FINISHED) {
132 if (GetTaskErrCode() == -E_EKEYREVOKED) {
133 finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
134 } else {
135 finalStatus = SyncOperation::OP_FINISHED_ALL;
136 }
137 }
138 syncOperation_->SetStatus(deviceId_, finalStatus);
139 if (finalStatus >= SyncOperation::OP_FINISHED_ALL) {
140 SaveLastPushTaskExecStatus(finalStatus);
141 }
142 if (syncOperation_->CheckIsAllFinished()) {
143 syncOperation_->Finished();
144 }
145 }
146
SaveLastPushTaskExecStatus(int finalStatus)147 void SyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
148 {
149 (void)finalStatus;
150 }
151
Clear()152 void SyncTaskContext::Clear()
153 {
154 StopTimer();
155 retryTime_ = 0;
156 sequenceId_ = 1;
157 syncId_ = 0;
158 isAutoSync_ = false;
159 requestSessionId_ = 0;
160 isNeedRetry_ = NO_NEED_RETRY;
161 mode_ = SyncModeType::INVALID_MODE;
162 status_ = SyncOperation::OP_WAITING;
163 taskErrCode_ = E_OK;
164 packetId_ = 0;
165 isAutoSubscribe_ = false;
166 }
167
RemoveSyncOperation(int syncId)168 int SyncTaskContext::RemoveSyncOperation(int syncId)
169 {
170 std::lock_guard<std::mutex> lock(targetQueueLock_);
171 auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
172 [syncId](const ISyncTarget *target) {
173 if (target == nullptr) {
174 return false;
175 }
176 return target->GetSyncId() == syncId;
177 });
178 if (iter != requestTargetQueue_.end()) {
179 if (*iter != nullptr) {
180 delete *iter;
181 }
182 requestTargetQueue_.erase(iter);
183 return E_OK;
184 }
185 return -E_INVALID_ARGS;
186 }
187
ClearSyncTarget()188 void SyncTaskContext::ClearSyncTarget()
189 {
190 std::lock_guard<std::mutex> lock(targetQueueLock_);
191 for (auto &requestTarget : requestTargetQueue_) {
192 if (requestTarget != nullptr) {
193 delete requestTarget;
194 }
195 }
196 requestTargetQueue_.clear();
197
198 for (auto &responseTarget : responseTargetQueue_) {
199 if (responseTarget != nullptr) { // LCOV_EXCL_BR_LINE
200 delete responseTarget;
201 }
202 }
203 responseTargetQueue_.clear();
204 }
205
IsTargetQueueEmpty() const206 bool SyncTaskContext::IsTargetQueueEmpty() const
207 {
208 std::lock_guard<std::mutex> lock(targetQueueLock_);
209 return requestTargetQueue_.empty() && responseTargetQueue_.empty();
210 }
211
GetOperationStatus() const212 int SyncTaskContext::GetOperationStatus() const
213 {
214 std::lock_guard<std::mutex> lock(operationLock_);
215 if (syncOperation_ == nullptr) {
216 return SyncOperation::OP_FINISHED_ALL;
217 }
218 return syncOperation_->GetStatus(deviceId_);
219 }
220
SetMode(int mode)221 void SyncTaskContext::SetMode(int mode)
222 {
223 mode_ = mode;
224 }
225
GetMode() const226 int SyncTaskContext::GetMode() const
227 {
228 return mode_;
229 }
230
MoveToNextTarget(uint32_t timeout)231 void SyncTaskContext::MoveToNextTarget(uint32_t timeout)
232 {
233 ClearSyncOperation();
234 TaskParam param;
235 // call other system api without lock
236 param.timeout = timeout;
237 std::lock_guard<std::mutex> lock(targetQueueLock_);
238 while (!requestTargetQueue_.empty() || !responseTargetQueue_.empty()) {
239 ISyncTarget *tmpTarget = nullptr;
240 if (!requestTargetQueue_.empty()) {
241 tmpTarget = requestTargetQueue_.front();
242 requestTargetQueue_.pop_front();
243 } else {
244 tmpTarget = responseTargetQueue_.front();
245 responseTargetQueue_.pop_front();
246 }
247 if (tmpTarget == nullptr) {
248 LOGE("[SyncTaskContext][MoveToNextTarget] currentTarget is null skip!");
249 continue;
250 }
251 SyncOperation *tmpOperation = nullptr;
252 tmpTarget->GetSyncOperation(tmpOperation);
253 if ((tmpOperation != nullptr) && tmpOperation->IsKilled()) {
254 // if killed skip this syncOperation_.
255 delete tmpTarget;
256 tmpTarget = nullptr;
257 continue;
258 }
259 CopyTargetData(tmpTarget, param);
260 delete tmpTarget;
261 tmpTarget = nullptr;
262 break;
263 }
264 }
265
GetNextTarget(uint32_t timeout)266 int SyncTaskContext::GetNextTarget(uint32_t timeout)
267 {
268 MoveToNextTarget(timeout);
269 int checkErrCode = RunPermissionCheck(GetPermissionCheckFlag(IsAutoSync(), GetMode()));
270 if (checkErrCode != E_OK) {
271 SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
272 return checkErrCode;
273 }
274 return E_OK;
275 }
276
GetSyncId() const277 uint32_t SyncTaskContext::GetSyncId() const
278 {
279 return syncId_;
280 }
281
282 // Get the current task deviceId.
GetDeviceId() const283 std::string SyncTaskContext::GetDeviceId() const
284 {
285 return deviceId_;
286 }
287
GetTargetUserId() const288 std::string SyncTaskContext::GetTargetUserId() const
289 {
290 return targetUserId_;
291 }
292
SetTargetUserId(const std::string & userId)293 void SyncTaskContext::SetTargetUserId(const std::string &userId)
294 {
295 targetUserId_ = userId;
296 }
297
SetTaskExecStatus(int status)298 void SyncTaskContext::SetTaskExecStatus(int status)
299 {
300 taskExecStatus_ = status;
301 }
302
GetTaskExecStatus() const303 int SyncTaskContext::GetTaskExecStatus() const
304 {
305 return taskExecStatus_;
306 }
307
IsAutoSync() const308 bool SyncTaskContext::IsAutoSync() const
309 {
310 return isAutoSync_;
311 }
312
StartTimer()313 int SyncTaskContext::StartTimer()
314 {
315 std::lock_guard<std::mutex> lockGuard(timerLock_);
316 if (timerId_ > 0) {
317 return -E_UNEXPECTED_DATA;
318 }
319 TimerId timerId = 0;
320 RefObject::IncObjRef(this);
321 TimerAction timeOutCallback = [this](TimerId id) { return TimeOut(id); };
322 int errCode = RuntimeContext::GetInstance()->SetTimer(timeout_, timeOutCallback,
323 [this]() {
324 int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(this); });
325 if (ret != E_OK) {
326 LOGE("[SyncTaskContext] timer finalizer ScheduleTask, errCode %d", ret);
327 }
328 }, timerId);
329 if (errCode != E_OK) {
330 RefObject::DecObjRef(this);
331 return errCode;
332 }
333 timerId_ = timerId;
334 return errCode;
335 }
336
StopTimer()337 void SyncTaskContext::StopTimer()
338 {
339 TimerId timerId;
340 {
341 std::lock_guard<std::mutex> lockGuard(timerLock_);
342 timerId = timerId_;
343 if (timerId_ == 0) {
344 return;
345 }
346 timerId_ = 0;
347 }
348 RuntimeContext::GetInstance()->RemoveTimer(timerId);
349 }
350
ModifyTimer(int milliSeconds)351 int SyncTaskContext::ModifyTimer(int milliSeconds)
352 {
353 std::lock_guard<std::mutex> lockGuard(timerLock_);
354 if (timerId_ == 0) {
355 return -E_UNEXPECTED_DATA;
356 }
357 return RuntimeContext::GetInstance()->ModifyTimer(timerId_, milliSeconds);
358 }
359
SetRetryTime(int retryTime)360 void SyncTaskContext::SetRetryTime(int retryTime)
361 {
362 retryTime_ = retryTime;
363 }
364
GetRetryTime() const365 int SyncTaskContext::GetRetryTime() const
366 {
367 return retryTime_;
368 }
369
SetRetryStatus(int isNeedRetry)370 void SyncTaskContext::SetRetryStatus(int isNeedRetry)
371 {
372 isNeedRetry_ = isNeedRetry;
373 }
374
GetRetryStatus() const375 int SyncTaskContext::GetRetryStatus() const
376 {
377 return isNeedRetry_;
378 }
379
GetTimerId() const380 TimerId SyncTaskContext::GetTimerId() const
381 {
382 return timerId_;
383 }
384
GetRequestSessionId() const385 uint32_t SyncTaskContext::GetRequestSessionId() const
386 {
387 return requestSessionId_;
388 }
389
IncSequenceId()390 void SyncTaskContext::IncSequenceId()
391 {
392 sequenceId_++;
393 }
394
GetSequenceId() const395 uint32_t SyncTaskContext::GetSequenceId() const
396 {
397 return sequenceId_;
398 }
399
ReSetSequenceId()400 void SyncTaskContext::ReSetSequenceId()
401 {
402 sequenceId_ = 1;
403 }
404
IncPacketId()405 void SyncTaskContext::IncPacketId()
406 {
407 packetId_++;
408 }
409
GetPacketId() const410 uint64_t SyncTaskContext::GetPacketId() const
411 {
412 return packetId_;
413 }
414
GetTimeoutTime() const415 int SyncTaskContext::GetTimeoutTime() const
416 {
417 return timeout_;
418 }
419
SetTimeoutCallback(const TimerAction & timeOutCallback)420 void SyncTaskContext::SetTimeoutCallback(const TimerAction &timeOutCallback)
421 {
422 timeOutCallback_ = timeOutCallback;
423 }
424
SetTimeOffset(TimeOffset offset)425 void SyncTaskContext::SetTimeOffset(TimeOffset offset)
426 {
427 timeOffset_ = offset;
428 }
429
GetTimeOffset() const430 TimeOffset SyncTaskContext::GetTimeOffset() const
431 {
432 return timeOffset_;
433 }
434
StartStateMachine()435 int SyncTaskContext::StartStateMachine()
436 {
437 return stateMachine_->StartSync();
438 }
439
ReceiveMessageCallback(Message * inMsg)440 int SyncTaskContext::ReceiveMessageCallback(Message *inMsg)
441 {
442 if (inMsg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
443 uint16_t remoteVersion = 0;
444 (void)communicator_->GetRemoteCommunicatorVersion(deviceId_, remoteVersion);
445 SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST + remoteVersion);
446 }
447 int errCode = E_OK;
448 if (IncUsedCount() == E_OK) {
449 errCode = stateMachine_->ReceiveMessageCallback(inMsg);
450 SafeExit();
451 }
452 return errCode;
453 }
454
RegOnSyncTask(const std::function<int (void)> & callback)455 void SyncTaskContext::RegOnSyncTask(const std::function<int(void)> &callback)
456 {
457 onSyncTaskAdd_ = callback;
458 }
459
IncUsedCount()460 int SyncTaskContext::IncUsedCount()
461 {
462 AutoLock lock(this);
463 if (IsKilled()) {
464 LOGI("[SyncTaskContext] IncUsedCount isKilled");
465 return -E_OBJ_IS_KILLED;
466 }
467 usedCount_++;
468 return E_OK;
469 }
470
SafeExit()471 void SyncTaskContext::SafeExit()
472 {
473 AutoLock lock(this);
474 usedCount_--;
475 if (usedCount_ < 1) {
476 safeKill_.notify_one();
477 }
478 }
479
GetCurrentLocalTime() const480 Timestamp SyncTaskContext::GetCurrentLocalTime() const
481 {
482 if (timeHelper_ == nullptr) {
483 return TimeHelper::INVALID_TIMESTAMP;
484 }
485 return timeHelper_->GetTime();
486 }
487
Abort(int status)488 void SyncTaskContext::Abort(int status)
489 {
490 (void)status;
491 Clear();
492 }
493
CommErrHandlerFunc(int errCode,ISyncTaskContext * context,int32_t sessionId,bool isDirectEnd)494 void SyncTaskContext::CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId, bool isDirectEnd)
495 {
496 {
497 std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
498 if (synTaskContextSet_.count(context) == 0) {
499 LOGI("[SyncTaskContext][CommErrHandle] context has been killed");
500 return;
501 }
502 // IncObjRef to maker sure context not been killed. after the lock_guard
503 RefObject::IncObjRef(context);
504 }
505
506 int ret = RuntimeContext::GetInstance()->ScheduleTask([context, errCode, sessionId, isDirectEnd]() {
507 static_cast<SyncTaskContext *>(context)->CommErrHandlerFuncInner(errCode, static_cast<uint32_t>(sessionId),
508 isDirectEnd);
509 RefObject::DecObjRef(context);
510 });
511 if (ret != E_OK) {
512 RefObject::DecObjRef(context);
513 }
514 }
515
SetRemoteSoftwareVersion(uint32_t version)516 void SyncTaskContext::SetRemoteSoftwareVersion(uint32_t version)
517 {
518 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
519 if (remoteSoftwareVersion_ == version) {
520 return;
521 }
522 remoteSoftwareVersion_ = version;
523 remoteSoftwareVersionId_++;
524 }
525
GetRemoteSoftwareVersion() const526 uint32_t SyncTaskContext::GetRemoteSoftwareVersion() const
527 {
528 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
529 return remoteSoftwareVersion_;
530 }
531
GetRemoteSoftwareVersionId() const532 uint64_t SyncTaskContext::GetRemoteSoftwareVersionId() const
533 {
534 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
535 return remoteSoftwareVersionId_;
536 }
537
IsCommNormal() const538 bool SyncTaskContext::IsCommNormal() const
539 {
540 return isCommNormal_;
541 }
542
CommErrHandlerFuncInner(int errCode,uint32_t sessionId,bool isDirectEnd)543 void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId, bool isDirectEnd)
544 {
545 {
546 RefObject::AutoLock lock(this);
547 if ((sessionId != requestSessionId_) || (requestSessionId_ == 0)) {
548 return;
549 }
550
551 if (errCode == E_OK) {
552 if (IsRetryTask()) {
553 SetCommFailErrCode(errCode);
554 }
555 // when communicator sent message failed, the state machine will get the error and exit this sync task
556 // it seems unnecessary to change isCommNormal_ value, so just return here
557 return;
558 }
559 }
560 LOGE("[SyncTaskContext][CommErr] errCode %d, isDirectEnd %d", errCode, static_cast<int>(isDirectEnd));
561 if (!isDirectEnd) {
562 SetErrCodeWhenWaitTimeOut(errCode);
563 return;
564 }
565 if (errCode > 0) {
566 SetCommFailErrCode(static_cast<int>(SyncOperation::OP_COMM_ABNORMAL));
567 } else {
568 SetCommFailErrCode(errCode);
569 }
570 stateMachine_->CommErrAbort(sessionId);
571 }
572
TimeOut(TimerId id)573 int SyncTaskContext::TimeOut(TimerId id)
574 {
575 if (!timeOutCallback_) {
576 return E_OK;
577 }
578 IncObjRef(this);
579 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, id]() {
580 timeOutCallback_(id);
581 DecObjRef(this);
582 });
583 if (errCode != E_OK) {
584 LOGW("[SyncTaskContext][TimeOut] Trigger TimeOut Async Failed! TimerId=" PRIu64 " errCode=%d", id, errCode);
585 DecObjRef(this);
586 }
587 return E_OK;
588 }
589
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)590 void SyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
591 {
592 retryTime_ = 0;
593 mode_ = target->GetMode();
594 status_ = SyncOperation::OP_SYNCING;
595 isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
596 taskErrCode_ = E_OK;
597 packetId_ = 0;
598 isCommNormal_ = true; // reset comm status here
599 commErrCode_ = E_OK;
600 syncTaskRetryStatus_ = isSyncRetry_;
601 timeout_ = static_cast<int>(taskParam.timeout);
602 negotiationCount_ = 0;
603 target->GetSyncOperation(syncOperation_);
604 ReSetSequenceId();
605
606 if (syncOperation_ != nullptr) {
607 // IncRef for syncOperation_ to make sure syncOperation_ is valid, when setStatus
608 RefObject::IncObjRef(syncOperation_);
609 syncId_ = syncOperation_->GetSyncId();
610 isAutoSync_ = syncOperation_->IsAutoSync();
611 isAutoSubscribe_ = syncOperation_->IsAutoControlCmd();
612 if (isAutoSync_ || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
613 syncTaskRetryStatus_ = true;
614 }
615 requestSessionId_ = GenerateRequestSessionId();
616 LOGI("[SyncTaskContext][copyTarget] mode=%d,syncId=%d,isAutoSync=%d,isRetry=%d,dev=%s{private}",
617 mode_, syncId_, isAutoSync_, syncTaskRetryStatus_, deviceId_.c_str());
618 DBDfxAdapter::StartAsyncTrace(syncActionName_, static_cast<int>(syncId_));
619 } else {
620 isAutoSync_ = false;
621 LOGI("[SyncTaskContext][copyTarget] for response data dev %s{private},isRetry=%d", deviceId_.c_str(),
622 syncTaskRetryStatus_);
623 }
624 }
625
KillWait()626 void SyncTaskContext::KillWait()
627 {
628 StopTimer();
629 UnlockObj();
630 stateMachine_->NotifyClosing();
631 stateMachine_->AbortImmediately();
632 LockObj();
633 LOGW("[SyncTaskContext] Try to kill a context, now wait.");
634 bool noDeadLock = WaitLockedUntil(
635 safeKill_,
636 [this]() {
637 if (usedCount_ < 1) {
638 return true;
639 }
640 return false;
641 },
642 KILL_WAIT_SECONDS);
643 if (!noDeadLock) { // LCOV_EXCL_BR_LINE
644 LOGE("[SyncTaskContext] Dead lock may happen, we stop waiting the task exit.");
645 } else {
646 LOGW("[SyncTaskContext] Wait the task exit ok.");
647 }
648 std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
649 synTaskContextSet_.erase(this);
650 }
651
ClearSyncOperation()652 void SyncTaskContext::ClearSyncOperation()
653 {
654 std::lock_guard<std::mutex> lock(operationLock_);
655 if (syncOperation_ != nullptr) {
656 DBDfxAdapter::FinishAsyncTrace(syncActionName_, static_cast<int>(syncId_));
657 RefObject::DecObjRef(syncOperation_);
658 syncOperation_ = nullptr;
659 }
660 }
661
CancelCurrentSyncRetryIfNeed(int newTargetMode,uint32_t syncId)662 void SyncTaskContext::CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId)
663 {
664 AutoLock lock(this);
665 if (!isAutoSync_) {
666 return;
667 }
668 if (syncId_ >= syncId) {
669 return;
670 }
671 int mode = SyncOperation::TransferSyncMode(newTargetMode);
672 if (newTargetMode == mode_ || mode == SyncModeType::PUSH_AND_PULL) {
673 SetRetryTime(AUTO_RETRY_TIMES);
674 ModifyTimer(timeout_);
675 }
676 }
677
GetTaskErrCode() const678 int SyncTaskContext::GetTaskErrCode() const
679 {
680 return taskErrCode_;
681 }
682
SetTaskErrCode(int errCode)683 void SyncTaskContext::SetTaskErrCode(int errCode)
684 {
685 taskErrCode_ = errCode;
686 }
687
IsSyncTaskNeedRetry() const688 bool SyncTaskContext::IsSyncTaskNeedRetry() const
689 {
690 return syncTaskRetryStatus_;
691 }
692
SetSyncRetry(bool isRetry)693 void SyncTaskContext::SetSyncRetry(bool isRetry)
694 {
695 isSyncRetry_ = isRetry;
696 }
697
GetSyncRetryTimes() const698 int SyncTaskContext::GetSyncRetryTimes() const
699 {
700 if (IsAutoSync() || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
701 return AUTO_RETRY_TIMES;
702 }
703 return MANUAL_RETRY_TIMES;
704 }
705
GetSyncRetryTimeout(int retryTime) const706 int SyncTaskContext::GetSyncRetryTimeout(int retryTime) const
707 {
708 int timeoutTime = GetTimeoutTime();
709 if (IsAutoSync()) {
710 // set the new timeout value with 2 raised to the power of retryTime.
711 return timeoutTime * (1u << retryTime);
712 }
713 return timeoutTime;
714 }
715
ClearAllSyncTask()716 void SyncTaskContext::ClearAllSyncTask()
717 {
718 }
719
IsAutoLiftWaterMark() const720 bool SyncTaskContext::IsAutoLiftWaterMark() const
721 {
722 return negotiationCount_ < NEGOTIATION_LIMIT;
723 }
724
IncNegotiationCount()725 void SyncTaskContext::IncNegotiationCount()
726 {
727 negotiationCount_++;
728 }
729
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)730 bool SyncTaskContext::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
731 {
732 return stateMachine_->IsNeedTriggerQueryAutoSync(inMsg, query);
733 }
734
IsAutoSubscribe() const735 bool SyncTaskContext::IsAutoSubscribe() const
736 {
737 return isAutoSubscribe_;
738 }
739
IsCurrentSyncTaskCanBeSkipped() const740 bool SyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
741 {
742 return false;
743 }
744
ResetLastPushTaskStatus()745 void SyncTaskContext::ResetLastPushTaskStatus()
746 {
747 }
748
SchemaChange()749 void SyncTaskContext::SchemaChange()
750 {
751 if (stateMachine_ != nullptr) {
752 stateMachine_->SchemaChange();
753 }
754 }
755
Dump(int fd)756 void SyncTaskContext::Dump(int fd)
757 {
758 size_t totalSyncTaskCount = 0u;
759 size_t autoSyncTaskCount = 0u;
760 size_t reponseTaskCount = 0u;
761 {
762 std::lock_guard<std::mutex> lock(targetQueueLock_);
763 totalSyncTaskCount = requestTargetQueue_.size() + responseTargetQueue_.size();
764 for (const auto &target : requestTargetQueue_) {
765 if (target->IsAutoSync()) { // LCOV_EXCL_BR_LINE
766 autoSyncTaskCount++;
767 }
768 }
769 reponseTaskCount = responseTargetQueue_.size();
770 }
771 DBDumpHelper::Dump(fd, "\t\ttarget = %s, total sync task count = %zu, auto sync task count = %zu,"
772 " response task count = %zu\n",
773 deviceId_.c_str(), totalSyncTaskCount, autoSyncTaskCount, reponseTaskCount);
774 }
775
RunPermissionCheck(uint8_t flag) const776 int SyncTaskContext::RunPermissionCheck(uint8_t flag) const
777 {
778 std::string appId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
779 std::string userId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
780 std::string storeId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
781 std::string subUserId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::SUB_USER, "");
782 int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
783 int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
784 { userId, appId, storeId, deviceId_, subUserId, instanceId }, flag);
785 if (errCode != E_OK) {
786 LOGE("[SyncTaskContext] RunPermissionCheck not pass errCode:%d, flag:%d, %s{private}",
787 errCode, flag, deviceId_.c_str());
788 }
789 return errCode;
790 }
791
GetPermissionCheckFlag(bool isAutoSync,int syncMode)792 uint8_t SyncTaskContext::GetPermissionCheckFlag(bool isAutoSync, int syncMode)
793 {
794 uint8_t flag = 0;
795 int mode = SyncOperation::TransferSyncMode(syncMode);
796 if (mode == SyncModeType::PUSH || mode == SyncModeType::RESPONSE_PULL) {
797 flag = CHECK_FLAG_SEND;
798 } else if (mode == SyncModeType::PULL) {
799 flag = CHECK_FLAG_RECEIVE;
800 } else if (mode == SyncModeType::PUSH_AND_PULL) {
801 flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE;
802 }
803 if (isAutoSync) {
804 flag = flag | CHECK_FLAG_AUTOSYNC;
805 }
806 if (mode != SyncModeType::RESPONSE_PULL) {
807 // it means this sync is started by local
808 flag = flag | CHECK_FLAG_SPONSOR;
809 }
810 return flag;
811 }
812
AbortMachineIfNeed(uint32_t syncId)813 void SyncTaskContext::AbortMachineIfNeed(uint32_t syncId)
814 {
815 uint32_t sessionId = 0u;
816 {
817 RefObject::AutoLock autoLock(this);
818 if (syncId_ != syncId) {
819 return;
820 }
821 sessionId = requestSessionId_;
822 }
823 stateMachine_->InnerErrorAbort(sessionId);
824 }
825
GetAndIncSyncOperation() const826 SyncOperation *SyncTaskContext::GetAndIncSyncOperation() const
827 {
828 std::lock_guard<std::mutex> lock(operationLock_);
829 if (syncOperation_ == nullptr) {
830 return nullptr;
831 }
832 RefObject::IncObjRef(syncOperation_);
833 return syncOperation_;
834 }
835
GenerateRequestSessionId()836 uint32_t SyncTaskContext::GenerateRequestSessionId()
837 {
838 uint32_t sessionId = lastRequestSessionId_ != 0 ? lastRequestSessionId_ + 1 : 0;
839 // make sure sessionId is between 0x01 and 0x8fffffff
840 if (sessionId > SESSION_ID_MAX_VALUE || sessionId == 0) {
841 sessionId = Hash::Hash32Func(deviceId_ + std::to_string(syncId_) +
842 std::to_string(TimeHelper::GetSysCurrentTime()));
843 }
844 lastRequestSessionId_ = sessionId;
845 return sessionId;
846 }
847
IsSchemaCompatible() const848 bool SyncTaskContext::IsSchemaCompatible() const
849 {
850 return true;
851 }
852
SetDbAbility(DbAbility & remoteDbAbility)853 void SyncTaskContext::SetDbAbility([[gnu::unused]] DbAbility &remoteDbAbility)
854 {
855 }
856
TimeChange()857 void SyncTaskContext::TimeChange()
858 {
859 if (stateMachine_ == nullptr) {
860 LOGW("[SyncTaskContext] machine is null when time change");
861 return;
862 }
863 stateMachine_->TimeChange();
864 }
865
GetResponseTaskCount()866 int32_t SyncTaskContext::GetResponseTaskCount()
867 {
868 std::lock_guard<std::mutex> autoLock(targetQueueLock_);
869 return static_cast<int32_t>(responseTargetQueue_.size());
870 }
871
GetCommErrCode() const872 int SyncTaskContext::GetCommErrCode() const
873 {
874 return commErrCode_;
875 }
876
SetCommFailErrCode(int errCode)877 void SyncTaskContext::SetCommFailErrCode(int errCode)
878 {
879 commErrCode_ = errCode;
880 }
881
SetErrCodeWhenWaitTimeOut(int errCode)882 void SyncTaskContext::SetErrCodeWhenWaitTimeOut(int errCode)
883 {
884 if (errCode > 0) {
885 SetCommFailErrCode(static_cast<int>(SyncOperation::OP_TIMEOUT));
886 } else {
887 SetCommFailErrCode(errCode);
888 }
889 }
890 } // namespace DistributedDB
891