1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_task_context.h"
17
18 #include <algorithm>
19 #include <cmath>
20
21 #include "db_constant.h"
22 #include "db_dump_helper.h"
23 #include "db_dfx_adapter.h"
24 #include "db_errno.h"
25 #include "hash.h"
26 #include "isync_state_machine.h"
27 #include "log_print.h"
28 #include "time_helper.h"
29
30 namespace DistributedDB {
31 std::mutex SyncTaskContext::synTaskContextSetLock_;
32 std::set<ISyncTaskContext *> SyncTaskContext::synTaskContextSet_;
33
34 namespace {
35 const int NEGOTIATION_LIMIT = 2;
36 }
37
SyncTaskContext()38 SyncTaskContext::SyncTaskContext()
39 : syncOperation_(nullptr),
40 syncId_(0),
41 mode_(0),
42 isAutoSync_(false),
43 status_(0),
44 taskExecStatus_(0),
45 syncInterface_(nullptr),
46 communicator_(nullptr),
47 stateMachine_(nullptr),
48 requestSessionId_(0),
49 lastRequestSessionId_(0),
50 timeHelper_(nullptr),
51 remoteSoftwareVersion_(0),
52 remoteSoftwareVersionId_(0),
53 isCommNormal_(true),
54 taskErrCode_(E_OK),
55 syncTaskRetryStatus_(false),
56 isSyncRetry_(false),
57 negotiationCount_(0),
58 isAutoSubscribe_(false),
59 isNeedResetAbilitySync_(false)
60 {
61 }
62
~SyncTaskContext()63 SyncTaskContext::~SyncTaskContext()
64 {
65 if (stateMachine_ != nullptr) {
66 delete stateMachine_;
67 stateMachine_ = nullptr;
68 }
69 ClearSyncOperation();
70 ClearSyncTarget();
71 syncInterface_ = nullptr;
72 communicator_ = nullptr;
73 }
74
AddSyncTarget(ISyncTarget * target)75 int SyncTaskContext::AddSyncTarget(ISyncTarget *target)
76 {
77 if (target == nullptr) {
78 return -E_INVALID_ARGS;
79 }
80 int targetMode = target->GetMode();
81 {
82 std::lock_guard<std::mutex> lock(targetQueueLock_);
83 if (target->GetTaskType() == ISyncTarget::REQUEST) {
84 requestTargetQueue_.push_back(target);
85 } else if (target->GetTaskType() == ISyncTarget::RESPONSE) {
86 responseTargetQueue_.push_back(target);
87 } else {
88 return -E_INVALID_ARGS;
89 }
90 }
91 CancelCurrentSyncRetryIfNeed(targetMode);
92 if (taskExecStatus_ == RUNNING) {
93 return E_OK;
94 }
95 if (onSyncTaskAdd_) {
96 RefObject::IncObjRef(this);
97 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
98 onSyncTaskAdd_();
99 RefObject::DecObjRef(this);
100 });
101 if (errCode != E_OK) {
102 RefObject::DecObjRef(this);
103 }
104 }
105 return E_OK;
106 }
107
SetOperationStatus(int status)108 void SyncTaskContext::SetOperationStatus(int status)
109 {
110 std::lock_guard<std::mutex> lock(operationLock_);
111 if (syncOperation_ == nullptr) {
112 LOGD("[SyncTaskContext][SetStatus] syncOperation is null");
113 return;
114 }
115 int finalStatus = status;
116
117 int operationStatus = syncOperation_->GetStatus(deviceId_);
118 if (status == SyncOperation::OP_SEND_FINISHED && operationStatus == SyncOperation::OP_RECV_FINISHED) {
119 if (GetTaskErrCode() == -E_EKEYREVOKED) {
120 finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
121 } else {
122 finalStatus = SyncOperation::OP_FINISHED_ALL;
123 }
124 } else if (status == SyncOperation::OP_RECV_FINISHED && operationStatus == SyncOperation::OP_SEND_FINISHED) {
125 if (GetTaskErrCode() == -E_EKEYREVOKED) {
126 finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
127 } else {
128 finalStatus = SyncOperation::OP_FINISHED_ALL;
129 }
130 }
131 syncOperation_->SetStatus(deviceId_, finalStatus);
132 if (finalStatus >= SyncOperation::OP_FINISHED_ALL) {
133 SaveLastPushTaskExecStatus(finalStatus);
134 }
135 if (syncOperation_->CheckIsAllFinished()) {
136 syncOperation_->Finished();
137 }
138 }
139
SaveLastPushTaskExecStatus(int finalStatus)140 void SyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
141 {
142 (void)finalStatus;
143 }
144
Clear()145 void SyncTaskContext::Clear()
146 {
147 StopTimer();
148 retryTime_ = 0;
149 sequenceId_ = 1;
150 syncId_ = 0;
151 isAutoSync_ = false;
152 requestSessionId_ = 0;
153 isNeedRetry_ = NO_NEED_RETRY;
154 mode_ = SyncModeType::INVALID_MODE;
155 status_ = SyncOperation::OP_WAITING;
156 taskErrCode_ = E_OK;
157 packetId_ = 0;
158 isAutoSubscribe_ = false;
159 }
160
RemoveSyncOperation(int syncId)161 int SyncTaskContext::RemoveSyncOperation(int syncId)
162 {
163 std::lock_guard<std::mutex> lock(targetQueueLock_);
164 auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
165 [syncId](const ISyncTarget *target) {
166 if (target == nullptr) {
167 return false;
168 }
169 return target->GetSyncId() == syncId;
170 });
171 if (iter != requestTargetQueue_.end()) {
172 if (*iter != nullptr) {
173 delete *iter;
174 *iter = nullptr;
175 }
176 requestTargetQueue_.erase(iter);
177 return E_OK;
178 }
179 return -E_INVALID_ARGS;
180 }
181
ClearSyncTarget()182 void SyncTaskContext::ClearSyncTarget()
183 {
184 std::lock_guard<std::mutex> lock(targetQueueLock_);
185 for (auto &requestTarget : requestTargetQueue_) {
186 if (requestTarget != nullptr) {
187 delete requestTarget;
188 requestTarget = nullptr;
189 }
190 }
191 requestTargetQueue_.clear();
192
193 for (auto &responseTarget : responseTargetQueue_) {
194 if (responseTarget != nullptr) {
195 delete responseTarget;
196 responseTarget = nullptr;
197 }
198 }
199 responseTargetQueue_.clear();
200 }
201
IsTargetQueueEmpty() const202 bool SyncTaskContext::IsTargetQueueEmpty() const
203 {
204 std::lock_guard<std::mutex> lock(targetQueueLock_);
205 return requestTargetQueue_.empty() && responseTargetQueue_.empty();
206 }
207
GetOperationStatus() const208 int SyncTaskContext::GetOperationStatus() const
209 {
210 std::lock_guard<std::mutex> lock(operationLock_);
211 if (syncOperation_ == nullptr) {
212 return SyncOperation::OP_FINISHED_ALL;
213 }
214 return syncOperation_->GetStatus(deviceId_);
215 }
216
SetMode(int mode)217 void SyncTaskContext::SetMode(int mode)
218 {
219 mode_ = mode;
220 }
221
GetMode() const222 int SyncTaskContext::GetMode() const
223 {
224 return mode_;
225 }
226
MoveToNextTarget()227 void SyncTaskContext::MoveToNextTarget()
228 {
229 ClearSyncOperation();
230 TaskParam param;
231 // call other system api without lock
232 param.timeout = communicator_->GetTimeout(deviceId_);
233 std::lock_guard<std::mutex> lock(targetQueueLock_);
234 while (!requestTargetQueue_.empty() || !responseTargetQueue_.empty()) {
235 ISyncTarget *tmpTarget = nullptr;
236 if (!requestTargetQueue_.empty()) {
237 tmpTarget = requestTargetQueue_.front();
238 requestTargetQueue_.pop_front();
239 } else {
240 tmpTarget = responseTargetQueue_.front();
241 responseTargetQueue_.pop_front();
242 }
243 if (tmpTarget == nullptr) {
244 LOGE("[SyncTaskContext][MoveToNextTarget] currentTarget is null skip!");
245 continue;
246 }
247 SyncOperation *tmpOperation = nullptr;
248 tmpTarget->GetSyncOperation(tmpOperation);
249 if ((tmpOperation != nullptr) && tmpOperation->IsKilled()) {
250 // if killed skip this syncOperation_.
251 delete tmpTarget;
252 tmpTarget = nullptr;
253 continue;
254 }
255 CopyTargetData(tmpTarget, param);
256 delete tmpTarget;
257 tmpTarget = nullptr;
258 break;
259 }
260 }
261
GetNextTarget(bool isNeedSetFinished)262 int SyncTaskContext::GetNextTarget(bool isNeedSetFinished)
263 {
264 MoveToNextTarget();
265 int checkErrCode = RunPermissionCheck(GetPermissionCheckFlag(IsAutoSync(), GetMode()));
266 if (checkErrCode != E_OK) {
267 SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
268 if (isNeedSetFinished) {
269 SetTaskExecStatus(ISyncTaskContext::FINISHED);
270 }
271 return checkErrCode;
272 }
273 return E_OK;
274 }
275
GetSyncId() const276 uint32_t SyncTaskContext::GetSyncId() const
277 {
278 return syncId_;
279 }
280
281 // Get the current task deviceId.
GetDeviceId() const282 std::string SyncTaskContext::GetDeviceId() const
283 {
284 return deviceId_;
285 }
286
SetTaskExecStatus(int status)287 void SyncTaskContext::SetTaskExecStatus(int status)
288 {
289 taskExecStatus_ = status;
290 }
291
GetTaskExecStatus() const292 int SyncTaskContext::GetTaskExecStatus() const
293 {
294 return taskExecStatus_;
295 }
296
IsAutoSync() const297 bool SyncTaskContext::IsAutoSync() const
298 {
299 return isAutoSync_;
300 }
301
StartTimer()302 int SyncTaskContext::StartTimer()
303 {
304 std::lock_guard<std::mutex> lockGuard(timerLock_);
305 if (timerId_ > 0) {
306 return -E_UNEXPECTED_DATA;
307 }
308 TimerId timerId = 0;
309 RefObject::IncObjRef(this);
310 TimerAction timeOutCallback = std::bind(&SyncTaskContext::TimeOut, this, std::placeholders::_1);
311 int errCode = RuntimeContext::GetInstance()->SetTimer(timeout_, timeOutCallback,
312 [this]() {
313 int ret = RuntimeContext::GetInstance()->ScheduleTask([this](){ RefObject::DecObjRef(this); });
314 if (ret != E_OK) {
315 LOGE("[SyncTaskContext] timer finalizer ScheduleTask, errCode %d", ret);
316 }
317 }, timerId);
318 if (errCode != E_OK) {
319 RefObject::DecObjRef(this);
320 return errCode;
321 }
322 timerId_ = timerId;
323 return errCode;
324 }
325
StopTimer()326 void SyncTaskContext::StopTimer()
327 {
328 TimerId timerId;
329 {
330 std::lock_guard<std::mutex> lockGuard(timerLock_);
331 timerId = timerId_;
332 if (timerId_ == 0) {
333 return;
334 }
335 timerId_ = 0;
336 }
337 RuntimeContext::GetInstance()->RemoveTimer(timerId);
338 }
339
ModifyTimer(int milliSeconds)340 int SyncTaskContext::ModifyTimer(int milliSeconds)
341 {
342 std::lock_guard<std::mutex> lockGuard(timerLock_);
343 if (timerId_ == 0) {
344 return -E_UNEXPECTED_DATA;
345 }
346 return RuntimeContext::GetInstance()->ModifyTimer(timerId_, milliSeconds);
347 }
348
SetRetryTime(int retryTime)349 void SyncTaskContext::SetRetryTime(int retryTime)
350 {
351 retryTime_ = retryTime;
352 }
353
GetRetryTime() const354 int SyncTaskContext::GetRetryTime() const
355 {
356 return retryTime_;
357 }
358
SetRetryStatus(int isNeedRetry)359 void SyncTaskContext::SetRetryStatus(int isNeedRetry)
360 {
361 isNeedRetry_ = isNeedRetry;
362 }
363
GetRetryStatus() const364 int SyncTaskContext::GetRetryStatus() const
365 {
366 return isNeedRetry_;
367 }
368
GetTimerId() const369 TimerId SyncTaskContext::GetTimerId() const
370 {
371 return timerId_;
372 }
373
GetRequestSessionId() const374 uint32_t SyncTaskContext::GetRequestSessionId() const
375 {
376 return requestSessionId_;
377 }
378
IncSequenceId()379 void SyncTaskContext::IncSequenceId()
380 {
381 sequenceId_++;
382 }
383
GetSequenceId() const384 uint32_t SyncTaskContext::GetSequenceId() const
385 {
386 return sequenceId_;
387 }
388
ReSetSequenceId()389 void SyncTaskContext::ReSetSequenceId()
390 {
391 sequenceId_ = 1;
392 }
393
IncPacketId()394 void SyncTaskContext::IncPacketId()
395 {
396 packetId_++;
397 }
398
GetPacketId() const399 uint64_t SyncTaskContext::GetPacketId() const
400 {
401 return packetId_;
402 }
403
GetTimeoutTime() const404 int SyncTaskContext::GetTimeoutTime() const
405 {
406 return timeout_;
407 }
408
SetTimeoutCallback(const TimerAction & timeOutCallback)409 void SyncTaskContext::SetTimeoutCallback(const TimerAction &timeOutCallback)
410 {
411 timeOutCallback_ = timeOutCallback;
412 }
413
SetTimeOffset(TimeOffset offset)414 void SyncTaskContext::SetTimeOffset(TimeOffset offset)
415 {
416 timeOffset_ = offset;
417 }
418
GetTimeOffset() const419 TimeOffset SyncTaskContext::GetTimeOffset() const
420 {
421 return timeOffset_;
422 }
423
StartStateMachine()424 int SyncTaskContext::StartStateMachine()
425 {
426 return stateMachine_->StartSync();
427 }
428
ReceiveMessageCallback(Message * inMsg)429 int SyncTaskContext::ReceiveMessageCallback(Message *inMsg)
430 {
431 int errCode = E_OK;
432 if (IncUsedCount() == E_OK) {
433 errCode = stateMachine_->ReceiveMessageCallback(inMsg);
434 SafeExit();
435 }
436 return errCode;
437 }
438
RegOnSyncTask(const std::function<int (void)> & callback)439 void SyncTaskContext::RegOnSyncTask(const std::function<int(void)> &callback)
440 {
441 onSyncTaskAdd_ = callback;
442 }
443
IncUsedCount()444 int SyncTaskContext::IncUsedCount()
445 {
446 AutoLock lock(this);
447 if (IsKilled()) {
448 LOGI("[SyncTaskContext] IncUsedCount isKilled");
449 return -E_OBJ_IS_KILLED;
450 }
451 usedCount_++;
452 return E_OK;
453 }
454
SafeExit()455 void SyncTaskContext::SafeExit()
456 {
457 AutoLock lock(this);
458 usedCount_--;
459 if (usedCount_ < 1) {
460 safeKill_.notify_one();
461 }
462 }
463
GetCurrentLocalTime() const464 Timestamp SyncTaskContext::GetCurrentLocalTime() const
465 {
466 if (timeHelper_ == nullptr) {
467 return TimeHelper::INVALID_TIMESTAMP;
468 }
469 return timeHelper_->GetTime();
470 }
471
Abort(int status)472 void SyncTaskContext::Abort(int status)
473 {
474 (void)status;
475 Clear();
476 }
477
CommErrHandlerFunc(int errCode,ISyncTaskContext * context,int32_t sessionId)478 void SyncTaskContext::CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId)
479 {
480 {
481 std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
482 if (synTaskContextSet_.count(context) == 0) {
483 LOGI("[SyncTaskContext][CommErrHandle] context has been killed");
484 return;
485 }
486
487 // IncObjRef to maker sure context not been killed. after the lock_guard
488 RefObject::IncObjRef(context);
489 }
490
491 static_cast<SyncTaskContext *>(context)->CommErrHandlerFuncInner(errCode, static_cast<uint32_t>(sessionId));
492 RefObject::DecObjRef(context);
493 }
494
SetRemoteSoftwareVersion(uint32_t version)495 void SyncTaskContext::SetRemoteSoftwareVersion(uint32_t version)
496 {
497 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
498 remoteSoftwareVersion_ = version;
499 remoteSoftwareVersionId_++;
500 }
501
GetRemoteSoftwareVersion() const502 uint32_t SyncTaskContext::GetRemoteSoftwareVersion() const
503 {
504 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
505 return remoteSoftwareVersion_;
506 }
507
GetRemoteSoftwareVersionId() const508 uint64_t SyncTaskContext::GetRemoteSoftwareVersionId() const
509 {
510 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
511 return remoteSoftwareVersionId_;
512 }
513
IsCommNormal() const514 bool SyncTaskContext::IsCommNormal() const
515 {
516 return isCommNormal_;
517 }
518
CommErrHandlerFuncInner(int errCode,uint32_t sessionId)519 void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId)
520 {
521 {
522 RefObject::AutoLock lock(this);
523 if ((sessionId != requestSessionId_) || (requestSessionId_ == 0)) {
524 return;
525 }
526
527 if (errCode == E_OK) {
528 // when communicator sent message failed, the state machine will get the error and exit this sync task
529 // it seems unnecessary to change isCommNormal_ value, so just return here
530 return;
531 }
532 }
533 LOGE("[SyncTaskContext][CommErr] errCode %d", errCode);
534 stateMachine_->CommErrAbort(sessionId);
535 }
536
TimeOut(TimerId id)537 int SyncTaskContext::TimeOut(TimerId id)
538 {
539 if (!timeOutCallback_) {
540 return E_OK;
541 }
542 int errCode = IncUsedCount();
543 if (errCode != E_OK) {
544 LOGW("[SyncTaskContext][TimeOut] IncUsedCount failed! errCode=", errCode);
545 // if return is not E_OK, the timer will be removed
546 // we removed timer when context call StopTimer
547 return E_OK;
548 }
549 IncObjRef(this);
550 errCode = RuntimeContext::GetInstance()->ScheduleTask([this, id]() {
551 timeOutCallback_(id);
552 SafeExit();
553 DecObjRef(this);
554 });
555 if (errCode != E_OK) {
556 LOGW("[SyncTaskContext][Timeout] Trigger Timeout Async Failed! TimerId=" PRIu64 " errCode=%d", id, errCode);
557 SafeExit();
558 DecObjRef(this);
559 }
560 return E_OK;
561 }
562
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)563 void SyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
564 {
565 mode_ = target->GetMode();
566 status_ = SyncOperation::OP_SYNCING;
567 isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
568 taskErrCode_ = E_OK;
569 packetId_ = 0;
570 isCommNormal_ = true; // reset comm status here
571 syncTaskRetryStatus_ = isSyncRetry_;
572 timeout_ = static_cast<int>(taskParam.timeout);
573 negotiationCount_ = 0;
574 target->GetSyncOperation(syncOperation_);
575 ReSetSequenceId();
576
577 if (syncOperation_ != nullptr) {
578 // IncRef for syncOperation_ to make sure syncOperation_ is valid, when setStatus
579 RefObject::IncObjRef(syncOperation_);
580 syncId_ = syncOperation_->GetSyncId();
581 isAutoSync_ = syncOperation_->IsAutoSync();
582 isAutoSubscribe_ = syncOperation_->IsAutoControlCmd();
583 if (isAutoSync_ || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
584 syncTaskRetryStatus_ = true;
585 }
586 requestSessionId_ = Hash::Hash32Func(deviceId_ + std::to_string(syncId_) +
587 std::to_string(TimeHelper::GetSysCurrentTime()));
588 if (lastRequestSessionId_ == requestSessionId_) {
589 // Hash32Func max is 0x7fffffff and UINT32_MAX is 0xffffffff
590 requestSessionId_++;
591 LOGI("last sessionId is equal to this sessionId!");
592 }
593 LOGI("[SyncTaskContext][copyTarget] mode=%d,syncId=%d,isAutoSync=%d,isRetry=%d,dev=%s{private}",
594 mode_, syncId_, isAutoSync_, syncTaskRetryStatus_, deviceId_.c_str());
595 lastRequestSessionId_ = requestSessionId_;
596 DBDfxAdapter::StartAsyncTrace(syncActionName_, static_cast<int>(syncId_));
597 } else {
598 isAutoSync_ = false;
599 LOGI("[SyncTaskContext][copyTarget] for response data dev %s{private},isRetry=%d", deviceId_.c_str(),
600 syncTaskRetryStatus_);
601 }
602 }
603
KillWait()604 void SyncTaskContext::KillWait()
605 {
606 StopTimer();
607 stateMachine_->NotifyClosing();
608 UnlockObj();
609 stateMachine_->AbortImmediately();
610 LockObj();
611 LOGW("[SyncTaskContext] Try to kill a context, now wait.");
612 bool noDeadLock = WaitLockedUntil(
613 safeKill_,
614 [this]() {
615 if (usedCount_ < 1) {
616 return true;
617 }
618 return false;
619 },
620 KILL_WAIT_SECONDS);
621 if (!noDeadLock) {
622 LOGE("[SyncTaskContext] Dead lock may happen, we stop waiting the task exit.");
623 } else {
624 LOGW("[SyncTaskContext] Wait the task exit ok.");
625 }
626 std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
627 synTaskContextSet_.erase(this);
628 }
629
ClearSyncOperation()630 void SyncTaskContext::ClearSyncOperation()
631 {
632 std::lock_guard<std::mutex> lock(operationLock_);
633 if (syncOperation_ != nullptr) {
634 DBDfxAdapter::FinishAsyncTrace(syncActionName_, static_cast<int>(syncId_));
635 RefObject::DecObjRef(syncOperation_);
636 syncOperation_ = nullptr;
637 }
638 }
639
CancelCurrentSyncRetryIfNeed(int newTargetMode)640 void SyncTaskContext::CancelCurrentSyncRetryIfNeed(int newTargetMode)
641 {
642 AutoLock(this);
643 if (!isAutoSync_) {
644 return;
645 }
646 int mode = SyncOperation::TransferSyncMode(newTargetMode);
647 if (newTargetMode == mode_ || mode == SyncModeType::PUSH_AND_PULL) {
648 SetRetryTime(AUTO_RETRY_TIMES);
649 ModifyTimer(timeout_);
650 }
651 }
652
GetTaskErrCode() const653 int SyncTaskContext::GetTaskErrCode() const
654 {
655 return taskErrCode_;
656 }
657
SetTaskErrCode(int errCode)658 void SyncTaskContext::SetTaskErrCode(int errCode)
659 {
660 taskErrCode_ = errCode;
661 }
662
IsSyncTaskNeedRetry() const663 bool SyncTaskContext::IsSyncTaskNeedRetry() const
664 {
665 return syncTaskRetryStatus_;
666 }
667
SetSyncRetry(bool isRetry)668 void SyncTaskContext::SetSyncRetry(bool isRetry)
669 {
670 isSyncRetry_ = isRetry;
671 }
672
GetSyncRetryTimes() const673 int SyncTaskContext::GetSyncRetryTimes() const
674 {
675 if (IsAutoSync() || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
676 return AUTO_RETRY_TIMES;
677 }
678 return MANUAL_RETRY_TIMES;
679 }
680
GetSyncRetryTimeout(int retryTime) const681 int SyncTaskContext::GetSyncRetryTimeout(int retryTime) const
682 {
683 int timeoutTime = GetTimeoutTime();
684 if (IsAutoSync()) {
685 // set the new timeout value with 2 raised to the power of retryTime.
686 return timeoutTime * static_cast<int>(pow(2, retryTime));
687 }
688 return timeoutTime;
689 }
690
ClearAllSyncTask()691 void SyncTaskContext::ClearAllSyncTask()
692 {
693 }
694
IsAutoLiftWaterMark() const695 bool SyncTaskContext::IsAutoLiftWaterMark() const
696 {
697 return negotiationCount_ < NEGOTIATION_LIMIT;
698 }
699
IncNegotiationCount()700 void SyncTaskContext::IncNegotiationCount()
701 {
702 negotiationCount_++;
703 }
704
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)705 bool SyncTaskContext::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
706 {
707 return stateMachine_->IsNeedTriggerQueryAutoSync(inMsg, query);
708 }
709
IsAutoSubscribe() const710 bool SyncTaskContext::IsAutoSubscribe() const
711 {
712 return isAutoSubscribe_;
713 }
714
GetIsNeedResetAbilitySync() const715 bool SyncTaskContext::GetIsNeedResetAbilitySync() const
716 {
717 return isNeedResetAbilitySync_;
718 }
719
SetIsNeedResetAbilitySync(bool isNeedReset)720 void SyncTaskContext::SetIsNeedResetAbilitySync(bool isNeedReset)
721 {
722 isNeedResetAbilitySync_ = isNeedReset;
723 }
724
IsCurrentSyncTaskCanBeSkipped() const725 bool SyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
726 {
727 return false;
728 }
729
ResetLastPushTaskStatus()730 void SyncTaskContext::ResetLastPushTaskStatus()
731 {
732 }
733
SchemaChange()734 void SyncTaskContext::SchemaChange()
735 {
736 SetIsNeedResetAbilitySync(true);
737 }
738
Dump(int fd)739 void SyncTaskContext::Dump(int fd)
740 {
741 size_t totalSyncTaskCount = 0u;
742 size_t autoSyncTaskCount = 0u;
743 size_t reponseTaskCount = 0u;
744 {
745 std::lock_guard<std::mutex> lock(targetQueueLock_);
746 totalSyncTaskCount = requestTargetQueue_.size() + responseTargetQueue_.size();
747 for (const auto &target : requestTargetQueue_) {
748 if (target->IsAutoSync()) {
749 autoSyncTaskCount++;
750 }
751 }
752 reponseTaskCount = responseTargetQueue_.size();
753 }
754 DBDumpHelper::Dump(fd, "\t\ttarget = %s, total sync task count = %zu, auto sync task count = %zu,"
755 " response task count = %zu\n",
756 deviceId_.c_str(), totalSyncTaskCount, autoSyncTaskCount, reponseTaskCount);
757 }
758
RunPermissionCheck(uint8_t flag) const759 int SyncTaskContext::RunPermissionCheck(uint8_t flag) const
760 {
761 std::string appId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
762 std::string userId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
763 std::string storeId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
764 int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
765 int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
766 { userId, appId, storeId, deviceId_, instanceId }, flag);
767 if (errCode != E_OK) {
768 LOGE("[SyncTaskContext] RunPermissionCheck not pass errCode:%d, flag:%d, %s{private}",
769 errCode, flag, deviceId_.c_str());
770 }
771 return errCode;
772 }
773
GetPermissionCheckFlag(bool isAutoSync,int syncMode)774 uint8_t SyncTaskContext::GetPermissionCheckFlag(bool isAutoSync, int syncMode)
775 {
776 uint8_t flag = 0;
777 int mode = SyncOperation::TransferSyncMode(syncMode);
778 if (mode == SyncModeType::PUSH || mode == SyncModeType::RESPONSE_PULL) {
779 flag = CHECK_FLAG_SEND;
780 } else if (mode == SyncModeType::PULL) {
781 flag = CHECK_FLAG_RECEIVE;
782 } else if (mode == SyncModeType::PUSH_AND_PULL) {
783 flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE;
784 }
785 if (isAutoSync) {
786 flag = flag | CHECK_FLAG_AUTOSYNC;
787 }
788 if (mode != SyncModeType::RESPONSE_PULL) {
789 // it means this sync is started by local
790 flag = flag | CHECK_FLAG_SPONSOR;
791 }
792 return flag;
793 }
794
AbortMachineIfNeed(uint32_t syncId)795 void SyncTaskContext::AbortMachineIfNeed(uint32_t syncId)
796 {
797 uint32_t sessionId = 0u;
798 {
799 RefObject::AutoLock autoLock(this);
800 if (syncId_ != syncId) {
801 return;
802 }
803 sessionId = requestSessionId_;
804 }
805 stateMachine_->InnerErrorAbort(sessionId);
806 }
807
GetAndIncSyncOperation() const808 SyncOperation *SyncTaskContext::GetAndIncSyncOperation() const
809 {
810 std::lock_guard<std::mutex> lock(operationLock_);
811 if (syncOperation_ == nullptr) {
812 return nullptr;
813 }
814 RefObject::IncObjRef(syncOperation_);
815 return syncOperation_;
816 }
817 } // namespace DistributedDB
818