1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_sync_task_context.h"
17
18 #include <algorithm>
19 #include "db_common.h"
20 #include "db_dfx_adapter.h"
21 #include "db_errno.h"
22 #include "isyncer.h"
23 #include "log_print.h"
24 #include "platform_specific.h"
25 #include "single_ver_sync_state_machine.h"
26 #include "single_ver_sync_target.h"
27 #include "sync_types.h"
28
29 namespace DistributedDB {
SingleVerSyncTaskContext()30 SingleVerSyncTaskContext::SingleVerSyncTaskContext()
31 : SyncTaskContext(),
32 token_(nullptr),
33 endMark_(0),
34 needClearRemoteStaleData_(false)
35 {}
36
~SingleVerSyncTaskContext()37 SingleVerSyncTaskContext::~SingleVerSyncTaskContext()
38 {
39 token_ = nullptr;
40 subManager_ = nullptr;
41 }
42
Initialize(const DeviceSyncTarget & target,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)43 int SingleVerSyncTaskContext::Initialize(const DeviceSyncTarget &target, ISyncInterface *syncInterface,
44 const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
45 {
46 if (target.device.empty() || syncInterface == nullptr || metadata == nullptr ||
47 communicator == nullptr) {
48 LOGE("[SingleVerSyncTaskContext] [Initialize] parameter is invalid.");
49 return -E_INVALID_ARGS;
50 }
51 stateMachine_ = new (std::nothrow) SingleVerSyncStateMachine;
52 if (stateMachine_ == nullptr) {
53 LOGE("[SingleVerSyncTaskContext] [Initialize] stateMachine_ is nullptr.");
54 return -E_OUT_OF_MEMORY;
55 }
56 deviceId_ = target.device;
57 targetUserId_ = target.userId;
58 std::vector<uint8_t> dbIdentifier = syncInterface->GetIdentifier();
59 dbIdentifier.resize(3); // only show 3 bytes
60 syncActionName_ = DBDfxAdapter::SYNC_ACTION + "_" +
61 DBCommon::VectorToHexString(dbIdentifier) + "_" + deviceId_.c_str();
62 TimerAction timeOutCallback;
63 int errCode = stateMachine_->Initialize(this, syncInterface, metadata, communicator);
64 if (errCode != E_OK) {
65 LOGE("[SingleVerSyncTaskContext] stateMachine Initialize failed, err %d.", errCode);
66 goto ERROR_OUT;
67 }
68
69 timeHelper_ = std::make_unique<TimeHelper>();
70 errCode = timeHelper_->Initialize(syncInterface, metadata);
71 if (errCode != E_OK) {
72 LOGE("[SingleVerSyncTaskContext] timeHelper Initialize failed, err %d.", errCode);
73 goto ERROR_OUT;
74 }
75 timeOutCallback = [stateMachine = static_cast<SingleVerSyncStateMachine *>(stateMachine_)](TimerId timerId) {
76 return stateMachine->TimeoutCallback(timerId);
77 };
78 SetTimeoutCallback(timeOutCallback);
79
80 syncInterface_ = syncInterface;
81 communicator_ = communicator;
82 taskExecStatus_ = INIT;
83 OnKill([this]() { this->KillWait(); });
84 {
85 std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
86 synTaskContextSet_.insert(this);
87 }
88 return errCode;
89
90 ERROR_OUT:
91 delete stateMachine_;
92 stateMachine_ = nullptr;
93 return errCode;
94 }
95
AddSyncOperation(SyncOperation * operation)96 int SingleVerSyncTaskContext::AddSyncOperation(SyncOperation *operation)
97 {
98 if (operation == nullptr) {
99 LOGE("[SingleVerSyncTaskContext] [AddSyncOperation] operation is nullptr.");
100 return -E_INVALID_ARGS;
101 }
102
103 // If auto sync, just update the end watermark
104 if (operation->IsAutoSync()) {
105 std::lock_guard<std::mutex> lock(targetQueueLock_);
106 bool isQuerySync = operation->IsQuerySync();
107 std::string queryId = operation->GetQueryId();
108 auto iter = std::find_if(requestTargetQueue_.begin(), requestTargetQueue_.end(),
109 [isQuerySync, queryId](const ISyncTarget *target) {
110 if (target == nullptr) {
111 return false;
112 }
113 if (isQuerySync) {
114 SyncOperation *tmpOperation = nullptr;
115 target->GetSyncOperation(tmpOperation);
116 return (tmpOperation != nullptr && tmpOperation->GetQueryId() == queryId) && target->IsAutoSync();
117 }
118 return target->IsAutoSync();
119 });
120 if (iter != requestTargetQueue_.end()) {
121 static_cast<SingleVerSyncTarget *>(*iter)->SetEndWaterMark(timeHelper_->GetTime());
122 operation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
123 return E_OK;
124 }
125 }
126
127 auto *newTarget = new (std::nothrow) SingleVerSyncTarget;
128 if (newTarget == nullptr) {
129 LOGE("[SingleVerSyncTaskContext] [AddSyncOperation] newTarget is nullptr.");
130 return -E_OUT_OF_MEMORY;
131 }
132 newTarget->SetSyncOperation(operation);
133 Timestamp timstamp = timeHelper_->GetTime();
134 newTarget->SetEndWaterMark(timstamp);
135 newTarget->SetTaskType(ISyncTarget::REQUEST);
136 AddSyncTarget(newTarget);
137 return E_OK;
138 }
139
SetEndMark(WaterMark endMark)140 void SingleVerSyncTaskContext::SetEndMark(WaterMark endMark)
141 {
142 endMark_ = endMark;
143 }
144
GetEndMark() const145 WaterMark SingleVerSyncTaskContext::GetEndMark() const
146 {
147 return endMark_;
148 }
149
GetContinueToken(ContinueToken & outToken) const150 void SingleVerSyncTaskContext::GetContinueToken(ContinueToken &outToken) const
151 {
152 outToken = token_;
153 }
154
SetContinueToken(ContinueToken token)155 void SingleVerSyncTaskContext::SetContinueToken(ContinueToken token)
156 {
157 token_ = token;
158 return;
159 }
160
ReleaseContinueToken()161 void SingleVerSyncTaskContext::ReleaseContinueToken()
162 {
163 if (token_ != nullptr) {
164 static_cast<SyncGenericInterface *>(syncInterface_)->ReleaseContinueToken(token_);
165 token_ = nullptr;
166 }
167 }
168
PopResponseTarget(SingleVerSyncTarget & target)169 int SingleVerSyncTaskContext::PopResponseTarget(SingleVerSyncTarget &target)
170 {
171 std::lock_guard<std::mutex> lock(targetQueueLock_);
172 LOGD("[SingleVerSyncTaskContext] GetFrontExtWaterMark size = %zu", responseTargetQueue_.size());
173 if (!responseTargetQueue_.empty()) {
174 ISyncTarget *tmpTarget = responseTargetQueue_.front();
175 responseTargetQueue_.pop_front();
176 target = *(static_cast<SingleVerSyncTarget *>(tmpTarget));
177 delete tmpTarget;
178 tmpTarget = nullptr;
179 return E_OK;
180 }
181 return -E_LENGTH_ERROR;
182 }
183
GetRspTargetQueueSize() const184 int SingleVerSyncTaskContext::GetRspTargetQueueSize() const
185 {
186 std::lock_guard<std::mutex> lock(targetQueueLock_);
187 return responseTargetQueue_.size();
188 }
189
SetResponseSessionId(uint32_t responseSessionId)190 void SingleVerSyncTaskContext::SetResponseSessionId(uint32_t responseSessionId)
191 {
192 responseSessionId_ = responseSessionId;
193 }
194
GetResponseSessionId() const195 uint32_t SingleVerSyncTaskContext::GetResponseSessionId() const
196 {
197 return responseSessionId_;
198 }
199
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)200 void SingleVerSyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
201 {
202 const SingleVerSyncTarget *targetTmp = static_cast<const SingleVerSyncTarget *>(target);
203 SyncTaskContext::CopyTargetData(target, taskParam);
204 mode_ = targetTmp->GetMode();
205 endMark_ = targetTmp->GetEndWaterMark();
206 if (mode_ == SyncModeType::RESPONSE_PULL) {
207 responseSessionId_ = targetTmp->GetResponseSessionId();
208 }
209 SetQuery(targetTmp->GetQuery());
210 isQuerySync_ = targetTmp->IsQuerySync();
211 }
212
Clear()213 void SingleVerSyncTaskContext::Clear()
214 {
215 retryTime_ = 0;
216 ClearSyncOperation();
217 SyncTaskContext::Clear();
218 SetMode(SyncModeType::INVALID_MODE);
219 syncId_ = 0;
220 isAutoSync_ = false;
221 SetOperationStatus(SyncOperation::OP_WAITING);
222 SetEndMark(0);
223 SetResponseSessionId(0);
224 {
225 std::lock_guard<std::mutex> autoLock(queryMutex_);
226 query_ = QuerySyncObject();
227 }
228 isQuerySync_ = false;
229 }
230
Abort(int status)231 void SingleVerSyncTaskContext::Abort(int status)
232 {
233 {
234 std::lock_guard<std::mutex> lock(operationLock_);
235 if (syncOperation_ != nullptr) {
236 syncOperation_->SetStatus(deviceId_, status, GetCommErrCode());
237 if ((status >= SyncOperation::OP_FINISHED_ALL)) {
238 UnlockObj();
239 if (syncOperation_->CheckIsAllFinished()) {
240 syncOperation_->Finished();
241 }
242 LockObj();
243 }
244 }
245 }
246 StopFeedDogForSync(SyncDirectionFlag::SEND);
247 StopFeedDogForSync(SyncDirectionFlag::RECEIVE);
248 Clear();
249 }
250
ClearAllSyncTask()251 void SingleVerSyncTaskContext::ClearAllSyncTask()
252 {
253 // clear request queue sync task and responsequeue first.
254 std::list<ISyncTarget *> targetQueue;
255 {
256 std::lock_guard<std::mutex> lock(targetQueueLock_);
257 LOGI("[SingleVerSyncTaskContext] request taskcount=%zu, responsecount=%zu", requestTargetQueue_.size(),
258 responseTargetQueue_.size());
259 while (!requestTargetQueue_.empty()) {
260 ISyncTarget *tmpTarget = requestTargetQueue_.front();
261 requestTargetQueue_.pop_front();
262 SyncOperation *tmpInfOperation = nullptr;
263 tmpTarget->GetSyncOperation(tmpInfOperation);
264 RefObject::IncObjRef(tmpInfOperation);
265 targetQueue.push_back(tmpTarget);
266 }
267 while (!responseTargetQueue_.empty()) {
268 ISyncTarget *tmpTarget = responseTargetQueue_.front();
269 responseTargetQueue_.pop_front();
270 delete tmpTarget;
271 tmpTarget = nullptr;
272 }
273 }
274 while (!targetQueue.empty()) {
275 ISyncTarget *target = targetQueue.front();
276 targetQueue.pop_front();
277 SyncOperation *tmpOperation = nullptr;
278 target->GetSyncOperation(tmpOperation);
279 if (tmpOperation == nullptr) {
280 LOGE("[ClearAllSyncTask] tmpOperation is nullptr");
281 continue; // not exit this scene
282 }
283 LOGI("[SingleVerSyncTaskContext] killing syncId=%d,dev=%s", tmpOperation->GetSyncId(), STR_MASK(deviceId_));
284 if (target->IsAutoSync()) {
285 tmpOperation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
286 } else {
287 tmpOperation->SetStatus(deviceId_, SyncOperation::OP_COMM_ABNORMAL);
288 }
289 if (tmpOperation->CheckIsAllFinished()) {
290 tmpOperation->Finished();
291 }
292 delete target;
293 target = nullptr;
294 RefObject::DecObjRef(tmpOperation);
295 }
296 if (GetTaskExecStatus() == SyncTaskContext::RUNNING) {
297 // clear syncing task.
298 stateMachine_->CommErrAbort();
299 SetCommFailErrCode(static_cast<int>(SyncOperation::OP_COMM_ABNORMAL));
300 }
301 // reset last push status for sync merge
302 ResetLastPushTaskStatus();
303 }
304
EnableClearRemoteStaleData(bool enable)305 void SingleVerSyncTaskContext::EnableClearRemoteStaleData(bool enable)
306 {
307 needClearRemoteStaleData_ = enable;
308 }
309
IsNeedClearRemoteStaleData() const310 bool SingleVerSyncTaskContext::IsNeedClearRemoteStaleData() const
311 {
312 return needClearRemoteStaleData_;
313 }
314
StartFeedDogForSync(uint32_t time,SyncDirectionFlag flag)315 bool SingleVerSyncTaskContext::StartFeedDogForSync(uint32_t time, SyncDirectionFlag flag)
316 {
317 return stateMachine_->StartFeedDogForSync(time, flag);
318 }
319
StopFeedDogForSync(SyncDirectionFlag flag)320 void SingleVerSyncTaskContext::StopFeedDogForSync(SyncDirectionFlag flag)
321 {
322 stateMachine_->StopFeedDogForSync(flag);
323 }
324
IsReceiveWaterMarkErr() const325 bool SingleVerSyncTaskContext::IsReceiveWaterMarkErr() const
326 {
327 return isReceiveWaterMarkErr_;
328 }
329
SetReceiveWaterMarkErr(bool isErr)330 void SingleVerSyncTaskContext::SetReceiveWaterMarkErr(bool isErr)
331 {
332 isReceiveWaterMarkErr_ = isErr;
333 }
334
SetRemoteSeccurityOption(SecurityOption secOption)335 void SingleVerSyncTaskContext::SetRemoteSeccurityOption(SecurityOption secOption)
336 {
337 std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
338 remoteSecOption_ = secOption;
339 }
340
GetRemoteSeccurityOption() const341 SecurityOption SingleVerSyncTaskContext::GetRemoteSeccurityOption() const
342 {
343 std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
344 return remoteSecOption_;
345 }
346
SetReceivcPermitCheck(bool isChecked)347 void SingleVerSyncTaskContext::SetReceivcPermitCheck(bool isChecked)
348 {
349 isReceivcPermitChecked_ = isChecked;
350 }
351
GetReceivcPermitCheck() const352 bool SingleVerSyncTaskContext::GetReceivcPermitCheck() const
353 {
354 return isReceivcPermitChecked_;
355 }
356
SetSendPermitCheck(bool isChecked)357 void SingleVerSyncTaskContext::SetSendPermitCheck(bool isChecked)
358 {
359 isSendPermitChecked_ = isChecked;
360 }
361
GetSendPermitCheck() const362 bool SingleVerSyncTaskContext::GetSendPermitCheck() const
363 {
364 return isSendPermitChecked_;
365 }
366
IsSkipTimeoutError(int errCode) const367 bool SingleVerSyncTaskContext::IsSkipTimeoutError(int errCode) const
368 {
369 if (errCode == -E_TIMEOUT && IsSyncTaskNeedRetry() && (GetRetryTime() < GetSyncRetryTimes())) { // LCOV_EXCL_BR_LINE
370 LOGE("[SingleVerSyncTaskContext] send message timeout error occurred");
371 return true;
372 } else {
373 return false;
374 }
375 }
376
FindResponseSyncTarget(uint32_t responseSessionId) const377 bool SingleVerSyncTaskContext::FindResponseSyncTarget(uint32_t responseSessionId) const
378 {
379 std::lock_guard<std::mutex> lock(targetQueueLock_);
380 auto iter = std::find_if(responseTargetQueue_.begin(), responseTargetQueue_.end(),
381 [responseSessionId](const ISyncTarget *target) {
382 return target->GetResponseSessionId() == responseSessionId;
383 });
384 if (iter == responseTargetQueue_.end()) {
385 return false;
386 }
387 return true;
388 }
389
SetQuery(const QuerySyncObject & query)390 void SingleVerSyncTaskContext::SetQuery(const QuerySyncObject &query)
391 {
392 std::lock_guard<std::mutex> autoLock(queryMutex_);
393 query_ = query;
394 query_.SetUseLocalSchema(mode_ != SyncModeType::RESPONSE_PULL);
395 query_.SetRemoteDev(deviceId_);
396 }
397
GetQuery() const398 QuerySyncObject SingleVerSyncTaskContext::GetQuery() const
399 {
400 std::lock_guard<std::mutex> autoLock(queryMutex_);
401 return query_;
402 }
403
SetQuerySync(bool isQuerySync)404 void SingleVerSyncTaskContext::SetQuerySync(bool isQuerySync)
405 {
406 isQuerySync_ = isQuerySync;
407 }
408
IsQuerySync() const409 bool SingleVerSyncTaskContext::IsQuerySync() const
410 {
411 return isQuerySync_;
412 }
413
GetRemoteCompressAlgo() const414 std::set<CompressAlgorithm> SingleVerSyncTaskContext::GetRemoteCompressAlgo() const
415 {
416 std::lock_guard<std::mutex> autoLock(remoteDbAbilityLock_);
417 std::set<CompressAlgorithm> compressAlgoSet;
418 for (const auto &algo : SyncConfig::COMPRESSALGOMAP) {
419 if (remoteDbAbility_.GetAbilityItem(algo.second) == SUPPORT_MARK) {
420 compressAlgoSet.insert(static_cast<CompressAlgorithm>(algo.first));
421 }
422 }
423 return compressAlgoSet;
424 }
425
GetRemoteCompressAlgoStr() const426 std::string SingleVerSyncTaskContext::GetRemoteCompressAlgoStr() const
427 {
428 static std::map<CompressAlgorithm, std::string> algoMap = {{CompressAlgorithm::ZLIB, "zlib"}};
429 std::set<CompressAlgorithm> remoteCompressAlgoSet = GetRemoteCompressAlgo();
430 if (remoteCompressAlgoSet.empty()) {
431 return "none";
432 }
433 std::string currentAlgoStr;
434 for (const auto &algo : remoteCompressAlgoSet) {
435 auto iter = algoMap.find(algo);
436 if (iter != algoMap.end()) {
437 currentAlgoStr += algoMap[algo] + ",";
438 }
439 }
440 if (currentAlgoStr.empty()) {
441 return "";
442 }
443 return currentAlgoStr.substr(0, currentAlgoStr.length() - 1);
444 }
445
SetDbAbility(DbAbility & remoteDbAbility)446 void SingleVerSyncTaskContext::SetDbAbility(DbAbility &remoteDbAbility)
447 {
448 {
449 std::lock_guard<std::mutex> autoLock(remoteDbAbilityLock_);
450 remoteDbAbility_ = remoteDbAbility;
451 }
452 LOGI("[SingleVerSyncTaskContext] set dev=%s compressAlgo=%s, IsSupAllPredicateQuery=%u,"
453 "IsSupSubscribeQuery=%u, inKeys=%u",
454 STR_MASK(GetDeviceId()), GetRemoteCompressAlgoStr().c_str(),
455 remoteDbAbility.GetAbilityItem(SyncConfig::ALLPREDICATEQUERY),
456 remoteDbAbility.GetAbilityItem(SyncConfig::SUBSCRIBEQUERY),
457 remoteDbAbility.GetAbilityItem(SyncConfig::INKEYS_QUERY));
458 }
459
ChooseCompressAlgo() const460 CompressAlgorithm SingleVerSyncTaskContext::ChooseCompressAlgo() const
461 {
462 std::set<CompressAlgorithm> remoteAlgo = GetRemoteCompressAlgo();
463 if (remoteAlgo.empty()) {
464 return CompressAlgorithm::NONE;
465 }
466 std::set<CompressAlgorithm> localAlgorithmSet;
467 (void)(static_cast<SyncGenericInterface *>(syncInterface_))->GetCompressionAlgo(localAlgorithmSet);
468 std::set<CompressAlgorithm> algoIntersection;
469 set_intersection(remoteAlgo.begin(), remoteAlgo.end(), localAlgorithmSet.begin(), localAlgorithmSet.end(),
470 inserter(algoIntersection, algoIntersection.begin()));
471 if (algoIntersection.empty()) {
472 return CompressAlgorithm::NONE;
473 }
474 return *(algoIntersection.begin());
475 }
476
IsNotSupportAbility(const AbilityItem & abilityItem) const477 bool SingleVerSyncTaskContext::IsNotSupportAbility(const AbilityItem &abilityItem) const
478 {
479 std::lock_guard<std::mutex> autoLock(remoteDbAbilityLock_);
480 return remoteDbAbility_.GetAbilityItem(abilityItem) != SUPPORT_MARK;
481 }
482
SetSubscribeManager(std::shared_ptr<SubscribeManager> & subManager)483 void SingleVerSyncTaskContext::SetSubscribeManager(std::shared_ptr<SubscribeManager> &subManager)
484 {
485 subManager_ = subManager;
486 }
487
GetSubscribeManager() const488 std::shared_ptr<SubscribeManager> SingleVerSyncTaskContext::GetSubscribeManager() const
489 {
490 return subManager_;
491 }
DEFINE_OBJECT_TAG_FACILITIES(SingleVerSyncTaskContext)492 DEFINE_OBJECT_TAG_FACILITIES(SingleVerSyncTaskContext)
493
494 bool SingleVerSyncTaskContext::IsCurrentSyncTaskCanBeSkipped() const
495 {
496 SyncOperation *operation = GetAndIncSyncOperation();
497 bool res = IsCurrentSyncTaskCanBeSkippedInner(operation);
498 RefObject::DecObjRef(operation);
499 return res;
500 }
501
SaveLastPushTaskExecStatus(int finalStatus)502 void SingleVerSyncTaskContext::SaveLastPushTaskExecStatus(int finalStatus)
503 {
504 if (IsTargetQueueEmpty()) {
505 LOGD("sync que is empty, reset last push status");
506 ResetLastPushTaskStatus();
507 return;
508 }
509 if (mode_ == SyncModeType::PUSH || mode_ == SyncModeType::PUSH_AND_PULL || mode_ == SyncModeType::RESPONSE_PULL) {
510 lastFullSyncTaskStatus_ = finalStatus;
511 } else if (mode_ == SyncModeType::QUERY_PUSH || mode_ == SyncModeType::QUERY_PUSH_PULL) {
512 std::lock_guard<std::mutex> autoLock(queryTaskStatusMutex_);
513 lastQuerySyncTaskStatusMap_[syncOperation_->GetQueryId()] = finalStatus;
514 }
515 }
516
GetCorrectedSendWaterMarkForCurrentTask(const SyncOperation * operation,uint64_t & waterMark) const517 int SingleVerSyncTaskContext::GetCorrectedSendWaterMarkForCurrentTask(const SyncOperation *operation,
518 uint64_t &waterMark) const
519 {
520 if (operation != nullptr && operation->IsQuerySync()) {
521 LOGD("Is QuerySync");
522 int errCode = static_cast<SingleVerSyncStateMachine *>(stateMachine_)->GetSendQueryWaterMark(
523 operation->GetQueryId(), deviceId_, targetUserId_,
524 lastFullSyncTaskStatus_ == SyncOperation::OP_FINISHED_ALL, waterMark);
525 if (errCode != E_OK) {
526 return errCode;
527 }
528 } else {
529 LOGD("Not QuerySync");
530 static_cast<SingleVerSyncStateMachine *>(stateMachine_)->GetLocalWaterMark(deviceId_, targetUserId_, waterMark);
531 }
532 return E_OK;
533 }
534
ResetLastPushTaskStatus()535 void SingleVerSyncTaskContext::ResetLastPushTaskStatus()
536 {
537 lastFullSyncTaskStatus_ = SyncOperation::OP_WAITING;
538 std::lock_guard<std::mutex> autoLock(queryTaskStatusMutex_);
539 lastQuerySyncTaskStatusMap_.clear();
540 }
541
SetCommNormal(bool isCommNormal)542 void SingleVerSyncTaskContext::SetCommNormal(bool isCommNormal)
543 {
544 isCommNormal_ = isCommNormal;
545 }
546
IsCurrentSyncTaskCanBeSkippedInner(const SyncOperation * operation) const547 bool SingleVerSyncTaskContext::IsCurrentSyncTaskCanBeSkippedInner(const SyncOperation *operation) const
548 {
549 if (mode_ == SyncModeType::PUSH) {
550 if (lastFullSyncTaskStatus_ != SyncOperation::OP_FINISHED_ALL) {
551 return false;
552 }
553 if (operation == nullptr) {
554 return true;
555 }
556 } else if (mode_ == SyncModeType::QUERY_PUSH) {
557 if (operation == nullptr) {
558 return true;
559 }
560 std::lock_guard<std::mutex> autoLock(queryTaskStatusMutex_);
561 auto it = lastQuerySyncTaskStatusMap_.find(operation->GetQueryId());
562 if (it == lastQuerySyncTaskStatusMap_.end()) {
563 // no last query_push and push
564 if (lastFullSyncTaskStatus_ != SyncOperation::OP_FINISHED_ALL) {
565 LOGD("no prev query push or successful prev push");
566 return false;
567 }
568 } else {
569 if (it->second != SyncOperation::OP_FINISHED_ALL) {
570 LOGD("last query push status = %d.", it->second);
571 return false;
572 }
573 }
574 } else {
575 return false;
576 }
577
578 Timestamp maxTimestampInDb;
579 syncInterface_->GetMaxTimestamp(maxTimestampInDb);
580 uint64_t localWaterMark = 0;
581 int errCode = GetCorrectedSendWaterMarkForCurrentTask(operation, localWaterMark);
582 if (errCode != E_OK) {
583 LOGE("GetLocalWaterMark in state machine failed: %d", errCode);
584 return false;
585 }
586 if (localWaterMark > maxTimestampInDb) {
587 LOGI("skip current push task, deviceId_ = %s, localWaterMark = %" PRIu64 ", maxTimestampInDb = %" PRIu64,
588 STR_MASK(deviceId_), localWaterMark, maxTimestampInDb);
589 return true;
590 }
591 return false;
592 }
593
StartFeedDogForGetData(uint32_t sessionId)594 void SingleVerSyncTaskContext::StartFeedDogForGetData(uint32_t sessionId)
595 {
596 stateMachine_->StartFeedDogForGetData(sessionId);
597 }
598
StopFeedDogForGetData()599 void SingleVerSyncTaskContext::StopFeedDogForGetData()
600 {
601 stateMachine_->StopFeedDogForGetData();
602 }
603
UpdateOperationFinishedCount(const std::string & deviceId,uint32_t count)604 void SingleVerSyncTaskContext::UpdateOperationFinishedCount(const std::string &deviceId, uint32_t count)
605 {
606 std::lock_guard<std::mutex> lock(operationLock_);
607 if (syncOperation_ != nullptr) {
608 syncOperation_->UpdateFinishedCount(deviceId, count);
609 }
610 }
611
SetOperationSyncProcessTotal(const std::string & deviceId,uint32_t total)612 void SingleVerSyncTaskContext::SetOperationSyncProcessTotal(const std::string &deviceId, uint32_t total)
613 {
614 std::lock_guard<std::mutex> lock(operationLock_);
615 if (syncOperation_ != nullptr) {
616 syncOperation_->SetSyncProcessTotal(deviceId, total);
617 }
618 }
619
SetInitWaterMark(WaterMark waterMark)620 void SingleVerSyncTaskContext::SetInitWaterMark(WaterMark waterMark)
621 {
622 initWaterMark_ = waterMark;
623 }
624
GetInitWaterMark() const625 WaterMark SingleVerSyncTaskContext::GetInitWaterMark() const
626 {
627 return initWaterMark_;
628 }
629
SetInitDeletedMark(WaterMark waterMark)630 void SingleVerSyncTaskContext::SetInitDeletedMark(WaterMark waterMark)
631 {
632 initDeletedMark_ = waterMark;
633 }
634
GetInitDeletedMark() const635 WaterMark SingleVerSyncTaskContext::GetInitDeletedMark() const
636 {
637 return initDeletedMark_;
638 }
639
GetResponseTaskCount()640 int32_t SingleVerSyncTaskContext::GetResponseTaskCount()
641 {
642 std::lock_guard<std::mutex> autoLock(targetQueueLock_);
643 int32_t taskCount = static_cast<int32_t>(responseTargetQueue_.size());
644 if (responseSessionId_ != 0) {
645 taskCount++;
646 }
647 return taskCount;
648 }
649
IsNeedRetrySync(uint32_t errNo,uint16_t messageType)650 bool SingleVerSyncTaskContext::IsNeedRetrySync(uint32_t errNo, uint16_t messageType)
651 {
652 if ((errNo != E_FEEDBACK_DB_CLOSING && errNo != E_NEED_CORRECT_TARGET_USER) || messageType != TYPE_RESPONSE) {
653 return false;
654 }
655 uint32_t cur = 0;
656 if (errNo == E_NEED_CORRECT_TARGET_USER) {
657 cur = ++resyncForUserTimes_;
658 LOGI("[IsNeedRetrySync] resync for user times: %u", cur);
659 return cur <= MANUAL_RETRY_TIMES;
660 }
661 cur = ++resyncTimes_;
662 LOGI("[IsNeedRetrySync]%u", cur);
663 return cur <= MANUAL_RETRY_TIMES;
664 }
665
ResetResyncTimes()666 void SingleVerSyncTaskContext::ResetResyncTimes()
667 {
668 resyncTimes_ = 0;
669 resyncForUserTimes_ = 0;
670 }
671
IsRetryTask() const672 bool SingleVerSyncTaskContext::IsRetryTask() const
673 {
674 SyncOperation *operation = GetAndIncSyncOperation();
675 if (operation == nullptr) {
676 return true;
677 }
678 bool isRetryTask = operation->IsRetryTask();
679 RefObject::DecObjRef(operation);
680 return isRetryTask;
681 }
682
RefreshSaveTime(bool isFinished)683 void SingleVerSyncTaskContext::RefreshSaveTime(bool isFinished)
684 {
685 if (isFinished) {
686 lastSaveTimes_ = 0;
687 return;
688 }
689 lastSaveTimes_ = TimeHelper::GetMonotonicTime();
690 }
691
IsSavingTask(uint32_t timeout) const692 bool SingleVerSyncTaskContext::IsSavingTask(uint32_t timeout) const
693 {
694 auto lastSaveTimes = lastSaveTimes_.load();
695 if (lastSaveTimes == 0) {
696 return false;
697 }
698 Timestamp duration = TimeHelper::GetMonotonicTime() - lastSaveTimes;
699 if (duration < timeout * TimeHelper::MS_TO_US) {
700 LOGI("exist saving task, duration[%" PRIu64 "]", duration);
701 return true;
702 }
703 return false;
704 }
705 } // namespace DistributedDB
706