• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_task_context.h"
17 
18 #include <algorithm>
19 #include <cmath>
20 
21 #include "db_constant.h"
22 #include "db_dump_helper.h"
23 #include "db_dfx_adapter.h"
24 #include "db_errno.h"
25 #include "hash.h"
26 #include "isync_state_machine.h"
27 #include "log_print.h"
28 #include "time_helper.h"
29 #include "version.h"
30 
31 namespace DistributedDB {
32 std::mutex SyncTaskContext::synTaskContextSetLock_;
33 std::set<ISyncTaskContext *> SyncTaskContext::synTaskContextSet_;
34 
35 namespace {
36     constexpr int NEGOTIATION_LIMIT = 2;
37     constexpr uint32_t SESSION_ID_MAX_VALUE = 0x8fffffffu;
38 }
39 
SyncTaskContext()40 SyncTaskContext::SyncTaskContext()
41     : syncOperation_(nullptr),
42       syncId_(0),
43       mode_(0),
44       isAutoSync_(false),
45       status_(0),
46       taskExecStatus_(0),
47       syncInterface_(nullptr),
48       communicator_(nullptr),
49       stateMachine_(nullptr),
50       requestSessionId_(0),
51       lastRequestSessionId_(0),
52       timeHelper_(nullptr),
53       remoteSoftwareVersion_(0),
54       remoteSoftwareVersionId_(0),
55       isCommNormal_(true),
56       taskErrCode_(E_OK),
57       commErrCode_(E_OK),
58       syncTaskRetryStatus_(false),
59       isSyncRetry_(false),
60       negotiationCount_(0),
61       isAutoSubscribe_(false)
62 {
63 }
64 
~SyncTaskContext()65 SyncTaskContext::~SyncTaskContext()
66 {
67     if (stateMachine_ != nullptr) {
68         delete stateMachine_;
69         stateMachine_ = nullptr;
70     }
71     SyncTaskContext::ClearSyncOperation();
72     ClearSyncTarget();
73     syncInterface_ = nullptr;
74     communicator_ = nullptr;
75 }
76 
AddSyncTarget(ISyncTarget * target)77 int SyncTaskContext::AddSyncTarget(ISyncTarget *target)
78 {
79     if (target == nullptr) {
80         return -E_INVALID_ARGS;
81     }
82     int targetMode = target->GetMode();
83     auto syncId = static_cast<uint32_t>(target->GetSyncId());
84     {
85         std::lock_guard<std::mutex> lock(targetQueueLock_);
86         if (target->GetTaskType() == ISyncTarget::REQUEST) {
87             requestTargetQueue_.push_back(target);
88         } else if (target->GetTaskType() == ISyncTarget::RESPONSE) {
89             responseTargetQueue_.push_back(target);
90         } else {
91             return -E_INVALID_ARGS;
92         }
93     }
94     RefObject::IncObjRef(this);
95     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetMode, syncId]() {
96         CancelCurrentSyncRetryIfNeed(targetMode, syncId);
97         RefObject::DecObjRef(this);
98     });
99     if (errCode != E_OK) {
100         RefObject::DecObjRef(this);
101     }
102     if (onSyncTaskAdd_) {
103         RefObject::IncObjRef(this);
104         errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
105             onSyncTaskAdd_();
106             RefObject::DecObjRef(this);
107         });
108         if (errCode != E_OK) {
109             RefObject::DecObjRef(this);
110         }
111     }
112     return E_OK;
113 }
114 
SetOperationStatus(int status)115 void SyncTaskContext::SetOperationStatus(int status)
116 {
117     std::lock_guard<std::mutex> lock(operationLock_);
118     if (syncOperation_ == nullptr) {
119         LOGD("[SyncTaskContext][SetStatus] syncOperation is null");
120         return;
121     }
122     int finalStatus = status;
123 
124     int operationStatus = syncOperation_->GetStatus(deviceId_);
125     if (status == SyncOperation::OP_SEND_FINISHED && operationStatus == SyncOperation::OP_RECV_FINISHED) {
126         if (GetTaskErrCode() == -E_EKEYREVOKED) { // LCOV_EXCL_BR_LINE
127             finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
128         } else {
129             finalStatus = SyncOperation::OP_FINISHED_ALL;
130         }
131     } else if (status == SyncOperation::OP_RECV_FINISHED && operationStatus == SyncOperation::OP_SEND_FINISHED) {
132         if (GetTaskErrCode() == -E_EKEYREVOKED) {
133             finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
134         } else {
135             finalStatus = SyncOperation::OP_FINISHED_ALL;
136         }
137     }
138     syncOperation_->SetStatus(deviceId_, finalStatus);
139     if (finalStatus >= SyncOperation::OP_FINISHED_ALL) {
140         SaveLastPushTaskExecStatus(finalStatus);
141     }
142     if (syncOperation_->CheckIsAllFinished()) {
143         syncOperation_->Finished();
144     }
145 }
146 
SaveLastPushTaskExecStatus(int finalStatus)147 void SyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
148 {
149     (void)finalStatus;
150 }
151 
Clear()152 void SyncTaskContext::Clear()
153 {
154     StopTimer();
155     retryTime_ = 0;
156     sequenceId_ = 1;
157     syncId_ = 0;
158     isAutoSync_ = false;
159     requestSessionId_ = 0;
160     isNeedRetry_ = NO_NEED_RETRY;
161     mode_ = SyncModeType::INVALID_MODE;
162     status_ = SyncOperation::OP_WAITING;
163     taskErrCode_ = E_OK;
164     packetId_ = 0;
165     isAutoSubscribe_ = false;
166 }
167 
RemoveSyncOperation(int syncId)168 int SyncTaskContext::RemoveSyncOperation(int syncId)
169 {
170     std::lock_guard<std::mutex> lock(targetQueueLock_);
171     auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
172         [syncId](const ISyncTarget *target) {
173             if (target == nullptr) {
174                 return false;
175             }
176             return target->GetSyncId() == syncId;
177         });
178     if (iter != requestTargetQueue_.end()) {
179         if (*iter != nullptr) {
180             delete *iter;
181         }
182         requestTargetQueue_.erase(iter);
183         return E_OK;
184     }
185     return -E_INVALID_ARGS;
186 }
187 
ClearSyncTarget()188 void SyncTaskContext::ClearSyncTarget()
189 {
190     std::lock_guard<std::mutex> lock(targetQueueLock_);
191     for (auto &requestTarget : requestTargetQueue_) {
192         if (requestTarget != nullptr) {
193             delete requestTarget;
194         }
195     }
196     requestTargetQueue_.clear();
197 
198     for (auto &responseTarget : responseTargetQueue_) {
199         if (responseTarget != nullptr) { // LCOV_EXCL_BR_LINE
200             delete responseTarget;
201         }
202     }
203     responseTargetQueue_.clear();
204 }
205 
IsTargetQueueEmpty() const206 bool SyncTaskContext::IsTargetQueueEmpty() const
207 {
208     std::lock_guard<std::mutex> lock(targetQueueLock_);
209     return requestTargetQueue_.empty() && responseTargetQueue_.empty();
210 }
211 
GetOperationStatus() const212 int SyncTaskContext::GetOperationStatus() const
213 {
214     std::lock_guard<std::mutex> lock(operationLock_);
215     if (syncOperation_ == nullptr) {
216         return SyncOperation::OP_FINISHED_ALL;
217     }
218     return syncOperation_->GetStatus(deviceId_);
219 }
220 
SetMode(int mode)221 void SyncTaskContext::SetMode(int mode)
222 {
223     mode_ = mode;
224 }
225 
GetMode() const226 int SyncTaskContext::GetMode() const
227 {
228     return mode_;
229 }
230 
MoveToNextTarget(uint32_t timeout)231 void SyncTaskContext::MoveToNextTarget(uint32_t timeout)
232 {
233     ClearSyncOperation();
234     TaskParam param;
235     // call other system api without lock
236     param.timeout = timeout;
237     std::lock_guard<std::mutex> lock(targetQueueLock_);
238     while (!requestTargetQueue_.empty() || !responseTargetQueue_.empty()) {
239         ISyncTarget *tmpTarget = nullptr;
240         if (!requestTargetQueue_.empty()) {
241             tmpTarget = requestTargetQueue_.front();
242             requestTargetQueue_.pop_front();
243         } else {
244             tmpTarget = responseTargetQueue_.front();
245             responseTargetQueue_.pop_front();
246         }
247         if (tmpTarget == nullptr) {
248             LOGE("[SyncTaskContext][MoveToNextTarget] currentTarget is null skip!");
249             continue;
250         }
251         SyncOperation *tmpOperation = nullptr;
252         tmpTarget->GetSyncOperation(tmpOperation);
253         if ((tmpOperation != nullptr) && tmpOperation->IsKilled()) {
254             // if killed skip this syncOperation_.
255             delete tmpTarget;
256             tmpTarget = nullptr;
257             continue;
258         }
259         CopyTargetData(tmpTarget, param);
260         delete tmpTarget;
261         tmpTarget = nullptr;
262         break;
263     }
264 }
265 
GetNextTarget(uint32_t timeout)266 int SyncTaskContext::GetNextTarget(uint32_t timeout)
267 {
268     MoveToNextTarget(timeout);
269     int checkErrCode = RunPermissionCheck(GetPermissionCheckFlag(IsAutoSync(), GetMode()));
270     if (checkErrCode != E_OK) {
271         SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
272         return checkErrCode;
273     }
274     return E_OK;
275 }
276 
GetSyncId() const277 uint32_t SyncTaskContext::GetSyncId() const
278 {
279     return syncId_;
280 }
281 
282 // Get the current task deviceId.
GetDeviceId() const283 std::string SyncTaskContext::GetDeviceId() const
284 {
285     return deviceId_;
286 }
287 
SetTaskExecStatus(int status)288 void SyncTaskContext::SetTaskExecStatus(int status)
289 {
290     taskExecStatus_ = status;
291 }
292 
GetTaskExecStatus() const293 int SyncTaskContext::GetTaskExecStatus() const
294 {
295     return taskExecStatus_;
296 }
297 
IsAutoSync() const298 bool SyncTaskContext::IsAutoSync() const
299 {
300     return isAutoSync_;
301 }
302 
StartTimer()303 int SyncTaskContext::StartTimer()
304 {
305     std::lock_guard<std::mutex> lockGuard(timerLock_);
306     if (timerId_ > 0) {
307         return -E_UNEXPECTED_DATA;
308     }
309     TimerId timerId = 0;
310     RefObject::IncObjRef(this);
311     TimerAction timeOutCallback = [this](TimerId id) { return TimeOut(id); };
312     int errCode = RuntimeContext::GetInstance()->SetTimer(timeout_, timeOutCallback,
313         [this]() {
314             int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(this); });
315             if (ret != E_OK) {
316                 LOGE("[SyncTaskContext] timer finalizer ScheduleTask, errCode %d", ret);
317             }
318         }, timerId);
319     if (errCode != E_OK) {
320         RefObject::DecObjRef(this);
321         return errCode;
322     }
323     timerId_ = timerId;
324     return errCode;
325 }
326 
StopTimer()327 void SyncTaskContext::StopTimer()
328 {
329     TimerId timerId;
330     {
331         std::lock_guard<std::mutex> lockGuard(timerLock_);
332         timerId = timerId_;
333         if (timerId_ == 0) {
334             return;
335         }
336         timerId_ = 0;
337     }
338     RuntimeContext::GetInstance()->RemoveTimer(timerId);
339 }
340 
ModifyTimer(int milliSeconds)341 int SyncTaskContext::ModifyTimer(int milliSeconds)
342 {
343     std::lock_guard<std::mutex> lockGuard(timerLock_);
344     if (timerId_ == 0) {
345         return -E_UNEXPECTED_DATA;
346     }
347     return RuntimeContext::GetInstance()->ModifyTimer(timerId_, milliSeconds);
348 }
349 
SetRetryTime(int retryTime)350 void SyncTaskContext::SetRetryTime(int retryTime)
351 {
352     retryTime_ = retryTime;
353 }
354 
GetRetryTime() const355 int SyncTaskContext::GetRetryTime() const
356 {
357     return retryTime_;
358 }
359 
SetRetryStatus(int isNeedRetry)360 void SyncTaskContext::SetRetryStatus(int isNeedRetry)
361 {
362     isNeedRetry_ = isNeedRetry;
363 }
364 
GetRetryStatus() const365 int SyncTaskContext::GetRetryStatus() const
366 {
367     return isNeedRetry_;
368 }
369 
GetTimerId() const370 TimerId SyncTaskContext::GetTimerId() const
371 {
372     return timerId_;
373 }
374 
GetRequestSessionId() const375 uint32_t SyncTaskContext::GetRequestSessionId() const
376 {
377     return requestSessionId_;
378 }
379 
IncSequenceId()380 void SyncTaskContext::IncSequenceId()
381 {
382     sequenceId_++;
383 }
384 
GetSequenceId() const385 uint32_t SyncTaskContext::GetSequenceId() const
386 {
387     return sequenceId_;
388 }
389 
ReSetSequenceId()390 void SyncTaskContext::ReSetSequenceId()
391 {
392     sequenceId_ = 1;
393 }
394 
IncPacketId()395 void SyncTaskContext::IncPacketId()
396 {
397     packetId_++;
398 }
399 
GetPacketId() const400 uint64_t SyncTaskContext::GetPacketId() const
401 {
402     return packetId_;
403 }
404 
GetTimeoutTime() const405 int SyncTaskContext::GetTimeoutTime() const
406 {
407     return timeout_;
408 }
409 
SetTimeoutCallback(const TimerAction & timeOutCallback)410 void SyncTaskContext::SetTimeoutCallback(const TimerAction &timeOutCallback)
411 {
412     timeOutCallback_ = timeOutCallback;
413 }
414 
SetTimeOffset(TimeOffset offset)415 void SyncTaskContext::SetTimeOffset(TimeOffset offset)
416 {
417     timeOffset_ = offset;
418 }
419 
GetTimeOffset() const420 TimeOffset SyncTaskContext::GetTimeOffset() const
421 {
422     return timeOffset_;
423 }
424 
StartStateMachine()425 int SyncTaskContext::StartStateMachine()
426 {
427     return stateMachine_->StartSync();
428 }
429 
ReceiveMessageCallback(Message * inMsg)430 int SyncTaskContext::ReceiveMessageCallback(Message *inMsg)
431 {
432     if (inMsg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
433         uint16_t remoteVersion = 0;
434         (void)communicator_->GetRemoteCommunicatorVersion(deviceId_, remoteVersion);
435         SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST + remoteVersion);
436     }
437     int errCode = E_OK;
438     if (IncUsedCount() == E_OK) {
439         errCode = stateMachine_->ReceiveMessageCallback(inMsg);
440         SafeExit();
441     }
442     return errCode;
443 }
444 
RegOnSyncTask(const std::function<int (void)> & callback)445 void SyncTaskContext::RegOnSyncTask(const std::function<int(void)> &callback)
446 {
447     onSyncTaskAdd_ = callback;
448 }
449 
IncUsedCount()450 int SyncTaskContext::IncUsedCount()
451 {
452     AutoLock lock(this);
453     if (IsKilled()) {
454         LOGI("[SyncTaskContext] IncUsedCount isKilled");
455         return -E_OBJ_IS_KILLED;
456     }
457     usedCount_++;
458     return E_OK;
459 }
460 
SafeExit()461 void SyncTaskContext::SafeExit()
462 {
463     AutoLock lock(this);
464     usedCount_--;
465     if (usedCount_ < 1) {
466         safeKill_.notify_one();
467     }
468 }
469 
GetCurrentLocalTime() const470 Timestamp SyncTaskContext::GetCurrentLocalTime() const
471 {
472     if (timeHelper_ == nullptr) {
473         return TimeHelper::INVALID_TIMESTAMP;
474     }
475     return timeHelper_->GetTime();
476 }
477 
Abort(int status)478 void SyncTaskContext::Abort(int status)
479 {
480     (void)status;
481     Clear();
482 }
483 
CommErrHandlerFunc(int errCode,ISyncTaskContext * context,int32_t sessionId,bool isDirectEnd)484 void SyncTaskContext::CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId, bool isDirectEnd)
485 {
486     {
487         std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
488         if (synTaskContextSet_.count(context) == 0) {
489             LOGI("[SyncTaskContext][CommErrHandle] context has been killed");
490             return;
491         }
492         // IncObjRef to maker sure context not been killed. after the lock_guard
493         RefObject::IncObjRef(context);
494     }
495 
496     static_cast<SyncTaskContext *>(context)->CommErrHandlerFuncInner(errCode, static_cast<uint32_t>(sessionId),
497         isDirectEnd);
498     RefObject::DecObjRef(context);
499 }
500 
SetRemoteSoftwareVersion(uint32_t version)501 void SyncTaskContext::SetRemoteSoftwareVersion(uint32_t version)
502 {
503     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
504     if (remoteSoftwareVersion_ == version) {
505         return;
506     }
507     remoteSoftwareVersion_ = version;
508     remoteSoftwareVersionId_++;
509 }
510 
GetRemoteSoftwareVersion() const511 uint32_t SyncTaskContext::GetRemoteSoftwareVersion() const
512 {
513     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
514     return remoteSoftwareVersion_;
515 }
516 
GetRemoteSoftwareVersionId() const517 uint64_t SyncTaskContext::GetRemoteSoftwareVersionId() const
518 {
519     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
520     return remoteSoftwareVersionId_;
521 }
522 
IsCommNormal() const523 bool SyncTaskContext::IsCommNormal() const
524 {
525     return isCommNormal_;
526 }
527 
CommErrHandlerFuncInner(int errCode,uint32_t sessionId,bool isDirectEnd)528 void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId, bool isDirectEnd)
529 {
530     {
531         RefObject::AutoLock lock(this);
532         if ((sessionId != requestSessionId_) || (requestSessionId_ == 0)) {
533             return;
534         }
535 
536         if (errCode == E_OK) {
537             SetCommFailErrCode(errCode);
538             // when communicator sent message failed, the state machine will get the error and exit this sync task
539             // it seems unnecessary to change isCommNormal_ value, so just return here
540             return;
541         }
542     }
543     LOGE("[SyncTaskContext][CommErr] errCode %d, isDirectEnd %d", errCode, static_cast<int>(isDirectEnd));
544     if (!isDirectEnd) {
545         SetErrCodeWhenWaitTimeOut(errCode);
546         return;
547     }
548     if (errCode > 0) {
549         SetCommFailErrCode(static_cast<int>(SyncOperation::OP_COMM_ABNORMAL));
550     } else {
551         SetCommFailErrCode(errCode);
552     }
553     stateMachine_->CommErrAbort(sessionId);
554 }
555 
TimeOut(TimerId id)556 int SyncTaskContext::TimeOut(TimerId id)
557 {
558     if (!timeOutCallback_) {
559         return E_OK;
560     }
561     IncObjRef(this);
562     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, id]() {
563         timeOutCallback_(id);
564         DecObjRef(this);
565     });
566     if (errCode != E_OK) {
567         LOGW("[SyncTaskContext][TimeOut] Trigger TimeOut Async Failed! TimerId=" PRIu64 " errCode=%d", id, errCode);
568         DecObjRef(this);
569     }
570     return E_OK;
571 }
572 
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)573 void SyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
574 {
575     retryTime_ = 0;
576     mode_ = target->GetMode();
577     status_ = SyncOperation::OP_SYNCING;
578     isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
579     taskErrCode_ = E_OK;
580     packetId_ = 0;
581     isCommNormal_ = true; // reset comm status here
582     commErrCode_ = E_OK;
583     syncTaskRetryStatus_ = isSyncRetry_;
584     timeout_ = static_cast<int>(taskParam.timeout);
585     negotiationCount_ = 0;
586     target->GetSyncOperation(syncOperation_);
587     ReSetSequenceId();
588 
589     if (syncOperation_ != nullptr) {
590         // IncRef for syncOperation_ to make sure syncOperation_ is valid, when setStatus
591         RefObject::IncObjRef(syncOperation_);
592         syncId_ = syncOperation_->GetSyncId();
593         isAutoSync_ = syncOperation_->IsAutoSync();
594         isAutoSubscribe_ = syncOperation_->IsAutoControlCmd();
595         if (isAutoSync_ || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
596             syncTaskRetryStatus_ = true;
597         }
598         requestSessionId_ = GenerateRequestSessionId();
599         LOGI("[SyncTaskContext][copyTarget] mode=%d,syncId=%d,isAutoSync=%d,isRetry=%d,dev=%s{private}",
600             mode_, syncId_, isAutoSync_, syncTaskRetryStatus_, deviceId_.c_str());
601         DBDfxAdapter::StartAsyncTrace(syncActionName_, static_cast<int>(syncId_));
602     } else {
603         isAutoSync_ = false;
604         LOGI("[SyncTaskContext][copyTarget] for response data dev %s{private},isRetry=%d", deviceId_.c_str(),
605             syncTaskRetryStatus_);
606     }
607 }
608 
KillWait()609 void SyncTaskContext::KillWait()
610 {
611     StopTimer();
612     UnlockObj();
613     stateMachine_->NotifyClosing();
614     stateMachine_->AbortImmediately();
615     LockObj();
616     LOGW("[SyncTaskContext] Try to kill a context, now wait.");
617     bool noDeadLock = WaitLockedUntil(
618         safeKill_,
619         [this]() {
620             if (usedCount_ < 1) {
621                 return true;
622             }
623             return false;
624         },
625         KILL_WAIT_SECONDS);
626     if (!noDeadLock) { // LCOV_EXCL_BR_LINE
627         LOGE("[SyncTaskContext] Dead lock may happen, we stop waiting the task exit.");
628     } else {
629         LOGW("[SyncTaskContext] Wait the task exit ok.");
630     }
631     std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
632     synTaskContextSet_.erase(this);
633 }
634 
ClearSyncOperation()635 void SyncTaskContext::ClearSyncOperation()
636 {
637     std::lock_guard<std::mutex> lock(operationLock_);
638     if (syncOperation_ != nullptr) {
639         DBDfxAdapter::FinishAsyncTrace(syncActionName_, static_cast<int>(syncId_));
640         RefObject::DecObjRef(syncOperation_);
641         syncOperation_ = nullptr;
642     }
643 }
644 
CancelCurrentSyncRetryIfNeed(int newTargetMode,uint32_t syncId)645 void SyncTaskContext::CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId)
646 {
647     AutoLock lock(this);
648     if (!isAutoSync_) {
649         return;
650     }
651     if (syncId_ >= syncId) {
652         return;
653     }
654     int mode = SyncOperation::TransferSyncMode(newTargetMode);
655     if (newTargetMode == mode_ || mode == SyncModeType::PUSH_AND_PULL) {
656         SetRetryTime(AUTO_RETRY_TIMES);
657         ModifyTimer(timeout_);
658     }
659 }
660 
GetTaskErrCode() const661 int SyncTaskContext::GetTaskErrCode() const
662 {
663     return taskErrCode_;
664 }
665 
SetTaskErrCode(int errCode)666 void SyncTaskContext::SetTaskErrCode(int errCode)
667 {
668     taskErrCode_ = errCode;
669 }
670 
IsSyncTaskNeedRetry() const671 bool SyncTaskContext::IsSyncTaskNeedRetry() const
672 {
673     return syncTaskRetryStatus_;
674 }
675 
SetSyncRetry(bool isRetry)676 void SyncTaskContext::SetSyncRetry(bool isRetry)
677 {
678     isSyncRetry_ = isRetry;
679 }
680 
GetSyncRetryTimes() const681 int SyncTaskContext::GetSyncRetryTimes() const
682 {
683     if (IsAutoSync() || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
684         return AUTO_RETRY_TIMES;
685     }
686     return MANUAL_RETRY_TIMES;
687 }
688 
GetSyncRetryTimeout(int retryTime) const689 int SyncTaskContext::GetSyncRetryTimeout(int retryTime) const
690 {
691     int timeoutTime = GetTimeoutTime();
692     if (IsAutoSync()) {
693         // set the new timeout value with 2 raised to the power of retryTime.
694         return timeoutTime * (1u << retryTime);
695     }
696     return timeoutTime;
697 }
698 
ClearAllSyncTask()699 void SyncTaskContext::ClearAllSyncTask()
700 {
701 }
702 
IsAutoLiftWaterMark() const703 bool SyncTaskContext::IsAutoLiftWaterMark() const
704 {
705     return negotiationCount_ < NEGOTIATION_LIMIT;
706 }
707 
IncNegotiationCount()708 void SyncTaskContext::IncNegotiationCount()
709 {
710     negotiationCount_++;
711 }
712 
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)713 bool SyncTaskContext::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
714 {
715     return stateMachine_->IsNeedTriggerQueryAutoSync(inMsg, query);
716 }
717 
IsAutoSubscribe() const718 bool SyncTaskContext::IsAutoSubscribe() const
719 {
720     return isAutoSubscribe_;
721 }
722 
IsCurrentSyncTaskCanBeSkipped() const723 bool SyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
724 {
725     return false;
726 }
727 
ResetLastPushTaskStatus()728 void SyncTaskContext::ResetLastPushTaskStatus()
729 {
730 }
731 
SchemaChange()732 void SyncTaskContext::SchemaChange()
733 {
734     if (stateMachine_ != nullptr) {
735         stateMachine_->SchemaChange();
736     }
737 }
738 
Dump(int fd)739 void SyncTaskContext::Dump(int fd)
740 {
741     size_t totalSyncTaskCount = 0u;
742     size_t autoSyncTaskCount = 0u;
743     size_t reponseTaskCount = 0u;
744     {
745         std::lock_guard<std::mutex> lock(targetQueueLock_);
746         totalSyncTaskCount = requestTargetQueue_.size() + responseTargetQueue_.size();
747         for (const auto &target : requestTargetQueue_) {
748             if (target->IsAutoSync()) { // LCOV_EXCL_BR_LINE
749                 autoSyncTaskCount++;
750             }
751         }
752         reponseTaskCount = responseTargetQueue_.size();
753     }
754     DBDumpHelper::Dump(fd, "\t\ttarget = %s, total sync task count = %zu, auto sync task count = %zu,"
755         " response task count = %zu\n",
756         deviceId_.c_str(), totalSyncTaskCount, autoSyncTaskCount, reponseTaskCount);
757 }
758 
RunPermissionCheck(uint8_t flag) const759 int SyncTaskContext::RunPermissionCheck(uint8_t flag) const
760 {
761     std::string appId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
762     std::string userId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
763     std::string storeId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
764     std::string subUserId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::SUB_USER, "");
765     int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
766     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
767         { userId, appId, storeId, deviceId_, subUserId, instanceId }, flag);
768     if (errCode != E_OK) {
769         LOGE("[SyncTaskContext] RunPermissionCheck not pass errCode:%d, flag:%d, %s{private}",
770             errCode, flag, deviceId_.c_str());
771     }
772     return errCode;
773 }
774 
GetPermissionCheckFlag(bool isAutoSync,int syncMode)775 uint8_t SyncTaskContext::GetPermissionCheckFlag(bool isAutoSync, int syncMode)
776 {
777     uint8_t flag = 0;
778     int mode = SyncOperation::TransferSyncMode(syncMode);
779     if (mode == SyncModeType::PUSH || mode == SyncModeType::RESPONSE_PULL) {
780         flag = CHECK_FLAG_SEND;
781     } else if (mode == SyncModeType::PULL) {
782         flag = CHECK_FLAG_RECEIVE;
783     } else if (mode == SyncModeType::PUSH_AND_PULL) {
784         flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE;
785     }
786     if (isAutoSync) {
787         flag = flag | CHECK_FLAG_AUTOSYNC;
788     }
789     if (mode != SyncModeType::RESPONSE_PULL) {
790         // it means this sync is started by local
791         flag = flag | CHECK_FLAG_SPONSOR;
792     }
793     return flag;
794 }
795 
AbortMachineIfNeed(uint32_t syncId)796 void SyncTaskContext::AbortMachineIfNeed(uint32_t syncId)
797 {
798     uint32_t sessionId = 0u;
799     {
800         RefObject::AutoLock autoLock(this);
801         if (syncId_ != syncId) {
802             return;
803         }
804         sessionId = requestSessionId_;
805     }
806     stateMachine_->InnerErrorAbort(sessionId);
807 }
808 
GetAndIncSyncOperation() const809 SyncOperation *SyncTaskContext::GetAndIncSyncOperation() const
810 {
811     std::lock_guard<std::mutex> lock(operationLock_);
812     if (syncOperation_ == nullptr) {
813         return nullptr;
814     }
815     RefObject::IncObjRef(syncOperation_);
816     return syncOperation_;
817 }
818 
GenerateRequestSessionId()819 uint32_t SyncTaskContext::GenerateRequestSessionId()
820 {
821     uint32_t sessionId = lastRequestSessionId_ != 0 ? lastRequestSessionId_ + 1 : 0;
822     // make sure sessionId is between 0x01 and 0x8fffffff
823     if (sessionId > SESSION_ID_MAX_VALUE || sessionId == 0) {
824         sessionId = Hash::Hash32Func(deviceId_ + std::to_string(syncId_) +
825             std::to_string(TimeHelper::GetSysCurrentTime()));
826     }
827     lastRequestSessionId_ = sessionId;
828     return sessionId;
829 }
830 
IsSchemaCompatible() const831 bool SyncTaskContext::IsSchemaCompatible() const
832 {
833     return true;
834 }
835 
SetDbAbility(DbAbility & remoteDbAbility)836 void SyncTaskContext::SetDbAbility([[gnu::unused]] DbAbility &remoteDbAbility)
837 {
838 }
839 
TimeChange()840 void SyncTaskContext::TimeChange()
841 {
842     if (stateMachine_ == nullptr) {
843         LOGW("[SyncTaskContext] machine is null when time change");
844         return;
845     }
846     stateMachine_->TimeChange();
847 }
848 
GetResponseTaskCount()849 int32_t SyncTaskContext::GetResponseTaskCount()
850 {
851     std::lock_guard<std::mutex> autoLock(targetQueueLock_);
852     return static_cast<int32_t>(responseTargetQueue_.size());
853 }
854 
GetCommErrCode() const855 int SyncTaskContext::GetCommErrCode() const
856 {
857     return commErrCode_;
858 }
859 
SetCommFailErrCode(int errCode)860 void SyncTaskContext::SetCommFailErrCode(int errCode)
861 {
862     commErrCode_ = errCode;
863 }
864 
SetErrCodeWhenWaitTimeOut(int errCode)865 void SyncTaskContext::SetErrCodeWhenWaitTimeOut(int errCode)
866 {
867     if (errCode > 0) {
868         SetCommFailErrCode(static_cast<int>(SyncOperation::OP_TIMEOUT));
869     } else {
870         SetCommFailErrCode(errCode);
871     }
872 }
873 } // namespace DistributedDB
874