• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_task_context.h"
17 
18 #include <algorithm>
19 #include <cmath>
20 
21 #include "db_constant.h"
22 #include "db_dump_helper.h"
23 #include "db_dfx_adapter.h"
24 #include "db_errno.h"
25 #include "hash.h"
26 #include "isync_state_machine.h"
27 #include "log_print.h"
28 #include "time_helper.h"
29 #include "version.h"
30 
31 namespace DistributedDB {
32 std::mutex SyncTaskContext::synTaskContextSetLock_;
33 std::set<ISyncTaskContext *> SyncTaskContext::synTaskContextSet_;
34 
35 namespace {
36     constexpr int NEGOTIATION_LIMIT = 2;
37     constexpr uint32_t SESSION_ID_MAX_VALUE = 0x8fffffffu;
38 }
39 
SyncTaskContext()40 SyncTaskContext::SyncTaskContext()
41     : syncOperation_(nullptr),
42       syncId_(0),
43       mode_(0),
44       isAutoSync_(false),
45       status_(0),
46       taskExecStatus_(0),
47       syncInterface_(nullptr),
48       communicator_(nullptr),
49       stateMachine_(nullptr),
50       requestSessionId_(0),
51       lastRequestSessionId_(0),
52       timeHelper_(nullptr),
53       remoteSoftwareVersion_(0),
54       remoteSoftwareVersionId_(0),
55       isCommNormal_(true),
56       taskErrCode_(E_OK),
57       syncTaskRetryStatus_(false),
58       isSyncRetry_(false),
59       negotiationCount_(0),
60       isAutoSubscribe_(false)
61 {
62 }
63 
~SyncTaskContext()64 SyncTaskContext::~SyncTaskContext()
65 {
66     if (stateMachine_ != nullptr) {
67         delete stateMachine_;
68         stateMachine_ = nullptr;
69     }
70     SyncTaskContext::ClearSyncOperation();
71     ClearSyncTarget();
72     syncInterface_ = nullptr;
73     communicator_ = nullptr;
74 }
75 
AddSyncTarget(ISyncTarget * target)76 int SyncTaskContext::AddSyncTarget(ISyncTarget *target)
77 {
78     if (target == nullptr) {
79         return -E_INVALID_ARGS;
80     }
81     int targetMode = target->GetMode();
82     auto syncId = static_cast<uint32_t>(target->GetSyncId());
83     {
84         std::lock_guard<std::mutex> lock(targetQueueLock_);
85         if (target->GetTaskType() == ISyncTarget::REQUEST) {
86             requestTargetQueue_.push_back(target);
87         } else if (target->GetTaskType() == ISyncTarget::RESPONSE) {
88             responseTargetQueue_.push_back(target);
89         } else {
90             return -E_INVALID_ARGS;
91         }
92     }
93     RefObject::IncObjRef(this);
94     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetMode, syncId]() {
95         CancelCurrentSyncRetryIfNeed(targetMode, syncId);
96         RefObject::DecObjRef(this);
97     });
98     if (errCode != E_OK) {
99         RefObject::DecObjRef(this);
100     }
101     if (onSyncTaskAdd_) {
102         RefObject::IncObjRef(this);
103         errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
104             onSyncTaskAdd_();
105             RefObject::DecObjRef(this);
106         });
107         if (errCode != E_OK) {
108             RefObject::DecObjRef(this);
109         }
110     }
111     return E_OK;
112 }
113 
SetOperationStatus(int status)114 void SyncTaskContext::SetOperationStatus(int status)
115 {
116     std::lock_guard<std::mutex> lock(operationLock_);
117     if (syncOperation_ == nullptr) {
118         LOGD("[SyncTaskContext][SetStatus] syncOperation is null");
119         return;
120     }
121     int finalStatus = status;
122 
123     int operationStatus = syncOperation_->GetStatus(deviceId_);
124     if (status == SyncOperation::OP_SEND_FINISHED && operationStatus == SyncOperation::OP_RECV_FINISHED) {
125         if (GetTaskErrCode() == -E_EKEYREVOKED) { // LCOV_EXCL_BR_LINE
126             finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
127         } else {
128             finalStatus = SyncOperation::OP_FINISHED_ALL;
129         }
130     } else if (status == SyncOperation::OP_RECV_FINISHED && operationStatus == SyncOperation::OP_SEND_FINISHED) {
131         if (GetTaskErrCode() == -E_EKEYREVOKED) {
132             finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
133         } else {
134             finalStatus = SyncOperation::OP_FINISHED_ALL;
135         }
136     }
137     syncOperation_->SetStatus(deviceId_, finalStatus);
138     if (finalStatus >= SyncOperation::OP_FINISHED_ALL) {
139         SaveLastPushTaskExecStatus(finalStatus);
140     }
141     if (syncOperation_->CheckIsAllFinished()) {
142         syncOperation_->Finished();
143     }
144 }
145 
SaveLastPushTaskExecStatus(int finalStatus)146 void SyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
147 {
148     (void)finalStatus;
149 }
150 
Clear()151 void SyncTaskContext::Clear()
152 {
153     StopTimer();
154     retryTime_ = 0;
155     sequenceId_ = 1;
156     syncId_ = 0;
157     isAutoSync_ = false;
158     requestSessionId_ = 0;
159     isNeedRetry_ = NO_NEED_RETRY;
160     mode_ = SyncModeType::INVALID_MODE;
161     status_ = SyncOperation::OP_WAITING;
162     taskErrCode_ = E_OK;
163     packetId_ = 0;
164     isAutoSubscribe_ = false;
165 }
166 
RemoveSyncOperation(int syncId)167 int SyncTaskContext::RemoveSyncOperation(int syncId)
168 {
169     std::lock_guard<std::mutex> lock(targetQueueLock_);
170     auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
171         [syncId](const ISyncTarget *target) {
172             if (target == nullptr) {
173                 return false;
174             }
175             return target->GetSyncId() == syncId;
176         });
177     if (iter != requestTargetQueue_.end()) {
178         if (*iter != nullptr) {
179             delete *iter;
180             *iter = nullptr;
181         }
182         requestTargetQueue_.erase(iter);
183         return E_OK;
184     }
185     return -E_INVALID_ARGS;
186 }
187 
ClearSyncTarget()188 void SyncTaskContext::ClearSyncTarget()
189 {
190     std::lock_guard<std::mutex> lock(targetQueueLock_);
191     for (auto &requestTarget : requestTargetQueue_) {
192         if (requestTarget != nullptr) {
193             delete requestTarget;
194             requestTarget = nullptr;
195         }
196     }
197     requestTargetQueue_.clear();
198 
199     for (auto &responseTarget : responseTargetQueue_) {
200         if (responseTarget != nullptr) { // LCOV_EXCL_BR_LINE
201             delete responseTarget;
202             responseTarget = nullptr;
203         }
204     }
205     responseTargetQueue_.clear();
206 }
207 
IsTargetQueueEmpty() const208 bool SyncTaskContext::IsTargetQueueEmpty() const
209 {
210     std::lock_guard<std::mutex> lock(targetQueueLock_);
211     return requestTargetQueue_.empty() && responseTargetQueue_.empty();
212 }
213 
GetOperationStatus() const214 int SyncTaskContext::GetOperationStatus() const
215 {
216     std::lock_guard<std::mutex> lock(operationLock_);
217     if (syncOperation_ == nullptr) {
218         return SyncOperation::OP_FINISHED_ALL;
219     }
220     return syncOperation_->GetStatus(deviceId_);
221 }
222 
SetMode(int mode)223 void SyncTaskContext::SetMode(int mode)
224 {
225     mode_ = mode;
226 }
227 
GetMode() const228 int SyncTaskContext::GetMode() const
229 {
230     return mode_;
231 }
232 
MoveToNextTarget()233 void SyncTaskContext::MoveToNextTarget()
234 {
235     ClearSyncOperation();
236     TaskParam param;
237     // call other system api without lock
238     param.timeout = communicator_->GetTimeout(deviceId_);
239     std::lock_guard<std::mutex> lock(targetQueueLock_);
240     while (!requestTargetQueue_.empty() || !responseTargetQueue_.empty()) {
241         ISyncTarget *tmpTarget = nullptr;
242         if (!requestTargetQueue_.empty()) {
243             tmpTarget = requestTargetQueue_.front();
244             requestTargetQueue_.pop_front();
245         } else {
246             tmpTarget = responseTargetQueue_.front();
247             responseTargetQueue_.pop_front();
248         }
249         if (tmpTarget == nullptr) {
250             LOGE("[SyncTaskContext][MoveToNextTarget] currentTarget is null skip!");
251             continue;
252         }
253         SyncOperation *tmpOperation = nullptr;
254         tmpTarget->GetSyncOperation(tmpOperation);
255         if ((tmpOperation != nullptr) && tmpOperation->IsKilled()) {
256             // if killed skip this syncOperation_.
257             delete tmpTarget;
258             tmpTarget = nullptr;
259             continue;
260         }
261         CopyTargetData(tmpTarget, param);
262         delete tmpTarget;
263         tmpTarget = nullptr;
264         break;
265     }
266 }
267 
GetNextTarget()268 int SyncTaskContext::GetNextTarget()
269 {
270     MoveToNextTarget();
271     int checkErrCode = RunPermissionCheck(GetPermissionCheckFlag(IsAutoSync(), GetMode()));
272     if (checkErrCode != E_OK) {
273         SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
274         return checkErrCode;
275     }
276     return E_OK;
277 }
278 
GetSyncId() const279 uint32_t SyncTaskContext::GetSyncId() const
280 {
281     return syncId_;
282 }
283 
284 // Get the current task deviceId.
GetDeviceId() const285 std::string SyncTaskContext::GetDeviceId() const
286 {
287     return deviceId_;
288 }
289 
SetTaskExecStatus(int status)290 void SyncTaskContext::SetTaskExecStatus(int status)
291 {
292     taskExecStatus_ = status;
293 }
294 
GetTaskExecStatus() const295 int SyncTaskContext::GetTaskExecStatus() const
296 {
297     return taskExecStatus_;
298 }
299 
IsAutoSync() const300 bool SyncTaskContext::IsAutoSync() const
301 {
302     return isAutoSync_;
303 }
304 
StartTimer()305 int SyncTaskContext::StartTimer()
306 {
307     std::lock_guard<std::mutex> lockGuard(timerLock_);
308     if (timerId_ > 0) {
309         return -E_UNEXPECTED_DATA;
310     }
311     TimerId timerId = 0;
312     RefObject::IncObjRef(this);
313     TimerAction timeOutCallback = [this](TimerId id) { return TimeOut(id); };
314     int errCode = RuntimeContext::GetInstance()->SetTimer(timeout_, timeOutCallback,
315         [this]() {
316             int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(this); });
317             if (ret != E_OK) {
318                 LOGE("[SyncTaskContext] timer finalizer ScheduleTask, errCode %d", ret);
319             }
320         }, timerId);
321     if (errCode != E_OK) {
322         RefObject::DecObjRef(this);
323         return errCode;
324     }
325     timerId_ = timerId;
326     return errCode;
327 }
328 
StopTimer()329 void SyncTaskContext::StopTimer()
330 {
331     TimerId timerId;
332     {
333         std::lock_guard<std::mutex> lockGuard(timerLock_);
334         timerId = timerId_;
335         if (timerId_ == 0) {
336             return;
337         }
338         timerId_ = 0;
339     }
340     RuntimeContext::GetInstance()->RemoveTimer(timerId);
341 }
342 
ModifyTimer(int milliSeconds)343 int SyncTaskContext::ModifyTimer(int milliSeconds)
344 {
345     std::lock_guard<std::mutex> lockGuard(timerLock_);
346     if (timerId_ == 0) {
347         return -E_UNEXPECTED_DATA;
348     }
349     return RuntimeContext::GetInstance()->ModifyTimer(timerId_, milliSeconds);
350 }
351 
SetRetryTime(int retryTime)352 void SyncTaskContext::SetRetryTime(int retryTime)
353 {
354     retryTime_ = retryTime;
355 }
356 
GetRetryTime() const357 int SyncTaskContext::GetRetryTime() const
358 {
359     return retryTime_;
360 }
361 
SetRetryStatus(int isNeedRetry)362 void SyncTaskContext::SetRetryStatus(int isNeedRetry)
363 {
364     isNeedRetry_ = isNeedRetry;
365 }
366 
GetRetryStatus() const367 int SyncTaskContext::GetRetryStatus() const
368 {
369     return isNeedRetry_;
370 }
371 
GetTimerId() const372 TimerId SyncTaskContext::GetTimerId() const
373 {
374     return timerId_;
375 }
376 
GetRequestSessionId() const377 uint32_t SyncTaskContext::GetRequestSessionId() const
378 {
379     return requestSessionId_;
380 }
381 
IncSequenceId()382 void SyncTaskContext::IncSequenceId()
383 {
384     sequenceId_++;
385 }
386 
GetSequenceId() const387 uint32_t SyncTaskContext::GetSequenceId() const
388 {
389     return sequenceId_;
390 }
391 
ReSetSequenceId()392 void SyncTaskContext::ReSetSequenceId()
393 {
394     sequenceId_ = 1;
395 }
396 
IncPacketId()397 void SyncTaskContext::IncPacketId()
398 {
399     packetId_++;
400 }
401 
GetPacketId() const402 uint64_t SyncTaskContext::GetPacketId() const
403 {
404     return packetId_;
405 }
406 
GetTimeoutTime() const407 int SyncTaskContext::GetTimeoutTime() const
408 {
409     return timeout_;
410 }
411 
SetTimeoutCallback(const TimerAction & timeOutCallback)412 void SyncTaskContext::SetTimeoutCallback(const TimerAction &timeOutCallback)
413 {
414     timeOutCallback_ = timeOutCallback;
415 }
416 
SetTimeOffset(TimeOffset offset)417 void SyncTaskContext::SetTimeOffset(TimeOffset offset)
418 {
419     timeOffset_ = offset;
420 }
421 
GetTimeOffset() const422 TimeOffset SyncTaskContext::GetTimeOffset() const
423 {
424     return timeOffset_;
425 }
426 
StartStateMachine()427 int SyncTaskContext::StartStateMachine()
428 {
429     return stateMachine_->StartSync();
430 }
431 
ReceiveMessageCallback(Message * inMsg)432 int SyncTaskContext::ReceiveMessageCallback(Message *inMsg)
433 {
434     if (GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE && inMsg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
435         uint16_t remoteVersion = 0;
436         (void)communicator_->GetRemoteCommunicatorVersion(deviceId_, remoteVersion);
437         SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST + remoteVersion);
438     }
439     int errCode = E_OK;
440     if (IncUsedCount() == E_OK) {
441         errCode = stateMachine_->ReceiveMessageCallback(inMsg);
442         SafeExit();
443     }
444     return errCode;
445 }
446 
RegOnSyncTask(const std::function<int (void)> & callback)447 void SyncTaskContext::RegOnSyncTask(const std::function<int(void)> &callback)
448 {
449     onSyncTaskAdd_ = callback;
450 }
451 
IncUsedCount()452 int SyncTaskContext::IncUsedCount()
453 {
454     AutoLock lock(this);
455     if (IsKilled()) {
456         LOGI("[SyncTaskContext] IncUsedCount isKilled");
457         return -E_OBJ_IS_KILLED;
458     }
459     usedCount_++;
460     return E_OK;
461 }
462 
SafeExit()463 void SyncTaskContext::SafeExit()
464 {
465     AutoLock lock(this);
466     usedCount_--;
467     if (usedCount_ < 1) {
468         safeKill_.notify_one();
469     }
470 }
471 
GetCurrentLocalTime() const472 Timestamp SyncTaskContext::GetCurrentLocalTime() const
473 {
474     if (timeHelper_ == nullptr) {
475         return TimeHelper::INVALID_TIMESTAMP;
476     }
477     return timeHelper_->GetTime();
478 }
479 
Abort(int status)480 void SyncTaskContext::Abort(int status)
481 {
482     (void)status;
483     Clear();
484 }
485 
CommErrHandlerFunc(int errCode,ISyncTaskContext * context,int32_t sessionId)486 void SyncTaskContext::CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId)
487 {
488     {
489         std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
490         if (synTaskContextSet_.count(context) == 0) {
491             LOGI("[SyncTaskContext][CommErrHandle] context has been killed");
492             return;
493         }
494         // IncObjRef to maker sure context not been killed. after the lock_guard
495         RefObject::IncObjRef(context);
496     }
497 
498     static_cast<SyncTaskContext *>(context)->CommErrHandlerFuncInner(errCode, static_cast<uint32_t>(sessionId));
499     RefObject::DecObjRef(context);
500 }
501 
SetRemoteSoftwareVersion(uint32_t version)502 void SyncTaskContext::SetRemoteSoftwareVersion(uint32_t version)
503 {
504     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
505     remoteSoftwareVersion_ = version;
506     remoteSoftwareVersionId_++;
507 }
508 
GetRemoteSoftwareVersion() const509 uint32_t SyncTaskContext::GetRemoteSoftwareVersion() const
510 {
511     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
512     return remoteSoftwareVersion_;
513 }
514 
GetRemoteSoftwareVersionId() const515 uint64_t SyncTaskContext::GetRemoteSoftwareVersionId() const
516 {
517     std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
518     return remoteSoftwareVersionId_;
519 }
520 
IsCommNormal() const521 bool SyncTaskContext::IsCommNormal() const
522 {
523     return isCommNormal_;
524 }
525 
CommErrHandlerFuncInner(int errCode,uint32_t sessionId)526 void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId)
527 {
528     {
529         RefObject::AutoLock lock(this);
530         if ((sessionId != requestSessionId_) || (requestSessionId_ == 0)) {
531             return;
532         }
533 
534         if (errCode == E_OK) {
535             // when communicator sent message failed, the state machine will get the error and exit this sync task
536             // it seems unnecessary to change isCommNormal_ value, so just return here
537             return;
538         }
539     }
540     LOGE("[SyncTaskContext][CommErr] errCode %d", errCode);
541     stateMachine_->CommErrAbort(sessionId);
542 }
543 
TimeOut(TimerId id)544 int SyncTaskContext::TimeOut(TimerId id)
545 {
546     if (!timeOutCallback_) {
547         return E_OK;
548     }
549     IncObjRef(this);
550     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, id]() {
551         timeOutCallback_(id);
552         DecObjRef(this);
553     });
554     if (errCode != E_OK) {
555         LOGW("[SyncTaskContext][TimeOut] Trigger TimeOut Async Failed! TimerId=" PRIu64 " errCode=%d", id, errCode);
556         DecObjRef(this);
557     }
558     return E_OK;
559 }
560 
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)561 void SyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
562 {
563     retryTime_ = 0;
564     mode_ = target->GetMode();
565     status_ = SyncOperation::OP_SYNCING;
566     isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
567     taskErrCode_ = E_OK;
568     packetId_ = 0;
569     isCommNormal_ = true; // reset comm status here
570     syncTaskRetryStatus_ = isSyncRetry_;
571     timeout_ = static_cast<int>(taskParam.timeout);
572     negotiationCount_ = 0;
573     target->GetSyncOperation(syncOperation_);
574     ReSetSequenceId();
575 
576     if (syncOperation_ != nullptr) {
577         // IncRef for syncOperation_ to make sure syncOperation_ is valid, when setStatus
578         RefObject::IncObjRef(syncOperation_);
579         syncId_ = syncOperation_->GetSyncId();
580         isAutoSync_ = syncOperation_->IsAutoSync();
581         isAutoSubscribe_ = syncOperation_->IsAutoControlCmd();
582         if (isAutoSync_ || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
583             syncTaskRetryStatus_ = true;
584         }
585         requestSessionId_ = GenerateRequestSessionId();
586         LOGI("[SyncTaskContext][copyTarget] mode=%d,syncId=%d,isAutoSync=%d,isRetry=%d,dev=%s{private}",
587             mode_, syncId_, isAutoSync_, syncTaskRetryStatus_, deviceId_.c_str());
588         DBDfxAdapter::StartAsyncTrace(syncActionName_, static_cast<int>(syncId_));
589     } else {
590         isAutoSync_ = false;
591         LOGI("[SyncTaskContext][copyTarget] for response data dev %s{private},isRetry=%d", deviceId_.c_str(),
592             syncTaskRetryStatus_);
593     }
594 }
595 
KillWait()596 void SyncTaskContext::KillWait()
597 {
598     StopTimer();
599     UnlockObj();
600     stateMachine_->NotifyClosing();
601     stateMachine_->AbortImmediately();
602     LockObj();
603     LOGW("[SyncTaskContext] Try to kill a context, now wait.");
604     bool noDeadLock = WaitLockedUntil(
605         safeKill_,
606         [this]() {
607             if (usedCount_ < 1) {
608                 return true;
609             }
610             return false;
611         },
612         KILL_WAIT_SECONDS);
613     if (!noDeadLock) { // LCOV_EXCL_BR_LINE
614         LOGE("[SyncTaskContext] Dead lock may happen, we stop waiting the task exit.");
615     } else {
616         LOGW("[SyncTaskContext] Wait the task exit ok.");
617     }
618     std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
619     synTaskContextSet_.erase(this);
620 }
621 
ClearSyncOperation()622 void SyncTaskContext::ClearSyncOperation()
623 {
624     std::lock_guard<std::mutex> lock(operationLock_);
625     if (syncOperation_ != nullptr) {
626         DBDfxAdapter::FinishAsyncTrace(syncActionName_, static_cast<int>(syncId_));
627         RefObject::DecObjRef(syncOperation_);
628         syncOperation_ = nullptr;
629     }
630 }
631 
CancelCurrentSyncRetryIfNeed(int newTargetMode,uint32_t syncId)632 void SyncTaskContext::CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId)
633 {
634     AutoLock lock(this);
635     if (!isAutoSync_) {
636         return;
637     }
638     if (syncId_ >= syncId) {
639         return;
640     }
641     int mode = SyncOperation::TransferSyncMode(newTargetMode);
642     if (newTargetMode == mode_ || mode == SyncModeType::PUSH_AND_PULL) {
643         SetRetryTime(AUTO_RETRY_TIMES);
644         ModifyTimer(timeout_);
645     }
646 }
647 
GetTaskErrCode() const648 int SyncTaskContext::GetTaskErrCode() const
649 {
650     return taskErrCode_;
651 }
652 
SetTaskErrCode(int errCode)653 void SyncTaskContext::SetTaskErrCode(int errCode)
654 {
655     taskErrCode_ = errCode;
656 }
657 
IsSyncTaskNeedRetry() const658 bool SyncTaskContext::IsSyncTaskNeedRetry() const
659 {
660     return syncTaskRetryStatus_;
661 }
662 
SetSyncRetry(bool isRetry)663 void SyncTaskContext::SetSyncRetry(bool isRetry)
664 {
665     isSyncRetry_ = isRetry;
666 }
667 
GetSyncRetryTimes() const668 int SyncTaskContext::GetSyncRetryTimes() const
669 {
670     if (IsAutoSync() || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
671         return AUTO_RETRY_TIMES;
672     }
673     return MANUAL_RETRY_TIMES;
674 }
675 
GetSyncRetryTimeout(int retryTime) const676 int SyncTaskContext::GetSyncRetryTimeout(int retryTime) const
677 {
678     int timeoutTime = GetTimeoutTime();
679     if (IsAutoSync()) {
680         // set the new timeout value with 2 raised to the power of retryTime.
681         return timeoutTime * (1u << retryTime);
682     }
683     return timeoutTime;
684 }
685 
ClearAllSyncTask()686 void SyncTaskContext::ClearAllSyncTask()
687 {
688 }
689 
IsAutoLiftWaterMark() const690 bool SyncTaskContext::IsAutoLiftWaterMark() const
691 {
692     return negotiationCount_ < NEGOTIATION_LIMIT;
693 }
694 
IncNegotiationCount()695 void SyncTaskContext::IncNegotiationCount()
696 {
697     negotiationCount_++;
698 }
699 
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)700 bool SyncTaskContext::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
701 {
702     return stateMachine_->IsNeedTriggerQueryAutoSync(inMsg, query);
703 }
704 
IsAutoSubscribe() const705 bool SyncTaskContext::IsAutoSubscribe() const
706 {
707     return isAutoSubscribe_;
708 }
709 
IsCurrentSyncTaskCanBeSkipped() const710 bool SyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
711 {
712     return false;
713 }
714 
ResetLastPushTaskStatus()715 void SyncTaskContext::ResetLastPushTaskStatus()
716 {
717 }
718 
SchemaChange()719 void SyncTaskContext::SchemaChange()
720 {
721     if (stateMachine_ != nullptr) {
722         stateMachine_->SchemaChange();
723     }
724 }
725 
Dump(int fd)726 void SyncTaskContext::Dump(int fd)
727 {
728     size_t totalSyncTaskCount = 0u;
729     size_t autoSyncTaskCount = 0u;
730     size_t reponseTaskCount = 0u;
731     {
732         std::lock_guard<std::mutex> lock(targetQueueLock_);
733         totalSyncTaskCount = requestTargetQueue_.size() + responseTargetQueue_.size();
734         for (const auto &target : requestTargetQueue_) {
735             if (target->IsAutoSync()) { // LCOV_EXCL_BR_LINE
736                 autoSyncTaskCount++;
737             }
738         }
739         reponseTaskCount = responseTargetQueue_.size();
740     }
741     DBDumpHelper::Dump(fd, "\t\ttarget = %s, total sync task count = %zu, auto sync task count = %zu,"
742         " response task count = %zu\n",
743         deviceId_.c_str(), totalSyncTaskCount, autoSyncTaskCount, reponseTaskCount);
744 }
745 
RunPermissionCheck(uint8_t flag) const746 int SyncTaskContext::RunPermissionCheck(uint8_t flag) const
747 {
748     std::string appId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
749     std::string userId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
750     std::string storeId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
751     int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
752     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
753         { userId, appId, storeId, deviceId_, instanceId }, flag);
754     if (errCode != E_OK) {
755         LOGE("[SyncTaskContext] RunPermissionCheck not pass errCode:%d, flag:%d, %s{private}",
756             errCode, flag, deviceId_.c_str());
757     }
758     return errCode;
759 }
760 
GetPermissionCheckFlag(bool isAutoSync,int syncMode)761 uint8_t SyncTaskContext::GetPermissionCheckFlag(bool isAutoSync, int syncMode)
762 {
763     uint8_t flag = 0;
764     int mode = SyncOperation::TransferSyncMode(syncMode);
765     if (mode == SyncModeType::PUSH || mode == SyncModeType::RESPONSE_PULL) {
766         flag = CHECK_FLAG_SEND;
767     } else if (mode == SyncModeType::PULL) {
768         flag = CHECK_FLAG_RECEIVE;
769     } else if (mode == SyncModeType::PUSH_AND_PULL) {
770         flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE;
771     }
772     if (isAutoSync) {
773         flag = flag | CHECK_FLAG_AUTOSYNC;
774     }
775     if (mode != SyncModeType::RESPONSE_PULL) {
776         // it means this sync is started by local
777         flag = flag | CHECK_FLAG_SPONSOR;
778     }
779     return flag;
780 }
781 
AbortMachineIfNeed(uint32_t syncId)782 void SyncTaskContext::AbortMachineIfNeed(uint32_t syncId)
783 {
784     uint32_t sessionId = 0u;
785     {
786         RefObject::AutoLock autoLock(this);
787         if (syncId_ != syncId) {
788             return;
789         }
790         sessionId = requestSessionId_;
791     }
792     stateMachine_->InnerErrorAbort(sessionId);
793 }
794 
GetAndIncSyncOperation() const795 SyncOperation *SyncTaskContext::GetAndIncSyncOperation() const
796 {
797     std::lock_guard<std::mutex> lock(operationLock_);
798     if (syncOperation_ == nullptr) {
799         return nullptr;
800     }
801     RefObject::IncObjRef(syncOperation_);
802     return syncOperation_;
803 }
804 
GenerateRequestSessionId()805 uint32_t SyncTaskContext::GenerateRequestSessionId()
806 {
807     uint32_t sessionId = lastRequestSessionId_ != 0 ? lastRequestSessionId_ + 1 : 0;
808     // make sure sessionId is between 0x01 and 0x8fffffff
809     if (sessionId > SESSION_ID_MAX_VALUE || sessionId == 0) {
810         sessionId = Hash::Hash32Func(deviceId_ + std::to_string(syncId_) +
811             std::to_string(TimeHelper::GetSysCurrentTime()));
812     }
813     lastRequestSessionId_ = sessionId;
814     return sessionId;
815 }
816 
IsSchemaCompatible() const817 bool SyncTaskContext::IsSchemaCompatible() const
818 {
819     return true;
820 }
821 
SetDbAbility(DbAbility & remoteDbAbility)822 void SyncTaskContext::SetDbAbility([[gnu::unused]] DbAbility &remoteDbAbility)
823 {
824 }
825 
TimeChange()826 void SyncTaskContext::TimeChange()
827 {
828     if (stateMachine_ == nullptr) {
829         LOGW("[SyncTaskContext] machine is null when time change");
830         return;
831     }
832     stateMachine_->TimeChange();
833 }
834 
GetResponseTaskCount()835 int32_t SyncTaskContext::GetResponseTaskCount()
836 {
837     std::lock_guard<std::mutex> autoLock(targetQueueLock_);
838     return static_cast<int32_t>(responseTargetQueue_.size());
839 }
840 } // namespace DistributedDB
841