• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_task_context.h"
17 
18 #include <algorithm>
19 #include <cmath>
20 
21 #include "db_constant.h"
22 #include "db_dump_helper.h"
23 #include "db_dfx_adapter.h"
24 #include "db_errno.h"
25 #include "hash.h"
26 #include "isync_state_machine.h"
27 #include "log_print.h"
28 #include "time_helper.h"
29 #include "version.h"
30 
31 namespace DistributedDB {
32 std::mutex SyncTaskContext::synTaskContextSetLock_;
33 std::set<ISyncTaskContext *> SyncTaskContext::synTaskContextSet_;
34 
35 namespace {
36     constexpr int NEGOTIATION_LIMIT = 2;
37     constexpr uint32_t SESSION_ID_MAX_VALUE = 0x8fffffffu;
38 }
39 
SyncTaskContext()40 SyncTaskContext::SyncTaskContext()
41     : syncOperation_(nullptr),
42       syncId_(0),
43       mode_(0),
44       isAutoSync_(false),
45       status_(0),
46       taskExecStatus_(0),
47       syncInterface_(nullptr),
48       communicator_(nullptr),
49       stateMachine_(nullptr),
50       requestSessionId_(0),
51       lastRequestSessionId_(0),
52       timeHelper_(nullptr),
53       remoteSoftwareVersion_(0),
54       remoteSoftwareVersionId_(0),
55       isCommNormal_(true),
56       taskErrCode_(E_OK),
57       commErrCode_(E_OK),
58       syncTaskRetryStatus_(false),
59       isSyncRetry_(false),
60       negotiationCount_(0),
61       isAutoSubscribe_(false)
62 {
63 }
64 
~SyncTaskContext()65 SyncTaskContext::~SyncTaskContext()
66 {
67     if (stateMachine_ != nullptr) {
68         delete stateMachine_;
69         stateMachine_ = nullptr;
70     }
71     SyncTaskContext::ClearSyncOperation();
72     ClearSyncTarget();
73     syncInterface_ = nullptr;
74     communicator_ = nullptr;
75 }
76 
AddSyncTarget(ISyncTarget * target)77 int SyncTaskContext::AddSyncTarget(ISyncTarget *target)
78 {
79     if (target == nullptr) {
80         return -E_INVALID_ARGS;
81     }
82     int targetMode = target->GetMode();
83     auto syncId = static_cast<uint32_t>(target->GetSyncId());
84     {
85         std::lock_guard<std::mutex> lock(targetQueueLock_);
86         if (target->GetTaskType() == ISyncTarget::REQUEST) {
87             requestTargetQueue_.push_back(target);
88         } else if (target->GetTaskType() == ISyncTarget::RESPONSE) {
89             responseTargetQueue_.push_back(target);
90         } else {
91             return -E_INVALID_ARGS;
92         }
93     }
94     RefObject::IncObjRef(this);
95     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetMode, syncId]() {
96         CancelCurrentSyncRetryIfNeed(targetMode, syncId);
97         RefObject::DecObjRef(this);
98     });
99     if (errCode != E_OK) {
100         RefObject::DecObjRef(this);
101     }
102     if (onSyncTaskAdd_) {
103         RefObject::IncObjRef(this);
104         errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
105             onSyncTaskAdd_();
106             RefObject::DecObjRef(this);
107         });
108         if (errCode != E_OK) {
109             RefObject::DecObjRef(this);
110         }
111     }
112     return E_OK;
113 }
114 
SetOperationStatus(int status)115 void SyncTaskContext::SetOperationStatus(int status)
116 {
117     std::lock_guard<std::mutex> lock(operationLock_);
118     if (syncOperation_ == nullptr) {
119         LOGD("[SyncTaskContext][SetStatus] syncOperation is null");
120         return;
121     }
122     int finalStatus = status;
123 
124     int operationStatus = syncOperation_->GetStatus(deviceId_);
125     if (status == SyncOperation::OP_SEND_FINISHED && operationStatus == SyncOperation::OP_RECV_FINISHED) {
126         if (GetTaskErrCode() == -E_EKEYREVOKED) { // LCOV_EXCL_BR_LINE
127             finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
128         } else {
129             finalStatus = SyncOperation::OP_FINISHED_ALL;
130         }
131     } else if (status == SyncOperation::OP_RECV_FINISHED && operationStatus == SyncOperation::OP_SEND_FINISHED) {
132         if (GetTaskErrCode() == -E_EKEYREVOKED) {
133             finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
134         } else {
135             finalStatus = SyncOperation::OP_FINISHED_ALL;
136         }
137     }
138     syncOperation_->SetStatus(deviceId_, finalStatus);
139     if (finalStatus >= SyncOperation::OP_FINISHED_ALL) {
140         SaveLastPushTaskExecStatus(finalStatus);
141     }
142     if (syncOperation_->CheckIsAllFinished()) {
143         syncOperation_->Finished();
144     }
145 }
146 
SaveLastPushTaskExecStatus(int finalStatus)147 void SyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
148 {
149     (void)finalStatus;
150 }
151 
Clear()152 void SyncTaskContext::Clear()
153 {
154     StopTimer();
155     retryTime_ = 0;
156     sequenceId_ = 1;
157     syncId_ = 0;
158     isAutoSync_ = false;
159     requestSessionId_ = 0;
160     isNeedRetry_ = NO_NEED_RETRY;
161     mode_ = SyncModeType::INVALID_MODE;
162     status_ = SyncOperation::OP_WAITING;
163     taskErrCode_ = E_OK;
164     packetId_ = 0;
165     isAutoSubscribe_ = false;
166 }
167 
RemoveSyncOperation(int syncId)168 int SyncTaskContext::RemoveSyncOperation(int syncId)
169 {
170     std::lock_guard<std::mutex> lock(targetQueueLock_);
171     auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
172         [syncId](const ISyncTarget *target) {
173             if (target == nullptr) {
174                 return false;
175             }
176             return target->GetSyncId() == syncId;
177         });
178     if (iter != requestTargetQueue_.end()) {
179         if (*iter != nullptr) {
180             delete *iter;
181             *iter = nullptr;
182         }
183         requestTargetQueue_.erase(iter);
184         return E_OK;
185     }
186     return -E_INVALID_ARGS;
187 }
188 
ClearSyncTarget()189 void SyncTaskContext::ClearSyncTarget()
190 {
191     std::lock_guard<std::mutex> lock(targetQueueLock_);
192     for (auto &requestTarget : requestTargetQueue_) {
193         if (requestTarget != nullptr) {
194             delete requestTarget;
195             requestTarget = nullptr;
196         }
197     }
198     requestTargetQueue_.clear();
199 
200     for (auto &responseTarget : responseTargetQueue_) {
201         if (responseTarget != nullptr) { // LCOV_EXCL_BR_LINE
202             delete responseTarget;
203             responseTarget = nullptr;
204         }
205     }
206     responseTargetQueue_.clear();
207 }
208 
IsTargetQueueEmpty() const209 bool SyncTaskContext::IsTargetQueueEmpty() const
210 {
211     std::lock_guard<std::mutex> lock(targetQueueLock_);
212     return requestTargetQueue_.empty() && responseTargetQueue_.empty();
213 }
214 
GetOperationStatus() const215 int SyncTaskContext::GetOperationStatus() const
216 {
217     std::lock_guard<std::mutex> lock(operationLock_);
218     if (syncOperation_ == nullptr) {
219         return SyncOperation::OP_FINISHED_ALL;
220     }
221     return syncOperation_->GetStatus(deviceId_);
222 }
223 
SetMode(int mode)224 void SyncTaskContext::SetMode(int mode)
225 {
226     mode_ = mode;
227 }
228 
GetMode() const229 int SyncTaskContext::GetMode() const
230 {
231     return mode_;
232 }
233 
MoveToNextTarget()234 void SyncTaskContext::MoveToNextTarget()
235 {
236     ClearSyncOperation();
237     TaskParam param;
238     // call other system api without lock
239     param.timeout = communicator_->GetTimeout(deviceId_);
240     std::lock_guard<std::mutex> lock(targetQueueLock_);
241     while (!requestTargetQueue_.empty() || !responseTargetQueue_.empty()) {
242         ISyncTarget *tmpTarget = nullptr;
243         if (!requestTargetQueue_.empty()) {
244             tmpTarget = requestTargetQueue_.front();
245             requestTargetQueue_.pop_front();
246         } else {
247             tmpTarget = responseTargetQueue_.front();
248             responseTargetQueue_.pop_front();
249         }
250         if (tmpTarget == nullptr) {
251             LOGE("[SyncTaskContext][MoveToNextTarget] currentTarget is null skip!");
252             continue;
253         }
254         SyncOperation *tmpOperation = nullptr;
255         tmpTarget->GetSyncOperation(tmpOperation);
256         if ((tmpOperation != nullptr) && tmpOperation->IsKilled()) {
257             // if killed skip this syncOperation_.
258             delete tmpTarget;
259             tmpTarget = nullptr;
260             continue;
261         }
262         CopyTargetData(tmpTarget, param);
263         delete tmpTarget;
264         tmpTarget = nullptr;
265         break;
266     }
267 }
268 
GetNextTarget()269 int SyncTaskContext::GetNextTarget()
270 {
271     MoveToNextTarget();
272     int checkErrCode = RunPermissionCheck(GetPermissionCheckFlag(IsAutoSync(), GetMode()));
273     if (checkErrCode != E_OK) {
274         SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
275         return checkErrCode;
276     }
277     return E_OK;
278 }
279 
GetSyncId() const280 uint32_t SyncTaskContext::GetSyncId() const
281 {
282     return syncId_;
283 }
284 
285 // Get the current task deviceId.
GetDeviceId() const286 std::string SyncTaskContext::GetDeviceId() const
287 {
288     return deviceId_;
289 }
290 
SetTaskExecStatus(int status)291 void SyncTaskContext::SetTaskExecStatus(int status)
292 {
293     taskExecStatus_ = status;
294 }
295 
GetTaskExecStatus() const296 int SyncTaskContext::GetTaskExecStatus() const
297 {
298     return taskExecStatus_;
299 }
300 
IsAutoSync() const301 bool SyncTaskContext::IsAutoSync() const
302 {
303     return isAutoSync_;
304 }
305 
StartTimer()306 int SyncTaskContext::StartTimer()
307 {
308     std::lock_guard<std::mutex> lockGuard(timerLock_);
309     if (timerId_ > 0) {
310         return -E_UNEXPECTED_DATA;
311     }
312     TimerId timerId = 0;
313     RefObject::IncObjRef(this);
314     TimerAction timeOutCallback = [this](TimerId id) { return TimeOut(id); };
315     int errCode = RuntimeContext::GetInstance()->SetTimer(timeout_, timeOutCallback,
316         [this]() {
317             int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(this); });
318             if (ret != E_OK) {
319                 LOGE("[SyncTaskContext] timer finalizer ScheduleTask, errCode %d", ret);
320             }
321         }, timerId);
322     if (errCode != E_OK) {
323         RefObject::DecObjRef(this);
324         return errCode;
325     }
326     timerId_ = timerId;
327     return errCode;
328 }
329 
StopTimer()330 void SyncTaskContext::StopTimer()
331 {
332     TimerId timerId;
333     {
334         std::lock_guard<std::mutex> lockGuard(timerLock_);
335         timerId = timerId_;
336         if (timerId_ == 0) {
337             return;
338         }
339         timerId_ = 0;
340     }
341     RuntimeContext::GetInstance()->RemoveTimer(timerId);
342 }
343 
ModifyTimer(int milliSeconds)344 int SyncTaskContext::ModifyTimer(int milliSeconds)
345 {
346     std::lock_guard<std::mutex> lockGuard(timerLock_);
347     if (timerId_ == 0) {
348         return -E_UNEXPECTED_DATA;
349     }
350     return RuntimeContext::GetInstance()->ModifyTimer(timerId_, milliSeconds);
351 }
352 
SetRetryTime(int retryTime)353 void SyncTaskContext::SetRetryTime(int retryTime)
354 {
355     retryTime_ = retryTime;
356 }
357 
GetRetryTime() const358 int SyncTaskContext::GetRetryTime() const
359 {
360     return retryTime_;
361 }
362 
SetRetryStatus(int isNeedRetry)363 void SyncTaskContext::SetRetryStatus(int isNeedRetry)
364 {
365     isNeedRetry_ = isNeedRetry;
366 }
367 
GetRetryStatus() const368 int SyncTaskContext::GetRetryStatus() const
369 {
370     return isNeedRetry_;
371 }
372 
GetTimerId() const373 TimerId SyncTaskContext::GetTimerId() const
374 {
375     return timerId_;
376 }
377 
GetRequestSessionId() const378 uint32_t SyncTaskContext::GetRequestSessionId() const
379 {
380     return requestSessionId_;
381 }
382 
IncSequenceId()383 void SyncTaskContext::IncSequenceId()
384 {
385     sequenceId_++;
386 }
387 
GetSequenceId() const388 uint32_t SyncTaskContext::GetSequenceId() const
389 {
390     return sequenceId_;
391 }
392 
ReSetSequenceId()393 void SyncTaskContext::ReSetSequenceId()
394 {
395     sequenceId_ = 1;
396 }
397 
IncPacketId()398 void SyncTaskContext::IncPacketId()
399 {
400     packetId_++;
401 }
402 
GetPacketId() const403 uint64_t SyncTaskContext::GetPacketId() const
404 {
405     return packetId_;
406 }
407 
GetTimeoutTime() const408 int SyncTaskContext::GetTimeoutTime() const
409 {
410     return timeout_;
411 }
412 
SetTimeoutCallback(const TimerAction & timeOutCallback)413 void SyncTaskContext::SetTimeoutCallback(const TimerAction &timeOutCallback)
414 {
415     timeOutCallback_ = timeOutCallback;
416 }
417 
SetTimeOffset(TimeOffset offset)418 void SyncTaskContext::SetTimeOffset(TimeOffset offset)
419 {
420     timeOffset_ = offset;
421 }
422 
GetTimeOffset() const423 TimeOffset SyncTaskContext::GetTimeOffset() const
424 {
425     return timeOffset_;
426 }
427 
StartStateMachine()428 int SyncTaskContext::StartStateMachine()
429 {
430     return stateMachine_->StartSync();
431 }
432 
ReceiveMessageCallback(Message * inMsg)433 int SyncTaskContext::ReceiveMessageCallback(Message *inMsg)
434 {
435     if (inMsg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
436         uint16_t remoteVersion = 0;
437         (void)communicator_->GetRemoteCommunicatorVersion(deviceId_, remoteVersion);
438         SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST + remoteVersion);
439     }
440     int errCode = E_OK;
441     if (IncUsedCount() == E_OK) {
442         errCode = stateMachine_->ReceiveMessageCallback(inMsg);
443         SafeExit();
444     }
445     return errCode;
446 }
447 
RegOnSyncTask(const std::function<int (void)> & callback)448 void SyncTaskContext::RegOnSyncTask(const std::function<int(void)> &callback)
449 {
450     onSyncTaskAdd_ = callback;
451 }
452 
IncUsedCount()453 int SyncTaskContext::IncUsedCount()
454 {
455     AutoLock lock(this);
456     if (IsKilled()) {
457         LOGI("[SyncTaskContext] IncUsedCount isKilled");
458         return -E_OBJ_IS_KILLED;
459     }
460     usedCount_++;
461     return E_OK;
462 }
463 
SafeExit()464 void SyncTaskContext::SafeExit()
465 {
466     AutoLock lock(this);
467     usedCount_--;
468     if (usedCount_ < 1) {
469         safeKill_.notify_one();
470     }
471 }
472 
GetCurrentLocalTime() const473 Timestamp SyncTaskContext::GetCurrentLocalTime() const
474 {
475     if (timeHelper_ == nullptr) {
476         return TimeHelper::INVALID_TIMESTAMP;
477     }
478     return timeHelper_->GetTime();
479 }
480 
Abort(int status)481 void SyncTaskContext::Abort(int status)
482 {
483     (void)status;
484     Clear();
485 }
486 
CommErrHandlerFunc(int errCode,ISyncTaskContext * context,int32_t sessionId,bool isDirectEnd)487 void SyncTaskContext::CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId, bool isDirectEnd)
488 {
489     {
490         std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
491         if (synTaskContextSet_.count(context) == 0) {
492             LOGI("[SyncTaskContext][CommErrHandle] context has been killed");
493             return;
494         }
495         // IncObjRef to maker sure context not been killed. after the lock_guard
496         RefObject::IncObjRef(context);
497     }
498 
499     static_cast<SyncTaskContext *>(context)->CommErrHandlerFuncInner(errCode, static_cast<uint32_t>(sessionId),
500         isDirectEnd);
501     RefObject::DecObjRef(context);
502 }
503 
SetRemoteSoftwareVersion(uint32_t version)504 void SyncTaskContext::SetRemoteSoftwareVersion(uint32_t version)
505 {
506     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
507     if (remoteSoftwareVersion_ == version) {
508         return;
509     }
510     remoteSoftwareVersion_ = version;
511     remoteSoftwareVersionId_++;
512 }
513 
GetRemoteSoftwareVersion() const514 uint32_t SyncTaskContext::GetRemoteSoftwareVersion() const
515 {
516     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
517     return remoteSoftwareVersion_;
518 }
519 
GetRemoteSoftwareVersionId() const520 uint64_t SyncTaskContext::GetRemoteSoftwareVersionId() const
521 {
522     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
523     return remoteSoftwareVersionId_;
524 }
525 
IsCommNormal() const526 bool SyncTaskContext::IsCommNormal() const
527 {
528     return isCommNormal_;
529 }
530 
CommErrHandlerFuncInner(int errCode,uint32_t sessionId,bool isDirectEnd)531 void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId, bool isDirectEnd)
532 {
533     {
534         RefObject::AutoLock lock(this);
535         if ((sessionId != requestSessionId_) || (requestSessionId_ == 0)) {
536             return;
537         }
538 
539         if (errCode == E_OK) {
540             SetCommFailErrCode(errCode);
541             // when communicator sent message failed, the state machine will get the error and exit this sync task
542             // it seems unnecessary to change isCommNormal_ value, so just return here
543             return;
544         }
545     }
546     LOGE("[SyncTaskContext][CommErr] errCode %d, isDirectEnd %d", errCode, static_cast<int>(isDirectEnd));
547     if (!isDirectEnd) {
548         SetErrCodeWhenWaitTimeOut(errCode);
549         return;
550     }
551     if (errCode > 0) {
552         SetCommFailErrCode(static_cast<int>(COMM_FAILURE));
553     } else {
554         SetCommFailErrCode(errCode);
555     }
556     stateMachine_->CommErrAbort(sessionId);
557 }
558 
TimeOut(TimerId id)559 int SyncTaskContext::TimeOut(TimerId id)
560 {
561     if (!timeOutCallback_) {
562         return E_OK;
563     }
564     IncObjRef(this);
565     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, id]() {
566         timeOutCallback_(id);
567         DecObjRef(this);
568     });
569     if (errCode != E_OK) {
570         LOGW("[SyncTaskContext][TimeOut] Trigger TimeOut Async Failed! TimerId=" PRIu64 " errCode=%d", id, errCode);
571         DecObjRef(this);
572     }
573     return E_OK;
574 }
575 
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)576 void SyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
577 {
578     retryTime_ = 0;
579     mode_ = target->GetMode();
580     status_ = SyncOperation::OP_SYNCING;
581     isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
582     taskErrCode_ = E_OK;
583     packetId_ = 0;
584     isCommNormal_ = true; // reset comm status here
585     commErrCode_ = E_OK;
586     syncTaskRetryStatus_ = isSyncRetry_;
587     timeout_ = static_cast<int>(taskParam.timeout);
588     negotiationCount_ = 0;
589     target->GetSyncOperation(syncOperation_);
590     ReSetSequenceId();
591 
592     if (syncOperation_ != nullptr) {
593         // IncRef for syncOperation_ to make sure syncOperation_ is valid, when setStatus
594         RefObject::IncObjRef(syncOperation_);
595         syncId_ = syncOperation_->GetSyncId();
596         isAutoSync_ = syncOperation_->IsAutoSync();
597         isAutoSubscribe_ = syncOperation_->IsAutoControlCmd();
598         if (isAutoSync_ || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
599             syncTaskRetryStatus_ = true;
600         }
601         requestSessionId_ = GenerateRequestSessionId();
602         LOGI("[SyncTaskContext][copyTarget] mode=%d,syncId=%d,isAutoSync=%d,isRetry=%d,dev=%s{private}",
603             mode_, syncId_, isAutoSync_, syncTaskRetryStatus_, deviceId_.c_str());
604         DBDfxAdapter::StartAsyncTrace(syncActionName_, static_cast<int>(syncId_));
605     } else {
606         isAutoSync_ = false;
607         LOGI("[SyncTaskContext][copyTarget] for response data dev %s{private},isRetry=%d", deviceId_.c_str(),
608             syncTaskRetryStatus_);
609     }
610 }
611 
KillWait()612 void SyncTaskContext::KillWait()
613 {
614     StopTimer();
615     UnlockObj();
616     stateMachine_->NotifyClosing();
617     stateMachine_->AbortImmediately();
618     LockObj();
619     LOGW("[SyncTaskContext] Try to kill a context, now wait.");
620     bool noDeadLock = WaitLockedUntil(
621         safeKill_,
622         [this]() {
623             if (usedCount_ < 1) {
624                 return true;
625             }
626             return false;
627         },
628         KILL_WAIT_SECONDS);
629     if (!noDeadLock) { // LCOV_EXCL_BR_LINE
630         LOGE("[SyncTaskContext] Dead lock may happen, we stop waiting the task exit.");
631     } else {
632         LOGW("[SyncTaskContext] Wait the task exit ok.");
633     }
634     std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
635     synTaskContextSet_.erase(this);
636 }
637 
ClearSyncOperation()638 void SyncTaskContext::ClearSyncOperation()
639 {
640     std::lock_guard<std::mutex> lock(operationLock_);
641     if (syncOperation_ != nullptr) {
642         DBDfxAdapter::FinishAsyncTrace(syncActionName_, static_cast<int>(syncId_));
643         RefObject::DecObjRef(syncOperation_);
644         syncOperation_ = nullptr;
645     }
646 }
647 
CancelCurrentSyncRetryIfNeed(int newTargetMode,uint32_t syncId)648 void SyncTaskContext::CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId)
649 {
650     AutoLock lock(this);
651     if (!isAutoSync_) {
652         return;
653     }
654     if (syncId_ >= syncId) {
655         return;
656     }
657     int mode = SyncOperation::TransferSyncMode(newTargetMode);
658     if (newTargetMode == mode_ || mode == SyncModeType::PUSH_AND_PULL) {
659         SetRetryTime(AUTO_RETRY_TIMES);
660         ModifyTimer(timeout_);
661     }
662 }
663 
GetTaskErrCode() const664 int SyncTaskContext::GetTaskErrCode() const
665 {
666     return taskErrCode_;
667 }
668 
SetTaskErrCode(int errCode)669 void SyncTaskContext::SetTaskErrCode(int errCode)
670 {
671     taskErrCode_ = errCode;
672 }
673 
IsSyncTaskNeedRetry() const674 bool SyncTaskContext::IsSyncTaskNeedRetry() const
675 {
676     return syncTaskRetryStatus_;
677 }
678 
SetSyncRetry(bool isRetry)679 void SyncTaskContext::SetSyncRetry(bool isRetry)
680 {
681     isSyncRetry_ = isRetry;
682 }
683 
GetSyncRetryTimes() const684 int SyncTaskContext::GetSyncRetryTimes() const
685 {
686     if (IsAutoSync() || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
687         return AUTO_RETRY_TIMES;
688     }
689     return MANUAL_RETRY_TIMES;
690 }
691 
GetSyncRetryTimeout(int retryTime) const692 int SyncTaskContext::GetSyncRetryTimeout(int retryTime) const
693 {
694     int timeoutTime = GetTimeoutTime();
695     if (IsAutoSync()) {
696         // set the new timeout value with 2 raised to the power of retryTime.
697         return timeoutTime * (1u << retryTime);
698     }
699     return timeoutTime;
700 }
701 
ClearAllSyncTask()702 void SyncTaskContext::ClearAllSyncTask()
703 {
704 }
705 
IsAutoLiftWaterMark() const706 bool SyncTaskContext::IsAutoLiftWaterMark() const
707 {
708     return negotiationCount_ < NEGOTIATION_LIMIT;
709 }
710 
IncNegotiationCount()711 void SyncTaskContext::IncNegotiationCount()
712 {
713     negotiationCount_++;
714 }
715 
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)716 bool SyncTaskContext::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
717 {
718     return stateMachine_->IsNeedTriggerQueryAutoSync(inMsg, query);
719 }
720 
IsAutoSubscribe() const721 bool SyncTaskContext::IsAutoSubscribe() const
722 {
723     return isAutoSubscribe_;
724 }
725 
IsCurrentSyncTaskCanBeSkipped() const726 bool SyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
727 {
728     return false;
729 }
730 
ResetLastPushTaskStatus()731 void SyncTaskContext::ResetLastPushTaskStatus()
732 {
733 }
734 
SchemaChange()735 void SyncTaskContext::SchemaChange()
736 {
737     if (stateMachine_ != nullptr) {
738         stateMachine_->SchemaChange();
739     }
740 }
741 
Dump(int fd)742 void SyncTaskContext::Dump(int fd)
743 {
744     size_t totalSyncTaskCount = 0u;
745     size_t autoSyncTaskCount = 0u;
746     size_t reponseTaskCount = 0u;
747     {
748         std::lock_guard<std::mutex> lock(targetQueueLock_);
749         totalSyncTaskCount = requestTargetQueue_.size() + responseTargetQueue_.size();
750         for (const auto &target : requestTargetQueue_) {
751             if (target->IsAutoSync()) { // LCOV_EXCL_BR_LINE
752                 autoSyncTaskCount++;
753             }
754         }
755         reponseTaskCount = responseTargetQueue_.size();
756     }
757     DBDumpHelper::Dump(fd, "\t\ttarget = %s, total sync task count = %zu, auto sync task count = %zu,"
758         " response task count = %zu\n",
759         deviceId_.c_str(), totalSyncTaskCount, autoSyncTaskCount, reponseTaskCount);
760 }
761 
RunPermissionCheck(uint8_t flag) const762 int SyncTaskContext::RunPermissionCheck(uint8_t flag) const
763 {
764     std::string appId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
765     std::string userId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
766     std::string storeId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
767     int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
768     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
769         { userId, appId, storeId, deviceId_, instanceId }, flag);
770     if (errCode != E_OK) {
771         LOGE("[SyncTaskContext] RunPermissionCheck not pass errCode:%d, flag:%d, %s{private}",
772             errCode, flag, deviceId_.c_str());
773     }
774     return errCode;
775 }
776 
GetPermissionCheckFlag(bool isAutoSync,int syncMode)777 uint8_t SyncTaskContext::GetPermissionCheckFlag(bool isAutoSync, int syncMode)
778 {
779     uint8_t flag = 0;
780     int mode = SyncOperation::TransferSyncMode(syncMode);
781     if (mode == SyncModeType::PUSH || mode == SyncModeType::RESPONSE_PULL) {
782         flag = CHECK_FLAG_SEND;
783     } else if (mode == SyncModeType::PULL) {
784         flag = CHECK_FLAG_RECEIVE;
785     } else if (mode == SyncModeType::PUSH_AND_PULL) {
786         flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE;
787     }
788     if (isAutoSync) {
789         flag = flag | CHECK_FLAG_AUTOSYNC;
790     }
791     if (mode != SyncModeType::RESPONSE_PULL) {
792         // it means this sync is started by local
793         flag = flag | CHECK_FLAG_SPONSOR;
794     }
795     return flag;
796 }
797 
AbortMachineIfNeed(uint32_t syncId)798 void SyncTaskContext::AbortMachineIfNeed(uint32_t syncId)
799 {
800     uint32_t sessionId = 0u;
801     {
802         RefObject::AutoLock autoLock(this);
803         if (syncId_ != syncId) {
804             return;
805         }
806         sessionId = requestSessionId_;
807     }
808     stateMachine_->InnerErrorAbort(sessionId);
809 }
810 
GetAndIncSyncOperation() const811 SyncOperation *SyncTaskContext::GetAndIncSyncOperation() const
812 {
813     std::lock_guard<std::mutex> lock(operationLock_);
814     if (syncOperation_ == nullptr) {
815         return nullptr;
816     }
817     RefObject::IncObjRef(syncOperation_);
818     return syncOperation_;
819 }
820 
GenerateRequestSessionId()821 uint32_t SyncTaskContext::GenerateRequestSessionId()
822 {
823     uint32_t sessionId = lastRequestSessionId_ != 0 ? lastRequestSessionId_ + 1 : 0;
824     // make sure sessionId is between 0x01 and 0x8fffffff
825     if (sessionId > SESSION_ID_MAX_VALUE || sessionId == 0) {
826         sessionId = Hash::Hash32Func(deviceId_ + std::to_string(syncId_) +
827             std::to_string(TimeHelper::GetSysCurrentTime()));
828     }
829     lastRequestSessionId_ = sessionId;
830     return sessionId;
831 }
832 
IsSchemaCompatible() const833 bool SyncTaskContext::IsSchemaCompatible() const
834 {
835     return true;
836 }
837 
SetDbAbility(DbAbility & remoteDbAbility)838 void SyncTaskContext::SetDbAbility([[gnu::unused]] DbAbility &remoteDbAbility)
839 {
840 }
841 
TimeChange()842 void SyncTaskContext::TimeChange()
843 {
844     if (stateMachine_ == nullptr) {
845         LOGW("[SyncTaskContext] machine is null when time change");
846         return;
847     }
848     stateMachine_->TimeChange();
849 }
850 
GetResponseTaskCount()851 int32_t SyncTaskContext::GetResponseTaskCount()
852 {
853     std::lock_guard<std::mutex> autoLock(targetQueueLock_);
854     return static_cast<int32_t>(responseTargetQueue_.size());
855 }
856 
GetCommErrCode() const857 int SyncTaskContext::GetCommErrCode() const
858 {
859     return commErrCode_;
860 }
861 
SetCommFailErrCode(int errCode)862 void SyncTaskContext::SetCommFailErrCode(int errCode)
863 {
864     commErrCode_ = errCode;
865 }
866 
SetErrCodeWhenWaitTimeOut(int errCode)867 void SyncTaskContext::SetErrCodeWhenWaitTimeOut(int errCode)
868 {
869     if (errCode > 0) {
870         SetCommFailErrCode(static_cast<int>(TIME_OUT));
871     } else {
872         SetCommFailErrCode(errCode);
873     }
874 }
875 } // namespace DistributedDB
876