• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_task_context.h"
17 
18 #include <algorithm>
19 #include <cmath>
20 
21 #include "db_constant.h"
22 #include "db_dump_helper.h"
23 #include "db_dfx_adapter.h"
24 #include "db_errno.h"
25 #include "hash.h"
26 #include "isync_state_machine.h"
27 #include "log_print.h"
28 #include "time_helper.h"
29 #include "version.h"
30 
31 namespace DistributedDB {
32 std::mutex SyncTaskContext::synTaskContextSetLock_;
33 std::set<ISyncTaskContext *> SyncTaskContext::synTaskContextSet_;
34 
35 namespace {
36     constexpr int NEGOTIATION_LIMIT = 2;
37     constexpr uint32_t SESSION_ID_MAX_VALUE = 0x8fffffffu;
38 }
39 
SyncTaskContext()40 SyncTaskContext::SyncTaskContext()
41     : syncOperation_(nullptr),
42       syncId_(0),
43       mode_(0),
44       isAutoSync_(false),
45       status_(0),
46       taskExecStatus_(0),
47       syncInterface_(nullptr),
48       communicator_(nullptr),
49       stateMachine_(nullptr),
50       requestSessionId_(0),
51       lastRequestSessionId_(0),
52       timeHelper_(nullptr),
53       remoteSoftwareVersion_(0),
54       remoteSoftwareVersionId_(0),
55       isCommNormal_(true),
56       taskErrCode_(E_OK),
57       commErrCode_(E_OK),
58       syncTaskRetryStatus_(false),
59       isSyncRetry_(false),
60       negotiationCount_(0),
61       isAutoSubscribe_(false)
62 {
63 }
64 
~SyncTaskContext()65 SyncTaskContext::~SyncTaskContext()
66 {
67     if (stateMachine_ != nullptr) {
68         delete stateMachine_;
69         stateMachine_ = nullptr;
70     }
71     SyncTaskContext::ClearSyncOperation();
72     ClearSyncTarget();
73     syncInterface_ = nullptr;
74     communicator_ = nullptr;
75 }
76 
AddSyncTarget(ISyncTarget * target)77 int SyncTaskContext::AddSyncTarget(ISyncTarget *target)
78 {
79     if (target == nullptr) {
80         return -E_INVALID_ARGS;
81     }
82     int targetMode = target->GetMode();
83     auto syncId = static_cast<uint32_t>(target->GetSyncId());
84     {
85         std::lock_guard<std::mutex> lock(targetQueueLock_);
86         if (target->GetTaskType() == ISyncTarget::REQUEST) {
87             requestTargetQueue_.push_back(target);
88         } else if (target->GetTaskType() == ISyncTarget::RESPONSE) {
89             responseTargetQueue_.push_back(target);
90         } else {
91             return -E_INVALID_ARGS;
92         }
93     }
94     RefObject::IncObjRef(this);
95     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetMode, syncId]() {
96         CancelCurrentSyncRetryIfNeed(targetMode, syncId);
97         RefObject::DecObjRef(this);
98     });
99     if (errCode != E_OK) {
100         RefObject::DecObjRef(this);
101     }
102     if (onSyncTaskAdd_) {
103         RefObject::IncObjRef(this);
104         errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
105             onSyncTaskAdd_();
106             RefObject::DecObjRef(this);
107         });
108         if (errCode != E_OK) {
109             RefObject::DecObjRef(this);
110         }
111     }
112     return E_OK;
113 }
114 
SetOperationStatus(int status)115 void SyncTaskContext::SetOperationStatus(int status)
116 {
117     std::lock_guard<std::mutex> lock(operationLock_);
118     if (syncOperation_ == nullptr) {
119         LOGD("[SyncTaskContext][SetStatus] syncOperation is null");
120         return;
121     }
122     int finalStatus = status;
123 
124     int operationStatus = syncOperation_->GetStatus(deviceId_);
125     if (status == SyncOperation::OP_SEND_FINISHED && operationStatus == SyncOperation::OP_RECV_FINISHED) {
126         if (GetTaskErrCode() == -E_EKEYREVOKED) { // LCOV_EXCL_BR_LINE
127             finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
128         } else {
129             finalStatus = SyncOperation::OP_FINISHED_ALL;
130         }
131     } else if (status == SyncOperation::OP_RECV_FINISHED && operationStatus == SyncOperation::OP_SEND_FINISHED) {
132         if (GetTaskErrCode() == -E_EKEYREVOKED) {
133             finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
134         } else {
135             finalStatus = SyncOperation::OP_FINISHED_ALL;
136         }
137     }
138     syncOperation_->SetStatus(deviceId_, finalStatus);
139     if (finalStatus >= SyncOperation::OP_FINISHED_ALL) {
140         SaveLastPushTaskExecStatus(finalStatus);
141     }
142     if (syncOperation_->CheckIsAllFinished()) {
143         syncOperation_->Finished();
144     }
145 }
146 
SaveLastPushTaskExecStatus(int finalStatus)147 void SyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
148 {
149     (void)finalStatus;
150 }
151 
Clear()152 void SyncTaskContext::Clear()
153 {
154     StopTimer();
155     retryTime_ = 0;
156     sequenceId_ = 1;
157     syncId_ = 0;
158     isAutoSync_ = false;
159     requestSessionId_ = 0;
160     isNeedRetry_ = NO_NEED_RETRY;
161     mode_ = SyncModeType::INVALID_MODE;
162     status_ = SyncOperation::OP_WAITING;
163     taskErrCode_ = E_OK;
164     packetId_ = 0;
165     isAutoSubscribe_ = false;
166 }
167 
RemoveSyncOperation(int syncId)168 int SyncTaskContext::RemoveSyncOperation(int syncId)
169 {
170     std::lock_guard<std::mutex> lock(targetQueueLock_);
171     auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
172         [syncId](const ISyncTarget *target) {
173             if (target == nullptr) {
174                 return false;
175             }
176             return target->GetSyncId() == syncId;
177         });
178     if (iter != requestTargetQueue_.end()) {
179         if (*iter != nullptr) {
180             delete *iter;
181         }
182         requestTargetQueue_.erase(iter);
183         return E_OK;
184     }
185     return -E_INVALID_ARGS;
186 }
187 
ClearSyncTarget()188 void SyncTaskContext::ClearSyncTarget()
189 {
190     std::lock_guard<std::mutex> lock(targetQueueLock_);
191     for (auto &requestTarget : requestTargetQueue_) {
192         if (requestTarget != nullptr) {
193             delete requestTarget;
194         }
195     }
196     requestTargetQueue_.clear();
197 
198     for (auto &responseTarget : responseTargetQueue_) {
199         if (responseTarget != nullptr) { // LCOV_EXCL_BR_LINE
200             delete responseTarget;
201         }
202     }
203     responseTargetQueue_.clear();
204 }
205 
IsTargetQueueEmpty() const206 bool SyncTaskContext::IsTargetQueueEmpty() const
207 {
208     std::lock_guard<std::mutex> lock(targetQueueLock_);
209     return requestTargetQueue_.empty() && responseTargetQueue_.empty();
210 }
211 
GetOperationStatus() const212 int SyncTaskContext::GetOperationStatus() const
213 {
214     std::lock_guard<std::mutex> lock(operationLock_);
215     if (syncOperation_ == nullptr) {
216         return SyncOperation::OP_FINISHED_ALL;
217     }
218     return syncOperation_->GetStatus(deviceId_);
219 }
220 
SetMode(int mode)221 void SyncTaskContext::SetMode(int mode)
222 {
223     mode_ = mode;
224 }
225 
GetMode() const226 int SyncTaskContext::GetMode() const
227 {
228     return mode_;
229 }
230 
MoveToNextTarget(uint32_t timeout)231 void SyncTaskContext::MoveToNextTarget(uint32_t timeout)
232 {
233     ClearSyncOperation();
234     TaskParam param;
235     // call other system api without lock
236     param.timeout = timeout;
237     std::lock_guard<std::mutex> lock(targetQueueLock_);
238     while (!requestTargetQueue_.empty() || !responseTargetQueue_.empty()) {
239         ISyncTarget *tmpTarget = nullptr;
240         if (!requestTargetQueue_.empty()) {
241             tmpTarget = requestTargetQueue_.front();
242             requestTargetQueue_.pop_front();
243         } else {
244             tmpTarget = responseTargetQueue_.front();
245             responseTargetQueue_.pop_front();
246         }
247         if (tmpTarget == nullptr) {
248             LOGE("[SyncTaskContext][MoveToNextTarget] currentTarget is null skip!");
249             continue;
250         }
251         SyncOperation *tmpOperation = nullptr;
252         tmpTarget->GetSyncOperation(tmpOperation);
253         if ((tmpOperation != nullptr) && tmpOperation->IsKilled()) {
254             // if killed skip this syncOperation_.
255             delete tmpTarget;
256             tmpTarget = nullptr;
257             continue;
258         }
259         CopyTargetData(tmpTarget, param);
260         delete tmpTarget;
261         tmpTarget = nullptr;
262         break;
263     }
264 }
265 
GetNextTarget(uint32_t timeout)266 int SyncTaskContext::GetNextTarget(uint32_t timeout)
267 {
268     MoveToNextTarget(timeout);
269     int checkErrCode = RunPermissionCheck(GetPermissionCheckFlag(IsAutoSync(), GetMode()));
270     if (checkErrCode != E_OK) {
271         SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
272         return checkErrCode;
273     }
274     return E_OK;
275 }
276 
GetSyncId() const277 uint32_t SyncTaskContext::GetSyncId() const
278 {
279     return syncId_;
280 }
281 
282 // Get the current task deviceId.
GetDeviceId() const283 std::string SyncTaskContext::GetDeviceId() const
284 {
285     return deviceId_;
286 }
287 
GetTargetUserId() const288 std::string SyncTaskContext::GetTargetUserId() const
289 {
290     return targetUserId_;
291 }
292 
SetTargetUserId(const std::string & userId)293 void SyncTaskContext::SetTargetUserId(const std::string &userId)
294 {
295     targetUserId_ = userId;
296 }
297 
SetTaskExecStatus(int status)298 void SyncTaskContext::SetTaskExecStatus(int status)
299 {
300     taskExecStatus_ = status;
301 }
302 
GetTaskExecStatus() const303 int SyncTaskContext::GetTaskExecStatus() const
304 {
305     return taskExecStatus_;
306 }
307 
IsAutoSync() const308 bool SyncTaskContext::IsAutoSync() const
309 {
310     return isAutoSync_;
311 }
312 
StartTimer()313 int SyncTaskContext::StartTimer()
314 {
315     std::lock_guard<std::mutex> lockGuard(timerLock_);
316     if (timerId_ > 0) {
317         return -E_UNEXPECTED_DATA;
318     }
319     TimerId timerId = 0;
320     RefObject::IncObjRef(this);
321     TimerAction timeOutCallback = [this](TimerId id) { return TimeOut(id); };
322     int errCode = RuntimeContext::GetInstance()->SetTimer(timeout_, timeOutCallback,
323         [this]() {
324             int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(this); });
325             if (ret != E_OK) {
326                 LOGE("[SyncTaskContext] timer finalizer ScheduleTask, errCode %d", ret);
327             }
328         }, timerId);
329     if (errCode != E_OK) {
330         RefObject::DecObjRef(this);
331         return errCode;
332     }
333     timerId_ = timerId;
334     return errCode;
335 }
336 
StopTimer()337 void SyncTaskContext::StopTimer()
338 {
339     TimerId timerId;
340     {
341         std::lock_guard<std::mutex> lockGuard(timerLock_);
342         timerId = timerId_;
343         if (timerId_ == 0) {
344             return;
345         }
346         timerId_ = 0;
347     }
348     RuntimeContext::GetInstance()->RemoveTimer(timerId);
349 }
350 
ModifyTimer(int milliSeconds)351 int SyncTaskContext::ModifyTimer(int milliSeconds)
352 {
353     std::lock_guard<std::mutex> lockGuard(timerLock_);
354     if (timerId_ == 0) {
355         return -E_UNEXPECTED_DATA;
356     }
357     return RuntimeContext::GetInstance()->ModifyTimer(timerId_, milliSeconds);
358 }
359 
SetRetryTime(int retryTime)360 void SyncTaskContext::SetRetryTime(int retryTime)
361 {
362     retryTime_ = retryTime;
363 }
364 
GetRetryTime() const365 int SyncTaskContext::GetRetryTime() const
366 {
367     return retryTime_;
368 }
369 
SetRetryStatus(int isNeedRetry)370 void SyncTaskContext::SetRetryStatus(int isNeedRetry)
371 {
372     isNeedRetry_ = isNeedRetry;
373 }
374 
GetRetryStatus() const375 int SyncTaskContext::GetRetryStatus() const
376 {
377     return isNeedRetry_;
378 }
379 
GetTimerId() const380 TimerId SyncTaskContext::GetTimerId() const
381 {
382     return timerId_;
383 }
384 
GetRequestSessionId() const385 uint32_t SyncTaskContext::GetRequestSessionId() const
386 {
387     return requestSessionId_;
388 }
389 
IncSequenceId()390 void SyncTaskContext::IncSequenceId()
391 {
392     sequenceId_++;
393 }
394 
GetSequenceId() const395 uint32_t SyncTaskContext::GetSequenceId() const
396 {
397     return sequenceId_;
398 }
399 
ReSetSequenceId()400 void SyncTaskContext::ReSetSequenceId()
401 {
402     sequenceId_ = 1;
403 }
404 
IncPacketId()405 void SyncTaskContext::IncPacketId()
406 {
407     packetId_++;
408 }
409 
GetPacketId() const410 uint64_t SyncTaskContext::GetPacketId() const
411 {
412     return packetId_;
413 }
414 
GetTimeoutTime() const415 int SyncTaskContext::GetTimeoutTime() const
416 {
417     return timeout_;
418 }
419 
SetTimeoutCallback(const TimerAction & timeOutCallback)420 void SyncTaskContext::SetTimeoutCallback(const TimerAction &timeOutCallback)
421 {
422     timeOutCallback_ = timeOutCallback;
423 }
424 
SetTimeOffset(TimeOffset offset)425 void SyncTaskContext::SetTimeOffset(TimeOffset offset)
426 {
427     timeOffset_ = offset;
428 }
429 
GetTimeOffset() const430 TimeOffset SyncTaskContext::GetTimeOffset() const
431 {
432     return timeOffset_;
433 }
434 
StartStateMachine()435 int SyncTaskContext::StartStateMachine()
436 {
437     return stateMachine_->StartSync();
438 }
439 
ReceiveMessageCallback(Message * inMsg)440 int SyncTaskContext::ReceiveMessageCallback(Message *inMsg)
441 {
442     if (inMsg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
443         uint16_t remoteVersion = 0;
444         (void)communicator_->GetRemoteCommunicatorVersion(deviceId_, remoteVersion);
445         SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST + remoteVersion);
446     }
447     int errCode = E_OK;
448     if (IncUsedCount() == E_OK) {
449         errCode = stateMachine_->ReceiveMessageCallback(inMsg);
450         SafeExit();
451     }
452     return errCode;
453 }
454 
RegOnSyncTask(const std::function<int (void)> & callback)455 void SyncTaskContext::RegOnSyncTask(const std::function<int(void)> &callback)
456 {
457     onSyncTaskAdd_ = callback;
458 }
459 
IncUsedCount()460 int SyncTaskContext::IncUsedCount()
461 {
462     AutoLock lock(this);
463     if (IsKilled()) {
464         LOGI("[SyncTaskContext] IncUsedCount isKilled");
465         return -E_OBJ_IS_KILLED;
466     }
467     usedCount_++;
468     return E_OK;
469 }
470 
SafeExit()471 void SyncTaskContext::SafeExit()
472 {
473     AutoLock lock(this);
474     usedCount_--;
475     if (usedCount_ < 1) {
476         safeKill_.notify_one();
477     }
478 }
479 
GetCurrentLocalTime() const480 Timestamp SyncTaskContext::GetCurrentLocalTime() const
481 {
482     if (timeHelper_ == nullptr) {
483         return TimeHelper::INVALID_TIMESTAMP;
484     }
485     return timeHelper_->GetTime();
486 }
487 
Abort(int status)488 void SyncTaskContext::Abort(int status)
489 {
490     (void)status;
491     Clear();
492 }
493 
CommErrHandlerFunc(int errCode,ISyncTaskContext * context,int32_t sessionId,bool isDirectEnd)494 void SyncTaskContext::CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId, bool isDirectEnd)
495 {
496     {
497         std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
498         if (synTaskContextSet_.count(context) == 0) {
499             LOGI("[SyncTaskContext][CommErrHandle] context has been killed");
500             return;
501         }
502         // IncObjRef to maker sure context not been killed. after the lock_guard
503         RefObject::IncObjRef(context);
504     }
505 
506     int ret = RuntimeContext::GetInstance()->ScheduleTask([context, errCode, sessionId, isDirectEnd]() {
507         static_cast<SyncTaskContext *>(context)->CommErrHandlerFuncInner(errCode, static_cast<uint32_t>(sessionId),
508             isDirectEnd);
509         RefObject::DecObjRef(context);
510         });
511     if (ret != E_OK) {
512         RefObject::DecObjRef(context);
513     }
514 }
515 
SetRemoteSoftwareVersion(uint32_t version)516 void SyncTaskContext::SetRemoteSoftwareVersion(uint32_t version)
517 {
518     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
519     if (remoteSoftwareVersion_ == version) {
520         return;
521     }
522     remoteSoftwareVersion_ = version;
523     remoteSoftwareVersionId_++;
524 }
525 
GetRemoteSoftwareVersion() const526 uint32_t SyncTaskContext::GetRemoteSoftwareVersion() const
527 {
528     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
529     return remoteSoftwareVersion_;
530 }
531 
GetRemoteSoftwareVersionId() const532 uint64_t SyncTaskContext::GetRemoteSoftwareVersionId() const
533 {
534     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
535     return remoteSoftwareVersionId_;
536 }
537 
IsCommNormal() const538 bool SyncTaskContext::IsCommNormal() const
539 {
540     return isCommNormal_;
541 }
542 
CommErrHandlerFuncInner(int errCode,uint32_t sessionId,bool isDirectEnd)543 void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId, bool isDirectEnd)
544 {
545     {
546         RefObject::AutoLock lock(this);
547         if ((sessionId != requestSessionId_) || (requestSessionId_ == 0)) {
548             return;
549         }
550 
551         if (errCode == E_OK) {
552             if (IsRetryTask()) {
553                 SetCommFailErrCode(errCode);
554             }
555             // when communicator sent message failed, the state machine will get the error and exit this sync task
556             // it seems unnecessary to change isCommNormal_ value, so just return here
557             return;
558         }
559     }
560     LOGE("[SyncTaskContext][CommErr] errCode %d, isDirectEnd %d", errCode, static_cast<int>(isDirectEnd));
561     if (!isDirectEnd) {
562         SetErrCodeWhenWaitTimeOut(errCode);
563         return;
564     }
565     if (errCode > 0) {
566         SetCommFailErrCode(static_cast<int>(SyncOperation::OP_COMM_ABNORMAL));
567     } else {
568         SetCommFailErrCode(errCode);
569     }
570     stateMachine_->CommErrAbort(sessionId);
571 }
572 
TimeOut(TimerId id)573 int SyncTaskContext::TimeOut(TimerId id)
574 {
575     if (!timeOutCallback_) {
576         return E_OK;
577     }
578     IncObjRef(this);
579     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, id]() {
580         timeOutCallback_(id);
581         DecObjRef(this);
582     });
583     if (errCode != E_OK) {
584         LOGW("[SyncTaskContext][TimeOut] Trigger TimeOut Async Failed! TimerId=" PRIu64 " errCode=%d", id, errCode);
585         DecObjRef(this);
586     }
587     return E_OK;
588 }
589 
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)590 void SyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
591 {
592     retryTime_ = 0;
593     mode_ = target->GetMode();
594     status_ = SyncOperation::OP_SYNCING;
595     isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
596     taskErrCode_ = E_OK;
597     packetId_ = 0;
598     isCommNormal_ = true; // reset comm status here
599     commErrCode_ = E_OK;
600     syncTaskRetryStatus_ = isSyncRetry_;
601     timeout_ = static_cast<int>(taskParam.timeout);
602     negotiationCount_ = 0;
603     target->GetSyncOperation(syncOperation_);
604     ReSetSequenceId();
605 
606     if (syncOperation_ != nullptr) {
607         // IncRef for syncOperation_ to make sure syncOperation_ is valid, when setStatus
608         RefObject::IncObjRef(syncOperation_);
609         syncId_ = syncOperation_->GetSyncId();
610         isAutoSync_ = syncOperation_->IsAutoSync();
611         isAutoSubscribe_ = syncOperation_->IsAutoControlCmd();
612         if (isAutoSync_ || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
613             syncTaskRetryStatus_ = true;
614         }
615         requestSessionId_ = GenerateRequestSessionId();
616         LOGI("[SyncTaskContext][copyTarget] mode=%d,syncId=%d,isAutoSync=%d,isRetry=%d,dev=%s{private}",
617             mode_, syncId_, isAutoSync_, syncTaskRetryStatus_, deviceId_.c_str());
618         DBDfxAdapter::StartAsyncTrace(syncActionName_, static_cast<int>(syncId_));
619     } else {
620         isAutoSync_ = false;
621         LOGI("[SyncTaskContext][copyTarget] for response data dev %s{private},isRetry=%d", deviceId_.c_str(),
622             syncTaskRetryStatus_);
623     }
624 }
625 
KillWait()626 void SyncTaskContext::KillWait()
627 {
628     StopTimer();
629     UnlockObj();
630     stateMachine_->NotifyClosing();
631     stateMachine_->AbortImmediately();
632     LockObj();
633     LOGW("[SyncTaskContext] Try to kill a context, now wait.");
634     bool noDeadLock = WaitLockedUntil(
635         safeKill_,
636         [this]() {
637             if (usedCount_ < 1) {
638                 return true;
639             }
640             return false;
641         },
642         KILL_WAIT_SECONDS);
643     if (!noDeadLock) { // LCOV_EXCL_BR_LINE
644         LOGE("[SyncTaskContext] Dead lock may happen, we stop waiting the task exit.");
645     } else {
646         LOGW("[SyncTaskContext] Wait the task exit ok.");
647     }
648     std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
649     synTaskContextSet_.erase(this);
650 }
651 
ClearSyncOperation()652 void SyncTaskContext::ClearSyncOperation()
653 {
654     std::lock_guard<std::mutex> lock(operationLock_);
655     if (syncOperation_ != nullptr) {
656         DBDfxAdapter::FinishAsyncTrace(syncActionName_, static_cast<int>(syncId_));
657         RefObject::DecObjRef(syncOperation_);
658         syncOperation_ = nullptr;
659     }
660 }
661 
CancelCurrentSyncRetryIfNeed(int newTargetMode,uint32_t syncId)662 void SyncTaskContext::CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId)
663 {
664     AutoLock lock(this);
665     if (!isAutoSync_) {
666         return;
667     }
668     if (syncId_ >= syncId) {
669         return;
670     }
671     int mode = SyncOperation::TransferSyncMode(newTargetMode);
672     if (newTargetMode == mode_ || mode == SyncModeType::PUSH_AND_PULL) {
673         SetRetryTime(AUTO_RETRY_TIMES);
674         ModifyTimer(timeout_);
675     }
676 }
677 
GetTaskErrCode() const678 int SyncTaskContext::GetTaskErrCode() const
679 {
680     return taskErrCode_;
681 }
682 
SetTaskErrCode(int errCode)683 void SyncTaskContext::SetTaskErrCode(int errCode)
684 {
685     taskErrCode_ = errCode;
686 }
687 
IsSyncTaskNeedRetry() const688 bool SyncTaskContext::IsSyncTaskNeedRetry() const
689 {
690     return syncTaskRetryStatus_;
691 }
692 
SetSyncRetry(bool isRetry)693 void SyncTaskContext::SetSyncRetry(bool isRetry)
694 {
695     isSyncRetry_ = isRetry;
696 }
697 
GetSyncRetryTimes() const698 int SyncTaskContext::GetSyncRetryTimes() const
699 {
700     if (IsAutoSync() || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
701         return AUTO_RETRY_TIMES;
702     }
703     return MANUAL_RETRY_TIMES;
704 }
705 
GetSyncRetryTimeout(int retryTime) const706 int SyncTaskContext::GetSyncRetryTimeout(int retryTime) const
707 {
708     int timeoutTime = GetTimeoutTime();
709     if (IsAutoSync()) {
710         // set the new timeout value with 2 raised to the power of retryTime.
711         return timeoutTime * (1u << retryTime);
712     }
713     return timeoutTime;
714 }
715 
ClearAllSyncTask()716 void SyncTaskContext::ClearAllSyncTask()
717 {
718 }
719 
IsAutoLiftWaterMark() const720 bool SyncTaskContext::IsAutoLiftWaterMark() const
721 {
722     return negotiationCount_ < NEGOTIATION_LIMIT;
723 }
724 
IncNegotiationCount()725 void SyncTaskContext::IncNegotiationCount()
726 {
727     negotiationCount_++;
728 }
729 
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)730 bool SyncTaskContext::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
731 {
732     return stateMachine_->IsNeedTriggerQueryAutoSync(inMsg, query);
733 }
734 
IsAutoSubscribe() const735 bool SyncTaskContext::IsAutoSubscribe() const
736 {
737     return isAutoSubscribe_;
738 }
739 
IsCurrentSyncTaskCanBeSkipped() const740 bool SyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
741 {
742     return false;
743 }
744 
ResetLastPushTaskStatus()745 void SyncTaskContext::ResetLastPushTaskStatus()
746 {
747 }
748 
SchemaChange()749 void SyncTaskContext::SchemaChange()
750 {
751     if (stateMachine_ != nullptr) {
752         stateMachine_->SchemaChange();
753     }
754 }
755 
Dump(int fd)756 void SyncTaskContext::Dump(int fd)
757 {
758     size_t totalSyncTaskCount = 0u;
759     size_t autoSyncTaskCount = 0u;
760     size_t reponseTaskCount = 0u;
761     {
762         std::lock_guard<std::mutex> lock(targetQueueLock_);
763         totalSyncTaskCount = requestTargetQueue_.size() + responseTargetQueue_.size();
764         for (const auto &target : requestTargetQueue_) {
765             if (target->IsAutoSync()) { // LCOV_EXCL_BR_LINE
766                 autoSyncTaskCount++;
767             }
768         }
769         reponseTaskCount = responseTargetQueue_.size();
770     }
771     DBDumpHelper::Dump(fd, "\t\ttarget = %s, total sync task count = %zu, auto sync task count = %zu,"
772         " response task count = %zu\n",
773         deviceId_.c_str(), totalSyncTaskCount, autoSyncTaskCount, reponseTaskCount);
774 }
775 
RunPermissionCheck(uint8_t flag) const776 int SyncTaskContext::RunPermissionCheck(uint8_t flag) const
777 {
778     std::string appId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
779     std::string userId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
780     std::string storeId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
781     std::string subUserId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::SUB_USER, "");
782     int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
783     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
784         { userId, appId, storeId, deviceId_, subUserId, instanceId }, flag);
785     if (errCode != E_OK) {
786         LOGE("[SyncTaskContext] RunPermissionCheck not pass errCode:%d, flag:%d, %s{private}",
787             errCode, flag, deviceId_.c_str());
788     }
789     return errCode;
790 }
791 
GetPermissionCheckFlag(bool isAutoSync,int syncMode)792 uint8_t SyncTaskContext::GetPermissionCheckFlag(bool isAutoSync, int syncMode)
793 {
794     uint8_t flag = 0;
795     int mode = SyncOperation::TransferSyncMode(syncMode);
796     if (mode == SyncModeType::PUSH || mode == SyncModeType::RESPONSE_PULL) {
797         flag = CHECK_FLAG_SEND;
798     } else if (mode == SyncModeType::PULL) {
799         flag = CHECK_FLAG_RECEIVE;
800     } else if (mode == SyncModeType::PUSH_AND_PULL) {
801         flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE;
802     }
803     if (isAutoSync) {
804         flag = flag | CHECK_FLAG_AUTOSYNC;
805     }
806     if (mode != SyncModeType::RESPONSE_PULL) {
807         // it means this sync is started by local
808         flag = flag | CHECK_FLAG_SPONSOR;
809     }
810     return flag;
811 }
812 
AbortMachineIfNeed(uint32_t syncId)813 void SyncTaskContext::AbortMachineIfNeed(uint32_t syncId)
814 {
815     uint32_t sessionId = 0u;
816     {
817         RefObject::AutoLock autoLock(this);
818         if (syncId_ != syncId) {
819             return;
820         }
821         sessionId = requestSessionId_;
822     }
823     stateMachine_->InnerErrorAbort(sessionId);
824 }
825 
GetAndIncSyncOperation() const826 SyncOperation *SyncTaskContext::GetAndIncSyncOperation() const
827 {
828     std::lock_guard<std::mutex> lock(operationLock_);
829     if (syncOperation_ == nullptr) {
830         return nullptr;
831     }
832     RefObject::IncObjRef(syncOperation_);
833     return syncOperation_;
834 }
835 
GenerateRequestSessionId()836 uint32_t SyncTaskContext::GenerateRequestSessionId()
837 {
838     uint32_t sessionId = lastRequestSessionId_ != 0 ? lastRequestSessionId_ + 1 : 0;
839     // make sure sessionId is between 0x01 and 0x8fffffff
840     if (sessionId > SESSION_ID_MAX_VALUE || sessionId == 0) {
841         sessionId = Hash::Hash32Func(deviceId_ + std::to_string(syncId_) +
842             std::to_string(TimeHelper::GetSysCurrentTime()));
843     }
844     lastRequestSessionId_ = sessionId;
845     return sessionId;
846 }
847 
IsSchemaCompatible() const848 bool SyncTaskContext::IsSchemaCompatible() const
849 {
850     return true;
851 }
852 
SetDbAbility(DbAbility & remoteDbAbility)853 void SyncTaskContext::SetDbAbility([[gnu::unused]] DbAbility &remoteDbAbility)
854 {
855 }
856 
TimeChange()857 void SyncTaskContext::TimeChange()
858 {
859     if (stateMachine_ == nullptr) {
860         LOGW("[SyncTaskContext] machine is null when time change");
861         return;
862     }
863     stateMachine_->TimeChange();
864 }
865 
GetResponseTaskCount()866 int32_t SyncTaskContext::GetResponseTaskCount()
867 {
868     std::lock_guard<std::mutex> autoLock(targetQueueLock_);
869     return static_cast<int32_t>(responseTargetQueue_.size());
870 }
871 
GetCommErrCode() const872 int SyncTaskContext::GetCommErrCode() const
873 {
874     return commErrCode_;
875 }
876 
SetCommFailErrCode(int errCode)877 void SyncTaskContext::SetCommFailErrCode(int errCode)
878 {
879     commErrCode_ = errCode;
880 }
881 
SetErrCodeWhenWaitTimeOut(int errCode)882 void SyncTaskContext::SetErrCodeWhenWaitTimeOut(int errCode)
883 {
884     if (errCode > 0) {
885         SetCommFailErrCode(static_cast<int>(SyncOperation::OP_TIMEOUT));
886     } else {
887         SetCommFailErrCode(errCode);
888     }
889 }
890 } // namespace DistributedDB
891