• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_sync_task_context.h"
17 
18 #include <algorithm>
19 #include "db_common.h"
20 #include "db_dfx_adapter.h"
21 #include "db_errno.h"
22 #include "isyncer.h"
23 #include "log_print.h"
24 #include "single_ver_sync_state_machine.h"
25 #include "single_ver_sync_target.h"
26 #include "sync_types.h"
27 
28 namespace DistributedDB {
SingleVerSyncTaskContext()29 SingleVerSyncTaskContext::SingleVerSyncTaskContext()
30     : SyncTaskContext(),
31       token_(nullptr),
32       endMark_(0),
33       needClearRemoteStaleData_(false)
34 {}
35 
~SingleVerSyncTaskContext()36 SingleVerSyncTaskContext::~SingleVerSyncTaskContext()
37 {
38     token_ = nullptr;
39     subManager_ = nullptr;
40 }
41 
Initialize(const std::string & deviceId,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)42 int SingleVerSyncTaskContext::Initialize(const std::string &deviceId, ISyncInterface *syncInterface,
43     const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
44 {
45     if (deviceId.empty() || syncInterface == nullptr || metadata == nullptr ||
46         communicator == nullptr) {
47         return -E_INVALID_ARGS;
48     }
49     stateMachine_ = new (std::nothrow) SingleVerSyncStateMachine;
50     if (stateMachine_ == nullptr) {
51         return -E_OUT_OF_MEMORY;
52     }
53     deviceId_ = deviceId;
54     std::vector<uint8_t> dbIdentifier = syncInterface->GetIdentifier();
55     dbIdentifier.resize(3); // only show 3 bytes
56     syncActionName_ = DBDfxAdapter::SYNC_ACTION + "_" +
57         DBCommon::VectorToHexString(dbIdentifier) + "_" + deviceId_.c_str();
58     TimerAction timeOutCallback;
59     int errCode = stateMachine_->Initialize(this, syncInterface, metadata, communicator);
60     if (errCode != E_OK) {
61         LOGE("[SingleVerSyncTaskContext] stateMachine Initialize failed, err %d.", errCode);
62         goto ERROR_OUT;
63     }
64 
65     timeHelper_ = std::make_unique<TimeHelper>();
66     errCode = timeHelper_->Initialize(syncInterface, metadata);
67     if (errCode != E_OK) {
68         LOGE("[SingleVerSyncTaskContext] timeHelper Initialize failed, err %d.", errCode);
69         goto ERROR_OUT;
70     }
71     timeOutCallback = [stateMachine = static_cast<SingleVerSyncStateMachine *>(stateMachine_)](TimerId timerId) {
72         return stateMachine->TimeoutCallback(timerId);
73     };
74     SetTimeoutCallback(timeOutCallback);
75 
76     syncInterface_ = syncInterface;
77     communicator_ = communicator;
78     taskExecStatus_ = INIT;
79     OnKill([this]() { this->KillWait(); });
80     {
81         std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
82         synTaskContextSet_.insert(this);
83     }
84     return errCode;
85 
86 ERROR_OUT:
87     delete stateMachine_;
88     stateMachine_ = nullptr;
89     return errCode;
90 }
91 
AddSyncOperation(SyncOperation * operation)92 int SingleVerSyncTaskContext::AddSyncOperation(SyncOperation *operation)
93 {
94     if (operation == nullptr) {
95         return -E_INVALID_ARGS;
96     }
97 
98     // If auto sync, just update the end watermark
99     if (operation->IsAutoSync()) {
100         std::lock_guard<std::mutex> lock(targetQueueLock_);
101         bool isQuerySync = operation->IsQuerySync();
102         std::string queryId = operation->GetQueryId();
103         auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
104             [isQuerySync, queryId](const ISyncTarget *target) {
105             if (target == nullptr) {
106                 return false;
107             }
108             if (isQuerySync) {
109                 SyncOperation *tmpOperation = nullptr;
110                 target->GetSyncOperation(tmpOperation);
111                 return (tmpOperation != nullptr && tmpOperation->GetQueryId() == queryId) && target->IsAutoSync();
112             }
113             return target->IsAutoSync();
114         });
115         if (iter != requestTargetQueue_.end()) {
116             static_cast<SingleVerSyncTarget *>(*iter)->SetEndWaterMark(timeHelper_->GetTime());
117             operation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
118             return E_OK;
119         }
120     }
121 
122     auto *newTarget = new (std::nothrow) SingleVerSyncTarget;
123     if (newTarget == nullptr) {
124         return -E_OUT_OF_MEMORY;
125     }
126     newTarget->SetSyncOperation(operation);
127     Timestamp timstamp = timeHelper_->GetTime();
128     newTarget->SetEndWaterMark(timstamp);
129     newTarget->SetTaskType(ISyncTarget::REQUEST);
130     AddSyncTarget(newTarget);
131     return E_OK;
132 }
133 
SetEndMark(WaterMark endMark)134 void SingleVerSyncTaskContext::SetEndMark(WaterMark endMark)
135 {
136     endMark_ = endMark;
137 }
138 
GetEndMark() const139 WaterMark SingleVerSyncTaskContext::GetEndMark() const
140 {
141     return endMark_;
142 }
143 
GetContinueToken(ContinueToken & outToken) const144 void SingleVerSyncTaskContext::GetContinueToken(ContinueToken &outToken) const
145 {
146     outToken = token_;
147 }
148 
SetContinueToken(ContinueToken token)149 void SingleVerSyncTaskContext::SetContinueToken(ContinueToken token)
150 {
151     token_ = token;
152     return;
153 }
154 
ReleaseContinueToken()155 void SingleVerSyncTaskContext::ReleaseContinueToken()
156 {
157     if (token_ != nullptr) {
158         static_cast<SyncGenericInterface *>(syncInterface_)->ReleaseContinueToken(token_);
159         token_ = nullptr;
160     }
161 }
162 
PopResponseTarget(SingleVerSyncTarget & target)163 int SingleVerSyncTaskContext::PopResponseTarget(SingleVerSyncTarget &target)
164 {
165     std::lock_guard<std::mutex> lock(targetQueueLock_);
166     LOGD("[SingleVerSyncTaskContext] GetFrontExtWaterMark size = %zu", responseTargetQueue_.size());
167     if (!responseTargetQueue_.empty()) {
168         ISyncTarget *tmpTarget = responseTargetQueue_.front();
169         responseTargetQueue_.pop_front();
170         target = *(static_cast<SingleVerSyncTarget *>(tmpTarget));
171         delete tmpTarget;
172         tmpTarget = nullptr;
173         return E_OK;
174     }
175     return -E_LENGTH_ERROR;
176 }
177 
GetRspTargetQueueSize() const178 int SingleVerSyncTaskContext::GetRspTargetQueueSize() const
179 {
180     std::lock_guard<std::mutex> lock(targetQueueLock_);
181     return responseTargetQueue_.size();
182 }
183 
SetResponseSessionId(uint32_t responseSessionId)184 void SingleVerSyncTaskContext::SetResponseSessionId(uint32_t responseSessionId)
185 {
186     responseSessionId_ = responseSessionId;
187 }
188 
GetResponseSessionId() const189 uint32_t SingleVerSyncTaskContext::GetResponseSessionId() const
190 {
191     return responseSessionId_;
192 }
193 
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)194 void SingleVerSyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
195 {
196     const SingleVerSyncTarget *targetTmp = static_cast<const SingleVerSyncTarget *>(target);
197     SyncTaskContext::CopyTargetData(target, taskParam);
198     mode_ = targetTmp->GetMode();
199     endMark_ = targetTmp->GetEndWaterMark();
200     if (mode_ == SyncModeType::RESPONSE_PULL) {
201         responseSessionId_ = targetTmp->GetResponseSessionId();
202     }
203     {
204         std::lock_guard<std::mutex> autoLock(queryMutex_);
205         query_ = targetTmp->GetQuery();
206     }
207     isQuerySync_ = targetTmp->IsQuerySync();
208 }
209 
Clear()210 void SingleVerSyncTaskContext::Clear()
211 {
212     retryTime_ = 0;
213     ClearSyncOperation();
214     SyncTaskContext::Clear();
215     SetMode(SyncModeType::INVALID_MODE);
216     syncId_ = 0;
217     isAutoSync_ = false;
218     SetOperationStatus(SyncOperation::OP_WAITING);
219     SetEndMark(0);
220     SetResponseSessionId(0);
221     {
222         std::lock_guard<std::mutex> autoLock(queryMutex_);
223         query_ = QuerySyncObject();
224     }
225     isQuerySync_ = false;
226 }
227 
Abort(int status)228 void SingleVerSyncTaskContext::Abort(int status)
229 {
230     {
231         std::lock_guard<std::mutex> lock(operationLock_);
232         if (syncOperation_ != nullptr) {
233             syncOperation_->SetStatus(deviceId_, status);
234             if ((status >= SyncOperation::OP_FINISHED_ALL)) {
235                 UnlockObj();
236                 if (syncOperation_->CheckIsAllFinished()) {
237                     syncOperation_->Finished();
238                 }
239                 LockObj();
240             }
241         }
242     }
243     StopFeedDogForSync(SyncDirectionFlag::SEND);
244     StopFeedDogForSync(SyncDirectionFlag::RECEIVE);
245     Clear();
246 }
247 
ClearAllSyncTask()248 void SingleVerSyncTaskContext::ClearAllSyncTask()
249 {
250     // clear request queue sync task and responsequeue first.
251     std::list<ISyncTarget *> targetQueue;
252     {
253         std::lock_guard<std::mutex> lock(targetQueueLock_);
254         LOGI("[SingleVerSyncTaskContext] request taskcount=%zu, responsecount=%zu", requestTargetQueue_.size(),
255             responseTargetQueue_.size());
256         while (!requestTargetQueue_.empty()) {
257             ISyncTarget *tmpTarget = requestTargetQueue_.front();
258             requestTargetQueue_.pop_front();
259             SyncOperation *tmpInfOperation = nullptr;
260             tmpTarget->GetSyncOperation(tmpInfOperation);
261             RefObject::IncObjRef(tmpInfOperation);
262             targetQueue.push_back(tmpTarget);
263         }
264         while (!responseTargetQueue_.empty()) {
265             ISyncTarget *tmpTarget = responseTargetQueue_.front();
266             responseTargetQueue_.pop_front();
267             delete tmpTarget;
268             tmpTarget = nullptr;
269         }
270     }
271     while (!targetQueue.empty()) {
272         ISyncTarget *target = targetQueue.front();
273         targetQueue.pop_front();
274         SyncOperation *tmpOperation = nullptr;
275         target->GetSyncOperation(tmpOperation);
276         if (tmpOperation == nullptr) {
277             LOGE("[ClearAllSyncTask] tmpOperation is nullptr");
278             continue; // not exit this scene
279         }
280         LOGI("[SingleVerSyncTaskContext] killing syncId=%d,dev=%s", tmpOperation->GetSyncId(), STR_MASK(deviceId_));
281         if (target->IsAutoSync()) {
282             tmpOperation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
283         } else {
284             tmpOperation->SetStatus(deviceId_, SyncOperation::OP_COMM_ABNORMAL);
285         }
286         if (tmpOperation->CheckIsAllFinished()) {
287             tmpOperation->Finished();
288         }
289         delete target;
290         target = nullptr;
291         RefObject::DecObjRef(tmpOperation);
292     }
293     if (GetTaskExecStatus() == SyncTaskContext::RUNNING) {
294         // clear syncing task.
295         stateMachine_->CommErrAbort();
296     }
297     // reset last push status for sync merge
298     ResetLastPushTaskStatus();
299 }
300 
EnableClearRemoteStaleData(bool enable)301 void SingleVerSyncTaskContext::EnableClearRemoteStaleData(bool enable)
302 {
303     needClearRemoteStaleData_ = enable;
304 }
305 
IsNeedClearRemoteStaleData() const306 bool SingleVerSyncTaskContext::IsNeedClearRemoteStaleData() const
307 {
308     return needClearRemoteStaleData_;
309 }
310 
StartFeedDogForSync(uint32_t time,SyncDirectionFlag flag)311 bool SingleVerSyncTaskContext::StartFeedDogForSync(uint32_t time, SyncDirectionFlag flag)
312 {
313     return stateMachine_->StartFeedDogForSync(time, flag);
314 }
315 
StopFeedDogForSync(SyncDirectionFlag flag)316 void SingleVerSyncTaskContext::StopFeedDogForSync(SyncDirectionFlag flag)
317 {
318     stateMachine_->StopFeedDogForSync(flag);
319 }
320 
IsReceiveWaterMarkErr() const321 bool SingleVerSyncTaskContext::IsReceiveWaterMarkErr() const
322 {
323     return isReceiveWaterMarkErr_;
324 }
325 
SetReceiveWaterMarkErr(bool isErr)326 void SingleVerSyncTaskContext::SetReceiveWaterMarkErr(bool isErr)
327 {
328     isReceiveWaterMarkErr_ = isErr;
329 }
330 
SetRemoteSeccurityOption(SecurityOption secOption)331 void SingleVerSyncTaskContext::SetRemoteSeccurityOption(SecurityOption secOption)
332 {
333     std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
334     remoteSecOption_ = secOption;
335 }
336 
GetRemoteSeccurityOption() const337 SecurityOption SingleVerSyncTaskContext::GetRemoteSeccurityOption() const
338 {
339     std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
340     return remoteSecOption_;
341 }
342 
SetReceivcPermitCheck(bool isChecked)343 void SingleVerSyncTaskContext::SetReceivcPermitCheck(bool isChecked)
344 {
345     isReceivcPermitChecked_ = isChecked;
346 }
347 
GetReceivcPermitCheck() const348 bool SingleVerSyncTaskContext::GetReceivcPermitCheck() const
349 {
350     return isReceivcPermitChecked_;
351 }
352 
SetSendPermitCheck(bool isChecked)353 void SingleVerSyncTaskContext::SetSendPermitCheck(bool isChecked)
354 {
355     isSendPermitChecked_ = isChecked;
356 }
357 
GetSendPermitCheck() const358 bool SingleVerSyncTaskContext::GetSendPermitCheck() const
359 {
360     return isSendPermitChecked_;
361 }
362 
IsSkipTimeoutError(int errCode) const363 bool SingleVerSyncTaskContext::IsSkipTimeoutError(int errCode) const
364 {
365     if (errCode == -E_TIMEOUT && IsSyncTaskNeedRetry() && (GetRetryTime() < GetSyncRetryTimes())) { // LCOV_EXCL_BR_LINE
366         LOGE("[SingleVerSyncTaskContext] send message timeout error occurred");
367         return true;
368     } else {
369         return false;
370     }
371 }
372 
FindResponseSyncTarget(uint32_t responseSessionId) const373 bool SingleVerSyncTaskContext::FindResponseSyncTarget(uint32_t responseSessionId) const
374 {
375     std::lock_guard<std::mutex> lock(targetQueueLock_);
376     auto iter = std::find_if(responseTargetQueue_.begin(), responseTargetQueue_.end(),
377         [responseSessionId](const ISyncTarget *target) {
378             return target->GetResponseSessionId() == responseSessionId;
379         });
380     if (iter == responseTargetQueue_.end()) {
381         return false;
382     }
383     return true;
384 }
385 
SetQuery(const QuerySyncObject & query)386 void SingleVerSyncTaskContext::SetQuery(const QuerySyncObject &query)
387 {
388     std::lock_guard<std::mutex> autoLock(queryMutex_);
389     query_ = query;
390 }
391 
GetQuery() const392 QuerySyncObject SingleVerSyncTaskContext::GetQuery() const
393 {
394     std::lock_guard<std::mutex> autoLock(queryMutex_);
395     return query_;
396 }
397 
SetQuerySync(bool isQuerySync)398 void SingleVerSyncTaskContext::SetQuerySync(bool isQuerySync)
399 {
400     isQuerySync_ = isQuerySync;
401 }
402 
IsQuerySync() const403 bool SingleVerSyncTaskContext::IsQuerySync() const
404 {
405     return isQuerySync_;
406 }
407 
GetRemoteCompressAlgo() const408 std::set<CompressAlgorithm> SingleVerSyncTaskContext::GetRemoteCompressAlgo() const
409 {
410     std::lock_guard<std::mutex> autoLock(remoteDbAbilityLock_);
411     std::set<CompressAlgorithm> compressAlgoSet;
412     for (const auto &algo : SyncConfig::COMPRESSALGOMAP) {
413         if (remoteDbAbility_.GetAbilityItem(algo.second) == SUPPORT_MARK) {
414             compressAlgoSet.insert(static_cast<CompressAlgorithm>(algo.first));
415         }
416     }
417     return compressAlgoSet;
418 }
419 
GetRemoteCompressAlgoStr() const420 std::string SingleVerSyncTaskContext::GetRemoteCompressAlgoStr() const
421 {
422     static std::map<CompressAlgorithm, std::string> algoMap = {{CompressAlgorithm::ZLIB, "zlib"}};
423     std::set<CompressAlgorithm> remoteCompressAlgoSet = GetRemoteCompressAlgo();
424     if (remoteCompressAlgoSet.empty()) {
425         return "none";
426     }
427     std::string currentAlgoStr;
428     for (const auto &algo : remoteCompressAlgoSet) {
429         auto iter = algoMap.find(algo);
430         if (iter != algoMap.end()) {
431             currentAlgoStr += algoMap[algo] + ",";
432         }
433     }
434     if (currentAlgoStr.empty()) {
435         return "";
436     }
437     return currentAlgoStr.substr(0, currentAlgoStr.length() - 1);
438 }
439 
SetDbAbility(DbAbility & remoteDbAbility)440 void SingleVerSyncTaskContext::SetDbAbility(DbAbility &remoteDbAbility)
441 {
442     {
443         std::lock_guard<std::mutex> autoLock(remoteDbAbilityLock_);
444         remoteDbAbility_ = remoteDbAbility;
445     }
446     LOGI("[SingleVerSyncTaskContext] set dev=%s compressAlgo=%s, IsSupAllPredicateQuery=%u,"
447         "IsSupSubscribeQuery=%u, inKeys=%u",
448         STR_MASK(GetDeviceId()), GetRemoteCompressAlgoStr().c_str(),
449         remoteDbAbility.GetAbilityItem(SyncConfig::ALLPREDICATEQUERY),
450         remoteDbAbility.GetAbilityItem(SyncConfig::SUBSCRIBEQUERY),
451         remoteDbAbility.GetAbilityItem(SyncConfig::INKEYS_QUERY));
452 }
453 
ChooseCompressAlgo() const454 CompressAlgorithm SingleVerSyncTaskContext::ChooseCompressAlgo() const
455 {
456     std::set<CompressAlgorithm> remoteAlgo = GetRemoteCompressAlgo();
457     if (remoteAlgo.empty()) {
458         return CompressAlgorithm::NONE;
459     }
460     std::set<CompressAlgorithm> localAlgorithmSet;
461     (void)(static_cast<SyncGenericInterface *>(syncInterface_))->GetCompressionAlgo(localAlgorithmSet);
462     std::set<CompressAlgorithm> algoIntersection;
463     set_intersection(remoteAlgo.begin(), remoteAlgo.end(), localAlgorithmSet.begin(), localAlgorithmSet.end(),
464         inserter(algoIntersection, algoIntersection.begin()));
465     if (algoIntersection.empty()) {
466         return CompressAlgorithm::NONE;
467     }
468     return *(algoIntersection.begin());
469 }
470 
IsNotSupportAbility(const AbilityItem & abilityItem) const471 bool SingleVerSyncTaskContext::IsNotSupportAbility(const AbilityItem &abilityItem) const
472 {
473     std::lock_guard<std::mutex> autoLock(remoteDbAbilityLock_);
474     return remoteDbAbility_.GetAbilityItem(abilityItem) != SUPPORT_MARK;
475 }
476 
SetSubscribeManager(std::shared_ptr<SubscribeManager> & subManager)477 void SingleVerSyncTaskContext::SetSubscribeManager(std::shared_ptr<SubscribeManager> &subManager)
478 {
479     subManager_ = subManager;
480 }
481 
GetSubscribeManager() const482 std::shared_ptr<SubscribeManager> SingleVerSyncTaskContext::GetSubscribeManager() const
483 {
484     return subManager_;
485 }
DEFINE_OBJECT_TAG_FACILITIES(SingleVerSyncTaskContext)486 DEFINE_OBJECT_TAG_FACILITIES(SingleVerSyncTaskContext)
487 
488 bool SingleVerSyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
489 {
490     SyncOperation *operation = GetAndIncSyncOperation();
491     bool res = IsCurrentSyncTaskCanBeSkippedInner(operation);
492     RefObject::DecObjRef(operation);
493     return res;
494 }
495 
SaveLastPushTaskExecStatus(int finalStatus)496 void SingleVerSyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
497 {
498     if (IsTargetQueueEmpty()) {
499         LOGD("sync que is empty, reset last push status");
500         ResetLastPushTaskStatus();
501         return;
502     }
503     if (mode_ == SyncModeType::PUSH || mode_ == SyncModeType::PUSH_AND_PULL || mode_ == SyncModeType::RESPONSE_PULL) {
504         lastFullSyncTaskStatus_ = finalStatus;
505     } else if (mode_ == SyncModeType::QUERY_PUSH || mode_ == SyncModeType::QUERY_PUSH_PULL) {
506         std::lock_guard<std::mutex> autoLock(queryTaskStatusMutex_);
507         lastQuerySyncTaskStatusMap_[syncOperation_->GetQueryId()] = finalStatus;
508     }
509 }
510 
GetCorrectedSendWaterMarkForCurrentTask(const SyncOperation * operation,uint64_t & waterMark) const511 int SingleVerSyncTaskContext::GetCorrectedSendWaterMarkForCurrentTask(const SyncOperation *operation,
512     uint64_t &waterMark) const
513 {
514     if (operation != nullptr && operation->IsQuerySync()) {
515         LOGD("Is QuerySync");
516         int errCode = static_cast<SingleVerSyncStateMachine *>(stateMachine_)->GetSendQueryWaterMark(
517             operation->GetQueryId(), deviceId_,
518             lastFullSyncTaskStatus_ == SyncOperation::OP_FINISHED_ALL, waterMark);
519         if (errCode != E_OK) {
520             return errCode;
521         }
522     } else {
523         LOGD("Not QuerySync");
524         static_cast<SingleVerSyncStateMachine *>(stateMachine_)->GetLocalWaterMark(deviceId_, waterMark);
525     }
526     return E_OK;
527 }
528 
ResetLastPushTaskStatus()529 void SingleVerSyncTaskContext::ResetLastPushTaskStatus()
530 {
531     lastFullSyncTaskStatus_ = SyncOperation::OP_WAITING;
532     std::lock_guard<std::mutex> autoLock(queryTaskStatusMutex_);
533     lastQuerySyncTaskStatusMap_.clear();
534 }
535 
SetCommNormal(bool isCommNormal)536 void SingleVerSyncTaskContext::SetCommNormal(bool isCommNormal)
537 {
538     isCommNormal_ = isCommNormal;
539 }
540 
IsCurrentSyncTaskCanBeSkippedInner(const SyncOperation * operation) const541 bool SingleVerSyncTaskContext::IsCurrentSyncTaskCanBeSkippedInner(const SyncOperation *operation) const
542 {
543     if (mode_ == SyncModeType::PUSH) {
544         if (lastFullSyncTaskStatus_ != SyncOperation::OP_FINISHED_ALL) {
545             return false;
546         }
547         if (operation == nullptr) {
548             return true;
549         }
550     } else if (mode_ == SyncModeType::QUERY_PUSH) {
551         if (operation == nullptr) {
552             return true;
553         }
554         std::lock_guard<std::mutex> autoLock(queryTaskStatusMutex_);
555         auto it = lastQuerySyncTaskStatusMap_.find(operation->GetQueryId());
556         if (it == lastQuerySyncTaskStatusMap_.end()) {
557             // no last query_push and push
558             if (lastFullSyncTaskStatus_ != SyncOperation::OP_FINISHED_ALL) {
559                 LOGD("no prev query push or successful prev push");
560                 return false;
561             }
562         } else {
563             if (it->second != SyncOperation::OP_FINISHED_ALL) {
564                 LOGD("last query push status = %d.", it->second);
565                 return false;
566             }
567         }
568     } else {
569         return false;
570     }
571 
572     Timestamp maxTimestampInDb;
573     syncInterface_->GetMaxTimestamp(maxTimestampInDb);
574     uint64_t localWaterMark = 0;
575     int errCode = GetCorrectedSendWaterMarkForCurrentTask(operation, localWaterMark);
576     if (errCode != E_OK) {
577         LOGE("GetLocalWaterMark in state machine failed: %d", errCode);
578         return false;
579     }
580     if (localWaterMark > maxTimestampInDb) {
581         LOGI("skip current push task, deviceId_ = %s, localWaterMark = %" PRIu64 ", maxTimestampInDb = %" PRIu64,
582             STR_MASK(deviceId_), localWaterMark, maxTimestampInDb);
583         return true;
584     }
585     return false;
586 }
587 
StartFeedDogForGetData(uint32_t sessionId)588 void SingleVerSyncTaskContext::StartFeedDogForGetData(uint32_t sessionId)
589 {
590     stateMachine_->StartFeedDogForGetData(sessionId);
591 }
592 
StopFeedDogForGetData()593 void SingleVerSyncTaskContext::StopFeedDogForGetData()
594 {
595     stateMachine_->StopFeedDogForGetData();
596 }
597 
GetResponseTaskCount()598 int32_t SingleVerSyncTaskContext::GetResponseTaskCount()
599 {
600     std::lock_guard<std::mutex> autoLock(targetQueueLock_);
601     int32_t taskCount = static_cast<int32_t>(responseTargetQueue_.size());
602     if (responseSessionId_ != 0) {
603         taskCount++;
604     }
605     return taskCount;
606 }
607 } // namespace DistributedDB
608