1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_task_context.h"
17
18 #include <algorithm>
19 #include <cmath>
20
21 #include "db_constant.h"
22 #include "db_dump_helper.h"
23 #include "db_dfx_adapter.h"
24 #include "db_errno.h"
25 #include "hash.h"
26 #include "isync_state_machine.h"
27 #include "log_print.h"
28 #include "time_helper.h"
29
30 namespace DistributedDB {
31 std::mutex SyncTaskContext::synTaskContextSetLock_;
32 std::set<ISyncTaskContext *> SyncTaskContext::synTaskContextSet_;
33
34 namespace {
35 constexpr int NEGOTIATION_LIMIT = 2;
36 constexpr uint32_t SESSION_ID_MAX_VALUE = 0x8fffffffu;
37 }
38
SyncTaskContext()39 SyncTaskContext::SyncTaskContext()
40 : syncOperation_(nullptr),
41 syncId_(0),
42 mode_(0),
43 isAutoSync_(false),
44 status_(0),
45 taskExecStatus_(0),
46 syncInterface_(nullptr),
47 communicator_(nullptr),
48 stateMachine_(nullptr),
49 requestSessionId_(0),
50 lastRequestSessionId_(0),
51 timeHelper_(nullptr),
52 remoteSoftwareVersion_(0),
53 remoteSoftwareVersionId_(0),
54 isCommNormal_(true),
55 taskErrCode_(E_OK),
56 syncTaskRetryStatus_(false),
57 isSyncRetry_(false),
58 negotiationCount_(0),
59 isAutoSubscribe_(false),
60 isNeedResetAbilitySync_(false)
61 {
62 }
63
~SyncTaskContext()64 SyncTaskContext::~SyncTaskContext()
65 {
66 if (stateMachine_ != nullptr) {
67 delete stateMachine_;
68 stateMachine_ = nullptr;
69 }
70 SyncTaskContext::ClearSyncOperation();
71 ClearSyncTarget();
72 syncInterface_ = nullptr;
73 communicator_ = nullptr;
74 }
75
AddSyncTarget(ISyncTarget * target)76 int SyncTaskContext::AddSyncTarget(ISyncTarget *target)
77 {
78 if (target == nullptr) {
79 return -E_INVALID_ARGS;
80 }
81 int targetMode = target->GetMode();
82 auto syncId = static_cast<uint32_t>(target->GetSyncId());
83 {
84 std::lock_guard<std::mutex> lock(targetQueueLock_);
85 if (target->GetTaskType() == ISyncTarget::REQUEST) {
86 requestTargetQueue_.push_back(target);
87 } else if (target->GetTaskType() == ISyncTarget::RESPONSE) {
88 responseTargetQueue_.push_back(target);
89 } else {
90 return -E_INVALID_ARGS;
91 }
92 }
93 RefObject::IncObjRef(this);
94 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetMode, syncId]() {
95 CancelCurrentSyncRetryIfNeed(targetMode, syncId);
96 RefObject::DecObjRef(this);
97 });
98 if (errCode != E_OK) {
99 RefObject::DecObjRef(this);
100 }
101 if (onSyncTaskAdd_) {
102 RefObject::IncObjRef(this);
103 errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
104 onSyncTaskAdd_();
105 RefObject::DecObjRef(this);
106 });
107 if (errCode != E_OK) {
108 RefObject::DecObjRef(this);
109 }
110 }
111 return E_OK;
112 }
113
SetOperationStatus(int status)114 void SyncTaskContext::SetOperationStatus(int status)
115 {
116 std::lock_guard<std::mutex> lock(operationLock_);
117 if (syncOperation_ == nullptr) {
118 LOGD("[SyncTaskContext][SetStatus] syncOperation is null");
119 return;
120 }
121 int finalStatus = status;
122
123 int operationStatus = syncOperation_->GetStatus(deviceId_);
124 if (status == SyncOperation::OP_SEND_FINISHED && operationStatus == SyncOperation::OP_RECV_FINISHED) {
125 if (GetTaskErrCode() == -E_EKEYREVOKED) {
126 finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
127 } else {
128 finalStatus = SyncOperation::OP_FINISHED_ALL;
129 }
130 } else if (status == SyncOperation::OP_RECV_FINISHED && operationStatus == SyncOperation::OP_SEND_FINISHED) {
131 if (GetTaskErrCode() == -E_EKEYREVOKED) {
132 finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
133 } else {
134 finalStatus = SyncOperation::OP_FINISHED_ALL;
135 }
136 }
137 syncOperation_->SetStatus(deviceId_, finalStatus);
138 if (finalStatus >= SyncOperation::OP_FINISHED_ALL) {
139 SaveLastPushTaskExecStatus(finalStatus);
140 }
141 if (syncOperation_->CheckIsAllFinished()) {
142 syncOperation_->Finished();
143 }
144 }
145
SaveLastPushTaskExecStatus(int finalStatus)146 void SyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
147 {
148 (void)finalStatus;
149 }
150
Clear()151 void SyncTaskContext::Clear()
152 {
153 StopTimer();
154 retryTime_ = 0;
155 sequenceId_ = 1;
156 syncId_ = 0;
157 isAutoSync_ = false;
158 requestSessionId_ = 0;
159 isNeedRetry_ = NO_NEED_RETRY;
160 mode_ = SyncModeType::INVALID_MODE;
161 status_ = SyncOperation::OP_WAITING;
162 taskErrCode_ = E_OK;
163 packetId_ = 0;
164 isAutoSubscribe_ = false;
165 }
166
RemoveSyncOperation(int syncId)167 int SyncTaskContext::RemoveSyncOperation(int syncId)
168 {
169 std::lock_guard<std::mutex> lock(targetQueueLock_);
170 auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
171 [syncId](const ISyncTarget *target) {
172 if (target == nullptr) {
173 return false;
174 }
175 return target->GetSyncId() == syncId;
176 });
177 if (iter != requestTargetQueue_.end()) {
178 if (*iter != nullptr) {
179 delete *iter;
180 *iter = nullptr;
181 }
182 requestTargetQueue_.erase(iter);
183 return E_OK;
184 }
185 return -E_INVALID_ARGS;
186 }
187
ClearSyncTarget()188 void SyncTaskContext::ClearSyncTarget()
189 {
190 std::lock_guard<std::mutex> lock(targetQueueLock_);
191 for (auto &requestTarget : requestTargetQueue_) {
192 if (requestTarget != nullptr) {
193 delete requestTarget;
194 requestTarget = nullptr;
195 }
196 }
197 requestTargetQueue_.clear();
198
199 for (auto &responseTarget : responseTargetQueue_) {
200 if (responseTarget != nullptr) {
201 delete responseTarget;
202 responseTarget = nullptr;
203 }
204 }
205 responseTargetQueue_.clear();
206 }
207
IsTargetQueueEmpty() const208 bool SyncTaskContext::IsTargetQueueEmpty() const
209 {
210 std::lock_guard<std::mutex> lock(targetQueueLock_);
211 return requestTargetQueue_.empty() && responseTargetQueue_.empty();
212 }
213
GetOperationStatus() const214 int SyncTaskContext::GetOperationStatus() const
215 {
216 std::lock_guard<std::mutex> lock(operationLock_);
217 if (syncOperation_ == nullptr) {
218 return SyncOperation::OP_FINISHED_ALL;
219 }
220 return syncOperation_->GetStatus(deviceId_);
221 }
222
SetMode(int mode)223 void SyncTaskContext::SetMode(int mode)
224 {
225 mode_ = mode;
226 }
227
GetMode() const228 int SyncTaskContext::GetMode() const
229 {
230 return mode_;
231 }
232
MoveToNextTarget()233 void SyncTaskContext::MoveToNextTarget()
234 {
235 ClearSyncOperation();
236 TaskParam param;
237 // call other system api without lock
238 param.timeout = communicator_->GetTimeout(deviceId_);
239 std::lock_guard<std::mutex> lock(targetQueueLock_);
240 while (!requestTargetQueue_.empty() || !responseTargetQueue_.empty()) {
241 ISyncTarget *tmpTarget = nullptr;
242 if (!requestTargetQueue_.empty()) {
243 tmpTarget = requestTargetQueue_.front();
244 requestTargetQueue_.pop_front();
245 } else {
246 tmpTarget = responseTargetQueue_.front();
247 responseTargetQueue_.pop_front();
248 }
249 if (tmpTarget == nullptr) {
250 LOGE("[SyncTaskContext][MoveToNextTarget] currentTarget is null skip!");
251 continue;
252 }
253 SyncOperation *tmpOperation = nullptr;
254 tmpTarget->GetSyncOperation(tmpOperation);
255 if ((tmpOperation != nullptr) && tmpOperation->IsKilled()) {
256 // if killed skip this syncOperation_.
257 delete tmpTarget;
258 tmpTarget = nullptr;
259 continue;
260 }
261 CopyTargetData(tmpTarget, param);
262 delete tmpTarget;
263 tmpTarget = nullptr;
264 break;
265 }
266 }
267
GetNextTarget()268 int SyncTaskContext::GetNextTarget()
269 {
270 MoveToNextTarget();
271 int checkErrCode = RunPermissionCheck(GetPermissionCheckFlag(IsAutoSync(), GetMode()));
272 if (checkErrCode != E_OK) {
273 SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
274 return checkErrCode;
275 }
276 return E_OK;
277 }
278
GetSyncId() const279 uint32_t SyncTaskContext::GetSyncId() const
280 {
281 return syncId_;
282 }
283
284 // Get the current task deviceId.
GetDeviceId() const285 std::string SyncTaskContext::GetDeviceId() const
286 {
287 return deviceId_;
288 }
289
SetTaskExecStatus(int status)290 void SyncTaskContext::SetTaskExecStatus(int status)
291 {
292 taskExecStatus_ = status;
293 }
294
GetTaskExecStatus() const295 int SyncTaskContext::GetTaskExecStatus() const
296 {
297 return taskExecStatus_;
298 }
299
IsAutoSync() const300 bool SyncTaskContext::IsAutoSync() const
301 {
302 return isAutoSync_;
303 }
304
StartTimer()305 int SyncTaskContext::StartTimer()
306 {
307 std::lock_guard<std::mutex> lockGuard(timerLock_);
308 if (timerId_ > 0) {
309 return -E_UNEXPECTED_DATA;
310 }
311 TimerId timerId = 0;
312 RefObject::IncObjRef(this);
313 TimerAction timeOutCallback = std::bind(&SyncTaskContext::TimeOut, this, std::placeholders::_1);
314 int errCode = RuntimeContext::GetInstance()->SetTimer(timeout_, timeOutCallback,
315 [this]() {
316 int ret = RuntimeContext::GetInstance()->ScheduleTask([this](){ RefObject::DecObjRef(this); });
317 if (ret != E_OK) {
318 LOGE("[SyncTaskContext] timer finalizer ScheduleTask, errCode %d", ret);
319 }
320 }, timerId);
321 if (errCode != E_OK) {
322 RefObject::DecObjRef(this);
323 return errCode;
324 }
325 timerId_ = timerId;
326 return errCode;
327 }
328
StopTimer()329 void SyncTaskContext::StopTimer()
330 {
331 TimerId timerId;
332 {
333 std::lock_guard<std::mutex> lockGuard(timerLock_);
334 timerId = timerId_;
335 if (timerId_ == 0) {
336 return;
337 }
338 timerId_ = 0;
339 }
340 RuntimeContext::GetInstance()->RemoveTimer(timerId);
341 }
342
ModifyTimer(int milliSeconds)343 int SyncTaskContext::ModifyTimer(int milliSeconds)
344 {
345 std::lock_guard<std::mutex> lockGuard(timerLock_);
346 if (timerId_ == 0) {
347 return -E_UNEXPECTED_DATA;
348 }
349 return RuntimeContext::GetInstance()->ModifyTimer(timerId_, milliSeconds);
350 }
351
SetRetryTime(int retryTime)352 void SyncTaskContext::SetRetryTime(int retryTime)
353 {
354 retryTime_ = retryTime;
355 }
356
GetRetryTime() const357 int SyncTaskContext::GetRetryTime() const
358 {
359 return retryTime_;
360 }
361
SetRetryStatus(int isNeedRetry)362 void SyncTaskContext::SetRetryStatus(int isNeedRetry)
363 {
364 isNeedRetry_ = isNeedRetry;
365 }
366
GetRetryStatus() const367 int SyncTaskContext::GetRetryStatus() const
368 {
369 return isNeedRetry_;
370 }
371
GetTimerId() const372 TimerId SyncTaskContext::GetTimerId() const
373 {
374 return timerId_;
375 }
376
GetRequestSessionId() const377 uint32_t SyncTaskContext::GetRequestSessionId() const
378 {
379 return requestSessionId_;
380 }
381
IncSequenceId()382 void SyncTaskContext::IncSequenceId()
383 {
384 sequenceId_++;
385 }
386
GetSequenceId() const387 uint32_t SyncTaskContext::GetSequenceId() const
388 {
389 return sequenceId_;
390 }
391
ReSetSequenceId()392 void SyncTaskContext::ReSetSequenceId()
393 {
394 sequenceId_ = 1;
395 }
396
IncPacketId()397 void SyncTaskContext::IncPacketId()
398 {
399 packetId_++;
400 }
401
GetPacketId() const402 uint64_t SyncTaskContext::GetPacketId() const
403 {
404 return packetId_;
405 }
406
GetTimeoutTime() const407 int SyncTaskContext::GetTimeoutTime() const
408 {
409 return timeout_;
410 }
411
SetTimeoutCallback(const TimerAction & timeOutCallback)412 void SyncTaskContext::SetTimeoutCallback(const TimerAction &timeOutCallback)
413 {
414 timeOutCallback_ = timeOutCallback;
415 }
416
SetTimeOffset(TimeOffset offset)417 void SyncTaskContext::SetTimeOffset(TimeOffset offset)
418 {
419 timeOffset_ = offset;
420 }
421
GetTimeOffset() const422 TimeOffset SyncTaskContext::GetTimeOffset() const
423 {
424 return timeOffset_;
425 }
426
StartStateMachine()427 int SyncTaskContext::StartStateMachine()
428 {
429 return stateMachine_->StartSync();
430 }
431
ReceiveMessageCallback(Message * inMsg)432 int SyncTaskContext::ReceiveMessageCallback(Message *inMsg)
433 {
434 int errCode = E_OK;
435 if (IncUsedCount() == E_OK) {
436 errCode = stateMachine_->ReceiveMessageCallback(inMsg);
437 SafeExit();
438 }
439 return errCode;
440 }
441
RegOnSyncTask(const std::function<int (void)> & callback)442 void SyncTaskContext::RegOnSyncTask(const std::function<int(void)> &callback)
443 {
444 onSyncTaskAdd_ = callback;
445 }
446
IncUsedCount()447 int SyncTaskContext::IncUsedCount()
448 {
449 AutoLock lock(this);
450 if (IsKilled()) {
451 LOGI("[SyncTaskContext] IncUsedCount isKilled");
452 return -E_OBJ_IS_KILLED;
453 }
454 usedCount_++;
455 return E_OK;
456 }
457
SafeExit()458 void SyncTaskContext::SafeExit()
459 {
460 AutoLock lock(this);
461 usedCount_--;
462 if (usedCount_ < 1) {
463 safeKill_.notify_one();
464 }
465 }
466
GetCurrentLocalTime() const467 Timestamp SyncTaskContext::GetCurrentLocalTime() const
468 {
469 if (timeHelper_ == nullptr) {
470 return TimeHelper::INVALID_TIMESTAMP;
471 }
472 return timeHelper_->GetTime();
473 }
474
Abort(int status)475 void SyncTaskContext::Abort(int status)
476 {
477 (void)status;
478 Clear();
479 }
480
CommErrHandlerFunc(int errCode,ISyncTaskContext * context,int32_t sessionId)481 void SyncTaskContext::CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId)
482 {
483 {
484 std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
485 if (synTaskContextSet_.count(context) == 0) {
486 LOGI("[SyncTaskContext][CommErrHandle] context has been killed");
487 return;
488 }
489
490 // IncObjRef to maker sure context not been killed. after the lock_guard
491 RefObject::IncObjRef(context);
492 }
493
494 static_cast<SyncTaskContext *>(context)->CommErrHandlerFuncInner(errCode, static_cast<uint32_t>(sessionId));
495 RefObject::DecObjRef(context);
496 }
497
SetRemoteSoftwareVersion(uint32_t version)498 void SyncTaskContext::SetRemoteSoftwareVersion(uint32_t version)
499 {
500 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
501 remoteSoftwareVersion_ = version;
502 remoteSoftwareVersionId_++;
503 }
504
GetRemoteSoftwareVersion() const505 uint32_t SyncTaskContext::GetRemoteSoftwareVersion() const
506 {
507 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
508 return remoteSoftwareVersion_;
509 }
510
GetRemoteSoftwareVersionId() const511 uint64_t SyncTaskContext::GetRemoteSoftwareVersionId() const
512 {
513 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
514 return remoteSoftwareVersionId_;
515 }
516
IsCommNormal() const517 bool SyncTaskContext::IsCommNormal() const
518 {
519 return isCommNormal_;
520 }
521
CommErrHandlerFuncInner(int errCode,uint32_t sessionId)522 void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId)
523 {
524 {
525 RefObject::AutoLock lock(this);
526 if ((sessionId != requestSessionId_) || (requestSessionId_ == 0)) {
527 return;
528 }
529
530 if (errCode == E_OK) {
531 // when communicator sent message failed, the state machine will get the error and exit this sync task
532 // it seems unnecessary to change isCommNormal_ value, so just return here
533 return;
534 }
535 }
536 LOGE("[SyncTaskContext][CommErr] errCode %d", errCode);
537 stateMachine_->CommErrAbort(sessionId);
538 }
539
TimeOut(TimerId id)540 int SyncTaskContext::TimeOut(TimerId id)
541 {
542 if (!timeOutCallback_) {
543 return E_OK;
544 }
545 IncObjRef(this);
546 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, id]() {
547 timeOutCallback_(id);
548 DecObjRef(this);
549 });
550 if (errCode != E_OK) {
551 LOGW("[SyncTaskContext][TimeOut] Trigger TimeOut Async Failed! TimerId=" PRIu64 " errCode=%d", id, errCode);
552 DecObjRef(this);
553 }
554 return E_OK;
555 }
556
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)557 void SyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
558 {
559 retryTime_ = 0;
560 mode_ = target->GetMode();
561 status_ = SyncOperation::OP_SYNCING;
562 isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
563 taskErrCode_ = E_OK;
564 packetId_ = 0;
565 isCommNormal_ = true; // reset comm status here
566 syncTaskRetryStatus_ = isSyncRetry_;
567 timeout_ = static_cast<int>(taskParam.timeout);
568 negotiationCount_ = 0;
569 target->GetSyncOperation(syncOperation_);
570 ReSetSequenceId();
571
572 if (syncOperation_ != nullptr) {
573 // IncRef for syncOperation_ to make sure syncOperation_ is valid, when setStatus
574 RefObject::IncObjRef(syncOperation_);
575 syncId_ = syncOperation_->GetSyncId();
576 isAutoSync_ = syncOperation_->IsAutoSync();
577 isAutoSubscribe_ = syncOperation_->IsAutoControlCmd();
578 if (isAutoSync_ || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
579 syncTaskRetryStatus_ = true;
580 }
581 requestSessionId_ = GenerateRequestSessionId();
582 LOGI("[SyncTaskContext][copyTarget] mode=%d,syncId=%d,isAutoSync=%d,isRetry=%d,dev=%s{private}",
583 mode_, syncId_, isAutoSync_, syncTaskRetryStatus_, deviceId_.c_str());
584 DBDfxAdapter::StartAsyncTrace(syncActionName_, static_cast<int>(syncId_));
585 } else {
586 isAutoSync_ = false;
587 LOGI("[SyncTaskContext][copyTarget] for response data dev %s{private},isRetry=%d", deviceId_.c_str(),
588 syncTaskRetryStatus_);
589 }
590 }
591
KillWait()592 void SyncTaskContext::KillWait()
593 {
594 StopTimer();
595 UnlockObj();
596 stateMachine_->NotifyClosing();
597 stateMachine_->AbortImmediately();
598 LockObj();
599 LOGW("[SyncTaskContext] Try to kill a context, now wait.");
600 bool noDeadLock = WaitLockedUntil(
601 safeKill_,
602 [this]() {
603 if (usedCount_ < 1) {
604 return true;
605 }
606 return false;
607 },
608 KILL_WAIT_SECONDS);
609 if (!noDeadLock) {
610 LOGE("[SyncTaskContext] Dead lock may happen, we stop waiting the task exit.");
611 } else {
612 LOGW("[SyncTaskContext] Wait the task exit ok.");
613 }
614 std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
615 synTaskContextSet_.erase(this);
616 }
617
ClearSyncOperation()618 void SyncTaskContext::ClearSyncOperation()
619 {
620 std::lock_guard<std::mutex> lock(operationLock_);
621 if (syncOperation_ != nullptr) {
622 DBDfxAdapter::FinishAsyncTrace(syncActionName_, static_cast<int>(syncId_));
623 RefObject::DecObjRef(syncOperation_);
624 syncOperation_ = nullptr;
625 }
626 }
627
CancelCurrentSyncRetryIfNeed(int newTargetMode,uint32_t syncId)628 void SyncTaskContext::CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId)
629 {
630 AutoLock lock(this);
631 if (!isAutoSync_) {
632 return;
633 }
634 if (syncId_ >= syncId) {
635 return;
636 }
637 int mode = SyncOperation::TransferSyncMode(newTargetMode);
638 if (newTargetMode == mode_ || mode == SyncModeType::PUSH_AND_PULL) {
639 SetRetryTime(AUTO_RETRY_TIMES);
640 ModifyTimer(timeout_);
641 }
642 }
643
GetTaskErrCode() const644 int SyncTaskContext::GetTaskErrCode() const
645 {
646 return taskErrCode_;
647 }
648
SetTaskErrCode(int errCode)649 void SyncTaskContext::SetTaskErrCode(int errCode)
650 {
651 taskErrCode_ = errCode;
652 }
653
IsSyncTaskNeedRetry() const654 bool SyncTaskContext::IsSyncTaskNeedRetry() const
655 {
656 return syncTaskRetryStatus_;
657 }
658
SetSyncRetry(bool isRetry)659 void SyncTaskContext::SetSyncRetry(bool isRetry)
660 {
661 isSyncRetry_ = isRetry;
662 }
663
GetSyncRetryTimes() const664 int SyncTaskContext::GetSyncRetryTimes() const
665 {
666 if (IsAutoSync() || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
667 return AUTO_RETRY_TIMES;
668 }
669 return MANUAL_RETRY_TIMES;
670 }
671
GetSyncRetryTimeout(int retryTime) const672 int SyncTaskContext::GetSyncRetryTimeout(int retryTime) const
673 {
674 int timeoutTime = GetTimeoutTime();
675 if (IsAutoSync()) {
676 // set the new timeout value with 2 raised to the power of retryTime.
677 return timeoutTime * (1u << retryTime);
678 }
679 return timeoutTime;
680 }
681
ClearAllSyncTask()682 void SyncTaskContext::ClearAllSyncTask()
683 {
684 }
685
IsAutoLiftWaterMark() const686 bool SyncTaskContext::IsAutoLiftWaterMark() const
687 {
688 return negotiationCount_ < NEGOTIATION_LIMIT;
689 }
690
IncNegotiationCount()691 void SyncTaskContext::IncNegotiationCount()
692 {
693 negotiationCount_++;
694 }
695
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)696 bool SyncTaskContext::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
697 {
698 return stateMachine_->IsNeedTriggerQueryAutoSync(inMsg, query);
699 }
700
IsAutoSubscribe() const701 bool SyncTaskContext::IsAutoSubscribe() const
702 {
703 return isAutoSubscribe_;
704 }
705
GetIsNeedResetAbilitySync() const706 bool SyncTaskContext::GetIsNeedResetAbilitySync() const
707 {
708 return isNeedResetAbilitySync_;
709 }
710
SetIsNeedResetAbilitySync(bool isNeedReset)711 void SyncTaskContext::SetIsNeedResetAbilitySync(bool isNeedReset)
712 {
713 isNeedResetAbilitySync_ = isNeedReset;
714 }
715
IsCurrentSyncTaskCanBeSkipped() const716 bool SyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
717 {
718 return false;
719 }
720
ResetLastPushTaskStatus()721 void SyncTaskContext::ResetLastPushTaskStatus()
722 {
723 }
724
SchemaChange()725 void SyncTaskContext::SchemaChange()
726 {
727 SetIsNeedResetAbilitySync(true);
728 }
729
Dump(int fd)730 void SyncTaskContext::Dump(int fd)
731 {
732 size_t totalSyncTaskCount = 0u;
733 size_t autoSyncTaskCount = 0u;
734 size_t reponseTaskCount = 0u;
735 {
736 std::lock_guard<std::mutex> lock(targetQueueLock_);
737 totalSyncTaskCount = requestTargetQueue_.size() + responseTargetQueue_.size();
738 for (const auto &target : requestTargetQueue_) {
739 if (target->IsAutoSync()) {
740 autoSyncTaskCount++;
741 }
742 }
743 reponseTaskCount = responseTargetQueue_.size();
744 }
745 DBDumpHelper::Dump(fd, "\t\ttarget = %s, total sync task count = %zu, auto sync task count = %zu,"
746 " response task count = %zu\n",
747 deviceId_.c_str(), totalSyncTaskCount, autoSyncTaskCount, reponseTaskCount);
748 }
749
RunPermissionCheck(uint8_t flag) const750 int SyncTaskContext::RunPermissionCheck(uint8_t flag) const
751 {
752 std::string appId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
753 std::string userId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
754 std::string storeId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
755 int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
756 int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
757 { userId, appId, storeId, deviceId_, instanceId }, flag);
758 if (errCode != E_OK) {
759 LOGE("[SyncTaskContext] RunPermissionCheck not pass errCode:%d, flag:%d, %s{private}",
760 errCode, flag, deviceId_.c_str());
761 }
762 return errCode;
763 }
764
GetPermissionCheckFlag(bool isAutoSync,int syncMode)765 uint8_t SyncTaskContext::GetPermissionCheckFlag(bool isAutoSync, int syncMode)
766 {
767 uint8_t flag = 0;
768 int mode = SyncOperation::TransferSyncMode(syncMode);
769 if (mode == SyncModeType::PUSH || mode == SyncModeType::RESPONSE_PULL) {
770 flag = CHECK_FLAG_SEND;
771 } else if (mode == SyncModeType::PULL) {
772 flag = CHECK_FLAG_RECEIVE;
773 } else if (mode == SyncModeType::PUSH_AND_PULL) {
774 flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE;
775 }
776 if (isAutoSync) {
777 flag = flag | CHECK_FLAG_AUTOSYNC;
778 }
779 if (mode != SyncModeType::RESPONSE_PULL) {
780 // it means this sync is started by local
781 flag = flag | CHECK_FLAG_SPONSOR;
782 }
783 return flag;
784 }
785
AbortMachineIfNeed(uint32_t syncId)786 void SyncTaskContext::AbortMachineIfNeed(uint32_t syncId)
787 {
788 uint32_t sessionId = 0u;
789 {
790 RefObject::AutoLock autoLock(this);
791 if (syncId_ != syncId) {
792 return;
793 }
794 sessionId = requestSessionId_;
795 }
796 stateMachine_->InnerErrorAbort(sessionId);
797 }
798
GetAndIncSyncOperation() const799 SyncOperation *SyncTaskContext::GetAndIncSyncOperation() const
800 {
801 std::lock_guard<std::mutex> lock(operationLock_);
802 if (syncOperation_ == nullptr) {
803 return nullptr;
804 }
805 RefObject::IncObjRef(syncOperation_);
806 return syncOperation_;
807 }
808
GenerateRequestSessionId()809 uint32_t SyncTaskContext::GenerateRequestSessionId()
810 {
811 uint32_t sessionId = lastRequestSessionId_ != 0 ? lastRequestSessionId_ + 1 : 0;
812 // make sure sessionId is between 0x01 and 0x8fffffff
813 if (sessionId > SESSION_ID_MAX_VALUE || sessionId == 0) {
814 sessionId = Hash::Hash32Func(deviceId_ + std::to_string(syncId_) +
815 std::to_string(TimeHelper::GetSysCurrentTime()));
816 }
817 lastRequestSessionId_ = sessionId;
818 return sessionId;
819 }
820 } // namespace DistributedDB
821