• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_sync_state_machine.h"
17 
18 #include <cmath>
19 #include <climits>
20 #include <algorithm>
21 
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "sync_operation.h"
25 #include "message_transform.h"
26 #include "sync_types.h"
27 #include "db_common.h"
28 #include "runtime_context.h"
29 #include "performance_analysis.h"
30 #include "single_ver_sync_target.h"
31 #include "single_ver_data_sync.h"
32 #include "single_ver_data_sync_utils.h"
33 
34 namespace DistributedDB {
35 using Event = SingleVerSyncStateMachine::Event;
36 using State = SingleVerSyncStateMachine::State;
37 namespace {
38     // used for state switch table
39     const int CURRENT_STATE_INDEX = 0;
40     const int EVENT_INDEX = 1;
41     const int OUTPUT_STATE_INDEX = 2;
42 
43     // drop v1 and v2 table by one optimize, dataSend mode in all version go with slide window mode.
44     // State switch table v3, has three columns, CurrentState, Event, and OutSate
45     const std::vector<std::vector<uint8_t>> STATE_SWITCH_TABLE_V3 = {
46         {State::IDLE, Event::START_SYNC_EVENT, State::TIME_SYNC},
47 
48         // In TIME_SYNC state
49         {State::TIME_SYNC, Event::TIME_SYNC_FINISHED_EVENT, State::ABILITY_SYNC},
50         {State::TIME_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
51         {State::TIME_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
52 
53         // In ABILITY_SYNC state, compare version num and schema
54         {State::ABILITY_SYNC, Event::VERSION_NOT_SUPPOR_EVENT, State::INNER_ERR},
55         {State::ABILITY_SYNC, Event::ABILITY_SYNC_FINISHED_EVENT, State::START_INITIACTIVE_DATA_SYNC},
56         {State::ABILITY_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
57         {State::ABILITY_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
58         {State::ABILITY_SYNC, Event::CONTROL_CMD_EVENT, State::SYNC_CONTROL_CMD},
59 
60         // In START_INITIACTIVE_DATA_SYNC state, send a sync request, and send first packt of data sync
61         {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
62         {State::START_INITIACTIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
63         {State::START_INITIACTIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
64         {State::START_INITIACTIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
65         {State::START_INITIACTIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_INITIACTIVE_DATA_SYNC},
66         {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_TIME_SYNC_EVENT, State::TIME_SYNC},
67 
68         // In START_PASSIVE_DATA_SYNC state, do response pull request, and send first packt of data sync
69         {State::START_PASSIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
70         {State::START_PASSIVE_DATA_SYNC, Event::RESPONSE_TASK_FINISHED_EVENT, State::WAIT_FOR_RECEIVE_DATA_FINISH},
71         {State::START_PASSIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
72         {State::START_PASSIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
73         {State::START_PASSIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
74         {State::START_PASSIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_PASSIVE_DATA_SYNC},
75         {State::START_PASSIVE_DATA_SYNC, Event::NEED_TIME_SYNC_EVENT, State::TIME_SYNC},
76 
77         // In WAIT_FOR_RECEIVE_DATA_FINISH,
78         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::RECV_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
79         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::START_PULL_RESPONSE_EVENT, State::START_PASSIVE_DATA_SYNC},
80         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
81         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::INNER_ERR_EVENT, State::INNER_ERR},
82         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
83 
84         {State::SYNC_CONTROL_CMD, Event::SEND_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
85         {State::SYNC_CONTROL_CMD, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
86         {State::SYNC_CONTROL_CMD, Event::INNER_ERR_EVENT, State::INNER_ERR},
87         {State::SYNC_CONTROL_CMD, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
88 
89         // In SYNC_TASK_FINISHED,
90         {State::SYNC_TASK_FINISHED, Event::ALL_TASK_FINISHED_EVENT, State::IDLE},
91         {State::SYNC_TASK_FINISHED, Event::START_SYNC_EVENT, State::TIME_SYNC},
92 
93         // SYNC_TIME_OUT and INNE_ERR state, just do some exception resolve
94         {State::SYNC_TIME_OUT, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
95         {State::INNER_ERR, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
96     };
97 }
98 
99 std::mutex SingleVerSyncStateMachine::stateSwitchTableLock_;
100 std::vector<StateSwitchTable> SingleVerSyncStateMachine::stateSwitchTables_;
101 bool SingleVerSyncStateMachine::isStateSwitchTableInited_ = false;
102 
SingleVerSyncStateMachine()103 SingleVerSyncStateMachine::SingleVerSyncStateMachine()
104     : context_(nullptr),
105       syncInterface_(nullptr),
106       timeSync_(nullptr),
107       abilitySync_(nullptr),
108       dataSync_(nullptr),
109       currentRemoteVersionId_(0)
110 {
111 }
112 
~SingleVerSyncStateMachine()113 SingleVerSyncStateMachine::~SingleVerSyncStateMachine()
114 {
115     LOGD("~SingleVerSyncStateMachine");
116     Clear();
117 }
118 
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metaData,ICommunicator * communicator)119 int SingleVerSyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
120     const std::shared_ptr<Metadata> &metaData, ICommunicator *communicator)
121 {
122     if ((context == nullptr) || (syncInterface == nullptr) || (metaData == nullptr) || (communicator == nullptr)) {
123         return -E_INVALID_ARGS;
124     }
125 
126     int errCode = SyncStateMachine::Initialize(context, syncInterface, metaData, communicator);
127     if (errCode != E_OK) {
128         return errCode;
129     }
130 
131     timeSync_ = std::make_shared<TimeSync>();
132     dataSync_ = std::make_shared<SingleVerDataSync>();
133     abilitySync_ = std::make_unique<AbilitySync>();
134     if ((timeSync_ == nullptr) || (dataSync_ == nullptr) || (abilitySync_ == nullptr)) {
135         timeSync_ = nullptr;
136         dataSync_ = nullptr;
137         abilitySync_ = nullptr;
138         return -E_OUT_OF_MEMORY;
139     }
140 
141     errCode = timeSync_->Initialize(communicator, metaData, syncInterface, context->GetDeviceId());
142     if (errCode != E_OK) {
143         goto ERROR_OUT;
144     }
145     errCode = dataSync_->Initialize(syncInterface, communicator, metaData, context->GetDeviceId());
146     if (errCode != E_OK) {
147         goto ERROR_OUT;
148     }
149     errCode = abilitySync_->Initialize(communicator, syncInterface, metaData, context->GetDeviceId());
150     if (errCode != E_OK) {
151         goto ERROR_OUT;
152     }
153     abilitySync_->InitAbilitySyncFinishStatus(*context);
154 
155     currentState_ = IDLE;
156     context_ = static_cast<SingleVerSyncTaskContext *>(context);
157     syncInterface_ = static_cast<SyncGenericInterface *>(syncInterface);
158 
159     InitStateSwitchTables();
160     InitStateMapping();
161     return E_OK;
162 
163 ERROR_OUT:
164     Clear();
165     return errCode;
166 }
167 
SyncStep()168 void SingleVerSyncStateMachine::SyncStep()
169 {
170     RefObject::IncObjRef(context_);
171     RefObject::IncObjRef(communicator_);
172     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this] { SyncStepInnerLocked(); });
173     if (errCode != E_OK) {
174         LOGE("[StateMachine][SyncStep] Schedule SyncStep failed");
175         RefObject::DecObjRef(communicator_);
176         RefObject::DecObjRef(context_);
177     }
178 }
179 
ReceiveMessageCallback(Message * inMsg)180 int SingleVerSyncStateMachine::ReceiveMessageCallback(Message *inMsg)
181 {
182     int errCode = MessageCallbackPre(inMsg);
183     if (errCode != E_OK) {
184         LOGE("[StateMachine] message pre check failed");
185         return errCode;
186     }
187     switch (inMsg->GetMessageId()) {
188         case TIME_SYNC_MESSAGE:
189             errCode = TimeMarkSyncRecv(inMsg);
190             break;
191         case ABILITY_SYNC_MESSAGE:
192             errCode = AbilitySyncRecv(inMsg);
193             break;
194         case DATA_SYNC_MESSAGE:
195         case QUERY_SYNC_MESSAGE:
196             errCode = DataPktRecv(inMsg);
197             break;
198         case CONTROL_SYNC_MESSAGE:
199             errCode = ControlPktRecv(inMsg);
200             break;
201         default:
202             errCode = -E_NOT_SUPPORT;
203     }
204     return errCode;
205 }
206 
SyncStepInnerLocked()207 void SingleVerSyncStateMachine::SyncStepInnerLocked()
208 {
209     if (context_->IncUsedCount() != E_OK) {
210         goto SYNC_STEP_OUT;
211     }
212     {
213         std::lock_guard<std::mutex> lock(stateMachineLock_);
214         SyncStepInner();
215     }
216     context_->SafeExit();
217 
218 SYNC_STEP_OUT:
219     RefObject::DecObjRef(communicator_);
220     RefObject::DecObjRef(context_);
221 }
222 
SyncStepInner()223 void SingleVerSyncStateMachine::SyncStepInner()
224 {
225     Event event = INNER_ERR_EVENT;
226     do {
227         auto iter = stateMapping_.find(currentState_);
228         if (iter != stateMapping_.end()) {
229             event = static_cast<Event>(iter->second());
230         } else {
231             LOGE("[StateMachine][SyncStepInner] can not find state=%d,label=%s,dev=%s", currentState_,
232                 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
233             break;
234         }
235     } while (event != Event::WAIT_ACK_EVENT && SwitchMachineState(event) == E_OK && currentState_ != IDLE);
236 }
237 
SetCurStateErrStatus()238 void SingleVerSyncStateMachine::SetCurStateErrStatus()
239 {
240     currentState_ = State::INNER_ERR;
241 }
242 
StartSyncInner()243 int SingleVerSyncStateMachine::StartSyncInner()
244 {
245     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
246     if (performance != nullptr) {
247         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
248     }
249     int errCode = PrepareNextSyncTask();
250     if (errCode == E_OK) {
251         SwitchStateAndStep(Event::START_SYNC_EVENT);
252     }
253     return errCode;
254 }
255 
AbortInner()256 void SingleVerSyncStateMachine::AbortInner()
257 {
258     LOGE("[StateMachine][AbortInner] error occurred,abort,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
259         STR_MASK(context_->GetDeviceId()));
260     if (context_->IsKilled()) {
261         dataSync_->ClearDataMsg();
262     }
263     dataSync_->ClearSyncStatus();
264     ContinueToken token;
265     context_->GetContinueToken(token);
266     if (token != nullptr) {
267         syncInterface_->ReleaseContinueToken(token);
268     }
269     context_->SetContinueToken(nullptr);
270     context_->Clear();
271 }
272 
GetStateSwitchTables() const273 const std::vector<StateSwitchTable> &SingleVerSyncStateMachine::GetStateSwitchTables() const
274 {
275     return stateSwitchTables_;
276 }
277 
PrepareNextSyncTask()278 int SingleVerSyncStateMachine::PrepareNextSyncTask()
279 {
280     int errCode = StartWatchDog();
281     if (errCode != E_OK && errCode != -E_UNEXPECTED_DATA) {
282         LOGE("[StateMachine][PrepareNextSyncTask] WatchDog start failed,err=%d", errCode);
283         return errCode;
284     }
285     if (errCode == -E_UNEXPECTED_DATA) {
286         LOGI("[PrepareNextSyncTask] timer already exists, reset the timer.");
287         (void)ResetWatchDog();
288     }
289 
290     if (currentState_ != State::IDLE && currentState_ != State::SYNC_TASK_FINISHED) {
291         LOGW("[StateMachine][PrepareNextSyncTask] PreSync may get an err, state=%" PRIu8 ",dev=%s",
292             currentState_, STR_MASK(context_->GetDeviceId()));
293         currentState_ = State::IDLE;
294     }
295     return E_OK;
296 }
297 
SendNotifyPacket(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)298 void SingleVerSyncStateMachine::SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
299 {
300     dataSync_->SendSaveDataNotifyPacket(context_,
301         std::min(context_->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT), sessionId, sequenceId, inMsgId);
302 }
303 
CommErrAbort(uint32_t sessionId)304 void SingleVerSyncStateMachine::CommErrAbort(uint32_t sessionId)
305 {
306     std::lock_guard<std::mutex> lock(stateMachineLock_);
307     uint32_t requestSessionId = context_->GetRequestSessionId();
308     if ((sessionId != 0) && ((requestSessionId == 0) || (sessionId != requestSessionId))) {
309         return;
310     }
311     context_->SetCommNormal(false);
312     if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
313         SyncStep();
314     }
315 }
316 
InitStateSwitchTables()317 void SingleVerSyncStateMachine::InitStateSwitchTables()
318 {
319     if (isStateSwitchTableInited_) {
320         return;
321     }
322 
323     std::lock_guard<std::mutex> lock(stateSwitchTableLock_);
324     if (isStateSwitchTableInited_) {
325         return;
326     }
327 
328     InitStateSwitchTable(SINGLE_VER_SYNC_PROCTOL_V3, STATE_SWITCH_TABLE_V3);
329     std::sort(stateSwitchTables_.begin(), stateSwitchTables_.end(),
330         [](const auto &tableA, const auto &tableB) {
331             return tableA.version > tableB.version;
332         }); // descending
333     isStateSwitchTableInited_ = true;
334 }
335 
InitStateSwitchTable(uint32_t version,const std::vector<std::vector<uint8_t>> & switchTable)336 void SingleVerSyncStateMachine::InitStateSwitchTable(uint32_t version,
337     const std::vector<std::vector<uint8_t>> &switchTable)
338 {
339     StateSwitchTable table;
340     table.version = version;
341     for (const auto &stateSwitch : switchTable) {
342         if (stateSwitch.size() <= OUTPUT_STATE_INDEX) {
343             LOGE("[StateMachine][InitSwitchTable] stateSwitch size err,size=%zu", stateSwitch.size());
344             return;
345         }
346         if (table.switchTable.count(stateSwitch[CURRENT_STATE_INDEX]) == 0) {
347             EventToState eventToState; // new EventToState
348             eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
349             table.switchTable[stateSwitch[CURRENT_STATE_INDEX]] = eventToState;
350         } else { // key stateSwitch[CURRENT_STATE_INDEX] already has EventToState
351             EventToState &eventToState = table.switchTable[stateSwitch[CURRENT_STATE_INDEX]];
352             eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
353         }
354     }
355     stateSwitchTables_.push_back(table);
356 }
357 
InitStateMapping()358 void SingleVerSyncStateMachine::InitStateMapping()
359 {
360     stateMapping_[TIME_SYNC] = [this] { return DoTimeSync(); };
361     stateMapping_[ABILITY_SYNC] = [this] { return DoAbilitySync(); };
362     stateMapping_[WAIT_FOR_RECEIVE_DATA_FINISH] = [this] { return DoWaitForDataRecv(); };
363     stateMapping_[SYNC_TASK_FINISHED] = [this] { return DoSyncTaskFinished(); };
364     stateMapping_[SYNC_TIME_OUT] = [this] { return DoTimeout(); };
365     stateMapping_[INNER_ERR] = [this] { return DoInnerErr(); };
366     stateMapping_[START_INITIACTIVE_DATA_SYNC] = [this] { return DoInitiactiveDataSyncWithSlidingWindow(); };
367     stateMapping_[START_PASSIVE_DATA_SYNC] = [this] { return DoPassiveDataSyncWithSlidingWindow(); };
368     stateMapping_[SYNC_CONTROL_CMD] = [this] { return DoInitiactiveControlSync(); };
369 }
370 
DoInitiactiveDataSyncWithSlidingWindow() const371 Event SingleVerSyncStateMachine::DoInitiactiveDataSyncWithSlidingWindow() const
372 {
373     LOGD("[StateMachine][activeDataSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
374         dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
375     int errCode = E_OK;
376     switch (context_->GetMode()) {
377         case SyncModeType::PUSH:
378         case SyncModeType::QUERY_PUSH:
379             context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
380             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
381             break;
382         case SyncModeType::PULL:
383         case SyncModeType::QUERY_PULL:
384             context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
385             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
386             break;
387         case SyncModeType::PUSH_AND_PULL:
388         case SyncModeType::QUERY_PUSH_PULL:
389             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
390             break;
391         case SyncModeType::RESPONSE_PULL:
392             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
393             break;
394         default:
395             errCode = -E_NOT_SUPPORT;
396             break;
397     }
398     if (errCode == E_OK) {
399         return Event::WAIT_ACK_EVENT;
400     }
401     // once E_EKEYREVOKED error occurred, PUSH_AND_PULL mode should wait for ack to pull remote data.
402     if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
403         errCode == -E_EKEYREVOKED) {
404         return Event::WAIT_ACK_EVENT;
405     }
406 
407     // when response task step dataSync again while request task is running,  ignore the errCode
408     bool ignoreInnerErr = (context_->GetResponseSessionId() != 0 && context_->GetRequestSessionId() != 0);
409     Event event = TransformErrCodeToEvent(errCode);
410     return (ignoreInnerErr && event == INNER_ERR_EVENT) ? SEND_FINISHED_EVENT : event;
411 }
412 
DoPassiveDataSyncWithSlidingWindow()413 Event SingleVerSyncStateMachine::DoPassiveDataSyncWithSlidingWindow()
414 {
415     {
416         RefObject::AutoLock lock(context_);
417         if (context_->GetRspTargetQueueSize() != 0) {
418             PreStartPullResponse();
419         } else if (context_->GetResponseSessionId() == 0 ||
420             context_->GetRetryStatus() == static_cast<int>(SyncTaskContext::NO_NEED_RETRY)) {
421             return RESPONSE_TASK_FINISHED_EVENT;
422         }
423     }
424     int errCode = dataSync_->SyncStart(SyncModeType::RESPONSE_PULL, context_);
425     if (errCode != E_OK) {
426         LOGE("[SingleVerSyncStateMachine][DoPassiveDataSyncWithSlidingWindow] response pull send failed[%d]", errCode);
427         return RESPONSE_TASK_FINISHED_EVENT;
428     }
429     return Event::WAIT_ACK_EVENT;
430 }
431 
DoInitiactiveControlSync() const432 Event SingleVerSyncStateMachine::DoInitiactiveControlSync() const
433 {
434     LOGD("[StateMachine][activeControlSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
435         dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
436     context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
437     int errCode = dataSync_->ControlCmdStart(context_);
438     if (errCode == E_OK) {
439         return Event::WAIT_ACK_EVENT;
440     }
441     context_->SetTaskErrCode(errCode);
442     return TransformErrCodeToEvent(errCode);
443 }
444 
HandleControlAckRecv(const Message * inMsg)445 int SingleVerSyncStateMachine::HandleControlAckRecv(const Message *inMsg)
446 {
447     std::lock_guard<std::mutex> lock(stateMachineLock_);
448     if (IsNeedResetWatchdog(inMsg)) {
449         (void)ResetWatchDog();
450     }
451     int errCode = dataSync_->ControlCmdAckRecv(context_, inMsg);
452     ControlAckRecvErrCodeHandle(errCode);
453     SwitchStateAndStep(TransformErrCodeToEvent(errCode));
454     return E_OK;
455 }
456 
DoWaitForDataRecv() const457 Event SingleVerSyncStateMachine::DoWaitForDataRecv() const
458 {
459     if (context_->GetRspTargetQueueSize() != 0) {
460         return START_PULL_RESPONSE_EVENT;
461     }
462     if (context_->GetOperationStatus() == SyncOperation::OP_FINISHED_ALL) {
463         return RECV_FINISHED_EVENT;
464     }
465     if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
466         context_->GetOperationStatus() == SyncOperation::OP_EKEYREVOKED_FAILURE &&
467         context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
468         return RECV_FINISHED_EVENT;
469     }
470     return Event::WAIT_ACK_EVENT;
471 }
472 
DoTimeSync() const473 Event SingleVerSyncStateMachine::DoTimeSync() const
474 {
475     timeSync_->SetTimeSyncFinishIfNeed();
476     if (timeSync_->IsNeedSync()) {
477         CommErrHandler handler = nullptr;
478         // Auto sync need do retry don't use errHandler to return.
479         if (!context_->IsAutoSync()) {
480             handler = [this, context = context_,
481                 requestSessionId = context_->GetRequestSessionId()](int ret, bool isDirectEnd) {
482                 SyncTaskContext::CommErrHandlerFunc(ret, context, requestSessionId, isDirectEnd);
483             };
484         }
485         int errCode = timeSync_->SyncStart(handler, context_->GetRequestSessionId());
486         if (errCode == E_OK) {
487             return Event::WAIT_ACK_EVENT;
488         }
489         context_->SetTaskErrCode(errCode);
490         return TransformErrCodeToEvent(errCode);
491     }
492 
493     return Event::TIME_SYNC_FINISHED_EVENT;
494 }
495 
DoAbilitySync() const496 Event SingleVerSyncStateMachine::DoAbilitySync() const
497 {
498     uint16_t remoteCommunicatorVersion = 0;
499     int errCode = communicator_->GetRemoteCommunicatorVersion(context_->GetDeviceId(), remoteCommunicatorVersion);
500     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
501         LOGE("[StateMachine][DoAbilitySync] Get RemoteCommunicatorVersion errCode=%d", errCode);
502         return Event::INNER_ERR_EVENT;
503     }
504     // Fistr version, not support AbilitySync
505     if (remoteCommunicatorVersion == 0 && errCode != -E_NOT_FOUND) {
506         context_->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
507         return GetEventAfterTimeSync(context_->GetMode());
508     }
509     if (abilitySync_->GetAbilitySyncFinishedStatus()) {
510         return GetEventAfterTimeSync(context_->GetMode());
511     }
512 
513     CommErrHandler handler = [this, context = context_,
514         requestSessionId = context_->GetRequestSessionId()](int ret, bool isDirectEnd) {
515         SyncTaskContext::CommErrHandlerFunc(ret, context, requestSessionId, isDirectEnd);
516     };
517     LOGI("[StateMachine][AbilitySync] start abilitySync,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
518         STR_MASK(context_->GetDeviceId()));
519     errCode = abilitySync_->SyncStart(context_->GetRequestSessionId(), context_->GetSequenceId(),
520         remoteCommunicatorVersion, handler, context_);
521     if (errCode != E_OK) {
522         LOGE("[StateMachine][DoAbilitySync] ability sync start failed,errCode=%d", errCode);
523         context_->SetTaskErrCode(errCode);
524         return TransformErrCodeToEvent(errCode);
525     }
526     return Event::WAIT_ACK_EVENT;
527 }
528 
GetEventAfterTimeSync(int mode) const529 Event SingleVerSyncStateMachine::GetEventAfterTimeSync(int mode) const
530 {
531     if (mode == SyncModeType::SUBSCRIBE_QUERY || mode == SyncModeType::UNSUBSCRIBE_QUERY) {
532         return Event::CONTROL_CMD_EVENT;
533     }
534     return Event::ABILITY_SYNC_FINISHED_EVENT;
535 }
536 
DoSyncTaskFinished()537 Event SingleVerSyncStateMachine::DoSyncTaskFinished()
538 {
539     StopWatchDog();
540     dataSync_->ClearSyncStatus();
541     auto timeout = communicator_->GetTimeout(syncContext_->GetDeviceId());
542     RefObject::AutoLock lock(syncContext_);
543     int errCode = ExecNextTask(timeout);
544     if (errCode == E_OK) {
545         return Event::START_SYNC_EVENT;
546     }
547     return TransformErrCodeToEvent(errCode);
548 }
549 
DoTimeout()550 Event SingleVerSyncStateMachine::DoTimeout()
551 {
552     RefObject::AutoLock lock(context_);
553     if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
554         std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
555         if (subManager != nullptr) {
556             subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
557         }
558     }
559     context_->Abort(SyncOperation::OP_TIMEOUT);
560     context_->Clear();
561     AbortInner();
562     return Event::ANY_EVENT;
563 }
564 
DoInnerErr()565 Event SingleVerSyncStateMachine::DoInnerErr()
566 {
567     RefObject::AutoLock lock(context_);
568     if (!context_->IsCommNormal()) {
569         if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
570             std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
571             if (subManager != nullptr) {
572                 subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
573             }
574         }
575         context_->Abort(SyncOperation::OP_COMM_ABNORMAL);
576     } else {
577         int status = GetSyncOperationStatus(context_->GetTaskErrCode());
578         context_->Abort(status);
579     }
580     context_->Clear();
581     AbortInner();
582     return Event::ANY_EVENT;
583 }
584 
AbilitySyncRecv(const Message * inMsg)585 int SingleVerSyncStateMachine::AbilitySyncRecv(const Message *inMsg)
586 {
587     if (inMsg->GetMessageType() == TYPE_REQUEST) {
588         int errCode = abilitySync_->RequestRecv(inMsg, context_);
589         if (errCode != E_OK && inMsg->GetSessionId() == context_->GetResponseSessionId()) {
590             context_->SetTaskErrCode(errCode);
591             std::lock_guard<std::mutex> lock(stateMachineLock_);
592             SwitchStateAndStep(Event::INNER_ERR_EVENT);
593         }
594         return E_OK;
595     }
596     return AbilitySyncResponseRecv(inMsg);
597 }
598 
AbilitySyncResponseRecv(const Message * inMsg)599 int SingleVerSyncStateMachine::AbilitySyncResponseRecv(const Message *inMsg)
600 {
601     if (inMsg->GetMessageType() == TYPE_RESPONSE && AbilityMsgSessionIdCheck(inMsg)) {
602         std::lock_guard<std::mutex> lock(stateMachineLock_);
603         int errCode = abilitySync_->AckRecv(inMsg, context_);
604         (void)ResetWatchDog();
605         if (errCode == -E_ABILITY_SYNC_FINISHED) {
606             LOGI("[StateMachine][AbilitySyncRecv] ability sync finished with both kv,label=%s,dev=%s",
607                 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
608             abilitySync_->SetAbilitySyncFinishedStatus(true, *context_);
609             JumpStatusAfterAbilitySync(context_->GetMode());
610         } else if (errCode != E_OK) {
611             LOGE("[StateMachine][AbilitySyncRecv] handle ackRecv failed,errCode=%d", errCode);
612             SwitchStateAndStep(TransformErrCodeToEvent(errCode));
613         } else if (context_->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_2_0) {
614             abilitySync_->SetAbilitySyncFinishedStatus(true, *context_);
615             LOGI("[StateMachine][AbilitySyncRecv] ability Sync Finished,label=%s,dev=%s",
616                 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
617             currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
618             JumpStatusAfterAbilitySync(context_->GetMode());
619         }
620         return E_OK;
621     }
622     if (inMsg->GetMessageType() == TYPE_NOTIFY) {
623         return AbilitySyncNotifyRecv(inMsg);
624     }
625     LOGE("[StateMachine][AbilitySyncRecv] msg type invalid");
626     return -E_NOT_SUPPORT;
627 }
628 
HandleDataRequestRecv(const Message * inMsg)629 int SingleVerSyncStateMachine::HandleDataRequestRecv(const Message *inMsg)
630 {
631     TimeOffset offset = 0;
632     auto [systemOffset, senderLocalOffset] = SingleVerDataSyncUtils::GetTimeOffsetFromRequestMsg(inMsg);
633     int errCode = timeSync_->GenerateTimeOffsetIfNeed(systemOffset, senderLocalOffset);
634     if (errCode != E_OK) {
635         (void)dataSync_->SendDataAck(context_, inMsg, errCode, 0);
636         return errCode;
637     }
638     uint32_t timeout = communicator_->GetTimeout(context_->GetDeviceId());
639     // If message is data sync request, we should check timeoffset.
640     errCode = timeSync_->GetTimeOffset(offset, timeout);
641     if (errCode != E_OK) {
642         LOGE("[StateMachine][HandleDataRequestRecv] GetTimeOffset err! errCode=%d", errCode);
643         return errCode;
644     }
645     context_->SetTimeOffset(offset);
646     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
647     if (performance != nullptr) {
648         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
649     }
650     DecRefCountOfFeedDogTimer(SyncDirectionFlag::RECEIVE);
651 
652     // RequestRecv will save data, it may cost a long time.
653     // So we need to send save data notify to keep remote alive.
654     bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
655     {
656         std::lock_guard<std::mutex> lockWatchDog(stateMachineLock_);
657         if (IsNeedResetWatchdog(inMsg)) {
658             (void)ResetWatchDog();
659         }
660     }
661     WaterMark pullEndWaterkark = 0;
662     errCode = dataSync_->DataRequestRecv(context_, inMsg, pullEndWaterkark);
663     if (performance != nullptr) {
664         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
665     }
666     // only higher than 102 version receive this errCode here.
667     // while both RequestSessionId is not equal,but get this errCode;slwr would seem to handle first secquencid.
668     // so while receive the same secquencid after abiitysync it wouldn't handle.
669     if (errCode == -E_NEED_ABILITY_SYNC) {
670         if (isNeedStop) {
671             StopSaveDataNotify();
672         }
673         return errCode;
674     }
675     {
676         std::lock_guard<std::mutex> lock(stateMachineLock_);
677         DataRecvErrCodeHandle(inMsg->GetSessionId(), errCode);
678         if (pullEndWaterkark > 0) {
679             AddPullResponseTarget(inMsg, pullEndWaterkark);
680         }
681     }
682     if (isNeedStop) {
683         StopSaveDataNotify();
684     }
685     return E_OK;
686 }
687 
HandleDataAckRecvWithSlidingWindow(int errCode,const Message * inMsg,bool ignoreInnerErr)688 void SingleVerSyncStateMachine::HandleDataAckRecvWithSlidingWindow(int errCode, const Message *inMsg,
689     bool ignoreInnerErr)
690 {
691     if (errCode == -E_RE_SEND_DATA) { // LOCAL_WATER_MARK_NOT_INIT
692         dataSync_->ClearSyncStatus();
693     }
694     if (errCode == -E_NO_DATA_SEND || errCode == -E_SEND_DATA) {
695         int ret = dataSync_->TryContinueSync(context_, inMsg);
696         if (ret == -E_FINISHED) {
697             SwitchStateAndStep(Event::SEND_FINISHED_EVENT);
698             return;
699         } else if (ret == E_OK) { // do nothing and waiting for all ack receive
700             return;
701         }
702         errCode = ret;
703     }
704     ResponsePullError(errCode, ignoreInnerErr);
705 }
706 
NeedAbilitySyncHandle()707 void SingleVerSyncStateMachine::NeedAbilitySyncHandle()
708 {
709     // if the remote device version num is overdue,
710     // mean the version num has been reset when syncing data,
711     // there should not clear the new version cache again.
712     if (currentRemoteVersionId_ == context_->GetRemoteSoftwareVersionId()) {
713         LOGI("[StateMachine] set remote version 0, currentRemoteVersionId_ = %" PRIu64, currentRemoteVersionId_);
714         context_->SetRemoteSoftwareVersion(0);
715     } else {
716         currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
717     }
718     abilitySync_->SetAbilitySyncFinishedStatus(false, *context_);
719 }
720 
HandleDataAckRecv(const Message * inMsg)721 int SingleVerSyncStateMachine::HandleDataAckRecv(const Message *inMsg)
722 {
723     if (inMsg->GetMessageType() == TYPE_RESPONSE) {
724         DecRefCountOfFeedDogTimer(SyncDirectionFlag::SEND);
725     }
726     std::lock_guard<std::mutex> lock(stateMachineLock_);
727     // Unfortunately we use stateMachineLock_ in many sync process
728     // So a bad ack will check before the lock and wait
729     // And then another process is running, it will get the lock.After this process, the ack became invalid.
730     // If we don't check ack again, it will be delivered to dataSyncer.
731     if (!IsPacketValid(inMsg)) {
732         return -E_INVALID_ARGS;
733     }
734     if (IsNeedResetWatchdog(inMsg)) {
735         (void)ResetWatchDog();
736     }
737     if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 && !dataSync_->AckPacketIdCheck(inMsg)) {
738         // packetId not match but sequence id matched scene, means resend map has be rebuilt
739         // this is old ack, should be dropped and wait for the same packetId sequence.
740         return E_OK;
741     }
742     // AckRecv will save meta data, it may cost a long time. if another thread is saving data
743     // So we need to send save data notify to keep remote alive.
744     // e.g. remote do pull sync
745     bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
746     int errCode = dataSync_->AckRecv(context_, inMsg);
747     if (isNeedStop) {
748         StopSaveDataNotify();
749     }
750     if (errCode == -E_NEED_ABILITY_SYNC || errCode == -E_RE_SEND_DATA || errCode == -E_NEED_TIME_SYNC) {
751         StopFeedDogForSync(SyncDirectionFlag::SEND);
752         dataSync_->ClearSyncStatus();
753         context_->ReSetSequenceId();
754     } else if (errCode == -E_SAVE_DATA_NOTIFY) {
755         return errCode;
756     }
757     // when this msg is from response task while request task is running,  ignore the errCode
758     bool ignoreInnerErr = inMsg->GetSessionId() == context_->GetResponseSessionId() &&
759         context_->GetRequestSessionId() != 0;
760     DataAckRecvErrCodeHandle(errCode, !ignoreInnerErr);
761     HandleDataAckRecvWithSlidingWindow(errCode, inMsg, ignoreInnerErr);
762     return errCode;
763 }
764 
DataPktRecv(Message * inMsg)765 int SingleVerSyncStateMachine::DataPktRecv(Message *inMsg)
766 {
767     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
768     int errCode = E_OK;
769     switch (inMsg->GetMessageType()) {
770         case TYPE_REQUEST:
771             ScheduleMsgAndHandle(inMsg);
772             errCode = -E_NOT_NEED_DELETE_MSG;
773             break;
774         case TYPE_RESPONSE:
775         case TYPE_NOTIFY:
776             if (performance != nullptr) {
777                 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
778                 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
779             }
780             errCode = HandleDataAckRecv(inMsg);
781             break;
782         default:
783             errCode = -E_INVALID_ARGS;
784             break;
785     }
786     return errCode;
787 }
788 
ScheduleMsgAndHandle(Message * inMsg)789 void SingleVerSyncStateMachine::ScheduleMsgAndHandle(Message *inMsg)
790 {
791     dataSync_->PutDataMsg(inMsg);
792     while (true) {
793         bool isNeedHandle = true;
794         bool isNeedContinue = true;
795         Message *msg = dataSync_->MoveNextDataMsg(context_, isNeedHandle, isNeedContinue);
796         if (!isNeedContinue) {
797             break;
798         }
799         if (msg == nullptr) {
800             if (dataSync_->IsNeedReloadQueue()) {
801                 continue;
802             }
803             dataSync_->ScheduleInfoHandle(false, false, nullptr);
804             break;
805         }
806         bool isNeedClearMap = false;
807         if (isNeedHandle) {
808             int errCode = HandleDataRequestRecv(msg);
809             if (context_->IsReceiveWaterMarkErr() || errCode == -E_NEED_ABILITY_SYNC || errCode == -E_NEED_TIME_SYNC) {
810                 isNeedClearMap = true;
811             }
812             if (errCode == -E_TIMEOUT) {
813                 isNeedHandle = false;
814             }
815         } else {
816             dataSync_->SendFinishedDataAck(context_, msg);
817         }
818         if (context_->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
819             // for lower version, no need to handle map schedule info, just reset schedule working status
820             isNeedHandle = false;
821         }
822         dataSync_->ScheduleInfoHandle(isNeedHandle, isNeedClearMap, msg);
823         delete msg;
824     }
825 }
826 
ControlPktRecv(Message * inMsg)827 int SingleVerSyncStateMachine::ControlPktRecv(Message *inMsg)
828 {
829     int errCode = E_OK;
830     switch (inMsg->GetMessageType()) {
831         case TYPE_REQUEST:
832             errCode = dataSync_->ControlCmdRequestRecv(context_, inMsg);
833             break;
834         case TYPE_RESPONSE:
835             errCode = HandleControlAckRecv(inMsg);
836             break;
837         default:
838             errCode = -E_INVALID_ARGS;
839             break;
840     }
841     return errCode;
842 }
843 
StepToTimeout(TimerId timerId)844 void SingleVerSyncStateMachine::StepToTimeout(TimerId timerId)
845 {
846     std::lock_guard<std::mutex> lock(stateMachineLock_);
847     TimerId timer = syncContext_->GetTimerId();
848     if (timer != timerId) {
849         return;
850     }
851     SwitchStateAndStep(Event::TIME_OUT_EVENT);
852 }
853 
854 namespace {
855 struct StateNode {
856     int errCode = 0;
857     SyncOperation::Status status = SyncOperation::OP_WAITING;
858 };
859 }
GetSyncOperationStatus(int errCode) const860 int SingleVerSyncStateMachine::GetSyncOperationStatus(int errCode) const
861 {
862     static const StateNode stateNodes[] = {
863         { -E_SCHEMA_MISMATCH,                 SyncOperation::OP_SCHEMA_INCOMPATIBLE },
864         { -E_EKEYREVOKED,                     SyncOperation::OP_EKEYREVOKED_FAILURE },
865         { -E_SECURITY_OPTION_CHECK_ERROR,     SyncOperation::OP_SECURITY_OPTION_CHECK_FAILURE },
866         { -E_BUSY,                            SyncOperation::OP_BUSY_FAILURE },
867         { -E_NOT_PERMIT,                      SyncOperation::OP_PERMISSION_CHECK_FAILED },
868         { -E_TIMEOUT,                         SyncOperation::OP_TIMEOUT },
869         { -E_INVALID_QUERY_FORMAT,            SyncOperation::OP_QUERY_FORMAT_FAILURE },
870         { -E_INVALID_QUERY_FIELD,             SyncOperation::OP_QUERY_FIELD_FAILURE },
871         { -E_FEEDBACK_UNKNOWN_MESSAGE,        SyncOperation::OP_NOT_SUPPORT },
872         { -E_FEEDBACK_COMMUNICATOR_NOT_FOUND, SyncOperation::OP_COMM_ABNORMAL },
873         { -E_NOT_SUPPORT,                     SyncOperation::OP_NOT_SUPPORT },
874         { -E_INTERCEPT_DATA_FAIL,             SyncOperation::OP_INTERCEPT_DATA_FAIL },
875         { -E_MAX_LIMITS,                      SyncOperation::OP_MAX_LIMITS },
876         { -E_DISTRIBUTED_SCHEMA_CHANGED,      SyncOperation::OP_SCHEMA_CHANGED },
877         { -E_NOT_REGISTER,                    SyncOperation::OP_NOT_SUPPORT },
878         { -E_DENIED_SQL,                      SyncOperation::OP_DENIED_SQL },
879         { -E_REMOTE_OVER_SIZE,                SyncOperation::OP_MAX_LIMITS },
880         { -E_INVALID_PASSWD_OR_CORRUPTED_DB,  SyncOperation::OP_NOTADB_OR_CORRUPTED },
881         { -E_DISTRIBUTED_SCHEMA_NOT_FOUND,    SyncOperation::OP_SCHEMA_INCOMPATIBLE }
882     };
883     const auto &result = std::find_if(std::begin(stateNodes), std::end(stateNodes), [errCode](const auto &node) {
884         return node.errCode == errCode;
885     });
886     return result == std::end(stateNodes) ? SyncOperation::OP_FAILED : static_cast<int>((*result).status);
887 }
888 
TimeMarkSyncRecv(const Message * inMsg)889 int SingleVerSyncStateMachine::TimeMarkSyncRecv(const Message *inMsg)
890 {
891     LOGD("[StateMachine][TimeMarkSyncRecv] type=%d,label=%s,dev=%s", inMsg->GetMessageType(),
892         dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
893     {
894         std::lock_guard<std::mutex> lock(stateMachineLock_);
895         (void)ResetWatchDog();
896     }
897     if (inMsg->GetMessageType() == TYPE_REQUEST) {
898         return timeSync_->RequestRecv(inMsg);
899     } else if (inMsg->GetMessageType() == TYPE_RESPONSE) {
900         int errCode = timeSync_->AckRecv(inMsg, context_->GetRequestSessionId());
901         if (errCode != E_OK) {
902             LOGE("[StateMachine][TimeMarkSyncRecv] AckRecv failed errCode=%d", errCode);
903             if (inMsg->GetSessionId() != 0 && inMsg->GetSessionId() == context_->GetRequestSessionId()) {
904                 context_->SetTaskErrCode(errCode);
905                 InnerErrorAbort(inMsg->GetSessionId());
906             }
907             return errCode;
908         }
909         std::lock_guard<std::mutex> lock(stateMachineLock_);
910         SwitchStateAndStep(TIME_SYNC_FINISHED_EVENT);
911         return E_OK;
912     } else {
913         return -E_INVALID_ARGS;
914     }
915 }
916 
Clear()917 void SingleVerSyncStateMachine::Clear()
918 {
919     dataSync_ = nullptr;
920     timeSync_ = nullptr;
921     abilitySync_ = nullptr;
922     context_ = nullptr;
923     syncInterface_ = nullptr;
924 }
925 
IsPacketValid(const Message * inMsg) const926 bool SingleVerSyncStateMachine::IsPacketValid(const Message *inMsg) const
927 {
928     if (inMsg == nullptr) {
929         return false;
930     }
931 
932     if ((inMsg->GetMessageId() < TIME_SYNC_MESSAGE) || (inMsg->GetMessageId() >= UNKNOW_MESSAGE)) {
933         LOGE("[StateMachine][IsPacketValid] Message is invalid, id=%d", inMsg->GetMessageId());
934         return false;
935     }
936     // filter invalid ack at first
937     bool isResponseType = (inMsg->GetMessageType() == TYPE_RESPONSE);
938     if (isResponseType && (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) &&
939         (inMsg->GetSessionId() != context_->GetRequestSessionId())) {
940         LOGE("[StateMachine][IsPacketValid] Control Message is invalid, label=%s, dev=%s",
941             dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
942         return false;
943     }
944     if (isResponseType && (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) &&
945         (inMsg->GetSessionId() != context_->GetRequestSessionId()) &&
946         (inMsg->GetSessionId() != context_->GetResponseSessionId())) {
947         LOGE("[StateMachine][IsPacketValid] Data Message is invalid, label=%s, dev=%s",
948             dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
949         return false;
950     }
951 
952     // timeSync and abilitySync don't need to check sequenceId
953     if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE || inMsg->GetMessageId() == ABILITY_SYNC_MESSAGE ||
954         inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
955         return true;
956     }
957     // sequenceId will be checked in dataSync
958     return true;
959 }
960 
PreStartPullResponse()961 void SingleVerSyncStateMachine::PreStartPullResponse()
962 {
963     SingleVerSyncTarget target;
964     context_->PopResponseTarget(target);
965     context_->SetEndMark(target.GetEndWaterMark());
966     context_->SetResponseSessionId(target.GetResponseSessionId());
967     context_->SetMode(SyncModeType::RESPONSE_PULL);
968     context_->ReSetSequenceId();
969     context_->SetQuerySync(target.IsQuerySync());
970     context_->SetQuery(target.GetQuery());
971 }
972 
CheckIsStartPullResponse() const973 bool SingleVerSyncStateMachine::CheckIsStartPullResponse() const
974 {
975     // Other state will step to do pull response, only this statem we need to step the statemachine
976     if (currentState_ == WAIT_FOR_RECEIVE_DATA_FINISH) {
977         return true;
978     }
979     return false;
980 }
981 
MessageCallbackPre(const Message * inMsg)982 int SingleVerSyncStateMachine::MessageCallbackPre(const Message *inMsg)
983 {
984     RefObject::AutoLock lock(context_);
985     if (context_->IsKilled()) {
986         return -E_OBJ_IS_KILLED;
987     }
988 
989     if (!IsPacketValid(inMsg)) {
990         return -E_INVALID_ARGS;
991     }
992     return E_OK;
993 }
994 
AddPullResponseTarget(const Message * inMsg,WaterMark pullEndWatermark)995 void SingleVerSyncStateMachine::AddPullResponseTarget(const Message *inMsg, WaterMark pullEndWatermark)
996 {
997     int messageType = static_cast<int>(inMsg->GetMessageId());
998     uint32_t sessionId = inMsg->GetSessionId();
999     if (pullEndWatermark == 0) {
1000         LOGE("[StateMachine][AddPullResponseTarget] pullEndWatermark is 0!");
1001         return;
1002     }
1003     if (context_->GetResponseSessionId() == sessionId || context_->FindResponseSyncTarget(sessionId)) {
1004         LOGI("[StateMachine][AddPullResponseTarget] task is already running");
1005         return;
1006     }
1007     const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
1008     if (packet == nullptr) {
1009         LOGE("[AddPullResponseTarget] get packet object failed");
1010         return;
1011     }
1012     SingleVerSyncTarget *targetTmp = new (std::nothrow) SingleVerSyncTarget;
1013     if (targetTmp == nullptr) {
1014         LOGE("[StateMachine][AddPullResponseTarget] add failed, may oom");
1015         return;
1016     }
1017     targetTmp->SetTaskType(ISyncTarget::RESPONSE);
1018     if (messageType == QUERY_SYNC_MESSAGE) {
1019         targetTmp->SetQuery(packet->GetQuery());
1020         targetTmp->SetQuerySync(true);
1021     }
1022     targetTmp->SetMode(SyncModeType::RESPONSE_PULL);
1023     targetTmp->SetEndWaterMark(pullEndWatermark);
1024     targetTmp->SetResponseSessionId(sessionId);
1025     if (context_->AddSyncTarget(targetTmp) != E_OK) {
1026         delete targetTmp;
1027         return;
1028     }
1029     if (CheckIsStartPullResponse()) {
1030         SwitchStateAndStep(TransformErrCodeToEvent(-E_NEED_PULL_REPONSE));
1031     }
1032 }
1033 
TransformErrCodeToEvent(int errCode) const1034 Event SingleVerSyncStateMachine::TransformErrCodeToEvent(int errCode) const
1035 {
1036     switch (errCode) {
1037         case -E_TIMEOUT:
1038             return TransforTimeOutErrCodeToEvent();
1039         case -static_cast<int>(VERSION_NOT_SUPPOR_EVENT):
1040             return Event::VERSION_NOT_SUPPOR_EVENT;
1041         case -E_SEND_DATA:
1042             return Event::SEND_DATA_EVENT;
1043         case -E_NO_DATA_SEND:
1044             return Event::SEND_FINISHED_EVENT;
1045         case -E_RECV_FINISHED:
1046             return Event::RECV_FINISHED_EVENT;
1047         case -E_NEED_ABILITY_SYNC:
1048             return Event::NEED_ABILITY_SYNC_EVENT;
1049         case -E_NO_SYNC_TASK:
1050             return Event::ALL_TASK_FINISHED_EVENT;
1051         case -E_NEED_PULL_REPONSE:
1052             return Event::START_PULL_RESPONSE_EVENT;
1053         case -E_RE_SEND_DATA:
1054             return Event::RE_SEND_DATA_EVENT;
1055         case -E_NEED_TIME_SYNC:
1056             return Event::NEED_TIME_SYNC_EVENT;
1057         default:
1058             return Event::INNER_ERR_EVENT;
1059     }
1060 }
1061 
IsNeedResetWatchdog(const Message * inMsg) const1062 bool SingleVerSyncStateMachine::IsNeedResetWatchdog(const Message *inMsg) const
1063 {
1064     if (inMsg == nullptr) {
1065         return false;
1066     }
1067 
1068     if (IsNeedErrCodeHandle(inMsg->GetSessionId())) {
1069         return true;
1070     }
1071 
1072     int msgType = inMsg->GetMessageType();
1073     if (msgType == TYPE_RESPONSE || msgType == TYPE_NOTIFY) {
1074         if (inMsg->GetSessionId() == context_->GetResponseSessionId()) {
1075             // Pull response ack also should reset watchdog
1076             return true;
1077         }
1078     }
1079 
1080     return false;
1081 }
1082 
TransforTimeOutErrCodeToEvent() const1083 Event SingleVerSyncStateMachine::TransforTimeOutErrCodeToEvent() const
1084 {
1085     if (syncContext_->IsSyncTaskNeedRetry() && (syncContext_->GetRetryTime() < syncContext_->GetSyncRetryTimes())) {
1086         return Event::WAIT_TIME_OUT_EVENT;
1087     } else {
1088         return Event::TIME_OUT_EVENT;
1089     }
1090 }
1091 
IsNeedErrCodeHandle(uint32_t sessionId) const1092 bool SingleVerSyncStateMachine::IsNeedErrCodeHandle(uint32_t sessionId) const
1093 {
1094     // omit to set sessionId so version_3 should skip to compare sessionid.
1095     if (sessionId == context_->GetRequestSessionId() ||
1096         context_->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_RELEASE_2_0) {
1097         return true;
1098     }
1099     return false;
1100 }
1101 
PushPullDataRequestEvokeErrHandle()1102 void SingleVerSyncStateMachine::PushPullDataRequestEvokeErrHandle()
1103 {
1104     // the pushpull sync task should wait for send finished after remote dev get data occur E_EKEYREVOKED error.
1105     if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 &&
1106         SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL) { // LCOV_EXCL_BR_LINE
1107         LOGI("data request errCode = %d, wait for send finished", -E_EKEYREVOKED);
1108         context_->SetTaskErrCode(-E_EKEYREVOKED);
1109         context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1110         SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1111     } else {
1112         context_->SetTaskErrCode(-E_EKEYREVOKED);
1113         SwitchStateAndStep(Event::INNER_ERR_EVENT);
1114     }
1115 }
1116 
DataRecvErrCodeHandle(uint32_t sessionId,int errCode)1117 void SingleVerSyncStateMachine::DataRecvErrCodeHandle(uint32_t sessionId, int errCode)
1118 {
1119     if (IsNeedErrCodeHandle(sessionId)) {
1120         switch (errCode) {
1121             case E_OK:
1122                 break;
1123             case -E_NOT_PERMIT:
1124                 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1125                 break;
1126             case -E_RECV_FINISHED:
1127                 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1128                 SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1129                 break;
1130             case -E_EKEYREVOKED:
1131                 PushPullDataRequestEvokeErrHandle();
1132                 break;
1133             case -E_BUSY:
1134             case -E_DISTRIBUTED_SCHEMA_CHANGED:
1135             case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1136             case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1137             case -E_FEEDBACK_UNKNOWN_MESSAGE:
1138             case -E_INTERCEPT_DATA_FAIL:
1139             case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1140             case -E_INVALID_QUERY_FIELD:
1141             case -E_INVALID_QUERY_FORMAT:
1142             case -E_MAX_LIMITS:
1143             case -E_NOT_REGISTER:
1144             case -E_NOT_SUPPORT:
1145             case -E_SECURITY_OPTION_CHECK_ERROR:
1146                 context_->SetTaskErrCode(errCode);
1147                 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1148                 break;
1149             default:
1150                 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1151                 break;
1152         }
1153     }
1154 }
1155 
AbilityMsgSessionIdCheck(const Message * inMsg)1156 bool SingleVerSyncStateMachine::AbilityMsgSessionIdCheck(const Message *inMsg)
1157 {
1158     if (inMsg != nullptr && inMsg->GetSessionId() == context_->GetRequestSessionId()) { // LCOV_EXCL_BR_LINE
1159         return true;
1160     }
1161     LOGE("[AbilitySync] session check failed,dev=%s", STR_MASK(context_->GetDeviceId()));
1162     return false;
1163 }
1164 
GetSyncType(uint32_t messageId) const1165 SyncType SingleVerSyncStateMachine::GetSyncType(uint32_t messageId) const
1166 {
1167     if (messageId == QUERY_SYNC_MESSAGE) { // LCOV_EXCL_BR_LINE
1168         return SyncType::QUERY_SYNC_TYPE;
1169     }
1170     return SyncType::MANUAL_FULL_SYNC_TYPE;
1171 }
1172 
DataAckRecvErrCodeHandle(int errCode,bool handleError)1173 void SingleVerSyncStateMachine::DataAckRecvErrCodeHandle(int errCode, bool handleError)
1174 {
1175     switch (errCode) {
1176         case -E_NEED_ABILITY_SYNC:
1177             NeedAbilitySyncHandle();
1178             break;
1179         case -E_NEED_TIME_SYNC:
1180             timeSync_->ClearTimeSyncFinish();
1181             break;
1182         case -E_NOT_PERMIT:
1183             if (handleError) {
1184                 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1185             }
1186             break;
1187         case -E_BUSY:
1188         case -E_DISTRIBUTED_SCHEMA_CHANGED:
1189         case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1190         case -E_EKEYREVOKED:
1191         case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1192         case -E_FEEDBACK_UNKNOWN_MESSAGE:
1193         case -E_INTERCEPT_DATA_FAIL:
1194         case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1195         case -E_INVALID_QUERY_FIELD:
1196         case -E_INVALID_QUERY_FORMAT:
1197         case -E_MAX_LIMITS:
1198         case -E_NOT_REGISTER:
1199         case -E_NOT_SUPPORT:
1200         case -E_SECURITY_OPTION_CHECK_ERROR:
1201             if (handleError) {
1202                 context_->SetTaskErrCode(errCode);
1203             }
1204             break;
1205         default:
1206             break;
1207     }
1208 }
1209 
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)1210 bool SingleVerSyncStateMachine::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
1211 {
1212     return SingleVerDataSyncUtils::IsNeedTriggerQueryAutoSync(inMsg, query);
1213 }
1214 
JumpStatusAfterAbilitySync(int mode)1215 void SingleVerSyncStateMachine::JumpStatusAfterAbilitySync(int mode)
1216 {
1217     if ((mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
1218         SwitchStateAndStep(CONTROL_CMD_EVENT);
1219     } else {
1220         SwitchStateAndStep(ABILITY_SYNC_FINISHED_EVENT);
1221     }
1222 }
1223 
ControlAckRecvErrCodeHandle(int errCode)1224 void SingleVerSyncStateMachine::ControlAckRecvErrCodeHandle(int errCode)
1225 {
1226     switch (errCode) {
1227         case -E_NEED_ABILITY_SYNC:
1228             NeedAbilitySyncHandle();
1229             break;
1230         case -E_NO_DATA_SEND:
1231             context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
1232             break;
1233         case -E_NOT_PERMIT:
1234             context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1235             break;
1236         // other errCode use default
1237         default:
1238             context_->SetTaskErrCode(errCode);
1239             break;
1240     }
1241 }
1242 
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)1243 void SingleVerSyncStateMachine::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
1244 {
1245     metadata_->GetLocalWaterMark(deviceId, outValue);
1246 }
1247 
GetSendQueryWaterMark(const std::string & queryId,const DeviceID & deviceId,bool isAutoLift,uint64_t & outValue)1248 int SingleVerSyncStateMachine::GetSendQueryWaterMark(const std::string &queryId,  const DeviceID &deviceId,
1249     bool isAutoLift, uint64_t &outValue)
1250 {
1251     return metadata_->GetSendQueryWaterMark(queryId, deviceId, outValue, isAutoLift);
1252 }
1253 
ResponsePullError(int errCode,bool ignoreInnerErr)1254 void SingleVerSyncStateMachine::ResponsePullError(int errCode, bool ignoreInnerErr)
1255 {
1256     Event event = TransformErrCodeToEvent(errCode);
1257     if (event == INNER_ERR_EVENT) {
1258         if (ignoreInnerErr) {
1259             event = RESPONSE_TASK_FINISHED_EVENT;
1260         } else if (context_ != nullptr) {
1261             context_->SetTaskErrCode(errCode);
1262         }
1263     }
1264     SwitchStateAndStep(event);
1265 }
1266 
InnerErrorAbort(uint32_t sessionId)1267 void SingleVerSyncStateMachine::InnerErrorAbort(uint32_t sessionId)
1268 {
1269     std::lock_guard<std::mutex> lock(stateMachineLock_);
1270     uint32_t requestSessionId = context_->GetRequestSessionId();
1271     if (sessionId != requestSessionId) {
1272         LOGD("[SingleVerSyncStateMachine][InnerErrorAbort] Ignore abort by different sessionId");
1273         return;
1274     }
1275     if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
1276         SyncStep();
1277     }
1278 }
1279 
NotifyClosing()1280 void SingleVerSyncStateMachine::NotifyClosing()
1281 {
1282     if (timeSync_ != nullptr) {
1283         timeSync_->Close();
1284     }
1285 }
1286 
AbilitySyncNotifyRecv(const Message * inMsg)1287 int SingleVerSyncStateMachine::AbilitySyncNotifyRecv(const Message *inMsg)
1288 {
1289     const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
1290     if (packet == nullptr) {
1291         return -E_INVALID_ARGS;
1292     }
1293     int ackCode = packet->GetAckCode();
1294     if (ackCode != AbilitySync::CHECK_SUCCESS && ackCode != AbilitySync::LAST_NOTIFY) {
1295         LOGE("[StateMachine][AbilitySyncRecv] ackCode check failed,ackCode=%d", ackCode);
1296         context_->SetTaskErrCode(ackCode);
1297         std::lock_guard<std::mutex> lock(stateMachineLock_);
1298         SwitchStateAndStep(Event::INNER_ERR_EVENT);
1299         return E_OK;
1300     }
1301     if (ackCode == AbilitySync::LAST_NOTIFY && AbilityMsgSessionIdCheck(inMsg)) {
1302         abilitySync_->SetAbilitySyncFinishedStatus(true, *context_);
1303         // while recv last notify means ability sync finished,it is better to reset watchDog to avoid timeout.
1304         LOGI("[StateMachine][AbilitySyncRecv] ability sync finished,label=%s,dev=%s",
1305             dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
1306         context_->SetRemoteSoftwareVersion(packet->GetSoftwareVersion());
1307         currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
1308         std::lock_guard<std::mutex> lock(stateMachineLock_);
1309         (void)ResetWatchDog();
1310         JumpStatusAfterAbilitySync(context_->GetMode());
1311     } else if (ackCode != AbilitySync::LAST_NOTIFY) {
1312         abilitySync_->AckNotifyRecv(inMsg, context_);
1313     }
1314     return E_OK;
1315 }
1316 
SchemaChange()1317 void SingleVerSyncStateMachine::SchemaChange()
1318 {
1319     abilitySync_->SetAbilitySyncFinishedStatus(false, *context_);
1320 }
1321 
TimeChange()1322 void SingleVerSyncStateMachine::TimeChange()
1323 {
1324     if (timeSync_ == nullptr) {
1325         LOGW("[SingleVerSyncStateMachine] time sync is null when time change");
1326         return;
1327     }
1328     timeSync_->ClearTimeSyncFinish();
1329 }
1330 } // namespace DistributedDB
1331