• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_task_context.h"
17 
18 #include <algorithm>
19 #include <cmath>
20 
21 #include "db_constant.h"
22 #include "db_dump_helper.h"
23 #include "db_dfx_adapter.h"
24 #include "db_errno.h"
25 #include "hash.h"
26 #include "isync_state_machine.h"
27 #include "log_print.h"
28 #include "time_helper.h"
29 
30 namespace DistributedDB {
31 std::mutex SyncTaskContext::synTaskContextSetLock_;
32 std::set<ISyncTaskContext *> SyncTaskContext::synTaskContextSet_;
33 
34 namespace {
35     const int NEGOTIATION_LIMIT = 2;
36 }
37 
SyncTaskContext()38 SyncTaskContext::SyncTaskContext()
39     : syncOperation_(nullptr),
40       syncId_(0),
41       mode_(0),
42       isAutoSync_(false),
43       status_(0),
44       taskExecStatus_(0),
45       syncInterface_(nullptr),
46       communicator_(nullptr),
47       stateMachine_(nullptr),
48       requestSessionId_(0),
49       lastRequestSessionId_(0),
50       timeHelper_(nullptr),
51       remoteSoftwareVersion_(0),
52       remoteSoftwareVersionId_(0),
53       isCommNormal_(true),
54       taskErrCode_(E_OK),
55       syncTaskRetryStatus_(false),
56       isSyncRetry_(false),
57       negotiationCount_(0),
58       isAutoSubscribe_(false),
59       isNeedResetAbilitySync_(false)
60 {
61 }
62 
~SyncTaskContext()63 SyncTaskContext::~SyncTaskContext()
64 {
65     if (stateMachine_ != nullptr) {
66         delete stateMachine_;
67         stateMachine_ = nullptr;
68     }
69     ClearSyncOperation();
70     ClearSyncTarget();
71     syncInterface_ = nullptr;
72     communicator_ = nullptr;
73 }
74 
AddSyncTarget(ISyncTarget * target)75 int SyncTaskContext::AddSyncTarget(ISyncTarget *target)
76 {
77     if (target == nullptr) {
78         return -E_INVALID_ARGS;
79     }
80     int targetMode = target->GetMode();
81     {
82         std::lock_guard<std::mutex> lock(targetQueueLock_);
83         if (target->GetTaskType() == ISyncTarget::REQUEST) {
84             requestTargetQueue_.push_back(target);
85         } else if (target->GetTaskType() == ISyncTarget::RESPONSE) {
86             responseTargetQueue_.push_back(target);
87         } else {
88             return -E_INVALID_ARGS;
89         }
90     }
91     CancelCurrentSyncRetryIfNeed(targetMode);
92     if (taskExecStatus_ == RUNNING) {
93         return E_OK;
94     }
95     if (onSyncTaskAdd_) {
96         RefObject::IncObjRef(this);
97         int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
98             onSyncTaskAdd_();
99             RefObject::DecObjRef(this);
100         });
101         if (errCode != E_OK) {
102             RefObject::DecObjRef(this);
103         }
104     }
105     return E_OK;
106 }
107 
SetOperationStatus(int status)108 void SyncTaskContext::SetOperationStatus(int status)
109 {
110     std::lock_guard<std::mutex> lock(operationLock_);
111     if (syncOperation_ == nullptr) {
112         LOGD("[SyncTaskContext][SetStatus] syncOperation is null");
113         return;
114     }
115     int finalStatus = status;
116 
117     int operationStatus = syncOperation_->GetStatus(deviceId_);
118     if (status == SyncOperation::OP_SEND_FINISHED && operationStatus == SyncOperation::OP_RECV_FINISHED) {
119         if (GetTaskErrCode() == -E_EKEYREVOKED) {
120             finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
121         } else {
122             finalStatus = SyncOperation::OP_FINISHED_ALL;
123         }
124     } else if (status == SyncOperation::OP_RECV_FINISHED && operationStatus == SyncOperation::OP_SEND_FINISHED) {
125         if (GetTaskErrCode() == -E_EKEYREVOKED) {
126             finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
127         } else {
128             finalStatus = SyncOperation::OP_FINISHED_ALL;
129         }
130     }
131     syncOperation_->SetStatus(deviceId_, finalStatus);
132     if (finalStatus >= SyncOperation::OP_FINISHED_ALL) {
133         SaveLastPushTaskExecStatus(finalStatus);
134     }
135     if (syncOperation_->CheckIsAllFinished()) {
136         syncOperation_->Finished();
137     }
138 }
139 
SaveLastPushTaskExecStatus(int finalStatus)140 void SyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
141 {
142     (void)finalStatus;
143 }
144 
Clear()145 void SyncTaskContext::Clear()
146 {
147     StopTimer();
148     retryTime_ = 0;
149     sequenceId_ = 1;
150     syncId_ = 0;
151     isAutoSync_ = false;
152     requestSessionId_ = 0;
153     isNeedRetry_ = NO_NEED_RETRY;
154     mode_ = SyncModeType::INVALID_MODE;
155     status_ = SyncOperation::OP_WAITING;
156     taskErrCode_ = E_OK;
157     packetId_ = 0;
158     isAutoSubscribe_ = false;
159 }
160 
RemoveSyncOperation(int syncId)161 int SyncTaskContext::RemoveSyncOperation(int syncId)
162 {
163     std::lock_guard<std::mutex> lock(targetQueueLock_);
164     auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
165         [syncId](const ISyncTarget *target) {
166             if (target == nullptr) {
167                 return false;
168             }
169             return target->GetSyncId() == syncId;
170         });
171     if (iter != requestTargetQueue_.end()) {
172         if (*iter != nullptr) {
173             delete *iter;
174             *iter = nullptr;
175         }
176         requestTargetQueue_.erase(iter);
177         return E_OK;
178     }
179     return -E_INVALID_ARGS;
180 }
181 
ClearSyncTarget()182 void SyncTaskContext::ClearSyncTarget()
183 {
184     std::lock_guard<std::mutex> lock(targetQueueLock_);
185     for (auto &requestTarget : requestTargetQueue_) {
186         if (requestTarget != nullptr) {
187             delete requestTarget;
188             requestTarget = nullptr;
189         }
190     }
191     requestTargetQueue_.clear();
192 
193     for (auto &responseTarget : responseTargetQueue_) {
194         if (responseTarget != nullptr) {
195             delete responseTarget;
196             responseTarget = nullptr;
197         }
198     }
199     responseTargetQueue_.clear();
200 }
201 
IsTargetQueueEmpty() const202 bool SyncTaskContext::IsTargetQueueEmpty() const
203 {
204     std::lock_guard<std::mutex> lock(targetQueueLock_);
205     return requestTargetQueue_.empty() && responseTargetQueue_.empty();
206 }
207 
GetOperationStatus() const208 int SyncTaskContext::GetOperationStatus() const
209 {
210     std::lock_guard<std::mutex> lock(operationLock_);
211     if (syncOperation_ == nullptr) {
212         return SyncOperation::OP_FINISHED_ALL;
213     }
214     return syncOperation_->GetStatus(deviceId_);
215 }
216 
SetMode(int mode)217 void SyncTaskContext::SetMode(int mode)
218 {
219     mode_ = mode;
220 }
221 
GetMode() const222 int SyncTaskContext::GetMode() const
223 {
224     return mode_;
225 }
226 
MoveToNextTarget()227 void SyncTaskContext::MoveToNextTarget()
228 {
229     ClearSyncOperation();
230     TaskParam param;
231     // call other system api without lock
232     param.timeout = communicator_->GetTimeout(deviceId_);
233     std::lock_guard<std::mutex> lock(targetQueueLock_);
234     while (!requestTargetQueue_.empty() || !responseTargetQueue_.empty()) {
235         ISyncTarget *tmpTarget = nullptr;
236         if (!requestTargetQueue_.empty()) {
237             tmpTarget = requestTargetQueue_.front();
238             requestTargetQueue_.pop_front();
239         } else {
240             tmpTarget = responseTargetQueue_.front();
241             responseTargetQueue_.pop_front();
242         }
243         if (tmpTarget == nullptr) {
244             LOGE("[SyncTaskContext][MoveToNextTarget] currentTarget is null skip!");
245             continue;
246         }
247         SyncOperation *tmpOperation = nullptr;
248         tmpTarget->GetSyncOperation(tmpOperation);
249         if ((tmpOperation != nullptr) && tmpOperation->IsKilled()) {
250             // if killed skip this syncOperation_.
251             delete tmpTarget;
252             tmpTarget = nullptr;
253             continue;
254         }
255         CopyTargetData(tmpTarget, param);
256         delete tmpTarget;
257         tmpTarget = nullptr;
258         break;
259     }
260 }
261 
GetNextTarget(bool isNeedSetFinished)262 int SyncTaskContext::GetNextTarget(bool isNeedSetFinished)
263 {
264     MoveToNextTarget();
265     int checkErrCode = RunPermissionCheck(GetPermissionCheckFlag(IsAutoSync(), GetMode()));
266     if (checkErrCode != E_OK) {
267         SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
268         if (isNeedSetFinished) {
269             SetTaskExecStatus(ISyncTaskContext::FINISHED);
270         }
271         return checkErrCode;
272     }
273     return E_OK;
274 }
275 
GetSyncId() const276 uint32_t SyncTaskContext::GetSyncId() const
277 {
278     return syncId_;
279 }
280 
281 // Get the current task deviceId.
GetDeviceId() const282 std::string SyncTaskContext::GetDeviceId() const
283 {
284     return deviceId_;
285 }
286 
SetTaskExecStatus(int status)287 void SyncTaskContext::SetTaskExecStatus(int status)
288 {
289     taskExecStatus_ = status;
290 }
291 
GetTaskExecStatus() const292 int SyncTaskContext::GetTaskExecStatus() const
293 {
294     return taskExecStatus_;
295 }
296 
IsAutoSync() const297 bool SyncTaskContext::IsAutoSync() const
298 {
299     return isAutoSync_;
300 }
301 
StartTimer()302 int SyncTaskContext::StartTimer()
303 {
304     std::lock_guard<std::mutex> lockGuard(timerLock_);
305     if (timerId_ > 0) {
306         return -E_UNEXPECTED_DATA;
307     }
308     TimerId timerId = 0;
309     RefObject::IncObjRef(this);
310     TimerAction timeOutCallback = std::bind(&SyncTaskContext::TimeOut, this, std::placeholders::_1);
311     int errCode = RuntimeContext::GetInstance()->SetTimer(timeout_, timeOutCallback,
312         [this]() {
313             int ret = RuntimeContext::GetInstance()->ScheduleTask([this](){ RefObject::DecObjRef(this); });
314             if (ret != E_OK) {
315                 LOGE("[SyncTaskContext] timer finalizer ScheduleTask, errCode %d", ret);
316             }
317         }, timerId);
318     if (errCode != E_OK) {
319         RefObject::DecObjRef(this);
320         return errCode;
321     }
322     timerId_ = timerId;
323     return errCode;
324 }
325 
StopTimer()326 void SyncTaskContext::StopTimer()
327 {
328     TimerId timerId;
329     {
330         std::lock_guard<std::mutex> lockGuard(timerLock_);
331         timerId = timerId_;
332         if (timerId_ == 0) {
333             return;
334         }
335         timerId_ = 0;
336     }
337     RuntimeContext::GetInstance()->RemoveTimer(timerId);
338 }
339 
ModifyTimer(int milliSeconds)340 int SyncTaskContext::ModifyTimer(int milliSeconds)
341 {
342     std::lock_guard<std::mutex> lockGuard(timerLock_);
343     if (timerId_ == 0) {
344         return -E_UNEXPECTED_DATA;
345     }
346     return RuntimeContext::GetInstance()->ModifyTimer(timerId_, milliSeconds);
347 }
348 
SetRetryTime(int retryTime)349 void SyncTaskContext::SetRetryTime(int retryTime)
350 {
351     retryTime_ = retryTime;
352 }
353 
GetRetryTime() const354 int SyncTaskContext::GetRetryTime() const
355 {
356     return retryTime_;
357 }
358 
SetRetryStatus(int isNeedRetry)359 void SyncTaskContext::SetRetryStatus(int isNeedRetry)
360 {
361     isNeedRetry_ = isNeedRetry;
362 }
363 
GetRetryStatus() const364 int SyncTaskContext::GetRetryStatus() const
365 {
366     return isNeedRetry_;
367 }
368 
GetTimerId() const369 TimerId SyncTaskContext::GetTimerId() const
370 {
371     return timerId_;
372 }
373 
GetRequestSessionId() const374 uint32_t SyncTaskContext::GetRequestSessionId() const
375 {
376     return requestSessionId_;
377 }
378 
IncSequenceId()379 void SyncTaskContext::IncSequenceId()
380 {
381     sequenceId_++;
382 }
383 
GetSequenceId() const384 uint32_t SyncTaskContext::GetSequenceId() const
385 {
386     return sequenceId_;
387 }
388 
ReSetSequenceId()389 void SyncTaskContext::ReSetSequenceId()
390 {
391     sequenceId_ = 1;
392 }
393 
IncPacketId()394 void SyncTaskContext::IncPacketId()
395 {
396     packetId_++;
397 }
398 
GetPacketId() const399 uint64_t SyncTaskContext::GetPacketId() const
400 {
401     return packetId_;
402 }
403 
GetTimeoutTime() const404 int SyncTaskContext::GetTimeoutTime() const
405 {
406     return timeout_;
407 }
408 
SetTimeoutCallback(const TimerAction & timeOutCallback)409 void SyncTaskContext::SetTimeoutCallback(const TimerAction &timeOutCallback)
410 {
411     timeOutCallback_ = timeOutCallback;
412 }
413 
SetTimeOffset(TimeOffset offset)414 void SyncTaskContext::SetTimeOffset(TimeOffset offset)
415 {
416     timeOffset_ = offset;
417 }
418 
GetTimeOffset() const419 TimeOffset SyncTaskContext::GetTimeOffset() const
420 {
421     return timeOffset_;
422 }
423 
StartStateMachine()424 int SyncTaskContext::StartStateMachine()
425 {
426     return stateMachine_->StartSync();
427 }
428 
ReceiveMessageCallback(Message * inMsg)429 int SyncTaskContext::ReceiveMessageCallback(Message *inMsg)
430 {
431     int errCode = E_OK;
432     if (IncUsedCount() == E_OK) {
433         errCode = stateMachine_->ReceiveMessageCallback(inMsg);
434         SafeExit();
435     }
436     return errCode;
437 }
438 
RegOnSyncTask(const std::function<int (void)> & callback)439 void SyncTaskContext::RegOnSyncTask(const std::function<int(void)> &callback)
440 {
441     onSyncTaskAdd_ = callback;
442 }
443 
IncUsedCount()444 int SyncTaskContext::IncUsedCount()
445 {
446     AutoLock lock(this);
447     if (IsKilled()) {
448         LOGI("[SyncTaskContext] IncUsedCount isKilled");
449         return -E_OBJ_IS_KILLED;
450     }
451     usedCount_++;
452     return E_OK;
453 }
454 
SafeExit()455 void SyncTaskContext::SafeExit()
456 {
457     AutoLock lock(this);
458     usedCount_--;
459     if (usedCount_ < 1) {
460         safeKill_.notify_one();
461     }
462 }
463 
GetCurrentLocalTime() const464 Timestamp SyncTaskContext::GetCurrentLocalTime() const
465 {
466     if (timeHelper_ == nullptr) {
467         return TimeHelper::INVALID_TIMESTAMP;
468     }
469     return timeHelper_->GetTime();
470 }
471 
Abort(int status)472 void SyncTaskContext::Abort(int status)
473 {
474     (void)status;
475     Clear();
476 }
477 
CommErrHandlerFunc(int errCode,ISyncTaskContext * context,int32_t sessionId)478 void SyncTaskContext::CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId)
479 {
480     {
481         std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
482         if (synTaskContextSet_.count(context) == 0) {
483             LOGI("[SyncTaskContext][CommErrHandle] context has been killed");
484             return;
485         }
486 
487         // IncObjRef to maker sure context not been killed. after the lock_guard
488         RefObject::IncObjRef(context);
489     }
490 
491     static_cast<SyncTaskContext *>(context)->CommErrHandlerFuncInner(errCode, static_cast<uint32_t>(sessionId));
492     RefObject::DecObjRef(context);
493 }
494 
SetRemoteSoftwareVersion(uint32_t version)495 void SyncTaskContext::SetRemoteSoftwareVersion(uint32_t version)
496 {
497     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
498     remoteSoftwareVersion_ = version;
499     remoteSoftwareVersionId_++;
500 }
501 
GetRemoteSoftwareVersion() const502 uint32_t SyncTaskContext::GetRemoteSoftwareVersion() const
503 {
504     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
505     return remoteSoftwareVersion_;
506 }
507 
GetRemoteSoftwareVersionId() const508 uint64_t SyncTaskContext::GetRemoteSoftwareVersionId() const
509 {
510     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
511     return remoteSoftwareVersionId_;
512 }
513 
IsCommNormal() const514 bool SyncTaskContext::IsCommNormal() const
515 {
516     return isCommNormal_;
517 }
518 
CommErrHandlerFuncInner(int errCode,uint32_t sessionId)519 void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId)
520 {
521     {
522         RefObject::AutoLock lock(this);
523         if ((sessionId != requestSessionId_) || (requestSessionId_ == 0)) {
524             return;
525         }
526 
527         if (errCode == E_OK) {
528             // when communicator sent message failed, the state machine will get the error and exit this sync task
529             // it seems unnecessary to change isCommNormal_ value, so just return here
530             return;
531         }
532     }
533     LOGE("[SyncTaskContext][CommErr] errCode %d", errCode);
534     stateMachine_->CommErrAbort(sessionId);
535 }
536 
TimeOut(TimerId id)537 int SyncTaskContext::TimeOut(TimerId id)
538 {
539     if (!timeOutCallback_) {
540         return E_OK;
541     }
542     int errCode = IncUsedCount();
543     if (errCode != E_OK) {
544         LOGW("[SyncTaskContext][TimeOut] IncUsedCount failed! errCode=", errCode);
545         // if return is not E_OK, the timer will be removed
546         // we removed timer when context call StopTimer
547         return E_OK;
548     }
549     IncObjRef(this);
550     errCode = RuntimeContext::GetInstance()->ScheduleTask([this, id]() {
551         timeOutCallback_(id);
552         SafeExit();
553         DecObjRef(this);
554     });
555     if (errCode != E_OK) {
556         LOGW("[SyncTaskContext][Timeout] Trigger Timeout Async Failed! TimerId=" PRIu64 " errCode=%d", id, errCode);
557         SafeExit();
558         DecObjRef(this);
559     }
560     return E_OK;
561 }
562 
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)563 void SyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
564 {
565     mode_ = target->GetMode();
566     status_ = SyncOperation::OP_SYNCING;
567     isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
568     taskErrCode_ = E_OK;
569     packetId_ = 0;
570     isCommNormal_ = true; // reset comm status here
571     syncTaskRetryStatus_ = isSyncRetry_;
572     timeout_ = static_cast<int>(taskParam.timeout);
573     negotiationCount_ = 0;
574     target->GetSyncOperation(syncOperation_);
575     ReSetSequenceId();
576 
577     if (syncOperation_ != nullptr) {
578         // IncRef for syncOperation_ to make sure syncOperation_ is valid, when setStatus
579         RefObject::IncObjRef(syncOperation_);
580         syncId_ = syncOperation_->GetSyncId();
581         isAutoSync_ = syncOperation_->IsAutoSync();
582         isAutoSubscribe_ = syncOperation_->IsAutoControlCmd();
583         if (isAutoSync_ || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
584             syncTaskRetryStatus_ = true;
585         }
586         requestSessionId_ = Hash::Hash32Func(deviceId_ + std::to_string(syncId_) +
587             std::to_string(TimeHelper::GetSysCurrentTime()));
588         if (lastRequestSessionId_ == requestSessionId_) {
589             // Hash32Func max is 0x7fffffff and UINT32_MAX is 0xffffffff
590             requestSessionId_++;
591             LOGI("last sessionId is equal to this sessionId!");
592         }
593         LOGI("[SyncTaskContext][copyTarget] mode=%d,syncId=%d,isAutoSync=%d,isRetry=%d,dev=%s{private}",
594             mode_, syncId_, isAutoSync_, syncTaskRetryStatus_, deviceId_.c_str());
595         lastRequestSessionId_ = requestSessionId_;
596         DBDfxAdapter::StartAsyncTrace(syncActionName_, static_cast<int>(syncId_));
597     } else {
598         isAutoSync_ = false;
599         LOGI("[SyncTaskContext][copyTarget] for response data dev %s{private},isRetry=%d", deviceId_.c_str(),
600             syncTaskRetryStatus_);
601     }
602 }
603 
KillWait()604 void SyncTaskContext::KillWait()
605 {
606     StopTimer();
607     stateMachine_->NotifyClosing();
608     UnlockObj();
609     stateMachine_->AbortImmediately();
610     LockObj();
611     LOGW("[SyncTaskContext] Try to kill a context, now wait.");
612     bool noDeadLock = WaitLockedUntil(
613         safeKill_,
614         [this]() {
615             if (usedCount_ < 1) {
616                 return true;
617             }
618             return false;
619         },
620         KILL_WAIT_SECONDS);
621     if (!noDeadLock) {
622         LOGE("[SyncTaskContext] Dead lock may happen, we stop waiting the task exit.");
623     } else {
624         LOGW("[SyncTaskContext] Wait the task exit ok.");
625     }
626     std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
627     synTaskContextSet_.erase(this);
628 }
629 
ClearSyncOperation()630 void SyncTaskContext::ClearSyncOperation()
631 {
632     std::lock_guard<std::mutex> lock(operationLock_);
633     if (syncOperation_ != nullptr) {
634         DBDfxAdapter::FinishAsyncTrace(syncActionName_, static_cast<int>(syncId_));
635         RefObject::DecObjRef(syncOperation_);
636         syncOperation_ = nullptr;
637     }
638 }
639 
CancelCurrentSyncRetryIfNeed(int newTargetMode)640 void SyncTaskContext::CancelCurrentSyncRetryIfNeed(int newTargetMode)
641 {
642     AutoLock(this);
643     if (!isAutoSync_) {
644         return;
645     }
646     int mode = SyncOperation::TransferSyncMode(newTargetMode);
647     if (newTargetMode == mode_ || mode == SyncModeType::PUSH_AND_PULL) {
648         SetRetryTime(AUTO_RETRY_TIMES);
649         ModifyTimer(timeout_);
650     }
651 }
652 
GetTaskErrCode() const653 int SyncTaskContext::GetTaskErrCode() const
654 {
655     return taskErrCode_;
656 }
657 
SetTaskErrCode(int errCode)658 void SyncTaskContext::SetTaskErrCode(int errCode)
659 {
660     taskErrCode_ = errCode;
661 }
662 
IsSyncTaskNeedRetry() const663 bool SyncTaskContext::IsSyncTaskNeedRetry() const
664 {
665     return syncTaskRetryStatus_;
666 }
667 
SetSyncRetry(bool isRetry)668 void SyncTaskContext::SetSyncRetry(bool isRetry)
669 {
670     isSyncRetry_ = isRetry;
671 }
672 
GetSyncRetryTimes() const673 int SyncTaskContext::GetSyncRetryTimes() const
674 {
675     if (IsAutoSync() || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
676         return AUTO_RETRY_TIMES;
677     }
678     return MANUAL_RETRY_TIMES;
679 }
680 
GetSyncRetryTimeout(int retryTime) const681 int SyncTaskContext::GetSyncRetryTimeout(int retryTime) const
682 {
683     int timeoutTime = GetTimeoutTime();
684     if (IsAutoSync()) {
685         // set the new timeout value with 2 raised to the power of retryTime.
686         return timeoutTime * static_cast<int>(pow(2, retryTime));
687     }
688     return timeoutTime;
689 }
690 
ClearAllSyncTask()691 void SyncTaskContext::ClearAllSyncTask()
692 {
693 }
694 
IsAutoLiftWaterMark() const695 bool SyncTaskContext::IsAutoLiftWaterMark() const
696 {
697     return negotiationCount_ < NEGOTIATION_LIMIT;
698 }
699 
IncNegotiationCount()700 void SyncTaskContext::IncNegotiationCount()
701 {
702     negotiationCount_++;
703 }
704 
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)705 bool SyncTaskContext::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
706 {
707     return stateMachine_->IsNeedTriggerQueryAutoSync(inMsg, query);
708 }
709 
IsAutoSubscribe() const710 bool SyncTaskContext::IsAutoSubscribe() const
711 {
712     return isAutoSubscribe_;
713 }
714 
GetIsNeedResetAbilitySync() const715 bool SyncTaskContext::GetIsNeedResetAbilitySync() const
716 {
717     return isNeedResetAbilitySync_;
718 }
719 
SetIsNeedResetAbilitySync(bool isNeedReset)720 void SyncTaskContext::SetIsNeedResetAbilitySync(bool isNeedReset)
721 {
722     isNeedResetAbilitySync_ = isNeedReset;
723 }
724 
IsCurrentSyncTaskCanBeSkipped() const725 bool SyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
726 {
727     return false;
728 }
729 
ResetLastPushTaskStatus()730 void SyncTaskContext::ResetLastPushTaskStatus()
731 {
732 }
733 
SchemaChange()734 void SyncTaskContext::SchemaChange()
735 {
736     SetIsNeedResetAbilitySync(true);
737 }
738 
Dump(int fd)739 void SyncTaskContext::Dump(int fd)
740 {
741     size_t totalSyncTaskCount = 0u;
742     size_t autoSyncTaskCount = 0u;
743     size_t reponseTaskCount = 0u;
744     {
745         std::lock_guard<std::mutex> lock(targetQueueLock_);
746         totalSyncTaskCount = requestTargetQueue_.size() + responseTargetQueue_.size();
747         for (const auto &target : requestTargetQueue_) {
748             if (target->IsAutoSync()) {
749                 autoSyncTaskCount++;
750             }
751         }
752         reponseTaskCount = responseTargetQueue_.size();
753     }
754     DBDumpHelper::Dump(fd, "\t\ttarget = %s, total sync task count = %zu, auto sync task count = %zu,"
755         " response task count = %zu\n",
756         deviceId_.c_str(), totalSyncTaskCount, autoSyncTaskCount, reponseTaskCount);
757 }
758 
RunPermissionCheck(uint8_t flag) const759 int SyncTaskContext::RunPermissionCheck(uint8_t flag) const
760 {
761     std::string appId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
762     std::string userId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
763     std::string storeId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
764     int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
765     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
766         { userId, appId, storeId, deviceId_, instanceId }, flag);
767     if (errCode != E_OK) {
768         LOGE("[SyncTaskContext] RunPermissionCheck not pass errCode:%d, flag:%d, %s{private}",
769             errCode, flag, deviceId_.c_str());
770     }
771     return errCode;
772 }
773 
GetPermissionCheckFlag(bool isAutoSync,int syncMode)774 uint8_t SyncTaskContext::GetPermissionCheckFlag(bool isAutoSync, int syncMode)
775 {
776     uint8_t flag = 0;
777     int mode = SyncOperation::TransferSyncMode(syncMode);
778     if (mode == SyncModeType::PUSH || mode == SyncModeType::RESPONSE_PULL) {
779         flag = CHECK_FLAG_SEND;
780     } else if (mode == SyncModeType::PULL) {
781         flag = CHECK_FLAG_RECEIVE;
782     } else if (mode == SyncModeType::PUSH_AND_PULL) {
783         flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE;
784     }
785     if (isAutoSync) {
786         flag = flag | CHECK_FLAG_AUTOSYNC;
787     }
788     if (mode != SyncModeType::RESPONSE_PULL) {
789         // it means this sync is started by local
790         flag = flag | CHECK_FLAG_SPONSOR;
791     }
792     return flag;
793 }
794 
AbortMachineIfNeed(uint32_t syncId)795 void SyncTaskContext::AbortMachineIfNeed(uint32_t syncId)
796 {
797     uint32_t sessionId = 0u;
798     {
799         RefObject::AutoLock autoLock(this);
800         if (syncId_ != syncId) {
801             return;
802         }
803         sessionId = requestSessionId_;
804     }
805     stateMachine_->InnerErrorAbort(sessionId);
806 }
807 
GetAndIncSyncOperation() const808 SyncOperation *SyncTaskContext::GetAndIncSyncOperation() const
809 {
810     std::lock_guard<std::mutex> lock(operationLock_);
811     if (syncOperation_ == nullptr) {
812         return nullptr;
813     }
814     RefObject::IncObjRef(syncOperation_);
815     return syncOperation_;
816 }
817 } // namespace DistributedDB
818