1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_task_context.h"
17
18 #include <algorithm>
19 #include <cmath>
20
21 #include "db_constant.h"
22 #include "db_dump_helper.h"
23 #include "db_dfx_adapter.h"
24 #include "db_errno.h"
25 #include "hash.h"
26 #include "isync_state_machine.h"
27 #include "log_print.h"
28 #include "time_helper.h"
29 #include "version.h"
30
31 namespace DistributedDB {
32 std::mutex SyncTaskContext::synTaskContextSetLock_;
33 std::set<ISyncTaskContext *> SyncTaskContext::synTaskContextSet_;
34
35 namespace {
36 constexpr int NEGOTIATION_LIMIT = 2;
37 constexpr uint32_t SESSION_ID_MAX_VALUE = 0x8fffffffu;
38 }
39
SyncTaskContext()40 SyncTaskContext::SyncTaskContext()
41 : syncOperation_(nullptr),
42 syncId_(0),
43 mode_(0),
44 isAutoSync_(false),
45 status_(0),
46 taskExecStatus_(0),
47 syncInterface_(nullptr),
48 communicator_(nullptr),
49 stateMachine_(nullptr),
50 requestSessionId_(0),
51 lastRequestSessionId_(0),
52 timeHelper_(nullptr),
53 remoteSoftwareVersion_(0),
54 remoteSoftwareVersionId_(0),
55 isCommNormal_(true),
56 taskErrCode_(E_OK),
57 commErrCode_(E_OK),
58 syncTaskRetryStatus_(false),
59 isSyncRetry_(false),
60 negotiationCount_(0),
61 isAutoSubscribe_(false)
62 {
63 }
64
~SyncTaskContext()65 SyncTaskContext::~SyncTaskContext()
66 {
67 if (stateMachine_ != nullptr) {
68 delete stateMachine_;
69 stateMachine_ = nullptr;
70 }
71 SyncTaskContext::ClearSyncOperation();
72 ClearSyncTarget();
73 syncInterface_ = nullptr;
74 communicator_ = nullptr;
75 }
76
AddSyncTarget(ISyncTarget * target)77 int SyncTaskContext::AddSyncTarget(ISyncTarget *target)
78 {
79 if (target == nullptr) {
80 return -E_INVALID_ARGS;
81 }
82 int targetMode = target->GetMode();
83 auto syncId = static_cast<uint32_t>(target->GetSyncId());
84 {
85 std::lock_guard<std::mutex> lock(targetQueueLock_);
86 if (target->GetTaskType() == ISyncTarget::REQUEST) {
87 requestTargetQueue_.push_back(target);
88 } else if (target->GetTaskType() == ISyncTarget::RESPONSE) {
89 responseTargetQueue_.push_back(target);
90 } else {
91 return -E_INVALID_ARGS;
92 }
93 }
94 RefObject::IncObjRef(this);
95 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetMode, syncId]() {
96 CancelCurrentSyncRetryIfNeed(targetMode, syncId);
97 RefObject::DecObjRef(this);
98 });
99 if (errCode != E_OK) {
100 RefObject::DecObjRef(this);
101 }
102 if (onSyncTaskAdd_) {
103 RefObject::IncObjRef(this);
104 errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
105 onSyncTaskAdd_();
106 RefObject::DecObjRef(this);
107 });
108 if (errCode != E_OK) {
109 RefObject::DecObjRef(this);
110 }
111 }
112 return E_OK;
113 }
114
SetOperationStatus(int status)115 void SyncTaskContext::SetOperationStatus(int status)
116 {
117 std::lock_guard<std::mutex> lock(operationLock_);
118 if (syncOperation_ == nullptr) {
119 LOGD("[SyncTaskContext][SetStatus] syncOperation is null");
120 return;
121 }
122 int finalStatus = status;
123
124 int operationStatus = syncOperation_->GetStatus(deviceId_);
125 if (status == SyncOperation::OP_SEND_FINISHED && operationStatus == SyncOperation::OP_RECV_FINISHED) {
126 if (GetTaskErrCode() == -E_EKEYREVOKED) { // LCOV_EXCL_BR_LINE
127 finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
128 } else {
129 finalStatus = SyncOperation::OP_FINISHED_ALL;
130 }
131 } else if (status == SyncOperation::OP_RECV_FINISHED && operationStatus == SyncOperation::OP_SEND_FINISHED) {
132 if (GetTaskErrCode() == -E_EKEYREVOKED) {
133 finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
134 } else {
135 finalStatus = SyncOperation::OP_FINISHED_ALL;
136 }
137 }
138 syncOperation_->SetStatus(deviceId_, finalStatus);
139 if (finalStatus >= SyncOperation::OP_FINISHED_ALL) {
140 SaveLastPushTaskExecStatus(finalStatus);
141 }
142 if (syncOperation_->CheckIsAllFinished()) {
143 syncOperation_->Finished();
144 }
145 }
146
SaveLastPushTaskExecStatus(int finalStatus)147 void SyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
148 {
149 (void)finalStatus;
150 }
151
Clear()152 void SyncTaskContext::Clear()
153 {
154 StopTimer();
155 retryTime_ = 0;
156 sequenceId_ = 1;
157 syncId_ = 0;
158 isAutoSync_ = false;
159 requestSessionId_ = 0;
160 isNeedRetry_ = NO_NEED_RETRY;
161 mode_ = SyncModeType::INVALID_MODE;
162 status_ = SyncOperation::OP_WAITING;
163 taskErrCode_ = E_OK;
164 packetId_ = 0;
165 isAutoSubscribe_ = false;
166 }
167
RemoveSyncOperation(int syncId)168 int SyncTaskContext::RemoveSyncOperation(int syncId)
169 {
170 std::lock_guard<std::mutex> lock(targetQueueLock_);
171 auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
172 [syncId](const ISyncTarget *target) {
173 if (target == nullptr) {
174 return false;
175 }
176 return target->GetSyncId() == syncId;
177 });
178 if (iter != requestTargetQueue_.end()) {
179 if (*iter != nullptr) {
180 delete *iter;
181 }
182 requestTargetQueue_.erase(iter);
183 return E_OK;
184 }
185 return -E_INVALID_ARGS;
186 }
187
ClearSyncTarget()188 void SyncTaskContext::ClearSyncTarget()
189 {
190 std::lock_guard<std::mutex> lock(targetQueueLock_);
191 for (auto &requestTarget : requestTargetQueue_) {
192 if (requestTarget != nullptr) {
193 delete requestTarget;
194 }
195 }
196 requestTargetQueue_.clear();
197
198 for (auto &responseTarget : responseTargetQueue_) {
199 if (responseTarget != nullptr) { // LCOV_EXCL_BR_LINE
200 delete responseTarget;
201 }
202 }
203 responseTargetQueue_.clear();
204 }
205
IsTargetQueueEmpty() const206 bool SyncTaskContext::IsTargetQueueEmpty() const
207 {
208 std::lock_guard<std::mutex> lock(targetQueueLock_);
209 return requestTargetQueue_.empty() && responseTargetQueue_.empty();
210 }
211
GetOperationStatus() const212 int SyncTaskContext::GetOperationStatus() const
213 {
214 std::lock_guard<std::mutex> lock(operationLock_);
215 if (syncOperation_ == nullptr) {
216 return SyncOperation::OP_FINISHED_ALL;
217 }
218 return syncOperation_->GetStatus(deviceId_);
219 }
220
SetMode(int mode)221 void SyncTaskContext::SetMode(int mode)
222 {
223 mode_ = mode;
224 }
225
GetMode() const226 int SyncTaskContext::GetMode() const
227 {
228 return mode_;
229 }
230
MoveToNextTarget(uint32_t timeout)231 void SyncTaskContext::MoveToNextTarget(uint32_t timeout)
232 {
233 ClearSyncOperation();
234 TaskParam param;
235 // call other system api without lock
236 param.timeout = timeout;
237 std::lock_guard<std::mutex> lock(targetQueueLock_);
238 while (!requestTargetQueue_.empty() || !responseTargetQueue_.empty()) {
239 ISyncTarget *tmpTarget = nullptr;
240 if (!requestTargetQueue_.empty()) {
241 tmpTarget = requestTargetQueue_.front();
242 requestTargetQueue_.pop_front();
243 } else {
244 tmpTarget = responseTargetQueue_.front();
245 responseTargetQueue_.pop_front();
246 }
247 if (tmpTarget == nullptr) {
248 LOGE("[SyncTaskContext][MoveToNextTarget] currentTarget is null skip!");
249 continue;
250 }
251 SyncOperation *tmpOperation = nullptr;
252 tmpTarget->GetSyncOperation(tmpOperation);
253 if ((tmpOperation != nullptr) && tmpOperation->IsKilled()) {
254 // if killed skip this syncOperation_.
255 delete tmpTarget;
256 tmpTarget = nullptr;
257 continue;
258 }
259 CopyTargetData(tmpTarget, param);
260 delete tmpTarget;
261 tmpTarget = nullptr;
262 break;
263 }
264 }
265
GetNextTarget(uint32_t timeout)266 int SyncTaskContext::GetNextTarget(uint32_t timeout)
267 {
268 MoveToNextTarget(timeout);
269 int checkErrCode = RunPermissionCheck(GetPermissionCheckFlag(IsAutoSync(), GetMode()));
270 if (checkErrCode != E_OK) {
271 SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
272 return checkErrCode;
273 }
274 return E_OK;
275 }
276
GetSyncId() const277 uint32_t SyncTaskContext::GetSyncId() const
278 {
279 return syncId_;
280 }
281
282 // Get the current task deviceId.
GetDeviceId() const283 std::string SyncTaskContext::GetDeviceId() const
284 {
285 return deviceId_;
286 }
287
SetTaskExecStatus(int status)288 void SyncTaskContext::SetTaskExecStatus(int status)
289 {
290 taskExecStatus_ = status;
291 }
292
GetTaskExecStatus() const293 int SyncTaskContext::GetTaskExecStatus() const
294 {
295 return taskExecStatus_;
296 }
297
IsAutoSync() const298 bool SyncTaskContext::IsAutoSync() const
299 {
300 return isAutoSync_;
301 }
302
StartTimer()303 int SyncTaskContext::StartTimer()
304 {
305 std::lock_guard<std::mutex> lockGuard(timerLock_);
306 if (timerId_ > 0) {
307 return -E_UNEXPECTED_DATA;
308 }
309 TimerId timerId = 0;
310 RefObject::IncObjRef(this);
311 TimerAction timeOutCallback = [this](TimerId id) { return TimeOut(id); };
312 int errCode = RuntimeContext::GetInstance()->SetTimer(timeout_, timeOutCallback,
313 [this]() {
314 int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(this); });
315 if (ret != E_OK) {
316 LOGE("[SyncTaskContext] timer finalizer ScheduleTask, errCode %d", ret);
317 }
318 }, timerId);
319 if (errCode != E_OK) {
320 RefObject::DecObjRef(this);
321 return errCode;
322 }
323 timerId_ = timerId;
324 return errCode;
325 }
326
StopTimer()327 void SyncTaskContext::StopTimer()
328 {
329 TimerId timerId;
330 {
331 std::lock_guard<std::mutex> lockGuard(timerLock_);
332 timerId = timerId_;
333 if (timerId_ == 0) {
334 return;
335 }
336 timerId_ = 0;
337 }
338 RuntimeContext::GetInstance()->RemoveTimer(timerId);
339 }
340
ModifyTimer(int milliSeconds)341 int SyncTaskContext::ModifyTimer(int milliSeconds)
342 {
343 std::lock_guard<std::mutex> lockGuard(timerLock_);
344 if (timerId_ == 0) {
345 return -E_UNEXPECTED_DATA;
346 }
347 return RuntimeContext::GetInstance()->ModifyTimer(timerId_, milliSeconds);
348 }
349
SetRetryTime(int retryTime)350 void SyncTaskContext::SetRetryTime(int retryTime)
351 {
352 retryTime_ = retryTime;
353 }
354
GetRetryTime() const355 int SyncTaskContext::GetRetryTime() const
356 {
357 return retryTime_;
358 }
359
SetRetryStatus(int isNeedRetry)360 void SyncTaskContext::SetRetryStatus(int isNeedRetry)
361 {
362 isNeedRetry_ = isNeedRetry;
363 }
364
GetRetryStatus() const365 int SyncTaskContext::GetRetryStatus() const
366 {
367 return isNeedRetry_;
368 }
369
GetTimerId() const370 TimerId SyncTaskContext::GetTimerId() const
371 {
372 return timerId_;
373 }
374
GetRequestSessionId() const375 uint32_t SyncTaskContext::GetRequestSessionId() const
376 {
377 return requestSessionId_;
378 }
379
IncSequenceId()380 void SyncTaskContext::IncSequenceId()
381 {
382 sequenceId_++;
383 }
384
GetSequenceId() const385 uint32_t SyncTaskContext::GetSequenceId() const
386 {
387 return sequenceId_;
388 }
389
ReSetSequenceId()390 void SyncTaskContext::ReSetSequenceId()
391 {
392 sequenceId_ = 1;
393 }
394
IncPacketId()395 void SyncTaskContext::IncPacketId()
396 {
397 packetId_++;
398 }
399
GetPacketId() const400 uint64_t SyncTaskContext::GetPacketId() const
401 {
402 return packetId_;
403 }
404
GetTimeoutTime() const405 int SyncTaskContext::GetTimeoutTime() const
406 {
407 return timeout_;
408 }
409
SetTimeoutCallback(const TimerAction & timeOutCallback)410 void SyncTaskContext::SetTimeoutCallback(const TimerAction &timeOutCallback)
411 {
412 timeOutCallback_ = timeOutCallback;
413 }
414
SetTimeOffset(TimeOffset offset)415 void SyncTaskContext::SetTimeOffset(TimeOffset offset)
416 {
417 timeOffset_ = offset;
418 }
419
GetTimeOffset() const420 TimeOffset SyncTaskContext::GetTimeOffset() const
421 {
422 return timeOffset_;
423 }
424
StartStateMachine()425 int SyncTaskContext::StartStateMachine()
426 {
427 return stateMachine_->StartSync();
428 }
429
ReceiveMessageCallback(Message * inMsg)430 int SyncTaskContext::ReceiveMessageCallback(Message *inMsg)
431 {
432 if (inMsg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
433 uint16_t remoteVersion = 0;
434 (void)communicator_->GetRemoteCommunicatorVersion(deviceId_, remoteVersion);
435 SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST + remoteVersion);
436 }
437 int errCode = E_OK;
438 if (IncUsedCount() == E_OK) {
439 errCode = stateMachine_->ReceiveMessageCallback(inMsg);
440 SafeExit();
441 }
442 return errCode;
443 }
444
RegOnSyncTask(const std::function<int (void)> & callback)445 void SyncTaskContext::RegOnSyncTask(const std::function<int(void)> &callback)
446 {
447 onSyncTaskAdd_ = callback;
448 }
449
IncUsedCount()450 int SyncTaskContext::IncUsedCount()
451 {
452 AutoLock lock(this);
453 if (IsKilled()) {
454 LOGI("[SyncTaskContext] IncUsedCount isKilled");
455 return -E_OBJ_IS_KILLED;
456 }
457 usedCount_++;
458 return E_OK;
459 }
460
SafeExit()461 void SyncTaskContext::SafeExit()
462 {
463 AutoLock lock(this);
464 usedCount_--;
465 if (usedCount_ < 1) {
466 safeKill_.notify_one();
467 }
468 }
469
GetCurrentLocalTime() const470 Timestamp SyncTaskContext::GetCurrentLocalTime() const
471 {
472 if (timeHelper_ == nullptr) {
473 return TimeHelper::INVALID_TIMESTAMP;
474 }
475 return timeHelper_->GetTime();
476 }
477
Abort(int status)478 void SyncTaskContext::Abort(int status)
479 {
480 (void)status;
481 Clear();
482 }
483
CommErrHandlerFunc(int errCode,ISyncTaskContext * context,int32_t sessionId,bool isDirectEnd)484 void SyncTaskContext::CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId, bool isDirectEnd)
485 {
486 {
487 std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
488 if (synTaskContextSet_.count(context) == 0) {
489 LOGI("[SyncTaskContext][CommErrHandle] context has been killed");
490 return;
491 }
492 // IncObjRef to maker sure context not been killed. after the lock_guard
493 RefObject::IncObjRef(context);
494 }
495
496 static_cast<SyncTaskContext *>(context)->CommErrHandlerFuncInner(errCode, static_cast<uint32_t>(sessionId),
497 isDirectEnd);
498 RefObject::DecObjRef(context);
499 }
500
SetRemoteSoftwareVersion(uint32_t version)501 void SyncTaskContext::SetRemoteSoftwareVersion(uint32_t version)
502 {
503 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
504 if (remoteSoftwareVersion_ == version) {
505 return;
506 }
507 remoteSoftwareVersion_ = version;
508 remoteSoftwareVersionId_++;
509 }
510
GetRemoteSoftwareVersion() const511 uint32_t SyncTaskContext::GetRemoteSoftwareVersion() const
512 {
513 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
514 return remoteSoftwareVersion_;
515 }
516
GetRemoteSoftwareVersionId() const517 uint64_t SyncTaskContext::GetRemoteSoftwareVersionId() const
518 {
519 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
520 return remoteSoftwareVersionId_;
521 }
522
IsCommNormal() const523 bool SyncTaskContext::IsCommNormal() const
524 {
525 return isCommNormal_;
526 }
527
CommErrHandlerFuncInner(int errCode,uint32_t sessionId,bool isDirectEnd)528 void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId, bool isDirectEnd)
529 {
530 {
531 RefObject::AutoLock lock(this);
532 if ((sessionId != requestSessionId_) || (requestSessionId_ == 0)) {
533 return;
534 }
535
536 if (errCode == E_OK) {
537 SetCommFailErrCode(errCode);
538 // when communicator sent message failed, the state machine will get the error and exit this sync task
539 // it seems unnecessary to change isCommNormal_ value, so just return here
540 return;
541 }
542 }
543 LOGE("[SyncTaskContext][CommErr] errCode %d, isDirectEnd %d", errCode, static_cast<int>(isDirectEnd));
544 if (!isDirectEnd) {
545 SetErrCodeWhenWaitTimeOut(errCode);
546 return;
547 }
548 if (errCode > 0) {
549 SetCommFailErrCode(static_cast<int>(SyncOperation::OP_COMM_ABNORMAL));
550 } else {
551 SetCommFailErrCode(errCode);
552 }
553 stateMachine_->CommErrAbort(sessionId);
554 }
555
TimeOut(TimerId id)556 int SyncTaskContext::TimeOut(TimerId id)
557 {
558 if (!timeOutCallback_) {
559 return E_OK;
560 }
561 IncObjRef(this);
562 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, id]() {
563 timeOutCallback_(id);
564 DecObjRef(this);
565 });
566 if (errCode != E_OK) {
567 LOGW("[SyncTaskContext][TimeOut] Trigger TimeOut Async Failed! TimerId=" PRIu64 " errCode=%d", id, errCode);
568 DecObjRef(this);
569 }
570 return E_OK;
571 }
572
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)573 void SyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
574 {
575 retryTime_ = 0;
576 mode_ = target->GetMode();
577 status_ = SyncOperation::OP_SYNCING;
578 isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
579 taskErrCode_ = E_OK;
580 packetId_ = 0;
581 isCommNormal_ = true; // reset comm status here
582 commErrCode_ = E_OK;
583 syncTaskRetryStatus_ = isSyncRetry_;
584 timeout_ = static_cast<int>(taskParam.timeout);
585 negotiationCount_ = 0;
586 target->GetSyncOperation(syncOperation_);
587 ReSetSequenceId();
588
589 if (syncOperation_ != nullptr) {
590 // IncRef for syncOperation_ to make sure syncOperation_ is valid, when setStatus
591 RefObject::IncObjRef(syncOperation_);
592 syncId_ = syncOperation_->GetSyncId();
593 isAutoSync_ = syncOperation_->IsAutoSync();
594 isAutoSubscribe_ = syncOperation_->IsAutoControlCmd();
595 if (isAutoSync_ || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
596 syncTaskRetryStatus_ = true;
597 }
598 requestSessionId_ = GenerateRequestSessionId();
599 LOGI("[SyncTaskContext][copyTarget] mode=%d,syncId=%d,isAutoSync=%d,isRetry=%d,dev=%s{private}",
600 mode_, syncId_, isAutoSync_, syncTaskRetryStatus_, deviceId_.c_str());
601 DBDfxAdapter::StartAsyncTrace(syncActionName_, static_cast<int>(syncId_));
602 } else {
603 isAutoSync_ = false;
604 LOGI("[SyncTaskContext][copyTarget] for response data dev %s{private},isRetry=%d", deviceId_.c_str(),
605 syncTaskRetryStatus_);
606 }
607 }
608
KillWait()609 void SyncTaskContext::KillWait()
610 {
611 StopTimer();
612 UnlockObj();
613 stateMachine_->NotifyClosing();
614 stateMachine_->AbortImmediately();
615 LockObj();
616 LOGW("[SyncTaskContext] Try to kill a context, now wait.");
617 bool noDeadLock = WaitLockedUntil(
618 safeKill_,
619 [this]() {
620 if (usedCount_ < 1) {
621 return true;
622 }
623 return false;
624 },
625 KILL_WAIT_SECONDS);
626 if (!noDeadLock) { // LCOV_EXCL_BR_LINE
627 LOGE("[SyncTaskContext] Dead lock may happen, we stop waiting the task exit.");
628 } else {
629 LOGW("[SyncTaskContext] Wait the task exit ok.");
630 }
631 std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
632 synTaskContextSet_.erase(this);
633 }
634
ClearSyncOperation()635 void SyncTaskContext::ClearSyncOperation()
636 {
637 std::lock_guard<std::mutex> lock(operationLock_);
638 if (syncOperation_ != nullptr) {
639 DBDfxAdapter::FinishAsyncTrace(syncActionName_, static_cast<int>(syncId_));
640 RefObject::DecObjRef(syncOperation_);
641 syncOperation_ = nullptr;
642 }
643 }
644
CancelCurrentSyncRetryIfNeed(int newTargetMode,uint32_t syncId)645 void SyncTaskContext::CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId)
646 {
647 AutoLock lock(this);
648 if (!isAutoSync_) {
649 return;
650 }
651 if (syncId_ >= syncId) {
652 return;
653 }
654 int mode = SyncOperation::TransferSyncMode(newTargetMode);
655 if (newTargetMode == mode_ || mode == SyncModeType::PUSH_AND_PULL) {
656 SetRetryTime(AUTO_RETRY_TIMES);
657 ModifyTimer(timeout_);
658 }
659 }
660
GetTaskErrCode() const661 int SyncTaskContext::GetTaskErrCode() const
662 {
663 return taskErrCode_;
664 }
665
SetTaskErrCode(int errCode)666 void SyncTaskContext::SetTaskErrCode(int errCode)
667 {
668 taskErrCode_ = errCode;
669 }
670
IsSyncTaskNeedRetry() const671 bool SyncTaskContext::IsSyncTaskNeedRetry() const
672 {
673 return syncTaskRetryStatus_;
674 }
675
SetSyncRetry(bool isRetry)676 void SyncTaskContext::SetSyncRetry(bool isRetry)
677 {
678 isSyncRetry_ = isRetry;
679 }
680
GetSyncRetryTimes() const681 int SyncTaskContext::GetSyncRetryTimes() const
682 {
683 if (IsAutoSync() || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
684 return AUTO_RETRY_TIMES;
685 }
686 return MANUAL_RETRY_TIMES;
687 }
688
GetSyncRetryTimeout(int retryTime) const689 int SyncTaskContext::GetSyncRetryTimeout(int retryTime) const
690 {
691 int timeoutTime = GetTimeoutTime();
692 if (IsAutoSync()) {
693 // set the new timeout value with 2 raised to the power of retryTime.
694 return timeoutTime * (1u << retryTime);
695 }
696 return timeoutTime;
697 }
698
ClearAllSyncTask()699 void SyncTaskContext::ClearAllSyncTask()
700 {
701 }
702
IsAutoLiftWaterMark() const703 bool SyncTaskContext::IsAutoLiftWaterMark() const
704 {
705 return negotiationCount_ < NEGOTIATION_LIMIT;
706 }
707
IncNegotiationCount()708 void SyncTaskContext::IncNegotiationCount()
709 {
710 negotiationCount_++;
711 }
712
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)713 bool SyncTaskContext::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
714 {
715 return stateMachine_->IsNeedTriggerQueryAutoSync(inMsg, query);
716 }
717
IsAutoSubscribe() const718 bool SyncTaskContext::IsAutoSubscribe() const
719 {
720 return isAutoSubscribe_;
721 }
722
IsCurrentSyncTaskCanBeSkipped() const723 bool SyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
724 {
725 return false;
726 }
727
ResetLastPushTaskStatus()728 void SyncTaskContext::ResetLastPushTaskStatus()
729 {
730 }
731
SchemaChange()732 void SyncTaskContext::SchemaChange()
733 {
734 if (stateMachine_ != nullptr) {
735 stateMachine_->SchemaChange();
736 }
737 }
738
Dump(int fd)739 void SyncTaskContext::Dump(int fd)
740 {
741 size_t totalSyncTaskCount = 0u;
742 size_t autoSyncTaskCount = 0u;
743 size_t reponseTaskCount = 0u;
744 {
745 std::lock_guard<std::mutex> lock(targetQueueLock_);
746 totalSyncTaskCount = requestTargetQueue_.size() + responseTargetQueue_.size();
747 for (const auto &target : requestTargetQueue_) {
748 if (target->IsAutoSync()) { // LCOV_EXCL_BR_LINE
749 autoSyncTaskCount++;
750 }
751 }
752 reponseTaskCount = responseTargetQueue_.size();
753 }
754 DBDumpHelper::Dump(fd, "\t\ttarget = %s, total sync task count = %zu, auto sync task count = %zu,"
755 " response task count = %zu\n",
756 deviceId_.c_str(), totalSyncTaskCount, autoSyncTaskCount, reponseTaskCount);
757 }
758
RunPermissionCheck(uint8_t flag) const759 int SyncTaskContext::RunPermissionCheck(uint8_t flag) const
760 {
761 std::string appId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
762 std::string userId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
763 std::string storeId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
764 std::string subUserId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::SUB_USER, "");
765 int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
766 int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
767 { userId, appId, storeId, deviceId_, subUserId, instanceId }, flag);
768 if (errCode != E_OK) {
769 LOGE("[SyncTaskContext] RunPermissionCheck not pass errCode:%d, flag:%d, %s{private}",
770 errCode, flag, deviceId_.c_str());
771 }
772 return errCode;
773 }
774
GetPermissionCheckFlag(bool isAutoSync,int syncMode)775 uint8_t SyncTaskContext::GetPermissionCheckFlag(bool isAutoSync, int syncMode)
776 {
777 uint8_t flag = 0;
778 int mode = SyncOperation::TransferSyncMode(syncMode);
779 if (mode == SyncModeType::PUSH || mode == SyncModeType::RESPONSE_PULL) {
780 flag = CHECK_FLAG_SEND;
781 } else if (mode == SyncModeType::PULL) {
782 flag = CHECK_FLAG_RECEIVE;
783 } else if (mode == SyncModeType::PUSH_AND_PULL) {
784 flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE;
785 }
786 if (isAutoSync) {
787 flag = flag | CHECK_FLAG_AUTOSYNC;
788 }
789 if (mode != SyncModeType::RESPONSE_PULL) {
790 // it means this sync is started by local
791 flag = flag | CHECK_FLAG_SPONSOR;
792 }
793 return flag;
794 }
795
AbortMachineIfNeed(uint32_t syncId)796 void SyncTaskContext::AbortMachineIfNeed(uint32_t syncId)
797 {
798 uint32_t sessionId = 0u;
799 {
800 RefObject::AutoLock autoLock(this);
801 if (syncId_ != syncId) {
802 return;
803 }
804 sessionId = requestSessionId_;
805 }
806 stateMachine_->InnerErrorAbort(sessionId);
807 }
808
GetAndIncSyncOperation() const809 SyncOperation *SyncTaskContext::GetAndIncSyncOperation() const
810 {
811 std::lock_guard<std::mutex> lock(operationLock_);
812 if (syncOperation_ == nullptr) {
813 return nullptr;
814 }
815 RefObject::IncObjRef(syncOperation_);
816 return syncOperation_;
817 }
818
GenerateRequestSessionId()819 uint32_t SyncTaskContext::GenerateRequestSessionId()
820 {
821 uint32_t sessionId = lastRequestSessionId_ != 0 ? lastRequestSessionId_ + 1 : 0;
822 // make sure sessionId is between 0x01 and 0x8fffffff
823 if (sessionId > SESSION_ID_MAX_VALUE || sessionId == 0) {
824 sessionId = Hash::Hash32Func(deviceId_ + std::to_string(syncId_) +
825 std::to_string(TimeHelper::GetSysCurrentTime()));
826 }
827 lastRequestSessionId_ = sessionId;
828 return sessionId;
829 }
830
IsSchemaCompatible() const831 bool SyncTaskContext::IsSchemaCompatible() const
832 {
833 return true;
834 }
835
SetDbAbility(DbAbility & remoteDbAbility)836 void SyncTaskContext::SetDbAbility([[gnu::unused]] DbAbility &remoteDbAbility)
837 {
838 }
839
TimeChange()840 void SyncTaskContext::TimeChange()
841 {
842 if (stateMachine_ == nullptr) {
843 LOGW("[SyncTaskContext] machine is null when time change");
844 return;
845 }
846 stateMachine_->TimeChange();
847 }
848
GetResponseTaskCount()849 int32_t SyncTaskContext::GetResponseTaskCount()
850 {
851 std::lock_guard<std::mutex> autoLock(targetQueueLock_);
852 return static_cast<int32_t>(responseTargetQueue_.size());
853 }
854
GetCommErrCode() const855 int SyncTaskContext::GetCommErrCode() const
856 {
857 return commErrCode_;
858 }
859
SetCommFailErrCode(int errCode)860 void SyncTaskContext::SetCommFailErrCode(int errCode)
861 {
862 commErrCode_ = errCode;
863 }
864
SetErrCodeWhenWaitTimeOut(int errCode)865 void SyncTaskContext::SetErrCodeWhenWaitTimeOut(int errCode)
866 {
867 if (errCode > 0) {
868 SetCommFailErrCode(static_cast<int>(SyncOperation::OP_TIMEOUT));
869 } else {
870 SetCommFailErrCode(errCode);
871 }
872 }
873 } // namespace DistributedDB
874