• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_sync_task_context.h"
17 
18 #include <algorithm>
19 #include "db_common.h"
20 #include "db_dfx_adapter.h"
21 #include "db_errno.h"
22 #include "isyncer.h"
23 #include "log_print.h"
24 #include "platform_specific.h"
25 #include "single_ver_sync_state_machine.h"
26 #include "single_ver_sync_target.h"
27 #include "sync_types.h"
28 
29 namespace DistributedDB {
SingleVerSyncTaskContext()30 SingleVerSyncTaskContext::SingleVerSyncTaskContext()
31     : SyncTaskContext(),
32       token_(nullptr),
33       endMark_(0),
34       needClearRemoteStaleData_(false)
35 {}
36 
~SingleVerSyncTaskContext()37 SingleVerSyncTaskContext::~SingleVerSyncTaskContext()
38 {
39     token_ = nullptr;
40     subManager_ = nullptr;
41 }
42 
Initialize(const DeviceSyncTarget & target,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)43 int SingleVerSyncTaskContext::Initialize(const DeviceSyncTarget &target, ISyncInterface *syncInterface,
44     const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
45 {
46     if (target.device.empty() || syncInterface == nullptr || metadata == nullptr ||
47         communicator == nullptr) {
48         LOGE("[SingleVerSyncTaskContext] [Initialize] parameter is invalid.");
49         return -E_INVALID_ARGS;
50     }
51     stateMachine_ = new (std::nothrow) SingleVerSyncStateMachine;
52     if (stateMachine_ == nullptr) {
53         LOGE("[SingleVerSyncTaskContext] [Initialize] stateMachine_ is nullptr.");
54         return -E_OUT_OF_MEMORY;
55     }
56     deviceId_ = target.device;
57     targetUserId_ = target.userId;
58     std::vector<uint8_t> dbIdentifier = syncInterface->GetIdentifier();
59     dbIdentifier.resize(3); // only show 3 bytes
60     syncActionName_ = DBDfxAdapter::SYNC_ACTION + "_" +
61         DBCommon::VectorToHexString(dbIdentifier) + "_" + deviceId_.c_str();
62     TimerAction timeOutCallback;
63     int errCode = stateMachine_->Initialize(this, syncInterface, metadata, communicator);
64     if (errCode != E_OK) {
65         LOGE("[SingleVerSyncTaskContext] stateMachine Initialize failed, err %d.", errCode);
66         goto ERROR_OUT;
67     }
68 
69     timeHelper_ = std::make_unique<TimeHelper>();
70     errCode = timeHelper_->Initialize(syncInterface, metadata);
71     if (errCode != E_OK) {
72         LOGE("[SingleVerSyncTaskContext] timeHelper Initialize failed, err %d.", errCode);
73         goto ERROR_OUT;
74     }
75     timeOutCallback = [stateMachine = static_cast<SingleVerSyncStateMachine *>(stateMachine_)](TimerId timerId) {
76         return stateMachine->TimeoutCallback(timerId);
77     };
78     SetTimeoutCallback(timeOutCallback);
79 
80     syncInterface_ = syncInterface;
81     communicator_ = communicator;
82     taskExecStatus_ = INIT;
83     OnKill([this]() { this->KillWait(); });
84     {
85         std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
86         synTaskContextSet_.insert(this);
87     }
88     return errCode;
89 
90 ERROR_OUT:
91     delete stateMachine_;
92     stateMachine_ = nullptr;
93     return errCode;
94 }
95 
AddSyncOperation(SyncOperation * operation)96 int SingleVerSyncTaskContext::AddSyncOperation(SyncOperation *operation)
97 {
98     if (operation == nullptr) {
99         LOGE("[SingleVerSyncTaskContext] [AddSyncOperation] operation is nullptr.");
100         return -E_INVALID_ARGS;
101     }
102 
103     // If auto sync, just update the end watermark
104     if (operation->IsAutoSync()) {
105         std::lock_guard<std::mutex> lock(targetQueueLock_);
106         bool isQuerySync = operation->IsQuerySync();
107         std::string queryId = operation->GetQueryId();
108         auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
109             [isQuerySync, queryId](const ISyncTarget *target) {
110             if (target == nullptr) {
111                 return false;
112             }
113             if (isQuerySync) {
114                 SyncOperation *tmpOperation = nullptr;
115                 target->GetSyncOperation(tmpOperation);
116                 return (tmpOperation != nullptr && tmpOperation->GetQueryId() == queryId) && target->IsAutoSync();
117             }
118             return target->IsAutoSync();
119         });
120         if (iter != requestTargetQueue_.end()) {
121             static_cast<SingleVerSyncTarget *>(*iter)->SetEndWaterMark(timeHelper_->GetTime());
122             operation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
123             return E_OK;
124         }
125     }
126 
127     auto *newTarget = new (std::nothrow) SingleVerSyncTarget;
128     if (newTarget == nullptr) {
129         LOGE("[SingleVerSyncTaskContext] [AddSyncOperation] newTarget is nullptr.");
130         return -E_OUT_OF_MEMORY;
131     }
132     newTarget->SetSyncOperation(operation);
133     Timestamp timstamp = timeHelper_->GetTime();
134     newTarget->SetEndWaterMark(timstamp);
135     newTarget->SetTaskType(ISyncTarget::REQUEST);
136     AddSyncTarget(newTarget);
137     return E_OK;
138 }
139 
SetEndMark(WaterMark endMark)140 void SingleVerSyncTaskContext::SetEndMark(WaterMark endMark)
141 {
142     endMark_ = endMark;
143 }
144 
GetEndMark() const145 WaterMark SingleVerSyncTaskContext::GetEndMark() const
146 {
147     return endMark_;
148 }
149 
GetContinueToken(ContinueToken & outToken) const150 void SingleVerSyncTaskContext::GetContinueToken(ContinueToken &outToken) const
151 {
152     outToken = token_;
153 }
154 
SetContinueToken(ContinueToken token)155 void SingleVerSyncTaskContext::SetContinueToken(ContinueToken token)
156 {
157     token_ = token;
158     return;
159 }
160 
ReleaseContinueToken()161 void SingleVerSyncTaskContext::ReleaseContinueToken()
162 {
163     if (token_ != nullptr) {
164         static_cast<SyncGenericInterface *>(syncInterface_)->ReleaseContinueToken(token_);
165         token_ = nullptr;
166     }
167 }
168 
PopResponseTarget(SingleVerSyncTarget & target)169 int SingleVerSyncTaskContext::PopResponseTarget(SingleVerSyncTarget &target)
170 {
171     std::lock_guard<std::mutex> lock(targetQueueLock_);
172     LOGD("[SingleVerSyncTaskContext] GetFrontExtWaterMark size = %zu", responseTargetQueue_.size());
173     if (!responseTargetQueue_.empty()) {
174         ISyncTarget *tmpTarget = responseTargetQueue_.front();
175         responseTargetQueue_.pop_front();
176         target = *(static_cast<SingleVerSyncTarget *>(tmpTarget));
177         delete tmpTarget;
178         tmpTarget = nullptr;
179         return E_OK;
180     }
181     return -E_LENGTH_ERROR;
182 }
183 
GetRspTargetQueueSize() const184 int SingleVerSyncTaskContext::GetRspTargetQueueSize() const
185 {
186     std::lock_guard<std::mutex> lock(targetQueueLock_);
187     return responseTargetQueue_.size();
188 }
189 
SetResponseSessionId(uint32_t responseSessionId)190 void SingleVerSyncTaskContext::SetResponseSessionId(uint32_t responseSessionId)
191 {
192     responseSessionId_ = responseSessionId;
193 }
194 
GetResponseSessionId() const195 uint32_t SingleVerSyncTaskContext::GetResponseSessionId() const
196 {
197     return responseSessionId_;
198 }
199 
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)200 void SingleVerSyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
201 {
202     const SingleVerSyncTarget *targetTmp = static_cast<const SingleVerSyncTarget *>(target);
203     SyncTaskContext::CopyTargetData(target, taskParam);
204     mode_ = targetTmp->GetMode();
205     endMark_ = targetTmp->GetEndWaterMark();
206     if (mode_ == SyncModeType::RESPONSE_PULL) {
207         responseSessionId_ = targetTmp->GetResponseSessionId();
208     }
209     SetQuery(targetTmp->GetQuery());
210     isQuerySync_ = targetTmp->IsQuerySync();
211 }
212 
Clear()213 void SingleVerSyncTaskContext::Clear()
214 {
215     retryTime_ = 0;
216     ClearSyncOperation();
217     SyncTaskContext::Clear();
218     SetMode(SyncModeType::INVALID_MODE);
219     syncId_ = 0;
220     isAutoSync_ = false;
221     SetOperationStatus(SyncOperation::OP_WAITING);
222     SetEndMark(0);
223     SetResponseSessionId(0);
224     {
225         std::lock_guard<std::mutex> autoLock(queryMutex_);
226         query_ = QuerySyncObject();
227     }
228     isQuerySync_ = false;
229 }
230 
Abort(int status)231 void SingleVerSyncTaskContext::Abort(int status)
232 {
233     {
234         std::lock_guard<std::mutex> lock(operationLock_);
235         if (syncOperation_ != nullptr) {
236             syncOperation_->SetStatus(deviceId_, status, GetCommErrCode());
237             if ((status >= SyncOperation::OP_FINISHED_ALL)) {
238                 UnlockObj();
239                 if (syncOperation_->CheckIsAllFinished()) {
240                     syncOperation_->Finished();
241                 }
242                 LockObj();
243             }
244         }
245     }
246     StopFeedDogForSync(SyncDirectionFlag::SEND);
247     StopFeedDogForSync(SyncDirectionFlag::RECEIVE);
248     Clear();
249 }
250 
ClearAllSyncTask()251 void SingleVerSyncTaskContext::ClearAllSyncTask()
252 {
253     // clear request queue sync task and responsequeue first.
254     std::list<ISyncTarget *> targetQueue;
255     {
256         std::lock_guard<std::mutex> lock(targetQueueLock_);
257         LOGI("[SingleVerSyncTaskContext] request taskcount=%zu, responsecount=%zu", requestTargetQueue_.size(),
258             responseTargetQueue_.size());
259         while (!requestTargetQueue_.empty()) {
260             ISyncTarget *tmpTarget = requestTargetQueue_.front();
261             requestTargetQueue_.pop_front();
262             SyncOperation *tmpInfOperation = nullptr;
263             tmpTarget->GetSyncOperation(tmpInfOperation);
264             RefObject::IncObjRef(tmpInfOperation);
265             targetQueue.push_back(tmpTarget);
266         }
267         while (!responseTargetQueue_.empty()) {
268             ISyncTarget *tmpTarget = responseTargetQueue_.front();
269             responseTargetQueue_.pop_front();
270             delete tmpTarget;
271             tmpTarget = nullptr;
272         }
273     }
274     while (!targetQueue.empty()) {
275         ISyncTarget *target = targetQueue.front();
276         targetQueue.pop_front();
277         SyncOperation *tmpOperation = nullptr;
278         target->GetSyncOperation(tmpOperation);
279         if (tmpOperation == nullptr) {
280             LOGE("[ClearAllSyncTask] tmpOperation is nullptr");
281             continue; // not exit this scene
282         }
283         LOGI("[SingleVerSyncTaskContext] killing syncId=%d,dev=%s", tmpOperation->GetSyncId(), STR_MASK(deviceId_));
284         if (target->IsAutoSync()) {
285             tmpOperation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
286         } else {
287             tmpOperation->SetStatus(deviceId_, SyncOperation::OP_COMM_ABNORMAL);
288         }
289         if (tmpOperation->CheckIsAllFinished()) {
290             tmpOperation->Finished();
291         }
292         delete target;
293         target = nullptr;
294         RefObject::DecObjRef(tmpOperation);
295     }
296     if (GetTaskExecStatus() == SyncTaskContext::RUNNING) {
297         // clear syncing task.
298         stateMachine_->CommErrAbort();
299         SetCommFailErrCode(static_cast<int>(SyncOperation::OP_COMM_ABNORMAL));
300     }
301     // reset last push status for sync merge
302     ResetLastPushTaskStatus();
303 }
304 
EnableClearRemoteStaleData(bool enable)305 void SingleVerSyncTaskContext::EnableClearRemoteStaleData(bool enable)
306 {
307     needClearRemoteStaleData_ = enable;
308 }
309 
IsNeedClearRemoteStaleData() const310 bool SingleVerSyncTaskContext::IsNeedClearRemoteStaleData() const
311 {
312     return needClearRemoteStaleData_;
313 }
314 
StartFeedDogForSync(uint32_t time,SyncDirectionFlag flag)315 bool SingleVerSyncTaskContext::StartFeedDogForSync(uint32_t time, SyncDirectionFlag flag)
316 {
317     return stateMachine_->StartFeedDogForSync(time, flag);
318 }
319 
StopFeedDogForSync(SyncDirectionFlag flag)320 void SingleVerSyncTaskContext::StopFeedDogForSync(SyncDirectionFlag flag)
321 {
322     stateMachine_->StopFeedDogForSync(flag);
323 }
324 
IsReceiveWaterMarkErr() const325 bool SingleVerSyncTaskContext::IsReceiveWaterMarkErr() const
326 {
327     return isReceiveWaterMarkErr_;
328 }
329 
SetReceiveWaterMarkErr(bool isErr)330 void SingleVerSyncTaskContext::SetReceiveWaterMarkErr(bool isErr)
331 {
332     isReceiveWaterMarkErr_ = isErr;
333 }
334 
SetRemoteSeccurityOption(SecurityOption secOption)335 void SingleVerSyncTaskContext::SetRemoteSeccurityOption(SecurityOption secOption)
336 {
337     std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
338     remoteSecOption_ = secOption;
339 }
340 
GetRemoteSeccurityOption() const341 SecurityOption SingleVerSyncTaskContext::GetRemoteSeccurityOption() const
342 {
343     std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
344     return remoteSecOption_;
345 }
346 
SetReceivcPermitCheck(bool isChecked)347 void SingleVerSyncTaskContext::SetReceivcPermitCheck(bool isChecked)
348 {
349     isReceivcPermitChecked_ = isChecked;
350 }
351 
GetReceivcPermitCheck() const352 bool SingleVerSyncTaskContext::GetReceivcPermitCheck() const
353 {
354     return isReceivcPermitChecked_;
355 }
356 
SetSendPermitCheck(bool isChecked)357 void SingleVerSyncTaskContext::SetSendPermitCheck(bool isChecked)
358 {
359     isSendPermitChecked_ = isChecked;
360 }
361 
GetSendPermitCheck() const362 bool SingleVerSyncTaskContext::GetSendPermitCheck() const
363 {
364     return isSendPermitChecked_;
365 }
366 
IsSkipTimeoutError(int errCode) const367 bool SingleVerSyncTaskContext::IsSkipTimeoutError(int errCode) const
368 {
369     if (errCode == -E_TIMEOUT && IsSyncTaskNeedRetry() && (GetRetryTime() < GetSyncRetryTimes())) { // LCOV_EXCL_BR_LINE
370         LOGE("[SingleVerSyncTaskContext] send message timeout error occurred");
371         return true;
372     } else {
373         return false;
374     }
375 }
376 
FindResponseSyncTarget(uint32_t responseSessionId) const377 bool SingleVerSyncTaskContext::FindResponseSyncTarget(uint32_t responseSessionId) const
378 {
379     std::lock_guard<std::mutex> lock(targetQueueLock_);
380     auto iter = std::find_if(responseTargetQueue_.begin(), responseTargetQueue_.end(),
381         [responseSessionId](const ISyncTarget *target) {
382             return target->GetResponseSessionId() == responseSessionId;
383         });
384     if (iter == responseTargetQueue_.end()) {
385         return false;
386     }
387     return true;
388 }
389 
SetQuery(const QuerySyncObject & query)390 void SingleVerSyncTaskContext::SetQuery(const QuerySyncObject &query)
391 {
392     std::lock_guard<std::mutex> autoLock(queryMutex_);
393     query_ = query;
394     query_.SetUseLocalSchema(mode_ != SyncModeType::RESPONSE_PULL);
395     query_.SetRemoteDev(deviceId_);
396 }
397 
GetQuery() const398 QuerySyncObject SingleVerSyncTaskContext::GetQuery() const
399 {
400     std::lock_guard<std::mutex> autoLock(queryMutex_);
401     return query_;
402 }
403 
SetQuerySync(bool isQuerySync)404 void SingleVerSyncTaskContext::SetQuerySync(bool isQuerySync)
405 {
406     isQuerySync_ = isQuerySync;
407 }
408 
IsQuerySync() const409 bool SingleVerSyncTaskContext::IsQuerySync() const
410 {
411     return isQuerySync_;
412 }
413 
GetRemoteCompressAlgo() const414 std::set<CompressAlgorithm> SingleVerSyncTaskContext::GetRemoteCompressAlgo() const
415 {
416     std::lock_guard<std::mutex> autoLock(remoteDbAbilityLock_);
417     std::set<CompressAlgorithm> compressAlgoSet;
418     for (const auto &algo : SyncConfig::COMPRESSALGOMAP) {
419         if (remoteDbAbility_.GetAbilityItem(algo.second) == SUPPORT_MARK) {
420             compressAlgoSet.insert(static_cast<CompressAlgorithm>(algo.first));
421         }
422     }
423     return compressAlgoSet;
424 }
425 
GetRemoteCompressAlgoStr() const426 std::string SingleVerSyncTaskContext::GetRemoteCompressAlgoStr() const
427 {
428     static std::map<CompressAlgorithm, std::string> algoMap = {{CompressAlgorithm::ZLIB, "zlib"}};
429     std::set<CompressAlgorithm> remoteCompressAlgoSet = GetRemoteCompressAlgo();
430     if (remoteCompressAlgoSet.empty()) {
431         return "none";
432     }
433     std::string currentAlgoStr;
434     for (const auto &algo : remoteCompressAlgoSet) {
435         auto iter = algoMap.find(algo);
436         if (iter != algoMap.end()) {
437             currentAlgoStr += algoMap[algo] + ",";
438         }
439     }
440     if (currentAlgoStr.empty()) {
441         return "";
442     }
443     return currentAlgoStr.substr(0, currentAlgoStr.length() - 1);
444 }
445 
SetDbAbility(DbAbility & remoteDbAbility)446 void SingleVerSyncTaskContext::SetDbAbility(DbAbility &remoteDbAbility)
447 {
448     {
449         std::lock_guard<std::mutex> autoLock(remoteDbAbilityLock_);
450         remoteDbAbility_ = remoteDbAbility;
451     }
452     LOGI("[SingleVerSyncTaskContext] set dev=%s compressAlgo=%s, IsSupAllPredicateQuery=%u,"
453         "IsSupSubscribeQuery=%u, inKeys=%u",
454         STR_MASK(GetDeviceId()), GetRemoteCompressAlgoStr().c_str(),
455         remoteDbAbility.GetAbilityItem(SyncConfig::ALLPREDICATEQUERY),
456         remoteDbAbility.GetAbilityItem(SyncConfig::SUBSCRIBEQUERY),
457         remoteDbAbility.GetAbilityItem(SyncConfig::INKEYS_QUERY));
458 }
459 
ChooseCompressAlgo() const460 CompressAlgorithm SingleVerSyncTaskContext::ChooseCompressAlgo() const
461 {
462     std::set<CompressAlgorithm> remoteAlgo = GetRemoteCompressAlgo();
463     if (remoteAlgo.empty()) {
464         return CompressAlgorithm::NONE;
465     }
466     std::set<CompressAlgorithm> localAlgorithmSet;
467     (void)(static_cast<SyncGenericInterface *>(syncInterface_))->GetCompressionAlgo(localAlgorithmSet);
468     std::set<CompressAlgorithm> algoIntersection;
469     set_intersection(remoteAlgo.begin(), remoteAlgo.end(), localAlgorithmSet.begin(), localAlgorithmSet.end(),
470         inserter(algoIntersection, algoIntersection.begin()));
471     if (algoIntersection.empty()) {
472         return CompressAlgorithm::NONE;
473     }
474     return *(algoIntersection.begin());
475 }
476 
IsNotSupportAbility(const AbilityItem & abilityItem) const477 bool SingleVerSyncTaskContext::IsNotSupportAbility(const AbilityItem &abilityItem) const
478 {
479     std::lock_guard<std::mutex> autoLock(remoteDbAbilityLock_);
480     return remoteDbAbility_.GetAbilityItem(abilityItem) != SUPPORT_MARK;
481 }
482 
SetSubscribeManager(std::shared_ptr<SubscribeManager> & subManager)483 void SingleVerSyncTaskContext::SetSubscribeManager(std::shared_ptr<SubscribeManager> &subManager)
484 {
485     subManager_ = subManager;
486 }
487 
GetSubscribeManager() const488 std::shared_ptr<SubscribeManager> SingleVerSyncTaskContext::GetSubscribeManager() const
489 {
490     return subManager_;
491 }
DEFINE_OBJECT_TAG_FACILITIES(SingleVerSyncTaskContext)492 DEFINE_OBJECT_TAG_FACILITIES(SingleVerSyncTaskContext)
493 
494 bool SingleVerSyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
495 {
496     SyncOperation *operation = GetAndIncSyncOperation();
497     bool res = IsCurrentSyncTaskCanBeSkippedInner(operation);
498     RefObject::DecObjRef(operation);
499     return res;
500 }
501 
SaveLastPushTaskExecStatus(int finalStatus)502 void SingleVerSyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
503 {
504     if (IsTargetQueueEmpty()) {
505         LOGD("sync que is empty, reset last push status");
506         ResetLastPushTaskStatus();
507         return;
508     }
509     if (mode_ == SyncModeType::PUSH || mode_ == SyncModeType::PUSH_AND_PULL || mode_ == SyncModeType::RESPONSE_PULL) {
510         lastFullSyncTaskStatus_ = finalStatus;
511     } else if (mode_ == SyncModeType::QUERY_PUSH || mode_ == SyncModeType::QUERY_PUSH_PULL) {
512         std::lock_guard<std::mutex> autoLock(queryTaskStatusMutex_);
513         lastQuerySyncTaskStatusMap_[syncOperation_->GetQueryId()] = finalStatus;
514     }
515 }
516 
GetCorrectedSendWaterMarkForCurrentTask(const SyncOperation * operation,uint64_t & waterMark) const517 int SingleVerSyncTaskContext::GetCorrectedSendWaterMarkForCurrentTask(const SyncOperation *operation,
518     uint64_t &waterMark) const
519 {
520     if (operation != nullptr && operation->IsQuerySync()) {
521         LOGD("Is QuerySync");
522         int errCode = static_cast<SingleVerSyncStateMachine *>(stateMachine_)->GetSendQueryWaterMark(
523             operation->GetQueryId(), deviceId_, targetUserId_,
524             lastFullSyncTaskStatus_ == SyncOperation::OP_FINISHED_ALL, waterMark);
525         if (errCode != E_OK) {
526             return errCode;
527         }
528     } else {
529         LOGD("Not QuerySync");
530         static_cast<SingleVerSyncStateMachine *>(stateMachine_)->GetLocalWaterMark(deviceId_, targetUserId_, waterMark);
531     }
532     return E_OK;
533 }
534 
ResetLastPushTaskStatus()535 void SingleVerSyncTaskContext::ResetLastPushTaskStatus()
536 {
537     lastFullSyncTaskStatus_ = SyncOperation::OP_WAITING;
538     std::lock_guard<std::mutex> autoLock(queryTaskStatusMutex_);
539     lastQuerySyncTaskStatusMap_.clear();
540 }
541 
SetCommNormal(bool isCommNormal)542 void SingleVerSyncTaskContext::SetCommNormal(bool isCommNormal)
543 {
544     isCommNormal_ = isCommNormal;
545 }
546 
IsCurrentSyncTaskCanBeSkippedInner(const SyncOperation * operation) const547 bool SingleVerSyncTaskContext::IsCurrentSyncTaskCanBeSkippedInner(const SyncOperation *operation) const
548 {
549     if (mode_ == SyncModeType::PUSH) {
550         if (lastFullSyncTaskStatus_ != SyncOperation::OP_FINISHED_ALL) {
551             return false;
552         }
553         if (operation == nullptr) {
554             return true;
555         }
556     } else if (mode_ == SyncModeType::QUERY_PUSH) {
557         if (operation == nullptr) {
558             return true;
559         }
560         std::lock_guard<std::mutex> autoLock(queryTaskStatusMutex_);
561         auto it = lastQuerySyncTaskStatusMap_.find(operation->GetQueryId());
562         if (it == lastQuerySyncTaskStatusMap_.end()) {
563             // no last query_push and push
564             if (lastFullSyncTaskStatus_ != SyncOperation::OP_FINISHED_ALL) {
565                 LOGD("no prev query push or successful prev push");
566                 return false;
567             }
568         } else {
569             if (it->second != SyncOperation::OP_FINISHED_ALL) {
570                 LOGD("last query push status = %d.", it->second);
571                 return false;
572             }
573         }
574     } else {
575         return false;
576     }
577 
578     Timestamp maxTimestampInDb;
579     syncInterface_->GetMaxTimestamp(maxTimestampInDb);
580     uint64_t localWaterMark = 0;
581     int errCode = GetCorrectedSendWaterMarkForCurrentTask(operation, localWaterMark);
582     if (errCode != E_OK) {
583         LOGE("GetLocalWaterMark in state machine failed: %d", errCode);
584         return false;
585     }
586     if (localWaterMark > maxTimestampInDb) {
587         LOGI("skip current push task, deviceId_ = %s, localWaterMark = %" PRIu64 ", maxTimestampInDb = %" PRIu64,
588             STR_MASK(deviceId_), localWaterMark, maxTimestampInDb);
589         return true;
590     }
591     return false;
592 }
593 
StartFeedDogForGetData(uint32_t sessionId)594 void SingleVerSyncTaskContext::StartFeedDogForGetData(uint32_t sessionId)
595 {
596     stateMachine_->StartFeedDogForGetData(sessionId);
597 }
598 
StopFeedDogForGetData()599 void SingleVerSyncTaskContext::StopFeedDogForGetData()
600 {
601     stateMachine_->StopFeedDogForGetData();
602 }
603 
UpdateOperationFinishedCount(const std::string & deviceId,uint32_t count)604 void SingleVerSyncTaskContext::UpdateOperationFinishedCount(const std::string &deviceId, uint32_t count)
605 {
606     std::lock_guard<std::mutex> lock(operationLock_);
607     if (syncOperation_ != nullptr) {
608         syncOperation_->UpdateFinishedCount(deviceId, count);
609     }
610 }
611 
SetOperationSyncProcessTotal(const std::string & deviceId,uint32_t total)612 void SingleVerSyncTaskContext::SetOperationSyncProcessTotal(const std::string &deviceId, uint32_t total)
613 {
614     std::lock_guard<std::mutex> lock(operationLock_);
615     if (syncOperation_ != nullptr) {
616         syncOperation_->SetSyncProcessTotal(deviceId, total);
617     }
618 }
619 
SetInitWaterMark(WaterMark waterMark)620 void SingleVerSyncTaskContext::SetInitWaterMark(WaterMark waterMark)
621 {
622     initWaterMark_ = waterMark;
623 }
624 
GetInitWaterMark() const625 WaterMark SingleVerSyncTaskContext::GetInitWaterMark() const
626 {
627     return initWaterMark_;
628 }
629 
SetInitDeletedMark(WaterMark waterMark)630 void SingleVerSyncTaskContext::SetInitDeletedMark(WaterMark waterMark)
631 {
632     initDeletedMark_ = waterMark;
633 }
634 
GetInitDeletedMark() const635 WaterMark SingleVerSyncTaskContext::GetInitDeletedMark() const
636 {
637     return initDeletedMark_;
638 }
639 
GetResponseTaskCount()640 int32_t SingleVerSyncTaskContext::GetResponseTaskCount()
641 {
642     std::lock_guard<std::mutex> autoLock(targetQueueLock_);
643     int32_t taskCount = static_cast<int32_t>(responseTargetQueue_.size());
644     if (responseSessionId_ != 0) {
645         taskCount++;
646     }
647     return taskCount;
648 }
649 
IsNeedRetrySync(uint32_t errNo,uint16_t messageType)650 bool SingleVerSyncTaskContext::IsNeedRetrySync(uint32_t errNo, uint16_t messageType)
651 {
652     if ((errNo != E_FEEDBACK_DB_CLOSING && errNo != E_NEED_CORRECT_TARGET_USER) || messageType != TYPE_RESPONSE) {
653         return false;
654     }
655     uint32_t cur = 0;
656     if (errNo == E_NEED_CORRECT_TARGET_USER) {
657         cur = ++resyncForUserTimes_;
658         LOGI("[IsNeedRetrySync] resync for user times: %u", cur);
659         return cur <= MANUAL_RETRY_TIMES;
660     }
661     cur = ++resyncTimes_;
662     LOGI("[IsNeedRetrySync]%u", cur);
663     return cur <= MANUAL_RETRY_TIMES;
664 }
665 
ResetResyncTimes()666 void SingleVerSyncTaskContext::ResetResyncTimes()
667 {
668     resyncTimes_ = 0;
669     resyncForUserTimes_ = 0;
670 }
671 
IsRetryTask() const672 bool SingleVerSyncTaskContext::IsRetryTask() const
673 {
674     SyncOperation *operation = GetAndIncSyncOperation();
675     if (operation == nullptr) {
676         return true;
677     }
678     bool isRetryTask = operation->IsRetryTask();
679     RefObject::DecObjRef(operation);
680     return isRetryTask;
681 }
682 
RefreshSaveTime(bool isFinished)683 void SingleVerSyncTaskContext::RefreshSaveTime(bool isFinished)
684 {
685     if (isFinished) {
686         lastSaveTimes_ = 0;
687         return;
688     }
689     lastSaveTimes_ = TimeHelper::GetMonotonicTime();
690 }
691 
IsSavingTask(uint32_t timeout) const692 bool SingleVerSyncTaskContext::IsSavingTask(uint32_t timeout) const
693 {
694     auto lastSaveTimes = lastSaveTimes_.load();
695     if (lastSaveTimes == 0) {
696         return false;
697     }
698     Timestamp duration = TimeHelper::GetMonotonicTime() - lastSaveTimes;
699     if (duration < timeout * TimeHelper::MS_TO_US) {
700         LOGI("exist saving task, duration[%" PRIu64 "]", duration);
701         return true;
702     }
703     return false;
704 }
705 } // namespace DistributedDB
706