1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_task_context.h"
17
18 #include <algorithm>
19 #include <cmath>
20
21 #include "db_constant.h"
22 #include "db_dump_helper.h"
23 #include "db_dfx_adapter.h"
24 #include "db_errno.h"
25 #include "hash.h"
26 #include "isync_state_machine.h"
27 #include "log_print.h"
28 #include "time_helper.h"
29 #include "version.h"
30
31 namespace DistributedDB {
32 std::mutex SyncTaskContext::synTaskContextSetLock_;
33 std::set<ISyncTaskContext *> SyncTaskContext::synTaskContextSet_;
34
35 namespace {
36 constexpr int NEGOTIATION_LIMIT = 2;
37 constexpr uint32_t SESSION_ID_MAX_VALUE = 0x8fffffffu;
38 }
39
SyncTaskContext()40 SyncTaskContext::SyncTaskContext()
41 : syncOperation_(nullptr),
42 syncId_(0),
43 mode_(0),
44 isAutoSync_(false),
45 status_(0),
46 taskExecStatus_(0),
47 syncInterface_(nullptr),
48 communicator_(nullptr),
49 stateMachine_(nullptr),
50 requestSessionId_(0),
51 lastRequestSessionId_(0),
52 timeHelper_(nullptr),
53 remoteSoftwareVersion_(0),
54 remoteSoftwareVersionId_(0),
55 isCommNormal_(true),
56 taskErrCode_(E_OK),
57 syncTaskRetryStatus_(false),
58 isSyncRetry_(false),
59 negotiationCount_(0),
60 isAutoSubscribe_(false)
61 {
62 }
63
~SyncTaskContext()64 SyncTaskContext::~SyncTaskContext()
65 {
66 if (stateMachine_ != nullptr) {
67 delete stateMachine_;
68 stateMachine_ = nullptr;
69 }
70 SyncTaskContext::ClearSyncOperation();
71 ClearSyncTarget();
72 syncInterface_ = nullptr;
73 communicator_ = nullptr;
74 }
75
AddSyncTarget(ISyncTarget * target)76 int SyncTaskContext::AddSyncTarget(ISyncTarget *target)
77 {
78 if (target == nullptr) {
79 return -E_INVALID_ARGS;
80 }
81 int targetMode = target->GetMode();
82 auto syncId = static_cast<uint32_t>(target->GetSyncId());
83 {
84 std::lock_guard<std::mutex> lock(targetQueueLock_);
85 if (target->GetTaskType() == ISyncTarget::REQUEST) {
86 requestTargetQueue_.push_back(target);
87 } else if (target->GetTaskType() == ISyncTarget::RESPONSE) {
88 responseTargetQueue_.push_back(target);
89 } else {
90 return -E_INVALID_ARGS;
91 }
92 }
93 RefObject::IncObjRef(this);
94 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetMode, syncId]() {
95 CancelCurrentSyncRetryIfNeed(targetMode, syncId);
96 RefObject::DecObjRef(this);
97 });
98 if (errCode != E_OK) {
99 RefObject::DecObjRef(this);
100 }
101 if (onSyncTaskAdd_) {
102 RefObject::IncObjRef(this);
103 errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
104 onSyncTaskAdd_();
105 RefObject::DecObjRef(this);
106 });
107 if (errCode != E_OK) {
108 RefObject::DecObjRef(this);
109 }
110 }
111 return E_OK;
112 }
113
SetOperationStatus(int status)114 void SyncTaskContext::SetOperationStatus(int status)
115 {
116 std::lock_guard<std::mutex> lock(operationLock_);
117 if (syncOperation_ == nullptr) {
118 LOGD("[SyncTaskContext][SetStatus] syncOperation is null");
119 return;
120 }
121 int finalStatus = status;
122
123 int operationStatus = syncOperation_->GetStatus(deviceId_);
124 if (status == SyncOperation::OP_SEND_FINISHED && operationStatus == SyncOperation::OP_RECV_FINISHED) {
125 if (GetTaskErrCode() == -E_EKEYREVOKED) { // LCOV_EXCL_BR_LINE
126 finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
127 } else {
128 finalStatus = SyncOperation::OP_FINISHED_ALL;
129 }
130 } else if (status == SyncOperation::OP_RECV_FINISHED && operationStatus == SyncOperation::OP_SEND_FINISHED) {
131 if (GetTaskErrCode() == -E_EKEYREVOKED) {
132 finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
133 } else {
134 finalStatus = SyncOperation::OP_FINISHED_ALL;
135 }
136 }
137 syncOperation_->SetStatus(deviceId_, finalStatus);
138 if (finalStatus >= SyncOperation::OP_FINISHED_ALL) {
139 SaveLastPushTaskExecStatus(finalStatus);
140 }
141 if (syncOperation_->CheckIsAllFinished()) {
142 syncOperation_->Finished();
143 }
144 }
145
SaveLastPushTaskExecStatus(int finalStatus)146 void SyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
147 {
148 (void)finalStatus;
149 }
150
Clear()151 void SyncTaskContext::Clear()
152 {
153 StopTimer();
154 retryTime_ = 0;
155 sequenceId_ = 1;
156 syncId_ = 0;
157 isAutoSync_ = false;
158 requestSessionId_ = 0;
159 isNeedRetry_ = NO_NEED_RETRY;
160 mode_ = SyncModeType::INVALID_MODE;
161 status_ = SyncOperation::OP_WAITING;
162 taskErrCode_ = E_OK;
163 packetId_ = 0;
164 isAutoSubscribe_ = false;
165 }
166
RemoveSyncOperation(int syncId)167 int SyncTaskContext::RemoveSyncOperation(int syncId)
168 {
169 std::lock_guard<std::mutex> lock(targetQueueLock_);
170 auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
171 [syncId](const ISyncTarget *target) {
172 if (target == nullptr) {
173 return false;
174 }
175 return target->GetSyncId() == syncId;
176 });
177 if (iter != requestTargetQueue_.end()) {
178 if (*iter != nullptr) {
179 delete *iter;
180 *iter = nullptr;
181 }
182 requestTargetQueue_.erase(iter);
183 return E_OK;
184 }
185 return -E_INVALID_ARGS;
186 }
187
ClearSyncTarget()188 void SyncTaskContext::ClearSyncTarget()
189 {
190 std::lock_guard<std::mutex> lock(targetQueueLock_);
191 for (auto &requestTarget : requestTargetQueue_) {
192 if (requestTarget != nullptr) {
193 delete requestTarget;
194 requestTarget = nullptr;
195 }
196 }
197 requestTargetQueue_.clear();
198
199 for (auto &responseTarget : responseTargetQueue_) {
200 if (responseTarget != nullptr) { // LCOV_EXCL_BR_LINE
201 delete responseTarget;
202 responseTarget = nullptr;
203 }
204 }
205 responseTargetQueue_.clear();
206 }
207
IsTargetQueueEmpty() const208 bool SyncTaskContext::IsTargetQueueEmpty() const
209 {
210 std::lock_guard<std::mutex> lock(targetQueueLock_);
211 return requestTargetQueue_.empty() && responseTargetQueue_.empty();
212 }
213
GetOperationStatus() const214 int SyncTaskContext::GetOperationStatus() const
215 {
216 std::lock_guard<std::mutex> lock(operationLock_);
217 if (syncOperation_ == nullptr) {
218 return SyncOperation::OP_FINISHED_ALL;
219 }
220 return syncOperation_->GetStatus(deviceId_);
221 }
222
SetMode(int mode)223 void SyncTaskContext::SetMode(int mode)
224 {
225 mode_ = mode;
226 }
227
GetMode() const228 int SyncTaskContext::GetMode() const
229 {
230 return mode_;
231 }
232
MoveToNextTarget()233 void SyncTaskContext::MoveToNextTarget()
234 {
235 ClearSyncOperation();
236 TaskParam param;
237 // call other system api without lock
238 param.timeout = communicator_->GetTimeout(deviceId_);
239 std::lock_guard<std::mutex> lock(targetQueueLock_);
240 while (!requestTargetQueue_.empty() || !responseTargetQueue_.empty()) {
241 ISyncTarget *tmpTarget = nullptr;
242 if (!requestTargetQueue_.empty()) {
243 tmpTarget = requestTargetQueue_.front();
244 requestTargetQueue_.pop_front();
245 } else {
246 tmpTarget = responseTargetQueue_.front();
247 responseTargetQueue_.pop_front();
248 }
249 if (tmpTarget == nullptr) {
250 LOGE("[SyncTaskContext][MoveToNextTarget] currentTarget is null skip!");
251 continue;
252 }
253 SyncOperation *tmpOperation = nullptr;
254 tmpTarget->GetSyncOperation(tmpOperation);
255 if ((tmpOperation != nullptr) && tmpOperation->IsKilled()) {
256 // if killed skip this syncOperation_.
257 delete tmpTarget;
258 tmpTarget = nullptr;
259 continue;
260 }
261 CopyTargetData(tmpTarget, param);
262 delete tmpTarget;
263 tmpTarget = nullptr;
264 break;
265 }
266 }
267
GetNextTarget()268 int SyncTaskContext::GetNextTarget()
269 {
270 MoveToNextTarget();
271 int checkErrCode = RunPermissionCheck(GetPermissionCheckFlag(IsAutoSync(), GetMode()));
272 if (checkErrCode != E_OK) {
273 SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
274 return checkErrCode;
275 }
276 return E_OK;
277 }
278
GetSyncId() const279 uint32_t SyncTaskContext::GetSyncId() const
280 {
281 return syncId_;
282 }
283
284 // Get the current task deviceId.
GetDeviceId() const285 std::string SyncTaskContext::GetDeviceId() const
286 {
287 return deviceId_;
288 }
289
SetTaskExecStatus(int status)290 void SyncTaskContext::SetTaskExecStatus(int status)
291 {
292 taskExecStatus_ = status;
293 }
294
GetTaskExecStatus() const295 int SyncTaskContext::GetTaskExecStatus() const
296 {
297 return taskExecStatus_;
298 }
299
IsAutoSync() const300 bool SyncTaskContext::IsAutoSync() const
301 {
302 return isAutoSync_;
303 }
304
StartTimer()305 int SyncTaskContext::StartTimer()
306 {
307 std::lock_guard<std::mutex> lockGuard(timerLock_);
308 if (timerId_ > 0) {
309 return -E_UNEXPECTED_DATA;
310 }
311 TimerId timerId = 0;
312 RefObject::IncObjRef(this);
313 TimerAction timeOutCallback = [this](TimerId id) { return TimeOut(id); };
314 int errCode = RuntimeContext::GetInstance()->SetTimer(timeout_, timeOutCallback,
315 [this]() {
316 int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(this); });
317 if (ret != E_OK) {
318 LOGE("[SyncTaskContext] timer finalizer ScheduleTask, errCode %d", ret);
319 }
320 }, timerId);
321 if (errCode != E_OK) {
322 RefObject::DecObjRef(this);
323 return errCode;
324 }
325 timerId_ = timerId;
326 return errCode;
327 }
328
StopTimer()329 void SyncTaskContext::StopTimer()
330 {
331 TimerId timerId;
332 {
333 std::lock_guard<std::mutex> lockGuard(timerLock_);
334 timerId = timerId_;
335 if (timerId_ == 0) {
336 return;
337 }
338 timerId_ = 0;
339 }
340 RuntimeContext::GetInstance()->RemoveTimer(timerId);
341 }
342
ModifyTimer(int milliSeconds)343 int SyncTaskContext::ModifyTimer(int milliSeconds)
344 {
345 std::lock_guard<std::mutex> lockGuard(timerLock_);
346 if (timerId_ == 0) {
347 return -E_UNEXPECTED_DATA;
348 }
349 return RuntimeContext::GetInstance()->ModifyTimer(timerId_, milliSeconds);
350 }
351
SetRetryTime(int retryTime)352 void SyncTaskContext::SetRetryTime(int retryTime)
353 {
354 retryTime_ = retryTime;
355 }
356
GetRetryTime() const357 int SyncTaskContext::GetRetryTime() const
358 {
359 return retryTime_;
360 }
361
SetRetryStatus(int isNeedRetry)362 void SyncTaskContext::SetRetryStatus(int isNeedRetry)
363 {
364 isNeedRetry_ = isNeedRetry;
365 }
366
GetRetryStatus() const367 int SyncTaskContext::GetRetryStatus() const
368 {
369 return isNeedRetry_;
370 }
371
GetTimerId() const372 TimerId SyncTaskContext::GetTimerId() const
373 {
374 return timerId_;
375 }
376
GetRequestSessionId() const377 uint32_t SyncTaskContext::GetRequestSessionId() const
378 {
379 return requestSessionId_;
380 }
381
IncSequenceId()382 void SyncTaskContext::IncSequenceId()
383 {
384 sequenceId_++;
385 }
386
GetSequenceId() const387 uint32_t SyncTaskContext::GetSequenceId() const
388 {
389 return sequenceId_;
390 }
391
ReSetSequenceId()392 void SyncTaskContext::ReSetSequenceId()
393 {
394 sequenceId_ = 1;
395 }
396
IncPacketId()397 void SyncTaskContext::IncPacketId()
398 {
399 packetId_++;
400 }
401
GetPacketId() const402 uint64_t SyncTaskContext::GetPacketId() const
403 {
404 return packetId_;
405 }
406
GetTimeoutTime() const407 int SyncTaskContext::GetTimeoutTime() const
408 {
409 return timeout_;
410 }
411
SetTimeoutCallback(const TimerAction & timeOutCallback)412 void SyncTaskContext::SetTimeoutCallback(const TimerAction &timeOutCallback)
413 {
414 timeOutCallback_ = timeOutCallback;
415 }
416
SetTimeOffset(TimeOffset offset)417 void SyncTaskContext::SetTimeOffset(TimeOffset offset)
418 {
419 timeOffset_ = offset;
420 }
421
GetTimeOffset() const422 TimeOffset SyncTaskContext::GetTimeOffset() const
423 {
424 return timeOffset_;
425 }
426
StartStateMachine()427 int SyncTaskContext::StartStateMachine()
428 {
429 return stateMachine_->StartSync();
430 }
431
ReceiveMessageCallback(Message * inMsg)432 int SyncTaskContext::ReceiveMessageCallback(Message *inMsg)
433 {
434 if (GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE && inMsg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
435 uint16_t remoteVersion = 0;
436 (void)communicator_->GetRemoteCommunicatorVersion(deviceId_, remoteVersion);
437 SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST + remoteVersion);
438 }
439 int errCode = E_OK;
440 if (IncUsedCount() == E_OK) {
441 errCode = stateMachine_->ReceiveMessageCallback(inMsg);
442 SafeExit();
443 }
444 return errCode;
445 }
446
RegOnSyncTask(const std::function<int (void)> & callback)447 void SyncTaskContext::RegOnSyncTask(const std::function<int(void)> &callback)
448 {
449 onSyncTaskAdd_ = callback;
450 }
451
IncUsedCount()452 int SyncTaskContext::IncUsedCount()
453 {
454 AutoLock lock(this);
455 if (IsKilled()) {
456 LOGI("[SyncTaskContext] IncUsedCount isKilled");
457 return -E_OBJ_IS_KILLED;
458 }
459 usedCount_++;
460 return E_OK;
461 }
462
SafeExit()463 void SyncTaskContext::SafeExit()
464 {
465 AutoLock lock(this);
466 usedCount_--;
467 if (usedCount_ < 1) {
468 safeKill_.notify_one();
469 }
470 }
471
GetCurrentLocalTime() const472 Timestamp SyncTaskContext::GetCurrentLocalTime() const
473 {
474 if (timeHelper_ == nullptr) {
475 return TimeHelper::INVALID_TIMESTAMP;
476 }
477 return timeHelper_->GetTime();
478 }
479
Abort(int status)480 void SyncTaskContext::Abort(int status)
481 {
482 (void)status;
483 Clear();
484 }
485
CommErrHandlerFunc(int errCode,ISyncTaskContext * context,int32_t sessionId)486 void SyncTaskContext::CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId)
487 {
488 {
489 std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
490 if (synTaskContextSet_.count(context) == 0) {
491 LOGI("[SyncTaskContext][CommErrHandle] context has been killed");
492 return;
493 }
494 // IncObjRef to maker sure context not been killed. after the lock_guard
495 RefObject::IncObjRef(context);
496 }
497
498 static_cast<SyncTaskContext *>(context)->CommErrHandlerFuncInner(errCode, static_cast<uint32_t>(sessionId));
499 RefObject::DecObjRef(context);
500 }
501
SetRemoteSoftwareVersion(uint32_t version)502 void SyncTaskContext::SetRemoteSoftwareVersion(uint32_t version)
503 {
504 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
505 remoteSoftwareVersion_ = version;
506 remoteSoftwareVersionId_++;
507 }
508
GetRemoteSoftwareVersion() const509 uint32_t SyncTaskContext::GetRemoteSoftwareVersion() const
510 {
511 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
512 return remoteSoftwareVersion_;
513 }
514
GetRemoteSoftwareVersionId() const515 uint64_t SyncTaskContext::GetRemoteSoftwareVersionId() const
516 {
517 std::lock_guard<std::mutex> lock(remoteSoftwareVersionLock_);
518 return remoteSoftwareVersionId_;
519 }
520
IsCommNormal() const521 bool SyncTaskContext::IsCommNormal() const
522 {
523 return isCommNormal_;
524 }
525
CommErrHandlerFuncInner(int errCode,uint32_t sessionId)526 void SyncTaskContext::CommErrHandlerFuncInner(int errCode, uint32_t sessionId)
527 {
528 {
529 RefObject::AutoLock lock(this);
530 if ((sessionId != requestSessionId_) || (requestSessionId_ == 0)) {
531 return;
532 }
533
534 if (errCode == E_OK) {
535 // when communicator sent message failed, the state machine will get the error and exit this sync task
536 // it seems unnecessary to change isCommNormal_ value, so just return here
537 return;
538 }
539 }
540 LOGE("[SyncTaskContext][CommErr] errCode %d", errCode);
541 stateMachine_->CommErrAbort(sessionId);
542 }
543
TimeOut(TimerId id)544 int SyncTaskContext::TimeOut(TimerId id)
545 {
546 if (!timeOutCallback_) {
547 return E_OK;
548 }
549 IncObjRef(this);
550 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, id]() {
551 timeOutCallback_(id);
552 DecObjRef(this);
553 });
554 if (errCode != E_OK) {
555 LOGW("[SyncTaskContext][TimeOut] Trigger TimeOut Async Failed! TimerId=" PRIu64 " errCode=%d", id, errCode);
556 DecObjRef(this);
557 }
558 return E_OK;
559 }
560
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)561 void SyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
562 {
563 retryTime_ = 0;
564 mode_ = target->GetMode();
565 status_ = SyncOperation::OP_SYNCING;
566 isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
567 taskErrCode_ = E_OK;
568 packetId_ = 0;
569 isCommNormal_ = true; // reset comm status here
570 syncTaskRetryStatus_ = isSyncRetry_;
571 timeout_ = static_cast<int>(taskParam.timeout);
572 negotiationCount_ = 0;
573 target->GetSyncOperation(syncOperation_);
574 ReSetSequenceId();
575
576 if (syncOperation_ != nullptr) {
577 // IncRef for syncOperation_ to make sure syncOperation_ is valid, when setStatus
578 RefObject::IncObjRef(syncOperation_);
579 syncId_ = syncOperation_->GetSyncId();
580 isAutoSync_ = syncOperation_->IsAutoSync();
581 isAutoSubscribe_ = syncOperation_->IsAutoControlCmd();
582 if (isAutoSync_ || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
583 syncTaskRetryStatus_ = true;
584 }
585 requestSessionId_ = GenerateRequestSessionId();
586 LOGI("[SyncTaskContext][copyTarget] mode=%d,syncId=%d,isAutoSync=%d,isRetry=%d,dev=%s{private}",
587 mode_, syncId_, isAutoSync_, syncTaskRetryStatus_, deviceId_.c_str());
588 DBDfxAdapter::StartAsyncTrace(syncActionName_, static_cast<int>(syncId_));
589 } else {
590 isAutoSync_ = false;
591 LOGI("[SyncTaskContext][copyTarget] for response data dev %s{private},isRetry=%d", deviceId_.c_str(),
592 syncTaskRetryStatus_);
593 }
594 }
595
KillWait()596 void SyncTaskContext::KillWait()
597 {
598 StopTimer();
599 UnlockObj();
600 stateMachine_->NotifyClosing();
601 stateMachine_->AbortImmediately();
602 LockObj();
603 LOGW("[SyncTaskContext] Try to kill a context, now wait.");
604 bool noDeadLock = WaitLockedUntil(
605 safeKill_,
606 [this]() {
607 if (usedCount_ < 1) {
608 return true;
609 }
610 return false;
611 },
612 KILL_WAIT_SECONDS);
613 if (!noDeadLock) { // LCOV_EXCL_BR_LINE
614 LOGE("[SyncTaskContext] Dead lock may happen, we stop waiting the task exit.");
615 } else {
616 LOGW("[SyncTaskContext] Wait the task exit ok.");
617 }
618 std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
619 synTaskContextSet_.erase(this);
620 }
621
ClearSyncOperation()622 void SyncTaskContext::ClearSyncOperation()
623 {
624 std::lock_guard<std::mutex> lock(operationLock_);
625 if (syncOperation_ != nullptr) {
626 DBDfxAdapter::FinishAsyncTrace(syncActionName_, static_cast<int>(syncId_));
627 RefObject::DecObjRef(syncOperation_);
628 syncOperation_ = nullptr;
629 }
630 }
631
CancelCurrentSyncRetryIfNeed(int newTargetMode,uint32_t syncId)632 void SyncTaskContext::CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId)
633 {
634 AutoLock lock(this);
635 if (!isAutoSync_) {
636 return;
637 }
638 if (syncId_ >= syncId) {
639 return;
640 }
641 int mode = SyncOperation::TransferSyncMode(newTargetMode);
642 if (newTargetMode == mode_ || mode == SyncModeType::PUSH_AND_PULL) {
643 SetRetryTime(AUTO_RETRY_TIMES);
644 ModifyTimer(timeout_);
645 }
646 }
647
GetTaskErrCode() const648 int SyncTaskContext::GetTaskErrCode() const
649 {
650 return taskErrCode_;
651 }
652
SetTaskErrCode(int errCode)653 void SyncTaskContext::SetTaskErrCode(int errCode)
654 {
655 taskErrCode_ = errCode;
656 }
657
IsSyncTaskNeedRetry() const658 bool SyncTaskContext::IsSyncTaskNeedRetry() const
659 {
660 return syncTaskRetryStatus_;
661 }
662
SetSyncRetry(bool isRetry)663 void SyncTaskContext::SetSyncRetry(bool isRetry)
664 {
665 isSyncRetry_ = isRetry;
666 }
667
GetSyncRetryTimes() const668 int SyncTaskContext::GetSyncRetryTimes() const
669 {
670 if (IsAutoSync() || mode_ == SUBSCRIBE_QUERY || mode_ == UNSUBSCRIBE_QUERY) {
671 return AUTO_RETRY_TIMES;
672 }
673 return MANUAL_RETRY_TIMES;
674 }
675
GetSyncRetryTimeout(int retryTime) const676 int SyncTaskContext::GetSyncRetryTimeout(int retryTime) const
677 {
678 int timeoutTime = GetTimeoutTime();
679 if (IsAutoSync()) {
680 // set the new timeout value with 2 raised to the power of retryTime.
681 return timeoutTime * (1u << retryTime);
682 }
683 return timeoutTime;
684 }
685
ClearAllSyncTask()686 void SyncTaskContext::ClearAllSyncTask()
687 {
688 }
689
IsAutoLiftWaterMark() const690 bool SyncTaskContext::IsAutoLiftWaterMark() const
691 {
692 return negotiationCount_ < NEGOTIATION_LIMIT;
693 }
694
IncNegotiationCount()695 void SyncTaskContext::IncNegotiationCount()
696 {
697 negotiationCount_++;
698 }
699
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)700 bool SyncTaskContext::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
701 {
702 return stateMachine_->IsNeedTriggerQueryAutoSync(inMsg, query);
703 }
704
IsAutoSubscribe() const705 bool SyncTaskContext::IsAutoSubscribe() const
706 {
707 return isAutoSubscribe_;
708 }
709
IsCurrentSyncTaskCanBeSkipped() const710 bool SyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
711 {
712 return false;
713 }
714
ResetLastPushTaskStatus()715 void SyncTaskContext::ResetLastPushTaskStatus()
716 {
717 }
718
SchemaChange()719 void SyncTaskContext::SchemaChange()
720 {
721 if (stateMachine_ != nullptr) {
722 stateMachine_->SchemaChange();
723 }
724 }
725
Dump(int fd)726 void SyncTaskContext::Dump(int fd)
727 {
728 size_t totalSyncTaskCount = 0u;
729 size_t autoSyncTaskCount = 0u;
730 size_t reponseTaskCount = 0u;
731 {
732 std::lock_guard<std::mutex> lock(targetQueueLock_);
733 totalSyncTaskCount = requestTargetQueue_.size() + responseTargetQueue_.size();
734 for (const auto &target : requestTargetQueue_) {
735 if (target->IsAutoSync()) { // LCOV_EXCL_BR_LINE
736 autoSyncTaskCount++;
737 }
738 }
739 reponseTaskCount = responseTargetQueue_.size();
740 }
741 DBDumpHelper::Dump(fd, "\t\ttarget = %s, total sync task count = %zu, auto sync task count = %zu,"
742 " response task count = %zu\n",
743 deviceId_.c_str(), totalSyncTaskCount, autoSyncTaskCount, reponseTaskCount);
744 }
745
RunPermissionCheck(uint8_t flag) const746 int SyncTaskContext::RunPermissionCheck(uint8_t flag) const
747 {
748 std::string appId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
749 std::string userId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
750 std::string storeId = syncInterface_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
751 int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
752 int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
753 { userId, appId, storeId, deviceId_, instanceId }, flag);
754 if (errCode != E_OK) {
755 LOGE("[SyncTaskContext] RunPermissionCheck not pass errCode:%d, flag:%d, %s{private}",
756 errCode, flag, deviceId_.c_str());
757 }
758 return errCode;
759 }
760
GetPermissionCheckFlag(bool isAutoSync,int syncMode)761 uint8_t SyncTaskContext::GetPermissionCheckFlag(bool isAutoSync, int syncMode)
762 {
763 uint8_t flag = 0;
764 int mode = SyncOperation::TransferSyncMode(syncMode);
765 if (mode == SyncModeType::PUSH || mode == SyncModeType::RESPONSE_PULL) {
766 flag = CHECK_FLAG_SEND;
767 } else if (mode == SyncModeType::PULL) {
768 flag = CHECK_FLAG_RECEIVE;
769 } else if (mode == SyncModeType::PUSH_AND_PULL) {
770 flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE;
771 }
772 if (isAutoSync) {
773 flag = flag | CHECK_FLAG_AUTOSYNC;
774 }
775 if (mode != SyncModeType::RESPONSE_PULL) {
776 // it means this sync is started by local
777 flag = flag | CHECK_FLAG_SPONSOR;
778 }
779 return flag;
780 }
781
AbortMachineIfNeed(uint32_t syncId)782 void SyncTaskContext::AbortMachineIfNeed(uint32_t syncId)
783 {
784 uint32_t sessionId = 0u;
785 {
786 RefObject::AutoLock autoLock(this);
787 if (syncId_ != syncId) {
788 return;
789 }
790 sessionId = requestSessionId_;
791 }
792 stateMachine_->InnerErrorAbort(sessionId);
793 }
794
GetAndIncSyncOperation() const795 SyncOperation *SyncTaskContext::GetAndIncSyncOperation() const
796 {
797 std::lock_guard<std::mutex> lock(operationLock_);
798 if (syncOperation_ == nullptr) {
799 return nullptr;
800 }
801 RefObject::IncObjRef(syncOperation_);
802 return syncOperation_;
803 }
804
GenerateRequestSessionId()805 uint32_t SyncTaskContext::GenerateRequestSessionId()
806 {
807 uint32_t sessionId = lastRequestSessionId_ != 0 ? lastRequestSessionId_ + 1 : 0;
808 // make sure sessionId is between 0x01 and 0x8fffffff
809 if (sessionId > SESSION_ID_MAX_VALUE || sessionId == 0) {
810 sessionId = Hash::Hash32Func(deviceId_ + std::to_string(syncId_) +
811 std::to_string(TimeHelper::GetSysCurrentTime()));
812 }
813 lastRequestSessionId_ = sessionId;
814 return sessionId;
815 }
816
IsSchemaCompatible() const817 bool SyncTaskContext::IsSchemaCompatible() const
818 {
819 return true;
820 }
821
SetDbAbility(DbAbility & remoteDbAbility)822 void SyncTaskContext::SetDbAbility([[gnu::unused]] DbAbility &remoteDbAbility)
823 {
824 }
825
TimeChange()826 void SyncTaskContext::TimeChange()
827 {
828 if (stateMachine_ == nullptr) {
829 LOGW("[SyncTaskContext] machine is null when time change");
830 return;
831 }
832 stateMachine_->TimeChange();
833 }
834
GetResponseTaskCount()835 int32_t SyncTaskContext::GetResponseTaskCount()
836 {
837 std::lock_guard<std::mutex> autoLock(targetQueueLock_);
838 return static_cast<int32_t>(responseTargetQueue_.size());
839 }
840 } // namespace DistributedDB
841