1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_sync_task_context.h"
17
18 #include <algorithm>
19 #include "db_common.h"
20 #include "db_dfx_adapter.h"
21 #include "db_errno.h"
22 #include "isyncer.h"
23 #include "log_print.h"
24 #include "single_ver_sync_state_machine.h"
25 #include "single_ver_sync_target.h"
26 #include "sync_types.h"
27
28 namespace DistributedDB {
SingleVerSyncTaskContext()29 SingleVerSyncTaskContext::SingleVerSyncTaskContext()
30 : SyncTaskContext(),
31 token_(nullptr),
32 endMark_(0),
33 needClearRemoteStaleData_(false)
34 {}
35
~SingleVerSyncTaskContext()36 SingleVerSyncTaskContext::~SingleVerSyncTaskContext()
37 {
38 token_ = nullptr;
39 subManager_ = nullptr;
40 }
41
Initialize(const std::string & deviceId,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)42 int SingleVerSyncTaskContext::Initialize(const std::string &deviceId, ISyncInterface *syncInterface,
43 const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
44 {
45 if (deviceId.empty() || syncInterface == nullptr || metadata == nullptr ||
46 communicator == nullptr) {
47 return -E_INVALID_ARGS;
48 }
49 stateMachine_ = new (std::nothrow) SingleVerSyncStateMachine;
50 if (stateMachine_ == nullptr) {
51 return -E_OUT_OF_MEMORY;
52 }
53 deviceId_ = deviceId;
54 std::vector<uint8_t> dbIdentifier = syncInterface->GetIdentifier();
55 dbIdentifier.resize(3); // only show 3 bytes
56 syncActionName_ = DBDfxAdapter::SYNC_ACTION + "_" +
57 DBCommon::VectorToHexString(dbIdentifier) + "_" + deviceId_.c_str();
58 TimerAction timeOutCallback;
59 int errCode = stateMachine_->Initialize(this, syncInterface, metadata, communicator);
60 if (errCode != E_OK) {
61 LOGE("[SingleVerSyncTaskContext] stateMachine Initialize failed, err %d.", errCode);
62 goto ERROR_OUT;
63 }
64
65 timeHelper_ = std::make_unique<TimeHelper>();
66 errCode = timeHelper_->Initialize(syncInterface, metadata);
67 if (errCode != E_OK) {
68 LOGE("[SingleVerSyncTaskContext] timeHelper Initialize failed, err %d.", errCode);
69 goto ERROR_OUT;
70 }
71 timeOutCallback = [stateMachine = static_cast<SingleVerSyncStateMachine *>(stateMachine_)](TimerId timerId) {
72 return stateMachine->TimeoutCallback(timerId);
73 };
74 SetTimeoutCallback(timeOutCallback);
75
76 syncInterface_ = syncInterface;
77 communicator_ = communicator;
78 taskExecStatus_ = INIT;
79 OnKill([this]() { this->KillWait(); });
80 {
81 std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
82 synTaskContextSet_.insert(this);
83 }
84 return errCode;
85
86 ERROR_OUT:
87 delete stateMachine_;
88 stateMachine_ = nullptr;
89 return errCode;
90 }
91
AddSyncOperation(SyncOperation * operation)92 int SingleVerSyncTaskContext::AddSyncOperation(SyncOperation *operation)
93 {
94 if (operation == nullptr) {
95 return -E_INVALID_ARGS;
96 }
97
98 // If auto sync, just update the end watermark
99 if (operation->IsAutoSync()) {
100 std::lock_guard<std::mutex> lock(targetQueueLock_);
101 bool isQuerySync = operation->IsQuerySync();
102 std::string queryId = operation->GetQueryId();
103 auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
104 [isQuerySync, queryId](const ISyncTarget *target) {
105 if (target == nullptr) {
106 return false;
107 }
108 if (isQuerySync) {
109 SyncOperation *tmpOperation = nullptr;
110 target->GetSyncOperation(tmpOperation);
111 return (tmpOperation != nullptr && tmpOperation->GetQueryId() == queryId) && target->IsAutoSync();
112 }
113 return target->IsAutoSync();
114 });
115 if (iter != requestTargetQueue_.end()) {
116 static_cast<SingleVerSyncTarget *>(*iter)->SetEndWaterMark(timeHelper_->GetTime());
117 operation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
118 return E_OK;
119 }
120 }
121
122 auto *newTarget = new (std::nothrow) SingleVerSyncTarget;
123 if (newTarget == nullptr) {
124 return -E_OUT_OF_MEMORY;
125 }
126 newTarget->SetSyncOperation(operation);
127 Timestamp timstamp = timeHelper_->GetTime();
128 newTarget->SetEndWaterMark(timstamp);
129 newTarget->SetTaskType(ISyncTarget::REQUEST);
130 AddSyncTarget(newTarget);
131 return E_OK;
132 }
133
SetEndMark(WaterMark endMark)134 void SingleVerSyncTaskContext::SetEndMark(WaterMark endMark)
135 {
136 endMark_ = endMark;
137 }
138
GetEndMark() const139 WaterMark SingleVerSyncTaskContext::GetEndMark() const
140 {
141 return endMark_;
142 }
143
GetContinueToken(ContinueToken & outToken) const144 void SingleVerSyncTaskContext::GetContinueToken(ContinueToken &outToken) const
145 {
146 outToken = token_;
147 }
148
SetContinueToken(ContinueToken token)149 void SingleVerSyncTaskContext::SetContinueToken(ContinueToken token)
150 {
151 token_ = token;
152 return;
153 }
154
ReleaseContinueToken()155 void SingleVerSyncTaskContext::ReleaseContinueToken()
156 {
157 if (token_ != nullptr) {
158 static_cast<SyncGenericInterface *>(syncInterface_)->ReleaseContinueToken(token_);
159 token_ = nullptr;
160 }
161 }
162
PopResponseTarget(SingleVerSyncTarget & target)163 int SingleVerSyncTaskContext::PopResponseTarget(SingleVerSyncTarget &target)
164 {
165 std::lock_guard<std::mutex> lock(targetQueueLock_);
166 LOGD("[SingleVerSyncTaskContext] GetFrontExtWaterMark size = %zu", responseTargetQueue_.size());
167 if (!responseTargetQueue_.empty()) {
168 ISyncTarget *tmpTarget = responseTargetQueue_.front();
169 responseTargetQueue_.pop_front();
170 target = *(static_cast<SingleVerSyncTarget *>(tmpTarget));
171 delete tmpTarget;
172 tmpTarget = nullptr;
173 return E_OK;
174 }
175 return -E_LENGTH_ERROR;
176 }
177
GetRspTargetQueueSize() const178 int SingleVerSyncTaskContext::GetRspTargetQueueSize() const
179 {
180 std::lock_guard<std::mutex> lock(targetQueueLock_);
181 return responseTargetQueue_.size();
182 }
183
SetResponseSessionId(uint32_t responseSessionId)184 void SingleVerSyncTaskContext::SetResponseSessionId(uint32_t responseSessionId)
185 {
186 responseSessionId_ = responseSessionId;
187 }
188
GetResponseSessionId() const189 uint32_t SingleVerSyncTaskContext::GetResponseSessionId() const
190 {
191 return responseSessionId_;
192 }
193
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)194 void SingleVerSyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
195 {
196 const SingleVerSyncTarget *targetTmp = static_cast<const SingleVerSyncTarget *>(target);
197 SyncTaskContext::CopyTargetData(target, taskParam);
198 mode_ = targetTmp->GetMode();
199 endMark_ = targetTmp->GetEndWaterMark();
200 if (mode_ == SyncModeType::RESPONSE_PULL) {
201 responseSessionId_ = targetTmp->GetResponseSessionId();
202 }
203 SetQuery(targetTmp->GetQuery());
204 isQuerySync_ = targetTmp->IsQuerySync();
205 }
206
Clear()207 void SingleVerSyncTaskContext::Clear()
208 {
209 retryTime_ = 0;
210 ClearSyncOperation();
211 SyncTaskContext::Clear();
212 SetMode(SyncModeType::INVALID_MODE);
213 syncId_ = 0;
214 isAutoSync_ = false;
215 SetOperationStatus(SyncOperation::OP_WAITING);
216 SetEndMark(0);
217 SetResponseSessionId(0);
218 {
219 std::lock_guard<std::mutex> autoLock(queryMutex_);
220 query_ = QuerySyncObject();
221 }
222 isQuerySync_ = false;
223 }
224
Abort(int status)225 void SingleVerSyncTaskContext::Abort(int status)
226 {
227 {
228 std::lock_guard<std::mutex> lock(operationLock_);
229 if (syncOperation_ != nullptr) {
230 syncOperation_->SetStatus(deviceId_, status, GetCommErrCode());
231 if ((status >= SyncOperation::OP_FINISHED_ALL)) {
232 UnlockObj();
233 if (syncOperation_->CheckIsAllFinished()) {
234 syncOperation_->Finished();
235 }
236 LockObj();
237 }
238 }
239 }
240 StopFeedDogForSync(SyncDirectionFlag::SEND);
241 StopFeedDogForSync(SyncDirectionFlag::RECEIVE);
242 Clear();
243 }
244
ClearAllSyncTask()245 void SingleVerSyncTaskContext::ClearAllSyncTask()
246 {
247 // clear request queue sync task and responsequeue first.
248 std::list<ISyncTarget *> targetQueue;
249 {
250 std::lock_guard<std::mutex> lock(targetQueueLock_);
251 LOGI("[SingleVerSyncTaskContext] request taskcount=%zu, responsecount=%zu", requestTargetQueue_.size(),
252 responseTargetQueue_.size());
253 while (!requestTargetQueue_.empty()) {
254 ISyncTarget *tmpTarget = requestTargetQueue_.front();
255 requestTargetQueue_.pop_front();
256 SyncOperation *tmpInfOperation = nullptr;
257 tmpTarget->GetSyncOperation(tmpInfOperation);
258 RefObject::IncObjRef(tmpInfOperation);
259 targetQueue.push_back(tmpTarget);
260 }
261 while (!responseTargetQueue_.empty()) {
262 ISyncTarget *tmpTarget = responseTargetQueue_.front();
263 responseTargetQueue_.pop_front();
264 delete tmpTarget;
265 tmpTarget = nullptr;
266 }
267 }
268 while (!targetQueue.empty()) {
269 ISyncTarget *target = targetQueue.front();
270 targetQueue.pop_front();
271 SyncOperation *tmpOperation = nullptr;
272 target->GetSyncOperation(tmpOperation);
273 if (tmpOperation == nullptr) {
274 LOGE("[ClearAllSyncTask] tmpOperation is nullptr");
275 continue; // not exit this scene
276 }
277 LOGI("[SingleVerSyncTaskContext] killing syncId=%d,dev=%s", tmpOperation->GetSyncId(), STR_MASK(deviceId_));
278 if (target->IsAutoSync()) {
279 tmpOperation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
280 } else {
281 tmpOperation->SetStatus(deviceId_, SyncOperation::OP_COMM_ABNORMAL);
282 }
283 if (tmpOperation->CheckIsAllFinished()) {
284 tmpOperation->Finished();
285 }
286 delete target;
287 target = nullptr;
288 RefObject::DecObjRef(tmpOperation);
289 }
290 if (GetTaskExecStatus() == SyncTaskContext::RUNNING) {
291 // clear syncing task.
292 stateMachine_->CommErrAbort();
293 SetCommFailErrCode(static_cast<int>(SyncOperation::OP_COMM_ABNORMAL));
294 }
295 // reset last push status for sync merge
296 ResetLastPushTaskStatus();
297 }
298
EnableClearRemoteStaleData(bool enable)299 void SingleVerSyncTaskContext::EnableClearRemoteStaleData(bool enable)
300 {
301 needClearRemoteStaleData_ = enable;
302 }
303
IsNeedClearRemoteStaleData() const304 bool SingleVerSyncTaskContext::IsNeedClearRemoteStaleData() const
305 {
306 return needClearRemoteStaleData_;
307 }
308
StartFeedDogForSync(uint32_t time,SyncDirectionFlag flag)309 bool SingleVerSyncTaskContext::StartFeedDogForSync(uint32_t time, SyncDirectionFlag flag)
310 {
311 return stateMachine_->StartFeedDogForSync(time, flag);
312 }
313
StopFeedDogForSync(SyncDirectionFlag flag)314 void SingleVerSyncTaskContext::StopFeedDogForSync(SyncDirectionFlag flag)
315 {
316 stateMachine_->StopFeedDogForSync(flag);
317 }
318
IsReceiveWaterMarkErr() const319 bool SingleVerSyncTaskContext::IsReceiveWaterMarkErr() const
320 {
321 return isReceiveWaterMarkErr_;
322 }
323
SetReceiveWaterMarkErr(bool isErr)324 void SingleVerSyncTaskContext::SetReceiveWaterMarkErr(bool isErr)
325 {
326 isReceiveWaterMarkErr_ = isErr;
327 }
328
SetRemoteSeccurityOption(SecurityOption secOption)329 void SingleVerSyncTaskContext::SetRemoteSeccurityOption(SecurityOption secOption)
330 {
331 std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
332 remoteSecOption_ = secOption;
333 }
334
GetRemoteSeccurityOption() const335 SecurityOption SingleVerSyncTaskContext::GetRemoteSeccurityOption() const
336 {
337 std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
338 return remoteSecOption_;
339 }
340
SetReceivcPermitCheck(bool isChecked)341 void SingleVerSyncTaskContext::SetReceivcPermitCheck(bool isChecked)
342 {
343 isReceivcPermitChecked_ = isChecked;
344 }
345
GetReceivcPermitCheck() const346 bool SingleVerSyncTaskContext::GetReceivcPermitCheck() const
347 {
348 return isReceivcPermitChecked_;
349 }
350
SetSendPermitCheck(bool isChecked)351 void SingleVerSyncTaskContext::SetSendPermitCheck(bool isChecked)
352 {
353 isSendPermitChecked_ = isChecked;
354 }
355
GetSendPermitCheck() const356 bool SingleVerSyncTaskContext::GetSendPermitCheck() const
357 {
358 return isSendPermitChecked_;
359 }
360
IsSkipTimeoutError(int errCode) const361 bool SingleVerSyncTaskContext::IsSkipTimeoutError(int errCode) const
362 {
363 if (errCode == -E_TIMEOUT && IsSyncTaskNeedRetry() && (GetRetryTime() < GetSyncRetryTimes())) { // LCOV_EXCL_BR_LINE
364 LOGE("[SingleVerSyncTaskContext] send message timeout error occurred");
365 return true;
366 } else {
367 return false;
368 }
369 }
370
FindResponseSyncTarget(uint32_t responseSessionId) const371 bool SingleVerSyncTaskContext::FindResponseSyncTarget(uint32_t responseSessionId) const
372 {
373 std::lock_guard<std::mutex> lock(targetQueueLock_);
374 auto iter = std::find_if(responseTargetQueue_.begin(), responseTargetQueue_.end(),
375 [responseSessionId](const ISyncTarget *target) {
376 return target->GetResponseSessionId() == responseSessionId;
377 });
378 if (iter == responseTargetQueue_.end()) {
379 return false;
380 }
381 return true;
382 }
383
SetQuery(const QuerySyncObject & query)384 void SingleVerSyncTaskContext::SetQuery(const QuerySyncObject &query)
385 {
386 std::lock_guard<std::mutex> autoLock(queryMutex_);
387 query_ = query;
388 query_.SetUseLocalSchema(mode_ != SyncModeType::RESPONSE_PULL);
389 query_.SetRemoteDev(deviceId_);
390 }
391
GetQuery() const392 QuerySyncObject SingleVerSyncTaskContext::GetQuery() const
393 {
394 std::lock_guard<std::mutex> autoLock(queryMutex_);
395 return query_;
396 }
397
SetQuerySync(bool isQuerySync)398 void SingleVerSyncTaskContext::SetQuerySync(bool isQuerySync)
399 {
400 isQuerySync_ = isQuerySync;
401 }
402
IsQuerySync() const403 bool SingleVerSyncTaskContext::IsQuerySync() const
404 {
405 return isQuerySync_;
406 }
407
GetRemoteCompressAlgo() const408 std::set<CompressAlgorithm> SingleVerSyncTaskContext::GetRemoteCompressAlgo() const
409 {
410 std::lock_guard<std::mutex> autoLock(remoteDbAbilityLock_);
411 std::set<CompressAlgorithm> compressAlgoSet;
412 for (const auto &algo : SyncConfig::COMPRESSALGOMAP) {
413 if (remoteDbAbility_.GetAbilityItem(algo.second) == SUPPORT_MARK) {
414 compressAlgoSet.insert(static_cast<CompressAlgorithm>(algo.first));
415 }
416 }
417 return compressAlgoSet;
418 }
419
GetRemoteCompressAlgoStr() const420 std::string SingleVerSyncTaskContext::GetRemoteCompressAlgoStr() const
421 {
422 static std::map<CompressAlgorithm, std::string> algoMap = {{CompressAlgorithm::ZLIB, "zlib"}};
423 std::set<CompressAlgorithm> remoteCompressAlgoSet = GetRemoteCompressAlgo();
424 if (remoteCompressAlgoSet.empty()) {
425 return "none";
426 }
427 std::string currentAlgoStr;
428 for (const auto &algo : remoteCompressAlgoSet) {
429 auto iter = algoMap.find(algo);
430 if (iter != algoMap.end()) {
431 currentAlgoStr += algoMap[algo] + ",";
432 }
433 }
434 if (currentAlgoStr.empty()) {
435 return "";
436 }
437 return currentAlgoStr.substr(0, currentAlgoStr.length() - 1);
438 }
439
SetDbAbility(DbAbility & remoteDbAbility)440 void SingleVerSyncTaskContext::SetDbAbility(DbAbility &remoteDbAbility)
441 {
442 {
443 std::lock_guard<std::mutex> autoLock(remoteDbAbilityLock_);
444 remoteDbAbility_ = remoteDbAbility;
445 }
446 LOGI("[SingleVerSyncTaskContext] set dev=%s compressAlgo=%s, IsSupAllPredicateQuery=%u,"
447 "IsSupSubscribeQuery=%u, inKeys=%u",
448 STR_MASK(GetDeviceId()), GetRemoteCompressAlgoStr().c_str(),
449 remoteDbAbility.GetAbilityItem(SyncConfig::ALLPREDICATEQUERY),
450 remoteDbAbility.GetAbilityItem(SyncConfig::SUBSCRIBEQUERY),
451 remoteDbAbility.GetAbilityItem(SyncConfig::INKEYS_QUERY));
452 }
453
ChooseCompressAlgo() const454 CompressAlgorithm SingleVerSyncTaskContext::ChooseCompressAlgo() const
455 {
456 std::set<CompressAlgorithm> remoteAlgo = GetRemoteCompressAlgo();
457 if (remoteAlgo.empty()) {
458 return CompressAlgorithm::NONE;
459 }
460 std::set<CompressAlgorithm> localAlgorithmSet;
461 (void)(static_cast<SyncGenericInterface *>(syncInterface_))->GetCompressionAlgo(localAlgorithmSet);
462 std::set<CompressAlgorithm> algoIntersection;
463 set_intersection(remoteAlgo.begin(), remoteAlgo.end(), localAlgorithmSet.begin(), localAlgorithmSet.end(),
464 inserter(algoIntersection, algoIntersection.begin()));
465 if (algoIntersection.empty()) {
466 return CompressAlgorithm::NONE;
467 }
468 return *(algoIntersection.begin());
469 }
470
IsNotSupportAbility(const AbilityItem & abilityItem) const471 bool SingleVerSyncTaskContext::IsNotSupportAbility(const AbilityItem &abilityItem) const
472 {
473 std::lock_guard<std::mutex> autoLock(remoteDbAbilityLock_);
474 return remoteDbAbility_.GetAbilityItem(abilityItem) != SUPPORT_MARK;
475 }
476
SetSubscribeManager(std::shared_ptr<SubscribeManager> & subManager)477 void SingleVerSyncTaskContext::SetSubscribeManager(std::shared_ptr<SubscribeManager> &subManager)
478 {
479 subManager_ = subManager;
480 }
481
GetSubscribeManager() const482 std::shared_ptr<SubscribeManager> SingleVerSyncTaskContext::GetSubscribeManager() const
483 {
484 return subManager_;
485 }
DEFINE_OBJECT_TAG_FACILITIES(SingleVerSyncTaskContext)486 DEFINE_OBJECT_TAG_FACILITIES(SingleVerSyncTaskContext)
487
488 bool SingleVerSyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
489 {
490 SyncOperation *operation = GetAndIncSyncOperation();
491 bool res = IsCurrentSyncTaskCanBeSkippedInner(operation);
492 RefObject::DecObjRef(operation);
493 return res;
494 }
495
SaveLastPushTaskExecStatus(int finalStatus)496 void SingleVerSyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
497 {
498 if (IsTargetQueueEmpty()) {
499 LOGD("sync que is empty, reset last push status");
500 ResetLastPushTaskStatus();
501 return;
502 }
503 if (mode_ == SyncModeType::PUSH || mode_ == SyncModeType::PUSH_AND_PULL || mode_ == SyncModeType::RESPONSE_PULL) {
504 lastFullSyncTaskStatus_ = finalStatus;
505 } else if (mode_ == SyncModeType::QUERY_PUSH || mode_ == SyncModeType::QUERY_PUSH_PULL) {
506 std::lock_guard<std::mutex> autoLock(queryTaskStatusMutex_);
507 lastQuerySyncTaskStatusMap_[syncOperation_->GetQueryId()] = finalStatus;
508 }
509 }
510
GetCorrectedSendWaterMarkForCurrentTask(const SyncOperation * operation,uint64_t & waterMark) const511 int SingleVerSyncTaskContext::GetCorrectedSendWaterMarkForCurrentTask(const SyncOperation *operation,
512 uint64_t &waterMark) const
513 {
514 if (operation != nullptr && operation->IsQuerySync()) {
515 LOGD("Is QuerySync");
516 int errCode = static_cast<SingleVerSyncStateMachine *>(stateMachine_)->GetSendQueryWaterMark(
517 operation->GetQueryId(), deviceId_,
518 lastFullSyncTaskStatus_ == SyncOperation::OP_FINISHED_ALL, waterMark);
519 if (errCode != E_OK) {
520 return errCode;
521 }
522 } else {
523 LOGD("Not QuerySync");
524 static_cast<SingleVerSyncStateMachine *>(stateMachine_)->GetLocalWaterMark(deviceId_, waterMark);
525 }
526 return E_OK;
527 }
528
ResetLastPushTaskStatus()529 void SingleVerSyncTaskContext::ResetLastPushTaskStatus()
530 {
531 lastFullSyncTaskStatus_ = SyncOperation::OP_WAITING;
532 std::lock_guard<std::mutex> autoLock(queryTaskStatusMutex_);
533 lastQuerySyncTaskStatusMap_.clear();
534 }
535
SetCommNormal(bool isCommNormal)536 void SingleVerSyncTaskContext::SetCommNormal(bool isCommNormal)
537 {
538 isCommNormal_ = isCommNormal;
539 }
540
IsCurrentSyncTaskCanBeSkippedInner(const SyncOperation * operation) const541 bool SingleVerSyncTaskContext::IsCurrentSyncTaskCanBeSkippedInner(const SyncOperation *operation) const
542 {
543 if (mode_ == SyncModeType::PUSH) {
544 if (lastFullSyncTaskStatus_ != SyncOperation::OP_FINISHED_ALL) {
545 return false;
546 }
547 if (operation == nullptr) {
548 return true;
549 }
550 } else if (mode_ == SyncModeType::QUERY_PUSH) {
551 if (operation == nullptr) {
552 return true;
553 }
554 std::lock_guard<std::mutex> autoLock(queryTaskStatusMutex_);
555 auto it = lastQuerySyncTaskStatusMap_.find(operation->GetQueryId());
556 if (it == lastQuerySyncTaskStatusMap_.end()) {
557 // no last query_push and push
558 if (lastFullSyncTaskStatus_ != SyncOperation::OP_FINISHED_ALL) {
559 LOGD("no prev query push or successful prev push");
560 return false;
561 }
562 } else {
563 if (it->second != SyncOperation::OP_FINISHED_ALL) {
564 LOGD("last query push status = %d.", it->second);
565 return false;
566 }
567 }
568 } else {
569 return false;
570 }
571
572 Timestamp maxTimestampInDb;
573 syncInterface_->GetMaxTimestamp(maxTimestampInDb);
574 uint64_t localWaterMark = 0;
575 int errCode = GetCorrectedSendWaterMarkForCurrentTask(operation, localWaterMark);
576 if (errCode != E_OK) {
577 LOGE("GetLocalWaterMark in state machine failed: %d", errCode);
578 return false;
579 }
580 if (localWaterMark > maxTimestampInDb) {
581 LOGI("skip current push task, deviceId_ = %s, localWaterMark = %" PRIu64 ", maxTimestampInDb = %" PRIu64,
582 STR_MASK(deviceId_), localWaterMark, maxTimestampInDb);
583 return true;
584 }
585 return false;
586 }
587
StartFeedDogForGetData(uint32_t sessionId)588 void SingleVerSyncTaskContext::StartFeedDogForGetData(uint32_t sessionId)
589 {
590 stateMachine_->StartFeedDogForGetData(sessionId);
591 }
592
StopFeedDogForGetData()593 void SingleVerSyncTaskContext::StopFeedDogForGetData()
594 {
595 stateMachine_->StopFeedDogForGetData();
596 }
597
UpdateOperationFinishedCount(const std::string & deviceId,uint32_t count)598 void SingleVerSyncTaskContext::UpdateOperationFinishedCount(const std::string &deviceId, uint32_t count)
599 {
600 std::lock_guard<std::mutex> lock(operationLock_);
601 if (syncOperation_ != nullptr) {
602 syncOperation_->UpdateFinishedCount(deviceId, count);
603 }
604 }
605
SetOperationSyncProcessTotal(const std::string & deviceId,uint32_t total)606 void SingleVerSyncTaskContext::SetOperationSyncProcessTotal(const std::string &deviceId, uint32_t total)
607 {
608 std::lock_guard<std::mutex> lock(operationLock_);
609 if (syncOperation_ != nullptr) {
610 syncOperation_->SetSyncProcessTotal(deviceId, total);
611 }
612 }
613
SetInitWaterMark(WaterMark waterMark)614 void SingleVerSyncTaskContext::SetInitWaterMark(WaterMark waterMark)
615 {
616 initWaterMark_ = waterMark;
617 }
618
GetInitWaterMark() const619 WaterMark SingleVerSyncTaskContext::GetInitWaterMark() const
620 {
621 return initWaterMark_;
622 }
623
SetInitDeletedMark(WaterMark waterMark)624 void SingleVerSyncTaskContext::SetInitDeletedMark(WaterMark waterMark)
625 {
626 initDeletedMark_ = waterMark;
627 }
628
GetInitDeletedMark() const629 WaterMark SingleVerSyncTaskContext::GetInitDeletedMark() const
630 {
631 return initDeletedMark_;
632 }
633
GetResponseTaskCount()634 int32_t SingleVerSyncTaskContext::GetResponseTaskCount()
635 {
636 std::lock_guard<std::mutex> autoLock(targetQueueLock_);
637 int32_t taskCount = static_cast<int32_t>(responseTargetQueue_.size());
638 if (responseSessionId_ != 0) {
639 taskCount++;
640 }
641 return taskCount;
642 }
643 } // namespace DistributedDB
644