• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_task_context.h"
17 
18 #include <algorithm>
19 #include <cmath>
20 
21 #include "db_constant.h"
22 #include "db_dump_helper.h"
23 #include "db_dfx_adapter.h"
24 #include "db_errno.h"
25 #include "hash.h"
26 #include "isync_state_machine.h"
27 #include "log_print.h"
28 #include "time_helper.h"
29 
30 namespace DistributedDB {
31 std::mutex SyncTaskContext::synTaskContextSetLock_;
32 std::set<ISyncTaskContext *> SyncTaskContext::synTaskContextSet_;
33 
34 namespace {
35     constexpr int NEGOTIATION_LIMIT = 2;
36     constexpr uint32_t SESSION_ID_MAX_VALUE = 0x8fffffffu;
37 }
38 
SyncTaskContext()39 SyncTaskContext::SyncTaskContext()
40     : syncOperation_(nullptr),
41       syncId_(0),
42       mode_(0),
43       isAutoSync_(false),
44       status_(0),
45       taskExecStatus_(0),
46       syncInterface_(nullptr),
47       communicator_(nullptr),
48       stateMachine_(nullptr),
49       requestSessionId_(0),
50       lastRequestSessionId_(0),
51       timeHelper_(nullptr),
52       remoteSoftwareVersion_(0),
53       remoteSoftwareVersionId_(0),
54       isCommNormal_(true),
55       taskErrCode_(E_OK),
56       syncTaskRetryStatus_(false),
57       isSyncRetry_(false),
58       negotiationCount_(0),
59       isAutoSubscribe_(false),
60       isNeedResetAbilitySync_(false)
61 {
62 }
63 
~SyncTaskContext()64 SyncTaskContext::~SyncTaskContext()
65 {
66     if (stateMachine_ != nullptr) {
67         delete stateMachine_;
68         stateMachine_ = nullptr;
69     }
70     SyncTaskContext::ClearSyncOperation();
71     ClearSyncTarget();
72     syncInterface_ = nullptr;
73     communicator_ = nullptr;
74 }
75 
AddSyncTarget(ISyncTarget * target)76 int SyncTaskContext::AddSyncTarget(ISyncTarget *target)
77 {
78     if (target == nullptr) {
79         return -E_INVALID_ARGS;
80     }
81     int targetMode = target->GetMode();
82     auto syncId = static_cast<uint32_t>(target->GetSyncId());
83     {
84         std::lock_guard<std::mutex> lock(targetQueueLock_);
85         if (target->GetTaskType() == ISyncTarget::REQUEST) {
86             requestTargetQueue_.push_back(target);
87         } else if (target->GetTaskType() == ISyncTarget::RESPONSE) {
88             responseTargetQueue_.push_back(target);
89         } else {
90             return -E_INVALID_ARGS;
91         }
92     }
93     RefObject::IncObjRef(this);
94     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetMode, syncId]() {
95         CancelCurrentSyncRetryIfNeed(targetMode, syncId);
96         RefObject::DecObjRef(this);
97     });
98     if (errCode != E_OK) {
99         RefObject::DecObjRef(this);
100     }
101     if (onSyncTaskAdd_) {
102         RefObject::IncObjRef(this);
103         errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
104             onSyncTaskAdd_();
105             RefObject::DecObjRef(this);
106         });
107         if (errCode != E_OK) {
108             RefObject::DecObjRef(this);
109         }
110     }
111     return E_OK;
112 }
113 
SetOperationStatus(int status)114 void SyncTaskContext::SetOperationStatus(int status)
115 {
116     std::lock_guard<std::mutex> lock(operationLock_);
117     if (syncOperation_ == nullptr) {
118         LOGD("[SyncTaskContext][SetStatus] syncOperation is null");
119         return;
120     }
121     int finalStatus = status;
122 
123     int operationStatus = syncOperation_->GetStatus(deviceId_);
124     if (status == SyncOperation::OP_SEND_FINISHED && operationStatus == SyncOperation::OP_RECV_FINISHED) {
125         if (GetTaskErrCode() == -E_EKEYREVOKED) {
126             finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
127         } else {
128             finalStatus = SyncOperation::OP_FINISHED_ALL;
129         }
130     } else if (status == SyncOperation::OP_RECV_FINISHED && operationStatus == SyncOperation::OP_SEND_FINISHED) {
131         if (GetTaskErrCode() == -E_EKEYREVOKED) {
132             finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
133         } else {
134             finalStatus = SyncOperation::OP_FINISHED_ALL;
135         }
136     }
137     syncOperation_->SetStatus(deviceId_, finalStatus);
138     if (finalStatus >= SyncOperation::OP_FINISHED_ALL) {
139         SaveLastPushTaskExecStatus(finalStatus);
140     }
141     if (syncOperation_->CheckIsAllFinished()) {
142         syncOperation_->Finished();
143     }
144 }
145 
SaveLastPushTaskExecStatus(int finalStatus)146 void SyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
147 {
148     (void)finalStatus;
149 }
150 
Clear()151 void SyncTaskContext::Clear()
152 {
153     StopTimer();
154     retryTime_ = 0;
155     sequenceId_ = 1;
156     syncId_ = 0;
157     isAutoSync_ = false;
158     requestSessionId_ = 0;
159     isNeedRetry_ = NO_NEED_RETRY;
160     mode_ = SyncModeType::INVALID_MODE;
161     status_ = SyncOperation::OP_WAITING;
162     taskErrCode_ = E_OK;
163     packetId_ = 0;
164     isAutoSubscribe_ = false;
165 }
166 
RemoveSyncOperation(int syncId)167 int SyncTaskContext::RemoveSyncOperation(int syncId)
168 {
169     std::lock_guard<std::mutex> lock(targetQueueLock_);
170     auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
171         [syncId](const ISyncTarget *target) {
172             if (target == nullptr) {
173                 return false;
174             }
175             return target->GetSyncId() == syncId;
176         });
177     if (iter != requestTargetQueue_.end()) {
178         if (*iter != nullptr) {
179             delete *iter;
180             *iter = nullptr;
181         }
182         requestTargetQueue_.erase(iter);
183         return E_OK;
184     }
185     return -E_INVALID_ARGS;
186 }
187 
ClearSyncTarget()188 void SyncTaskContext::ClearSyncTarget()
189 {
190     std::lock_guard<std::mutex> lock(targetQueueLock_);
191     for (auto &requestTarget : requestTargetQueue_) {
192         if (requestTarget != nullptr) {
193             delete requestTarget;
194             requestTarget = nullptr;
195         }
196     }
197     requestTargetQueue_.clear();
198 
199     for (auto &responseTarget : responseTargetQueue_) {
200         if (responseTarget != nullptr) {
201             delete responseTarget;
202             responseTarget = nullptr;
203         }
204     }
205     responseTargetQueue_.clear();
206 }
207 
IsTargetQueueEmpty() const208 bool SyncTaskContext::IsTargetQueueEmpty() const
209 {
210     std::lock_guard<std::mutex> lock(targetQueueLock_);
211     return requestTargetQueue_.empty() && responseTargetQueue_.empty();
212 }
213 
GetOperationStatus() const214 int SyncTaskContext::GetOperationStatus() const
215 {
216     std::lock_guard<std::mutex> lock(operationLock_);
217     if (syncOperation_ == nullptr) {
218         return SyncOperation::OP_FINISHED_ALL;
219     }
220     return syncOperation_->GetStatus(deviceId_);
221 }
222 
SetMode(int mode)223 void SyncTaskContext::SetMode(int mode)
224 {
225     mode_ = mode;
226 }
227 
GetMode() const228 int SyncTaskContext::GetMode() const
229 {
230     return mode_;
231 }
232 
MoveToNextTarget()233 void SyncTaskContext::MoveToNextTarget()
234 {
235     ClearSyncOperation();
236     TaskParam param;
237     // call other system api without lock
238     param.timeout = communicator_->GetTimeout(deviceId_);
239     std::lock_guard<std::mutex> lock(targetQueueLock_);
240     while (!requestTargetQueue_.empty() || !responseTargetQueue_.empty()) {
241         ISyncTarget *tmpTarget = nullptr;
242         if (!requestTargetQueue_.empty()) {
243             tmpTarget = requestTargetQueue_.front();
244             requestTargetQueue_.pop_front();
245         } else {
246             tmpTarget = responseTargetQueue_.front();
247             responseTargetQueue_.pop_front();
248         }
249         if (tmpTarget == nullptr) {
250             LOGE("[SyncTaskContext][MoveToNextTarget] currentTarget is null skip!");
251             continue;
252         }
253         SyncOperation *tmpOperation = nullptr;
254         tmpTarget->GetSyncOperation(tmpOperation);
255         if ((tmpOperation != nullptr) && tmpOperation->IsKilled()) {
256             // if killed skip this syncOperation_.
257             delete tmpTarget;
258             tmpTarget = nullptr;
259             continue;
260         }
261         CopyTargetData(tmpTarget, param);
262         delete tmpTarget;
263         tmpTarget = nullptr;
264         break;
265     }
266 }
267 
GetNextTarget()268 int SyncTaskContext::GetNextTarget()
269 {
270     MoveToNextTarget();
271     int checkErrCode = RunPermissionCheck(GetPermissionCheckFlag(IsAutoSync(), GetMode()));
272     if (checkErrCode != E_OK) {
273         SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
274         return checkErrCode;
275     }
276     return E_OK;
277 }
278 
GetSyncId() const279 uint32_t SyncTaskContext::GetSyncId() const
280 {
281     return syncId_;
282 }
283 
284 // Get the current task deviceId.
GetDeviceId() const285 std::string SyncTaskContext::GetDeviceId() const
286 {
287     return deviceId_;
288 }
289 
SetTaskExecStatus(int status)290 void SyncTaskContext::SetTaskExecStatus(int status)
291 {
292     taskExecStatus_ = status;
293 }
294 
GetTaskExecStatus() const295 int SyncTaskContext::GetTaskExecStatus() const
296 {
297     return taskExecStatus_;
298 }
299 
IsAutoSync() const300 bool SyncTaskContext::IsAutoSync() const
301 {
302     return isAutoSync_;
303 }
304 
StartTimer()305 int SyncTaskContext::StartTimer()
306 {
307     std::lock_guard<std::mutex> lockGuard(timerLock_);
308     if (timerId_ > 0) {
309         return -E_UNEXPECTED_DATA;
310     }
311     TimerId timerId = 0;
312     RefObject::IncObjRef(this);
313     TimerAction timeOutCallback = std::bind(&SyncTaskContext::TimeOut, this, std::placeholders::_1);
314     int errCode = RuntimeContext::GetInstance()->SetTimer(timeout_, timeOutCallback,
315         [this]() {
316             int ret = RuntimeContext::GetInstance()->ScheduleTask([this](){ RefObject::DecObjRef(this); });
317             if (ret != E_OK) {
318                 LOGE("[SyncTaskContext] timer finalizer ScheduleTask, errCode %d", ret);
319             }
320         }, timerId);
321     if (errCode != E_OK) {
322         RefObject::DecObjRef(this);
323         return errCode;
324     }
325     timerId_ = timerId;
326     return errCode;
327 }
328 
StopTimer()329 void SyncTaskContext::StopTimer()
330 {
331     TimerId timerId;
332     {
333         std::lock_guard<std::mutex> lockGuard(timerLock_);
334         timerId = timerId_;
335         if (timerId_ == 0) {
336             return;
337         }
338         timerId_ = 0;
339     }
340     RuntimeContext::GetInstance()->RemoveTimer(timerId);
341 }
342 
ModifyTimer(int milliSeconds)343 int SyncTaskContext::ModifyTimer(int milliSeconds)
344 {
345     std::lock_guard<std::mutex> lockGuard(timerLock_);
346     if (timerId_ == 0) {
347         return -E_UNEXPECTED_DATA;
348     }
349     return RuntimeContext::GetInstance()->ModifyTimer(timerId_, milliSeconds);
350 }
351 
SetRetryTime(int retryTime)352 void SyncTaskContext::SetRetryTime(int retryTime)
353 {
354     retryTime_ = retryTime;
355 }
356 
GetRetryTime() const357 int SyncTaskContext::GetRetryTime() const
358 {
359     return retryTime_;
360 }
361 
SetRetryStatus(int isNeedRetry)362 void SyncTaskContext::SetRetryStatus(int isNeedRetry)
363 {
364     isNeedRetry_ = isNeedRetry;
365 }
366 
GetRetryStatus() const367 int SyncTaskContext::GetRetryStatus() const
368 {
369     return isNeedRetry_;
370 }
371 
GetTimerId() const372 TimerId SyncTaskContext::GetTimerId() const
373 {
374     return timerId_;
375 }
376 
GetRequestSessionId() const377 uint32_t SyncTaskContext::GetRequestSessionId() const
378 {
379     return requestSessionId_;
380 }
381 
IncSequenceId()382 void SyncTaskContext::IncSequenceId()
383 {
384     sequenceId_++;
385 }
386 
GetSequenceId() const387 uint32_t SyncTaskContext::GetSequenceId() const
388 {
389     return sequenceId_;
390 }
391 
ReSetSequenceId()392 void SyncTaskContext::ReSetSequenceId()
393 {
394     sequenceId_ = 1;
395 }
396 
IncPacketId()397 void SyncTaskContext::IncPacketId()
398 {
399     packetId_++;
400 }
401 
GetPacketId() const402 uint64_t SyncTaskContext::GetPacketId() const
403 {
404     return packetId_;
405 }
406 
GetTimeoutTime() const407 int SyncTaskContext::GetTimeoutTime() const
408 {
409     return timeout_;
410 }
411 
SetTimeoutCallback(const TimerAction & timeOutCallback)412 void SyncTaskContext::SetTimeoutCallback(const TimerAction &timeOutCallback)
413 {
414     timeOutCallback_ = timeOutCallback;
415 }
416 
SetTimeOffset(TimeOffset offset)417 void SyncTaskContext::SetTimeOffset(TimeOffset offset)
418 {
419     timeOffset_ = offset;
420 }
421 
GetTimeOffset() const422 TimeOffset SyncTaskContext::GetTimeOffset() const
423 {
424     return timeOffset_;
425 }
426 
StartStateMachine()427 int SyncTaskContext::StartStateMachine()
428 {
429     return stateMachine_->StartSync();
430 }
431 
ReceiveMessageCallback(Message * inMsg)432 int SyncTaskContext::ReceiveMessageCallback(Message *inMsg)
433 {
434     int errCode = E_OK;
435     if (IncUsedCount() == E_OK) {
436         errCode = stateMachine_->ReceiveMessageCallback(inMsg);
437         SafeExit();
438     }
439     return errCode;
440 }
441 
RegOnSyncTask(const std::function<int (void)> & callback)442 void SyncTaskContext::RegOnSyncTask(const std::function<int(void)> &callback)
443 {
444     onSyncTaskAdd_ = callback;
445 }
446 
IncUsedCount()447 int SyncTaskContext::IncUsedCount()
448 {
449     AutoLock lock(this);
450     if (IsKilled()) {
451         LOGI("[SyncTaskContext] IncUsedCount isKilled");
452         return -E_OBJ_IS_KILLED;
453     }
454     usedCount_++;
455     return E_OK;
456 }
457 
SafeExit()458 void SyncTaskContext::SafeExit()
459 {
460     AutoLock lock(this);
461     usedCount_--;
462     if (usedCount_ < 1) {
463         safeKill_.notify_one();
464     }
465 }
466 
GetCurrentLocalTime() const467 Timestamp SyncTaskContext::GetCurrentLocalTime() const
468 {
469     if (timeHelper_ == nullptr) {
470         return TimeHelper::INVALID_TIMESTAMP;
471     }
472     return timeHelper_->GetTime();
473 }
474 
Abort(int status)475 void SyncTaskContext::Abort(int status)
476 {
477     (void)status;
478     Clear();
479 }
480 
CommErrHandlerFunc(int errCode,ISyncTaskContext * context,int32_t sessionId)481 void SyncTaskContext::CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId)
482 {
483     {
484         std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
485         if (synTaskContextSet_.count(context) == 0) {
486             LOGI("[SyncTaskContext][CommErrHandle] context has been killed");
487             return;
488         }
489 
490         // IncObjRef to maker sure context not been killed. after the lock_guard
491         RefObject::IncObjRef(context);
492     }
493 
494     static_cast<SyncTaskContext *>(context)->CommErrHandlerFuncInner(errCode, static_cast<uint32_t>(sessionId));
495     RefObject::DecObjRef(context);
496 }
497 
SetRemoteSoftwareVersion(uint32_t version)498 void SyncTaskContext::SetRemoteSoftwareVersion(uint32_t version)
499 {
500     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
501     remoteSoftwareVersion_ = version;
502     remoteSoftwareVersionId_++;
503 }
504 
GetRemoteSoftwareVersion() const505 uint32_t SyncTaskContext::GetRemoteSoftwareVersion() const
506 {
507     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
508     return remoteSoftwareVersion_;
509 }
510 
GetRemoteSoftwareVersionId() const511 uint64_t SyncTaskContext::GetRemoteSoftwareVersionId() const
512 {
513     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
514     return remoteSoftwareVersionId_;
515 }
516 
IsCommNormal() const517 bool SyncTaskContext::IsCommNormal() const
518 {
519     return isCommNormal_;
520 }
521 
CommErrHandlerFuncInner(int errCode,uint32_t sessionId)522 void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId)
523 {
524     {
525         RefObject::AutoLock lock(this);
526         if ((sessionId != requestSessionId_) || (requestSessionId_ == 0)) {
527             return;
528         }
529 
530         if (errCode == E_OK) {
531             // when communicator sent message failed, the state machine will get the error and exit this sync task
532             // it seems unnecessary to change isCommNormal_ value, so just return here
533             return;
534         }
535     }
536     LOGE("[SyncTaskContext][CommErr] errCode %d", errCode);
537     stateMachine_->CommErrAbort(sessionId);
538 }
539 
TimeOut(TimerId id)540 int SyncTaskContext::TimeOut(TimerId id)
541 {
542     if (!timeOutCallback_) {
543         return E_OK;
544     }
545     IncObjRef(this);
546     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, id]() {
547         timeOutCallback_(id);
548         DecObjRef(this);
549     });
550     if (errCode != E_OK) {
551         LOGW("[SyncTaskContext][TimeOut] Trigger TimeOut Async Failed! TimerId=" PRIu64 " errCode=%d", id, errCode);
552         DecObjRef(this);
553     }
554     return E_OK;
555 }
556 
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)557 void SyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
558 {
559     retryTime_ = 0;
560     mode_ = target->GetMode();
561     status_ = SyncOperation::OP_SYNCING;
562     isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
563     taskErrCode_ = E_OK;
564     packetId_ = 0;
565     isCommNormal_ = true; // reset comm status here
566     syncTaskRetryStatus_ = isSyncRetry_;
567     timeout_ = static_cast<int>(taskParam.timeout);
568     negotiationCount_ = 0;
569     target->GetSyncOperation(syncOperation_);
570     ReSetSequenceId();
571 
572     if (syncOperation_ != nullptr) {
573         // IncRef for syncOperation_ to make sure syncOperation_ is valid, when setStatus
574         RefObject::IncObjRef(syncOperation_);
575         syncId_ = syncOperation_->GetSyncId();
576         isAutoSync_ = syncOperation_->IsAutoSync();
577         isAutoSubscribe_ = syncOperation_->IsAutoControlCmd();
578         if (isAutoSync_ || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
579             syncTaskRetryStatus_ = true;
580         }
581         requestSessionId_ = GenerateRequestSessionId();
582         LOGI("[SyncTaskContext][copyTarget] mode=%d,syncId=%d,isAutoSync=%d,isRetry=%d,dev=%s{private}",
583             mode_, syncId_, isAutoSync_, syncTaskRetryStatus_, deviceId_.c_str());
584         DBDfxAdapter::StartAsyncTrace(syncActionName_, static_cast<int>(syncId_));
585     } else {
586         isAutoSync_ = false;
587         LOGI("[SyncTaskContext][copyTarget] for response data dev %s{private},isRetry=%d", deviceId_.c_str(),
588             syncTaskRetryStatus_);
589     }
590 }
591 
KillWait()592 void SyncTaskContext::KillWait()
593 {
594     StopTimer();
595     UnlockObj();
596     stateMachine_->NotifyClosing();
597     stateMachine_->AbortImmediately();
598     LockObj();
599     LOGW("[SyncTaskContext] Try to kill a context, now wait.");
600     bool noDeadLock = WaitLockedUntil(
601         safeKill_,
602         [this]() {
603             if (usedCount_ < 1) {
604                 return true;
605             }
606             return false;
607         },
608         KILL_WAIT_SECONDS);
609     if (!noDeadLock) {
610         LOGE("[SyncTaskContext] Dead lock may happen, we stop waiting the task exit.");
611     } else {
612         LOGW("[SyncTaskContext] Wait the task exit ok.");
613     }
614     std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
615     synTaskContextSet_.erase(this);
616 }
617 
ClearSyncOperation()618 void SyncTaskContext::ClearSyncOperation()
619 {
620     std::lock_guard<std::mutex> lock(operationLock_);
621     if (syncOperation_ != nullptr) {
622         DBDfxAdapter::FinishAsyncTrace(syncActionName_, static_cast<int>(syncId_));
623         RefObject::DecObjRef(syncOperation_);
624         syncOperation_ = nullptr;
625     }
626 }
627 
CancelCurrentSyncRetryIfNeed(int newTargetMode,uint32_t syncId)628 void SyncTaskContext::CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId)
629 {
630     AutoLock lock(this);
631     if (!isAutoSync_) {
632         return;
633     }
634     if (syncId_ >= syncId) {
635         return;
636     }
637     int mode = SyncOperation::TransferSyncMode(newTargetMode);
638     if (newTargetMode == mode_ || mode == SyncModeType::PUSH_AND_PULL) {
639         SetRetryTime(AUTO_RETRY_TIMES);
640         ModifyTimer(timeout_);
641     }
642 }
643 
GetTaskErrCode() const644 int SyncTaskContext::GetTaskErrCode() const
645 {
646     return taskErrCode_;
647 }
648 
SetTaskErrCode(int errCode)649 void SyncTaskContext::SetTaskErrCode(int errCode)
650 {
651     taskErrCode_ = errCode;
652 }
653 
IsSyncTaskNeedRetry() const654 bool SyncTaskContext::IsSyncTaskNeedRetry() const
655 {
656     return syncTaskRetryStatus_;
657 }
658 
SetSyncRetry(bool isRetry)659 void SyncTaskContext::SetSyncRetry(bool isRetry)
660 {
661     isSyncRetry_ = isRetry;
662 }
663 
GetSyncRetryTimes() const664 int SyncTaskContext::GetSyncRetryTimes() const
665 {
666     if (IsAutoSync() || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
667         return AUTO_RETRY_TIMES;
668     }
669     return MANUAL_RETRY_TIMES;
670 }
671 
GetSyncRetryTimeout(int retryTime) const672 int SyncTaskContext::GetSyncRetryTimeout(int retryTime) const
673 {
674     int timeoutTime = GetTimeoutTime();
675     if (IsAutoSync()) {
676         // set the new timeout value with 2 raised to the power of retryTime.
677         return timeoutTime * (1u << retryTime);
678     }
679     return timeoutTime;
680 }
681 
ClearAllSyncTask()682 void SyncTaskContext::ClearAllSyncTask()
683 {
684 }
685 
IsAutoLiftWaterMark() const686 bool SyncTaskContext::IsAutoLiftWaterMark() const
687 {
688     return negotiationCount_ < NEGOTIATION_LIMIT;
689 }
690 
IncNegotiationCount()691 void SyncTaskContext::IncNegotiationCount()
692 {
693     negotiationCount_++;
694 }
695 
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)696 bool SyncTaskContext::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
697 {
698     return stateMachine_->IsNeedTriggerQueryAutoSync(inMsg, query);
699 }
700 
IsAutoSubscribe() const701 bool SyncTaskContext::IsAutoSubscribe() const
702 {
703     return isAutoSubscribe_;
704 }
705 
GetIsNeedResetAbilitySync() const706 bool SyncTaskContext::GetIsNeedResetAbilitySync() const
707 {
708     return isNeedResetAbilitySync_;
709 }
710 
SetIsNeedResetAbilitySync(bool isNeedReset)711 void SyncTaskContext::SetIsNeedResetAbilitySync(bool isNeedReset)
712 {
713     isNeedResetAbilitySync_ = isNeedReset;
714 }
715 
IsCurrentSyncTaskCanBeSkipped() const716 bool SyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
717 {
718     return false;
719 }
720 
ResetLastPushTaskStatus()721 void SyncTaskContext::ResetLastPushTaskStatus()
722 {
723 }
724 
SchemaChange()725 void SyncTaskContext::SchemaChange()
726 {
727     SetIsNeedResetAbilitySync(true);
728 }
729 
Dump(int fd)730 void SyncTaskContext::Dump(int fd)
731 {
732     size_t totalSyncTaskCount = 0u;
733     size_t autoSyncTaskCount = 0u;
734     size_t reponseTaskCount = 0u;
735     {
736         std::lock_guard<std::mutex> lock(targetQueueLock_);
737         totalSyncTaskCount = requestTargetQueue_.size() + responseTargetQueue_.size();
738         for (const auto &target : requestTargetQueue_) {
739             if (target->IsAutoSync()) {
740                 autoSyncTaskCount++;
741             }
742         }
743         reponseTaskCount = responseTargetQueue_.size();
744     }
745     DBDumpHelper::Dump(fd, "\t\ttarget = %s, total sync task count = %zu, auto sync task count = %zu,"
746         " response task count = %zu\n",
747         deviceId_.c_str(), totalSyncTaskCount, autoSyncTaskCount, reponseTaskCount);
748 }
749 
RunPermissionCheck(uint8_t flag) const750 int SyncTaskContext::RunPermissionCheck(uint8_t flag) const
751 {
752     std::string appId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
753     std::string userId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
754     std::string storeId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
755     int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
756     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
757         { userId, appId, storeId, deviceId_, instanceId }, flag);
758     if (errCode != E_OK) {
759         LOGE("[SyncTaskContext] RunPermissionCheck not pass errCode:%d, flag:%d, %s{private}",
760             errCode, flag, deviceId_.c_str());
761     }
762     return errCode;
763 }
764 
GetPermissionCheckFlag(bool isAutoSync,int syncMode)765 uint8_t SyncTaskContext::GetPermissionCheckFlag(bool isAutoSync, int syncMode)
766 {
767     uint8_t flag = 0;
768     int mode = SyncOperation::TransferSyncMode(syncMode);
769     if (mode == SyncModeType::PUSH || mode == SyncModeType::RESPONSE_PULL) {
770         flag = CHECK_FLAG_SEND;
771     } else if (mode == SyncModeType::PULL) {
772         flag = CHECK_FLAG_RECEIVE;
773     } else if (mode == SyncModeType::PUSH_AND_PULL) {
774         flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE;
775     }
776     if (isAutoSync) {
777         flag = flag | CHECK_FLAG_AUTOSYNC;
778     }
779     if (mode != SyncModeType::RESPONSE_PULL) {
780         // it means this sync is started by local
781         flag = flag | CHECK_FLAG_SPONSOR;
782     }
783     return flag;
784 }
785 
AbortMachineIfNeed(uint32_t syncId)786 void SyncTaskContext::AbortMachineIfNeed(uint32_t syncId)
787 {
788     uint32_t sessionId = 0u;
789     {
790         RefObject::AutoLock autoLock(this);
791         if (syncId_ != syncId) {
792             return;
793         }
794         sessionId = requestSessionId_;
795     }
796     stateMachine_->InnerErrorAbort(sessionId);
797 }
798 
GetAndIncSyncOperation() const799 SyncOperation *SyncTaskContext::GetAndIncSyncOperation() const
800 {
801     std::lock_guard<std::mutex> lock(operationLock_);
802     if (syncOperation_ == nullptr) {
803         return nullptr;
804     }
805     RefObject::IncObjRef(syncOperation_);
806     return syncOperation_;
807 }
808 
GenerateRequestSessionId()809 uint32_t SyncTaskContext::GenerateRequestSessionId()
810 {
811     uint32_t sessionId = lastRequestSessionId_ != 0 ? lastRequestSessionId_ + 1 : 0;
812     // make sure sessionId is between 0x01 and 0x8fffffff
813     if (sessionId > SESSION_ID_MAX_VALUE || sessionId == 0) {
814         sessionId = Hash::Hash32Func(deviceId_ + std::to_string(syncId_) +
815             std::to_string(TimeHelper::GetSysCurrentTime()));
816     }
817     lastRequestSessionId_ = sessionId;
818     return sessionId;
819 }
820 } // namespace DistributedDB
821