1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_task_context.h"
17
18 #include <algorithm>
19 #include <cmath>
20
21 #include "db_constant.h"
22 #include "db_dump_helper.h"
23 #include "db_dfx_adapter.h"
24 #include "db_errno.h"
25 #include "hash.h"
26 #include "isync_state_machine.h"
27 #include "log_print.h"
28 #include "time_helper.h"
29 #include "version.h"
30
31 namespace DistributedDB {
32 std::mutex SyncTaskContext::synTaskContextSetLock_;
33 std::set<ISyncTaskContext *> SyncTaskContext::synTaskContextSet_;
34
35 namespace {
36 constexpr int NEGOTIATION_LIMIT = 2;
37 constexpr uint32_t SESSION_ID_MAX_VALUE = 0x8fffffffu;
38 }
39
SyncTaskContext()40 SyncTaskContext::SyncTaskContext()
41 : syncOperation_(nullptr),
42 syncId_(0),
43 mode_(0),
44 isAutoSync_(false),
45 status_(0),
46 taskExecStatus_(0),
47 syncInterface_(nullptr),
48 communicator_(nullptr),
49 stateMachine_(nullptr),
50 requestSessionId_(0),
51 lastRequestSessionId_(0),
52 timeHelper_(nullptr),
53 remoteSoftwareVersion_(0),
54 remoteSoftwareVersionId_(0),
55 isCommNormal_(true),
56 taskErrCode_(E_OK),
57 commErrCode_(E_OK),
58 syncTaskRetryStatus_(false),
59 isSyncRetry_(false),
60 negotiationCount_(0),
61 isAutoSubscribe_(false)
62 {
63 }
64
~SyncTaskContext()65 SyncTaskContext::~SyncTaskContext()
66 {
67 if (stateMachine_ != nullptr) {
68 delete stateMachine_;
69 stateMachine_ = nullptr;
70 }
71 SyncTaskContext::ClearSyncOperation();
72 ClearSyncTarget();
73 syncInterface_ = nullptr;
74 communicator_ = nullptr;
75 }
76
AddSyncTarget(ISyncTarget * target)77 int SyncTaskContext::AddSyncTarget(ISyncTarget *target)
78 {
79 if (target == nullptr) {
80 return -E_INVALID_ARGS;
81 }
82 int targetMode = target->GetMode();
83 auto syncId = static_cast<uint32_t>(target->GetSyncId());
84 {
85 std::lock_guard<std::mutex> lock(targetQueueLock_);
86 if (target->GetTaskType() == ISyncTarget::REQUEST) {
87 requestTargetQueue_.push_back(target);
88 } else if (target->GetTaskType() == ISyncTarget::RESPONSE) {
89 responseTargetQueue_.push_back(target);
90 } else {
91 return -E_INVALID_ARGS;
92 }
93 }
94 RefObject::IncObjRef(this);
95 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetMode, syncId]() {
96 CancelCurrentSyncRetryIfNeed(targetMode, syncId);
97 RefObject::DecObjRef(this);
98 });
99 if (errCode != E_OK) {
100 RefObject::DecObjRef(this);
101 }
102 if (onSyncTaskAdd_) {
103 RefObject::IncObjRef(this);
104 errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
105 onSyncTaskAdd_();
106 RefObject::DecObjRef(this);
107 });
108 if (errCode != E_OK) {
109 RefObject::DecObjRef(this);
110 }
111 }
112 return E_OK;
113 }
114
SetOperationStatus(int status)115 void SyncTaskContext::SetOperationStatus(int status)
116 {
117 std::lock_guard<std::mutex> lock(operationLock_);
118 if (syncOperation_ == nullptr) {
119 LOGD("[SyncTaskContext][SetStatus] syncOperation is null");
120 return;
121 }
122 int finalStatus = status;
123
124 int operationStatus = syncOperation_->GetStatus(deviceId_);
125 if (status == SyncOperation::OP_SEND_FINISHED && operationStatus == SyncOperation::OP_RECV_FINISHED) {
126 if (GetTaskErrCode() == -E_EKEYREVOKED) { // LCOV_EXCL_BR_LINE
127 finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
128 } else {
129 finalStatus = SyncOperation::OP_FINISHED_ALL;
130 }
131 } else if (status == SyncOperation::OP_RECV_FINISHED && operationStatus == SyncOperation::OP_SEND_FINISHED) {
132 if (GetTaskErrCode() == -E_EKEYREVOKED) {
133 finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
134 } else {
135 finalStatus = SyncOperation::OP_FINISHED_ALL;
136 }
137 }
138 syncOperation_->SetStatus(deviceId_, finalStatus);
139 if (finalStatus >= SyncOperation::OP_FINISHED_ALL) {
140 SaveLastPushTaskExecStatus(finalStatus);
141 }
142 if (syncOperation_->CheckIsAllFinished()) {
143 syncOperation_->Finished();
144 }
145 }
146
SaveLastPushTaskExecStatus(int finalStatus)147 void SyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
148 {
149 (void)finalStatus;
150 }
151
Clear()152 void SyncTaskContext::Clear()
153 {
154 StopTimer();
155 retryTime_ = 0;
156 sequenceId_ = 1;
157 syncId_ = 0;
158 isAutoSync_ = false;
159 requestSessionId_ = 0;
160 isNeedRetry_ = NO_NEED_RETRY;
161 mode_ = SyncModeType::INVALID_MODE;
162 status_ = SyncOperation::OP_WAITING;
163 taskErrCode_ = E_OK;
164 packetId_ = 0;
165 isAutoSubscribe_ = false;
166 }
167
RemoveSyncOperation(int syncId)168 int SyncTaskContext::RemoveSyncOperation(int syncId)
169 {
170 std::lock_guard<std::mutex> lock(targetQueueLock_);
171 auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
172 [syncId](const ISyncTarget *target) {
173 if (target == nullptr) {
174 return false;
175 }
176 return target->GetSyncId() == syncId;
177 });
178 if (iter != requestTargetQueue_.end()) {
179 if (*iter != nullptr) {
180 delete *iter;
181 *iter = nullptr;
182 }
183 requestTargetQueue_.erase(iter);
184 return E_OK;
185 }
186 return -E_INVALID_ARGS;
187 }
188
ClearSyncTarget()189 void SyncTaskContext::ClearSyncTarget()
190 {
191 std::lock_guard<std::mutex> lock(targetQueueLock_);
192 for (auto &requestTarget : requestTargetQueue_) {
193 if (requestTarget != nullptr) {
194 delete requestTarget;
195 requestTarget = nullptr;
196 }
197 }
198 requestTargetQueue_.clear();
199
200 for (auto &responseTarget : responseTargetQueue_) {
201 if (responseTarget != nullptr) { // LCOV_EXCL_BR_LINE
202 delete responseTarget;
203 responseTarget = nullptr;
204 }
205 }
206 responseTargetQueue_.clear();
207 }
208
IsTargetQueueEmpty() const209 bool SyncTaskContext::IsTargetQueueEmpty() const
210 {
211 std::lock_guard<std::mutex> lock(targetQueueLock_);
212 return requestTargetQueue_.empty() && responseTargetQueue_.empty();
213 }
214
GetOperationStatus() const215 int SyncTaskContext::GetOperationStatus() const
216 {
217 std::lock_guard<std::mutex> lock(operationLock_);
218 if (syncOperation_ == nullptr) {
219 return SyncOperation::OP_FINISHED_ALL;
220 }
221 return syncOperation_->GetStatus(deviceId_);
222 }
223
SetMode(int mode)224 void SyncTaskContext::SetMode(int mode)
225 {
226 mode_ = mode;
227 }
228
GetMode() const229 int SyncTaskContext::GetMode() const
230 {
231 return mode_;
232 }
233
MoveToNextTarget()234 void SyncTaskContext::MoveToNextTarget()
235 {
236 ClearSyncOperation();
237 TaskParam param;
238 // call other system api without lock
239 param.timeout = communicator_->GetTimeout(deviceId_);
240 std::lock_guard<std::mutex> lock(targetQueueLock_);
241 while (!requestTargetQueue_.empty() || !responseTargetQueue_.empty()) {
242 ISyncTarget *tmpTarget = nullptr;
243 if (!requestTargetQueue_.empty()) {
244 tmpTarget = requestTargetQueue_.front();
245 requestTargetQueue_.pop_front();
246 } else {
247 tmpTarget = responseTargetQueue_.front();
248 responseTargetQueue_.pop_front();
249 }
250 if (tmpTarget == nullptr) {
251 LOGE("[SyncTaskContext][MoveToNextTarget] currentTarget is null skip!");
252 continue;
253 }
254 SyncOperation *tmpOperation = nullptr;
255 tmpTarget->GetSyncOperation(tmpOperation);
256 if ((tmpOperation != nullptr) && tmpOperation->IsKilled()) {
257 // if killed skip this syncOperation_.
258 delete tmpTarget;
259 tmpTarget = nullptr;
260 continue;
261 }
262 CopyTargetData(tmpTarget, param);
263 delete tmpTarget;
264 tmpTarget = nullptr;
265 break;
266 }
267 }
268
GetNextTarget()269 int SyncTaskContext::GetNextTarget()
270 {
271 MoveToNextTarget();
272 int checkErrCode = RunPermissionCheck(GetPermissionCheckFlag(IsAutoSync(), GetMode()));
273 if (checkErrCode != E_OK) {
274 SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
275 return checkErrCode;
276 }
277 return E_OK;
278 }
279
GetSyncId() const280 uint32_t SyncTaskContext::GetSyncId() const
281 {
282 return syncId_;
283 }
284
285 // Get the current task deviceId.
GetDeviceId() const286 std::string SyncTaskContext::GetDeviceId() const
287 {
288 return deviceId_;
289 }
290
SetTaskExecStatus(int status)291 void SyncTaskContext::SetTaskExecStatus(int status)
292 {
293 taskExecStatus_ = status;
294 }
295
GetTaskExecStatus() const296 int SyncTaskContext::GetTaskExecStatus() const
297 {
298 return taskExecStatus_;
299 }
300
IsAutoSync() const301 bool SyncTaskContext::IsAutoSync() const
302 {
303 return isAutoSync_;
304 }
305
StartTimer()306 int SyncTaskContext::StartTimer()
307 {
308 std::lock_guard<std::mutex> lockGuard(timerLock_);
309 if (timerId_ > 0) {
310 return -E_UNEXPECTED_DATA;
311 }
312 TimerId timerId = 0;
313 RefObject::IncObjRef(this);
314 TimerAction timeOutCallback = [this](TimerId id) { return TimeOut(id); };
315 int errCode = RuntimeContext::GetInstance()->SetTimer(timeout_, timeOutCallback,
316 [this]() {
317 int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(this); });
318 if (ret != E_OK) {
319 LOGE("[SyncTaskContext] timer finalizer ScheduleTask, errCode %d", ret);
320 }
321 }, timerId);
322 if (errCode != E_OK) {
323 RefObject::DecObjRef(this);
324 return errCode;
325 }
326 timerId_ = timerId;
327 return errCode;
328 }
329
StopTimer()330 void SyncTaskContext::StopTimer()
331 {
332 TimerId timerId;
333 {
334 std::lock_guard<std::mutex> lockGuard(timerLock_);
335 timerId = timerId_;
336 if (timerId_ == 0) {
337 return;
338 }
339 timerId_ = 0;
340 }
341 RuntimeContext::GetInstance()->RemoveTimer(timerId);
342 }
343
ModifyTimer(int milliSeconds)344 int SyncTaskContext::ModifyTimer(int milliSeconds)
345 {
346 std::lock_guard<std::mutex> lockGuard(timerLock_);
347 if (timerId_ == 0) {
348 return -E_UNEXPECTED_DATA;
349 }
350 return RuntimeContext::GetInstance()->ModifyTimer(timerId_, milliSeconds);
351 }
352
SetRetryTime(int retryTime)353 void SyncTaskContext::SetRetryTime(int retryTime)
354 {
355 retryTime_ = retryTime;
356 }
357
GetRetryTime() const358 int SyncTaskContext::GetRetryTime() const
359 {
360 return retryTime_;
361 }
362
SetRetryStatus(int isNeedRetry)363 void SyncTaskContext::SetRetryStatus(int isNeedRetry)
364 {
365 isNeedRetry_ = isNeedRetry;
366 }
367
GetRetryStatus() const368 int SyncTaskContext::GetRetryStatus() const
369 {
370 return isNeedRetry_;
371 }
372
GetTimerId() const373 TimerId SyncTaskContext::GetTimerId() const
374 {
375 return timerId_;
376 }
377
GetRequestSessionId() const378 uint32_t SyncTaskContext::GetRequestSessionId() const
379 {
380 return requestSessionId_;
381 }
382
IncSequenceId()383 void SyncTaskContext::IncSequenceId()
384 {
385 sequenceId_++;
386 }
387
GetSequenceId() const388 uint32_t SyncTaskContext::GetSequenceId() const
389 {
390 return sequenceId_;
391 }
392
ReSetSequenceId()393 void SyncTaskContext::ReSetSequenceId()
394 {
395 sequenceId_ = 1;
396 }
397
IncPacketId()398 void SyncTaskContext::IncPacketId()
399 {
400 packetId_++;
401 }
402
GetPacketId() const403 uint64_t SyncTaskContext::GetPacketId() const
404 {
405 return packetId_;
406 }
407
GetTimeoutTime() const408 int SyncTaskContext::GetTimeoutTime() const
409 {
410 return timeout_;
411 }
412
SetTimeoutCallback(const TimerAction & timeOutCallback)413 void SyncTaskContext::SetTimeoutCallback(const TimerAction &timeOutCallback)
414 {
415 timeOutCallback_ = timeOutCallback;
416 }
417
SetTimeOffset(TimeOffset offset)418 void SyncTaskContext::SetTimeOffset(TimeOffset offset)
419 {
420 timeOffset_ = offset;
421 }
422
GetTimeOffset() const423 TimeOffset SyncTaskContext::GetTimeOffset() const
424 {
425 return timeOffset_;
426 }
427
StartStateMachine()428 int SyncTaskContext::StartStateMachine()
429 {
430 return stateMachine_->StartSync();
431 }
432
ReceiveMessageCallback(Message * inMsg)433 int SyncTaskContext::ReceiveMessageCallback(Message *inMsg)
434 {
435 if (inMsg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
436 uint16_t remoteVersion = 0;
437 (void)communicator_->GetRemoteCommunicatorVersion(deviceId_, remoteVersion);
438 SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST + remoteVersion);
439 }
440 int errCode = E_OK;
441 if (IncUsedCount() == E_OK) {
442 errCode = stateMachine_->ReceiveMessageCallback(inMsg);
443 SafeExit();
444 }
445 return errCode;
446 }
447
RegOnSyncTask(const std::function<int (void)> & callback)448 void SyncTaskContext::RegOnSyncTask(const std::function<int(void)> &callback)
449 {
450 onSyncTaskAdd_ = callback;
451 }
452
IncUsedCount()453 int SyncTaskContext::IncUsedCount()
454 {
455 AutoLock lock(this);
456 if (IsKilled()) {
457 LOGI("[SyncTaskContext] IncUsedCount isKilled");
458 return -E_OBJ_IS_KILLED;
459 }
460 usedCount_++;
461 return E_OK;
462 }
463
SafeExit()464 void SyncTaskContext::SafeExit()
465 {
466 AutoLock lock(this);
467 usedCount_--;
468 if (usedCount_ < 1) {
469 safeKill_.notify_one();
470 }
471 }
472
GetCurrentLocalTime() const473 Timestamp SyncTaskContext::GetCurrentLocalTime() const
474 {
475 if (timeHelper_ == nullptr) {
476 return TimeHelper::INVALID_TIMESTAMP;
477 }
478 return timeHelper_->GetTime();
479 }
480
Abort(int status)481 void SyncTaskContext::Abort(int status)
482 {
483 (void)status;
484 Clear();
485 }
486
CommErrHandlerFunc(int errCode,ISyncTaskContext * context,int32_t sessionId,bool isDirectEnd)487 void SyncTaskContext::CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId, bool isDirectEnd)
488 {
489 {
490 std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
491 if (synTaskContextSet_.count(context) == 0) {
492 LOGI("[SyncTaskContext][CommErrHandle] context has been killed");
493 return;
494 }
495 // IncObjRef to maker sure context not been killed. after the lock_guard
496 RefObject::IncObjRef(context);
497 }
498
499 static_cast<SyncTaskContext *>(context)->CommErrHandlerFuncInner(errCode, static_cast<uint32_t>(sessionId),
500 isDirectEnd);
501 RefObject::DecObjRef(context);
502 }
503
SetRemoteSoftwareVersion(uint32_t version)504 void SyncTaskContext::SetRemoteSoftwareVersion(uint32_t version)
505 {
506 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
507 if (remoteSoftwareVersion_ == version) {
508 return;
509 }
510 remoteSoftwareVersion_ = version;
511 remoteSoftwareVersionId_++;
512 }
513
GetRemoteSoftwareVersion() const514 uint32_t SyncTaskContext::GetRemoteSoftwareVersion() const
515 {
516 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
517 return remoteSoftwareVersion_;
518 }
519
GetRemoteSoftwareVersionId() const520 uint64_t SyncTaskContext::GetRemoteSoftwareVersionId() const
521 {
522 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
523 return remoteSoftwareVersionId_;
524 }
525
IsCommNormal() const526 bool SyncTaskContext::IsCommNormal() const
527 {
528 return isCommNormal_;
529 }
530
CommErrHandlerFuncInner(int errCode,uint32_t sessionId,bool isDirectEnd)531 void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId, bool isDirectEnd)
532 {
533 {
534 RefObject::AutoLock lock(this);
535 if ((sessionId != requestSessionId_) || (requestSessionId_ == 0)) {
536 return;
537 }
538
539 if (errCode == E_OK) {
540 SetCommFailErrCode(errCode);
541 // when communicator sent message failed, the state machine will get the error and exit this sync task
542 // it seems unnecessary to change isCommNormal_ value, so just return here
543 return;
544 }
545 }
546 LOGE("[SyncTaskContext][CommErr] errCode %d, isDirectEnd %d", errCode, static_cast<int>(isDirectEnd));
547 if (!isDirectEnd) {
548 SetErrCodeWhenWaitTimeOut(errCode);
549 return;
550 }
551 if (errCode > 0) {
552 SetCommFailErrCode(static_cast<int>(COMM_FAILURE));
553 } else {
554 SetCommFailErrCode(errCode);
555 }
556 stateMachine_->CommErrAbort(sessionId);
557 }
558
TimeOut(TimerId id)559 int SyncTaskContext::TimeOut(TimerId id)
560 {
561 if (!timeOutCallback_) {
562 return E_OK;
563 }
564 IncObjRef(this);
565 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, id]() {
566 timeOutCallback_(id);
567 DecObjRef(this);
568 });
569 if (errCode != E_OK) {
570 LOGW("[SyncTaskContext][TimeOut] Trigger TimeOut Async Failed! TimerId=" PRIu64 " errCode=%d", id, errCode);
571 DecObjRef(this);
572 }
573 return E_OK;
574 }
575
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)576 void SyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
577 {
578 retryTime_ = 0;
579 mode_ = target->GetMode();
580 status_ = SyncOperation::OP_SYNCING;
581 isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
582 taskErrCode_ = E_OK;
583 packetId_ = 0;
584 isCommNormal_ = true; // reset comm status here
585 commErrCode_ = E_OK;
586 syncTaskRetryStatus_ = isSyncRetry_;
587 timeout_ = static_cast<int>(taskParam.timeout);
588 negotiationCount_ = 0;
589 target->GetSyncOperation(syncOperation_);
590 ReSetSequenceId();
591
592 if (syncOperation_ != nullptr) {
593 // IncRef for syncOperation_ to make sure syncOperation_ is valid, when setStatus
594 RefObject::IncObjRef(syncOperation_);
595 syncId_ = syncOperation_->GetSyncId();
596 isAutoSync_ = syncOperation_->IsAutoSync();
597 isAutoSubscribe_ = syncOperation_->IsAutoControlCmd();
598 if (isAutoSync_ || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
599 syncTaskRetryStatus_ = true;
600 }
601 requestSessionId_ = GenerateRequestSessionId();
602 LOGI("[SyncTaskContext][copyTarget] mode=%d,syncId=%d,isAutoSync=%d,isRetry=%d,dev=%s{private}",
603 mode_, syncId_, isAutoSync_, syncTaskRetryStatus_, deviceId_.c_str());
604 DBDfxAdapter::StartAsyncTrace(syncActionName_, static_cast<int>(syncId_));
605 } else {
606 isAutoSync_ = false;
607 LOGI("[SyncTaskContext][copyTarget] for response data dev %s{private},isRetry=%d", deviceId_.c_str(),
608 syncTaskRetryStatus_);
609 }
610 }
611
KillWait()612 void SyncTaskContext::KillWait()
613 {
614 StopTimer();
615 UnlockObj();
616 stateMachine_->NotifyClosing();
617 stateMachine_->AbortImmediately();
618 LockObj();
619 LOGW("[SyncTaskContext] Try to kill a context, now wait.");
620 bool noDeadLock = WaitLockedUntil(
621 safeKill_,
622 [this]() {
623 if (usedCount_ < 1) {
624 return true;
625 }
626 return false;
627 },
628 KILL_WAIT_SECONDS);
629 if (!noDeadLock) { // LCOV_EXCL_BR_LINE
630 LOGE("[SyncTaskContext] Dead lock may happen, we stop waiting the task exit.");
631 } else {
632 LOGW("[SyncTaskContext] Wait the task exit ok.");
633 }
634 std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
635 synTaskContextSet_.erase(this);
636 }
637
ClearSyncOperation()638 void SyncTaskContext::ClearSyncOperation()
639 {
640 std::lock_guard<std::mutex> lock(operationLock_);
641 if (syncOperation_ != nullptr) {
642 DBDfxAdapter::FinishAsyncTrace(syncActionName_, static_cast<int>(syncId_));
643 RefObject::DecObjRef(syncOperation_);
644 syncOperation_ = nullptr;
645 }
646 }
647
CancelCurrentSyncRetryIfNeed(int newTargetMode,uint32_t syncId)648 void SyncTaskContext::CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId)
649 {
650 AutoLock lock(this);
651 if (!isAutoSync_) {
652 return;
653 }
654 if (syncId_ >= syncId) {
655 return;
656 }
657 int mode = SyncOperation::TransferSyncMode(newTargetMode);
658 if (newTargetMode == mode_ || mode == SyncModeType::PUSH_AND_PULL) {
659 SetRetryTime(AUTO_RETRY_TIMES);
660 ModifyTimer(timeout_);
661 }
662 }
663
GetTaskErrCode() const664 int SyncTaskContext::GetTaskErrCode() const
665 {
666 return taskErrCode_;
667 }
668
SetTaskErrCode(int errCode)669 void SyncTaskContext::SetTaskErrCode(int errCode)
670 {
671 taskErrCode_ = errCode;
672 }
673
IsSyncTaskNeedRetry() const674 bool SyncTaskContext::IsSyncTaskNeedRetry() const
675 {
676 return syncTaskRetryStatus_;
677 }
678
SetSyncRetry(bool isRetry)679 void SyncTaskContext::SetSyncRetry(bool isRetry)
680 {
681 isSyncRetry_ = isRetry;
682 }
683
GetSyncRetryTimes() const684 int SyncTaskContext::GetSyncRetryTimes() const
685 {
686 if (IsAutoSync() || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
687 return AUTO_RETRY_TIMES;
688 }
689 return MANUAL_RETRY_TIMES;
690 }
691
GetSyncRetryTimeout(int retryTime) const692 int SyncTaskContext::GetSyncRetryTimeout(int retryTime) const
693 {
694 int timeoutTime = GetTimeoutTime();
695 if (IsAutoSync()) {
696 // set the new timeout value with 2 raised to the power of retryTime.
697 return timeoutTime * (1u << retryTime);
698 }
699 return timeoutTime;
700 }
701
ClearAllSyncTask()702 void SyncTaskContext::ClearAllSyncTask()
703 {
704 }
705
IsAutoLiftWaterMark() const706 bool SyncTaskContext::IsAutoLiftWaterMark() const
707 {
708 return negotiationCount_ < NEGOTIATION_LIMIT;
709 }
710
IncNegotiationCount()711 void SyncTaskContext::IncNegotiationCount()
712 {
713 negotiationCount_++;
714 }
715
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)716 bool SyncTaskContext::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
717 {
718 return stateMachine_->IsNeedTriggerQueryAutoSync(inMsg, query);
719 }
720
IsAutoSubscribe() const721 bool SyncTaskContext::IsAutoSubscribe() const
722 {
723 return isAutoSubscribe_;
724 }
725
IsCurrentSyncTaskCanBeSkipped() const726 bool SyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
727 {
728 return false;
729 }
730
ResetLastPushTaskStatus()731 void SyncTaskContext::ResetLastPushTaskStatus()
732 {
733 }
734
SchemaChange()735 void SyncTaskContext::SchemaChange()
736 {
737 if (stateMachine_ != nullptr) {
738 stateMachine_->SchemaChange();
739 }
740 }
741
Dump(int fd)742 void SyncTaskContext::Dump(int fd)
743 {
744 size_t totalSyncTaskCount = 0u;
745 size_t autoSyncTaskCount = 0u;
746 size_t reponseTaskCount = 0u;
747 {
748 std::lock_guard<std::mutex> lock(targetQueueLock_);
749 totalSyncTaskCount = requestTargetQueue_.size() + responseTargetQueue_.size();
750 for (const auto &target : requestTargetQueue_) {
751 if (target->IsAutoSync()) { // LCOV_EXCL_BR_LINE
752 autoSyncTaskCount++;
753 }
754 }
755 reponseTaskCount = responseTargetQueue_.size();
756 }
757 DBDumpHelper::Dump(fd, "\t\ttarget = %s, total sync task count = %zu, auto sync task count = %zu,"
758 " response task count = %zu\n",
759 deviceId_.c_str(), totalSyncTaskCount, autoSyncTaskCount, reponseTaskCount);
760 }
761
RunPermissionCheck(uint8_t flag) const762 int SyncTaskContext::RunPermissionCheck(uint8_t flag) const
763 {
764 std::string appId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
765 std::string userId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
766 std::string storeId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
767 int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
768 int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
769 { userId, appId, storeId, deviceId_, instanceId }, flag);
770 if (errCode != E_OK) {
771 LOGE("[SyncTaskContext] RunPermissionCheck not pass errCode:%d, flag:%d, %s{private}",
772 errCode, flag, deviceId_.c_str());
773 }
774 return errCode;
775 }
776
GetPermissionCheckFlag(bool isAutoSync,int syncMode)777 uint8_t SyncTaskContext::GetPermissionCheckFlag(bool isAutoSync, int syncMode)
778 {
779 uint8_t flag = 0;
780 int mode = SyncOperation::TransferSyncMode(syncMode);
781 if (mode == SyncModeType::PUSH || mode == SyncModeType::RESPONSE_PULL) {
782 flag = CHECK_FLAG_SEND;
783 } else if (mode == SyncModeType::PULL) {
784 flag = CHECK_FLAG_RECEIVE;
785 } else if (mode == SyncModeType::PUSH_AND_PULL) {
786 flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE;
787 }
788 if (isAutoSync) {
789 flag = flag | CHECK_FLAG_AUTOSYNC;
790 }
791 if (mode != SyncModeType::RESPONSE_PULL) {
792 // it means this sync is started by local
793 flag = flag | CHECK_FLAG_SPONSOR;
794 }
795 return flag;
796 }
797
AbortMachineIfNeed(uint32_t syncId)798 void SyncTaskContext::AbortMachineIfNeed(uint32_t syncId)
799 {
800 uint32_t sessionId = 0u;
801 {
802 RefObject::AutoLock autoLock(this);
803 if (syncId_ != syncId) {
804 return;
805 }
806 sessionId = requestSessionId_;
807 }
808 stateMachine_->InnerErrorAbort(sessionId);
809 }
810
GetAndIncSyncOperation() const811 SyncOperation *SyncTaskContext::GetAndIncSyncOperation() const
812 {
813 std::lock_guard<std::mutex> lock(operationLock_);
814 if (syncOperation_ == nullptr) {
815 return nullptr;
816 }
817 RefObject::IncObjRef(syncOperation_);
818 return syncOperation_;
819 }
820
GenerateRequestSessionId()821 uint32_t SyncTaskContext::GenerateRequestSessionId()
822 {
823 uint32_t sessionId = lastRequestSessionId_ != 0 ? lastRequestSessionId_ + 1 : 0;
824 // make sure sessionId is between 0x01 and 0x8fffffff
825 if (sessionId > SESSION_ID_MAX_VALUE || sessionId == 0) {
826 sessionId = Hash::Hash32Func(deviceId_ + std::to_string(syncId_) +
827 std::to_string(TimeHelper::GetSysCurrentTime()));
828 }
829 lastRequestSessionId_ = sessionId;
830 return sessionId;
831 }
832
IsSchemaCompatible() const833 bool SyncTaskContext::IsSchemaCompatible() const
834 {
835 return true;
836 }
837
SetDbAbility(DbAbility & remoteDbAbility)838 void SyncTaskContext::SetDbAbility([[gnu::unused]] DbAbility &remoteDbAbility)
839 {
840 }
841
TimeChange()842 void SyncTaskContext::TimeChange()
843 {
844 if (stateMachine_ == nullptr) {
845 LOGW("[SyncTaskContext] machine is null when time change");
846 return;
847 }
848 stateMachine_->TimeChange();
849 }
850
GetResponseTaskCount()851 int32_t SyncTaskContext::GetResponseTaskCount()
852 {
853 std::lock_guard<std::mutex> autoLock(targetQueueLock_);
854 return static_cast<int32_t>(responseTargetQueue_.size());
855 }
856
GetCommErrCode() const857 int SyncTaskContext::GetCommErrCode() const
858 {
859 return commErrCode_;
860 }
861
SetCommFailErrCode(int errCode)862 void SyncTaskContext::SetCommFailErrCode(int errCode)
863 {
864 commErrCode_ = errCode;
865 }
866
SetErrCodeWhenWaitTimeOut(int errCode)867 void SyncTaskContext::SetErrCodeWhenWaitTimeOut(int errCode)
868 {
869 if (errCode > 0) {
870 SetCommFailErrCode(static_cast<int>(TIME_OUT));
871 } else {
872 SetCommFailErrCode(errCode);
873 }
874 }
875 } // namespace DistributedDB
876