• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_sync_state_machine.h"
17 
18 #include <cmath>
19 #include <climits>
20 #include <algorithm>
21 
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "sync_operation.h"
25 #include "message_transform.h"
26 #include "sync_types.h"
27 #include "db_common.h"
28 #include "runtime_context.h"
29 #include "performance_analysis.h"
30 #include "single_ver_sync_target.h"
31 #include "single_ver_data_sync.h"
32 #include "single_ver_data_sync_utils.h"
33 
34 namespace DistributedDB {
35 using Event = SingleVerSyncStateMachine::Event;
36 using State = SingleVerSyncStateMachine::State;
37 namespace {
38     // used for state switch table
39     const int CURRENT_STATE_INDEX = 0;
40     const int EVENT_INDEX = 1;
41     const int OUTPUT_STATE_INDEX = 2;
42 
43     // drop v1 and v2 table by one optimize, dataSend mode in all version go with slide window mode.
44     // State switch table v3, has three columns, CurrentState, Event, and OutSate
45     const std::vector<std::vector<uint8_t>> STATE_SWITCH_TABLE_V3 = {
46         {State::IDLE, Event::START_SYNC_EVENT, State::TIME_SYNC},
47 
48         // In TIME_SYNC state
49         {State::TIME_SYNC, Event::TIME_SYNC_FINISHED_EVENT, State::ABILITY_SYNC},
50         {State::TIME_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
51         {State::TIME_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
52 
53         // In ABILITY_SYNC state, compare version num and schema
54         {State::ABILITY_SYNC, Event::VERSION_NOT_SUPPOR_EVENT, State::INNER_ERR},
55         {State::ABILITY_SYNC, Event::ABILITY_SYNC_FINISHED_EVENT, State::START_INITIACTIVE_DATA_SYNC},
56         {State::ABILITY_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
57         {State::ABILITY_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
58         {State::ABILITY_SYNC, Event::CONTROL_CMD_EVENT, State::SYNC_CONTROL_CMD},
59 
60         // In START_INITIACTIVE_DATA_SYNC state, send a sync request, and send first packt of data sync
61         {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
62         {State::START_INITIACTIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
63         {State::START_INITIACTIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
64         {State::START_INITIACTIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
65         {State::START_INITIACTIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_INITIACTIVE_DATA_SYNC},
66 
67         // In START_PASSIVE_DATA_SYNC state, do response pull request, and send first packt of data sync
68         {State::START_PASSIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
69         {State::START_PASSIVE_DATA_SYNC, Event::RESPONSE_TASK_FINISHED_EVENT, State::WAIT_FOR_RECEIVE_DATA_FINISH},
70         {State::START_PASSIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
71         {State::START_PASSIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
72         {State::START_PASSIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
73         {State::START_PASSIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_PASSIVE_DATA_SYNC},
74 
75         // In WAIT_FOR_RECEIVE_DATA_FINISH,
76         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::RECV_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
77         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::START_PULL_RESPONSE_EVENT, State::START_PASSIVE_DATA_SYNC},
78         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
79         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::INNER_ERR_EVENT, State::INNER_ERR},
80         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
81 
82         {State::SYNC_CONTROL_CMD, Event::SEND_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
83         {State::SYNC_CONTROL_CMD, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
84         {State::SYNC_CONTROL_CMD, Event::INNER_ERR_EVENT, State::INNER_ERR},
85         {State::SYNC_CONTROL_CMD, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
86 
87         // In SYNC_TASK_FINISHED,
88         {State::SYNC_TASK_FINISHED, Event::ALL_TASK_FINISHED_EVENT, State::IDLE},
89         {State::SYNC_TASK_FINISHED, Event::START_SYNC_EVENT, State::TIME_SYNC},
90 
91         // SYNC_TIME_OUT and INNE_ERR state, just do some exception resolve
92         {State::SYNC_TIME_OUT, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
93         {State::INNER_ERR, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
94     };
95 }
96 
97 std::mutex SingleVerSyncStateMachine::stateSwitchTableLock_;
98 std::vector<StateSwitchTable> SingleVerSyncStateMachine::stateSwitchTables_;
99 bool SingleVerSyncStateMachine::isStateSwitchTableInited_ = false;
100 
SingleVerSyncStateMachine()101 SingleVerSyncStateMachine::SingleVerSyncStateMachine()
102     : context_(nullptr),
103       syncInterface_(nullptr),
104       timeSync_(nullptr),
105       abilitySync_(nullptr),
106       dataSync_(nullptr),
107       currentRemoteVersionId_(0)
108 {
109 }
110 
~SingleVerSyncStateMachine()111 SingleVerSyncStateMachine::~SingleVerSyncStateMachine()
112 {
113     LOGD("~SingleVerSyncStateMachine");
114     Clear();
115 }
116 
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metaData,ICommunicator * communicator)117 int SingleVerSyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
118     const std::shared_ptr<Metadata> &metaData, ICommunicator *communicator)
119 {
120     if ((context == nullptr) || (syncInterface == nullptr) || (metaData == nullptr) || (communicator == nullptr)) {
121         return -E_INVALID_ARGS;
122     }
123 
124     int errCode = SyncStateMachine::Initialize(context, syncInterface, metaData, communicator);
125     if (errCode != E_OK) {
126         return errCode;
127     }
128 
129     timeSync_ = std::make_unique<TimeSync>();
130     dataSync_ = std::make_shared<SingleVerDataSync>();
131     abilitySync_ = std::make_unique<AbilitySync>();
132     if ((timeSync_ == nullptr) || (dataSync_ == nullptr) || (abilitySync_ == nullptr)) {
133         timeSync_ = nullptr;
134         dataSync_ = nullptr;
135         abilitySync_ = nullptr;
136         return -E_OUT_OF_MEMORY;
137     }
138 
139     errCode = timeSync_->Initialize(communicator, metaData, syncInterface, context->GetDeviceId());
140     if (errCode != E_OK) {
141         goto ERROR_OUT;
142     }
143     errCode = dataSync_->Initialize(syncInterface, communicator, metaData, context->GetDeviceId());
144     if (errCode != E_OK) {
145         goto ERROR_OUT;
146     }
147     errCode = abilitySync_->Initialize(communicator, syncInterface, metaData, context->GetDeviceId());
148     if (errCode != E_OK) {
149         goto ERROR_OUT;
150     }
151 
152     currentState_ = IDLE;
153     context_ = static_cast<SingleVerSyncTaskContext *>(context);
154     syncInterface_ = static_cast<SingleVerKvDBSyncInterface *>(syncInterface);
155 
156     InitStateSwitchTables();
157     InitStateMapping();
158     return E_OK;
159 
160 ERROR_OUT:
161     Clear();
162     return errCode;
163 }
164 
SyncStep()165 void SingleVerSyncStateMachine::SyncStep()
166 {
167     RefObject::IncObjRef(context_);
168     RefObject::IncObjRef(communicator_);
169     int errCode = RuntimeContext::GetInstance()->ScheduleTask(
170         std::bind(&SingleVerSyncStateMachine::SyncStepInnerLocked, this));
171     if (errCode != E_OK) {
172         LOGE("[StateMachine][SyncStep] Schedule SyncStep failed");
173         RefObject::DecObjRef(communicator_);
174         RefObject::DecObjRef(context_);
175     }
176 }
177 
ReceiveMessageCallback(Message * inMsg)178 int SingleVerSyncStateMachine::ReceiveMessageCallback(Message *inMsg)
179 {
180     int errCode = MessageCallbackPre(inMsg);
181     if (errCode != E_OK) {
182         LOGE("[StateMachine] message pre check failed");
183         return errCode;
184     }
185     switch (inMsg->GetMessageId()) {
186         case TIME_SYNC_MESSAGE:
187             errCode = TimeMarkSyncRecv(inMsg);
188             break;
189         case ABILITY_SYNC_MESSAGE:
190             errCode = AbilitySyncRecv(inMsg);
191             break;
192         case DATA_SYNC_MESSAGE:
193         case QUERY_SYNC_MESSAGE:
194             errCode = DataPktRecv(inMsg);
195             break;
196         case CONTROL_SYNC_MESSAGE:
197             errCode = ControlPktRecv(inMsg);
198             break;
199         default:
200             errCode = -E_NOT_SUPPORT;
201     }
202     return errCode;
203 }
204 
SyncStepInnerLocked()205 void SingleVerSyncStateMachine::SyncStepInnerLocked()
206 {
207     if (context_->IncUsedCount() != E_OK) {
208         goto SYNC_STEP_OUT;
209     }
210     {
211         std::lock_guard<std::mutex> lock(stateMachineLock_);
212         SyncStepInner();
213     }
214     context_->SafeExit();
215 
216 SYNC_STEP_OUT:
217     RefObject::DecObjRef(communicator_);
218     RefObject::DecObjRef(context_);
219 }
220 
SyncStepInner()221 void SingleVerSyncStateMachine::SyncStepInner()
222 {
223     Event event = INNER_ERR_EVENT;
224     do {
225         auto iter = stateMapping_.find(currentState_);
226         if (iter != stateMapping_.end()) {
227             event = static_cast<Event>(iter->second());
228         } else {
229             LOGE("[StateMachine][SyncStepInner] can not find state=%d,label=%s,dev=%s", currentState_,
230                 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
231             break;
232         }
233     } while (event != Event::WAIT_ACK_EVENT && SwitchMachineState(event) == E_OK && currentState_ != IDLE);
234 }
235 
SetCurStateErrStatus()236 void SingleVerSyncStateMachine::SetCurStateErrStatus()
237 {
238     currentState_ = State::INNER_ERR;
239 }
240 
StartSyncInner()241 int SingleVerSyncStateMachine::StartSyncInner()
242 {
243     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
244     if (performance != nullptr) {
245         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
246     }
247     int errCode = PrepareNextSyncTask();
248     if (errCode == E_OK) {
249         SwitchStateAndStep(Event::START_SYNC_EVENT);
250     }
251     return errCode;
252 }
253 
AbortInner()254 void SingleVerSyncStateMachine::AbortInner()
255 {
256     LOGE("[StateMachine][AbortInner] error occurred,abort,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
257         STR_MASK(context_->GetDeviceId()));
258     if (context_->IsKilled()) {
259         dataSync_->ClearDataMsg();
260     }
261     dataSync_->ClearSyncStatus();
262     ContinueToken token;
263     context_->GetContinueToken(token);
264     if (token != nullptr) {
265         syncInterface_->ReleaseContinueToken(token);
266     }
267     context_->SetContinueToken(nullptr);
268     context_->Clear();
269 }
270 
GetStateSwitchTables() const271 const std::vector<StateSwitchTable> &SingleVerSyncStateMachine::GetStateSwitchTables() const
272 {
273     return stateSwitchTables_;
274 }
275 
PrepareNextSyncTask()276 int SingleVerSyncStateMachine::PrepareNextSyncTask()
277 {
278     int errCode = StartWatchDog();
279     if (errCode != E_OK && errCode != -E_UNEXPECTED_DATA) {
280         LOGE("[StateMachine][PrepareNextSyncTask] WatchDog start failed,err=%d", errCode);
281         return errCode;
282     }
283     if (errCode == -E_UNEXPECTED_DATA) {
284         LOGI("[PrepareNextSyncTask] timer already exists, reset the timer.");
285         (void)ResetWatchDog();
286     }
287 
288     if (currentState_ != State::IDLE && currentState_ != State::SYNC_TASK_FINISHED) {
289         LOGW("[StateMachine][PrepareNextSyncTask] PreSync may get an err, state=%" PRIu8 ",dev=%s",
290             currentState_, STR_MASK(context_->GetDeviceId()));
291         currentState_ = State::IDLE;
292     }
293     return E_OK;
294 }
295 
SendNotifyPacket(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)296 void SingleVerSyncStateMachine::SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
297 {
298     dataSync_->SendSaveDataNotifyPacket(context_,
299         std::min(context_->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT), sessionId, sequenceId, inMsgId);
300 }
301 
CommErrAbort(uint32_t sessionId)302 void SingleVerSyncStateMachine::CommErrAbort(uint32_t sessionId)
303 {
304     std::lock_guard<std::mutex> lock(stateMachineLock_);
305     uint32_t requestSessionId = context_->GetRequestSessionId();
306     if ((sessionId != 0) && ((requestSessionId == 0) || (sessionId != requestSessionId))) {
307         return;
308     }
309     context_->SetCommNormal(false);
310     if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
311         SyncStep();
312     }
313 }
314 
InitStateSwitchTables()315 void SingleVerSyncStateMachine::InitStateSwitchTables()
316 {
317     if (isStateSwitchTableInited_) {
318         return;
319     }
320 
321     std::lock_guard<std::mutex> lock(stateSwitchTableLock_);
322     if (isStateSwitchTableInited_) {
323         return;
324     }
325 
326     InitStateSwitchTable(SINGLE_VER_SYNC_PROCTOL_V3, STATE_SWITCH_TABLE_V3);
327     std::sort(stateSwitchTables_.begin(), stateSwitchTables_.end(),
328         [](const auto &tableA, const auto &tableB) {
329             return tableA.version > tableB.version;
330         }); // descending
331     isStateSwitchTableInited_ = true;
332 }
333 
InitStateSwitchTable(uint32_t version,const std::vector<std::vector<uint8_t>> & switchTable)334 void SingleVerSyncStateMachine::InitStateSwitchTable(uint32_t version,
335     const std::vector<std::vector<uint8_t>> &switchTable)
336 {
337     StateSwitchTable table;
338     table.version = version;
339     for (const auto &stateSwitch : switchTable) {
340         if (stateSwitch.size() <= OUTPUT_STATE_INDEX) {
341             LOGE("[StateMachine][InitSwitchTable] stateSwitch size err,size=%zu", stateSwitch.size());
342             return;
343         }
344         if (table.switchTable.count(stateSwitch[CURRENT_STATE_INDEX]) == 0) {
345             EventToState eventToState; // new EventToState
346             eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
347             table.switchTable[stateSwitch[CURRENT_STATE_INDEX]] = eventToState;
348         } else { // key stateSwitch[CURRENT_STATE_INDEX] already has EventToState
349             EventToState &eventToState = table.switchTable[stateSwitch[CURRENT_STATE_INDEX]];
350             eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
351         }
352     }
353     stateSwitchTables_.push_back(table);
354 }
355 
InitStateMapping()356 void SingleVerSyncStateMachine::InitStateMapping()
357 {
358     stateMapping_[TIME_SYNC] = std::bind(&SingleVerSyncStateMachine::DoTimeSync, this);
359     stateMapping_[ABILITY_SYNC] = std::bind(&SingleVerSyncStateMachine::DoAbilitySync, this);
360     stateMapping_[WAIT_FOR_RECEIVE_DATA_FINISH] = std::bind(&SingleVerSyncStateMachine::DoWaitForDataRecv, this);
361     stateMapping_[SYNC_TASK_FINISHED] = std::bind(&SingleVerSyncStateMachine::DoSyncTaskFinished, this);
362     stateMapping_[SYNC_TIME_OUT] = std::bind(&SingleVerSyncStateMachine::DoTimeout, this);
363     stateMapping_[INNER_ERR] = std::bind(&SingleVerSyncStateMachine::DoInnerErr, this);
364     stateMapping_[START_INITIACTIVE_DATA_SYNC] =
365         std::bind(&SingleVerSyncStateMachine::DoInitiactiveDataSyncWithSlidingWindow, this);
366     stateMapping_[START_PASSIVE_DATA_SYNC] =
367         std::bind(&SingleVerSyncStateMachine::DoPassiveDataSyncWithSlidingWindow, this);
368     stateMapping_[SYNC_CONTROL_CMD] = std::bind(&SingleVerSyncStateMachine::DoInitiactiveControlSync, this);
369 }
370 
DoInitiactiveDataSyncWithSlidingWindow() const371 Event SingleVerSyncStateMachine::DoInitiactiveDataSyncWithSlidingWindow() const
372 {
373     LOGD("[StateMachine][activeDataSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
374         dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
375     int errCode = E_OK;
376     switch (context_->GetMode()) {
377         case SyncModeType::PUSH:
378         case SyncModeType::QUERY_PUSH:
379             context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
380             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
381             break;
382         case SyncModeType::PULL:
383         case SyncModeType::QUERY_PULL:
384             context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
385             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
386             break;
387         case SyncModeType::PUSH_AND_PULL:
388         case SyncModeType::QUERY_PUSH_PULL:
389             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
390             break;
391         case SyncModeType::RESPONSE_PULL:
392             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
393             break;
394         default:
395             errCode = -E_NOT_SUPPORT;
396             break;
397     }
398     if (errCode == E_OK) {
399         return Event::WAIT_ACK_EVENT;
400     }
401     // once E_EKEYREVOKED error occurred, PUSH_AND_PULL mode should wait for ack to pull remote data.
402     if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
403         errCode == -E_EKEYREVOKED) {
404         return Event::WAIT_ACK_EVENT;
405     }
406 
407     // when response task step dataSync again while request task is running,  ignore the errCode
408     bool ignoreInnerErr = (context_->GetResponseSessionId() != 0 && context_->GetRequestSessionId() != 0);
409     Event event = TransformErrCodeToEvent(errCode);
410     return (ignoreInnerErr && event == INNER_ERR_EVENT) ? SEND_FINISHED_EVENT : event;
411 }
412 
DoPassiveDataSyncWithSlidingWindow()413 Event SingleVerSyncStateMachine::DoPassiveDataSyncWithSlidingWindow()
414 {
415     {
416         RefObject::AutoLock lock(context_);
417         if (context_->GetRspTargetQueueSize() != 0) {
418             PreStartPullResponse();
419         } else if (context_->GetResponseSessionId() == 0 ||
420             context_->GetRetryStatus() == static_cast<int>(SyncTaskContext::NO_NEED_RETRY)) {
421             return RESPONSE_TASK_FINISHED_EVENT;
422         }
423     }
424     int errCode = dataSync_->SyncStart(SyncModeType::RESPONSE_PULL, context_);
425     if (errCode != E_OK) {
426         LOGE("[SingleVerSyncStateMachine][DoPassiveDataSyncWithSlidingWindow] response pull send failed[%d]", errCode);
427         return RESPONSE_TASK_FINISHED_EVENT;
428     }
429     return Event::WAIT_ACK_EVENT;
430 }
431 
DoInitiactiveControlSync() const432 Event SingleVerSyncStateMachine::DoInitiactiveControlSync() const
433 {
434     LOGD("[StateMachine][activeControlSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
435         dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
436     context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
437     int errCode = dataSync_->ControlCmdStart(context_);
438     if (errCode == E_OK) {
439         return Event::WAIT_ACK_EVENT;
440     }
441     context_->SetTaskErrCode(errCode);
442     return TransformErrCodeToEvent(errCode);
443 }
444 
HandleControlAckRecv(const Message * inMsg)445 int SingleVerSyncStateMachine::HandleControlAckRecv(const Message *inMsg)
446 {
447     std::lock_guard<std::mutex> lock(stateMachineLock_);
448     if (IsNeedResetWatchdog(inMsg)) {
449         (void)ResetWatchDog();
450     }
451     int errCode = dataSync_->ControlCmdAckRecv(context_, inMsg);
452     ControlAckRecvErrCodeHandle(errCode);
453     SwitchStateAndStep(TransformErrCodeToEvent(errCode));
454     return E_OK;
455 }
456 
DoWaitForDataRecv() const457 Event SingleVerSyncStateMachine::DoWaitForDataRecv() const
458 {
459     if (context_->GetRspTargetQueueSize() != 0) {
460         return START_PULL_RESPONSE_EVENT;
461     }
462     if (context_->GetOperationStatus() == SyncOperation::OP_FINISHED_ALL) {
463         return RECV_FINISHED_EVENT;
464     }
465     if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
466         context_->GetOperationStatus() == SyncOperation::OP_EKEYREVOKED_FAILURE &&
467         context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
468         return RECV_FINISHED_EVENT;
469     }
470     return Event::WAIT_ACK_EVENT;
471 }
472 
DoTimeSync() const473 Event SingleVerSyncStateMachine::DoTimeSync() const
474 {
475     if (timeSync_->IsNeedSync()) {
476         CommErrHandler handler = nullptr;
477         // Auto sync need do retry don't use errHandler to return.
478         if (!context_->IsAutoSync()) {
479             handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
480                 context_, context_->GetRequestSessionId());
481         }
482         int errCode = timeSync_->SyncStart(handler, context_->GetRequestSessionId());
483         if (errCode == E_OK) {
484             return Event::WAIT_ACK_EVENT;
485         }
486         context_->SetTaskErrCode(errCode);
487         return TransformErrCodeToEvent(errCode);
488     }
489 
490     return Event::TIME_SYNC_FINISHED_EVENT;
491 }
492 
DoAbilitySync() const493 Event SingleVerSyncStateMachine::DoAbilitySync() const
494 {
495     uint16_t remoteCommunicatorVersion = 0;
496     int errCode = communicator_->GetRemoteCommunicatorVersion(context_->GetDeviceId(), remoteCommunicatorVersion);
497     if (errCode != E_OK) {
498         LOGE("[StateMachine][DoAbilitySync] Get RemoteCommunicatorVersion errCode=%d", errCode);
499         return Event::INNER_ERR_EVENT;
500     }
501     // Fistr version, not support AbilitySync
502     if (remoteCommunicatorVersion == 0) {
503         context_->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
504         return GetEventAfterTimeSync(context_->GetMode());
505     }
506     if (context_->GetIsNeedResetAbilitySync()) {
507         abilitySync_->SetAbilitySyncFinishedStatus(false);
508         context_->SetIsNeedResetAbilitySync(false);
509     }
510     if (abilitySync_->GetAbilitySyncFinishedStatus()) {
511         return GetEventAfterTimeSync(context_->GetMode());
512     }
513 
514     CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
515         context_, context_->GetRequestSessionId());
516     LOGI("[StateMachine][AbilitySync] start abilitySync,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
517         STR_MASK(context_->GetDeviceId()));
518     errCode = abilitySync_->SyncStart(context_->GetRequestSessionId(), context_->GetSequenceId(),
519         remoteCommunicatorVersion, handler);
520     if (errCode != E_OK) {
521         LOGE("[StateMachine][DoAbilitySync] ability sync start failed,errCode=%d", errCode);
522         return TransformErrCodeToEvent(errCode);
523     }
524     return Event::WAIT_ACK_EVENT;
525 }
526 
GetEventAfterTimeSync(int mode) const527 Event SingleVerSyncStateMachine::GetEventAfterTimeSync(int mode) const
528 {
529     if (mode == SyncModeType::SUBSCRIBE_QUERY || mode == SyncModeType::UNSUBSCRIBE_QUERY) {
530         return Event::CONTROL_CMD_EVENT;
531     }
532     return Event::ABILITY_SYNC_FINISHED_EVENT;
533 }
534 
DoSyncTaskFinished()535 Event SingleVerSyncStateMachine::DoSyncTaskFinished()
536 {
537     StopWatchDog();
538     dataSync_->ClearSyncStatus();
539     RefObject::AutoLock lock(syncContext_);
540     int errCode = ExecNextTask();
541     if (errCode == E_OK) {
542         return Event::START_SYNC_EVENT;
543     }
544     return TransformErrCodeToEvent(errCode);
545 }
546 
DoTimeout()547 Event SingleVerSyncStateMachine::DoTimeout()
548 {
549     RefObject::AutoLock lock(context_);
550     if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
551         std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
552         if (subManager != nullptr) {
553             subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
554         }
555     }
556     context_->Abort(SyncOperation::OP_TIMEOUT);
557     context_->Clear();
558     AbortInner();
559     return Event::ANY_EVENT;
560 }
561 
DoInnerErr()562 Event SingleVerSyncStateMachine::DoInnerErr()
563 {
564     RefObject::AutoLock lock(context_);
565     if (!context_->IsCommNormal()) {
566         if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
567             std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
568             if (subManager != nullptr) {
569                 subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
570             }
571         }
572         context_->Abort(SyncOperation::OP_COMM_ABNORMAL);
573     } else {
574         int status = GetSyncOperationStatus(context_->GetTaskErrCode());
575         context_->Abort(status);
576     }
577     context_->Clear();
578     AbortInner();
579     return Event::ANY_EVENT;
580 }
581 
AbilitySyncRecv(const Message * inMsg)582 int SingleVerSyncStateMachine::AbilitySyncRecv(const Message *inMsg)
583 {
584     if (inMsg->GetMessageType() == TYPE_REQUEST) {
585         return abilitySync_->RequestRecv(inMsg, context_);
586     }
587 
588     if (inMsg->GetMessageType() == TYPE_RESPONSE && AbilityMsgSessionIdCheck(inMsg)) {
589         std::lock_guard<std::mutex> lock(stateMachineLock_);
590         int errCode = abilitySync_->AckRecv(inMsg, context_);
591         (void)ResetWatchDog();
592         if (errCode != E_OK) {
593             LOGE("[StateMachine][AbilitySyncRecv] handle ackRecv failed,errCode=%d", errCode);
594             SwitchStateAndStep(TransformErrCodeToEvent(errCode));
595         } else if (context_->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_2_0) {
596             abilitySync_->SetAbilitySyncFinishedStatus(true);
597             LOGI("[StateMachine][AbilitySyncRecv] ability Sync Finished,label=%s,dev=%s",
598                 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
599             currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
600             JumpStatusAfterAbilitySync(context_->GetMode());
601         }
602         return E_OK;
603     }
604     if (inMsg->GetMessageType() == TYPE_NOTIFY) {
605         const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
606         if (packet == nullptr) {
607             return -E_INVALID_ARGS;
608         }
609         int ackCode = packet->GetAckCode();
610         if (ackCode != AbilitySync::CHECK_SUCCESS && ackCode != AbilitySync::LAST_NOTIFY) {
611             LOGE("[StateMachine][AbilitySyncRecv] ackCode check failed,ackCode=%d", ackCode);
612             context_->SetTaskErrCode(ackCode);
613             std::lock_guard<std::mutex> lock(stateMachineLock_);
614             SwitchStateAndStep(Event::INNER_ERR_EVENT);
615             return E_OK;
616         }
617         if (ackCode == AbilitySync::LAST_NOTIFY && AbilityMsgSessionIdCheck(inMsg)) {
618             abilitySync_->SetAbilitySyncFinishedStatus(true);
619             // while recv last notify means ability sync finished,it is better to reset watchDog to avoid timeout.
620             LOGI("[StateMachine][AbilitySyncRecv] ability sync finished,label=%s,dev=%s",
621                 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
622             context_->SetRemoteSoftwareVersion(packet->GetSoftwareVersion());
623             currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
624             std::lock_guard<std::mutex> lock(stateMachineLock_);
625             (void)ResetWatchDog();
626             JumpStatusAfterAbilitySync(context_->GetMode());
627         } else if (ackCode != AbilitySync::LAST_NOTIFY) {
628             abilitySync_->AckNotifyRecv(inMsg, context_);
629         }
630         return E_OK;
631     }
632 
633     LOGE("[StateMachine][AbilitySyncRecv] msg type invalid");
634     return -E_NOT_SUPPORT;
635 }
636 
HandleDataRequestRecv(const Message * inMsg)637 int SingleVerSyncStateMachine::HandleDataRequestRecv(const Message *inMsg)
638 {
639     TimeOffset offset = 0;
640     uint32_t timeout = communicator_->GetTimeout(context_->GetDeviceId());
641     // If message is data sync request, we should check timeoffset.
642     int errCode = timeSync_->GetTimeOffset(offset, timeout);
643     if (errCode != E_OK) {
644         LOGE("[StateMachine][HandleDataRequestRecv] GetTimeOffset err! errCode=%d", errCode);
645         return errCode;
646     }
647     context_->SetTimeOffset(offset);
648     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
649     if (performance != nullptr) {
650         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
651     }
652     DecRefCountOfFeedDogTimer(SyncDirectionFlag::RECEIVE);
653 
654     // RequestRecv will save data, it may cost a long time.
655     // So we need to send save data notify to keep remote alive.
656     bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
657     {
658         std::lock_guard<std::mutex> lockWatchDog(stateMachineLock_);
659         if (IsNeedResetWatchdog(inMsg)) {
660             (void)ResetWatchDog();
661         }
662     }
663     WaterMark pullEndWaterkark = 0;
664     errCode = dataSync_->DataRequestRecv(context_, inMsg, pullEndWaterkark);
665     if (performance != nullptr) {
666         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
667     }
668     // only higher than 102 version receive this errCode here.
669     // while both RequestSessionId is not equal,but get this errCode;slwr would seem to handle first secquencid.
670     // so while receive the same secquencid after abiitysync it wouldn't handle.
671     if (errCode == -E_NEED_ABILITY_SYNC) {
672         if (isNeedStop) {
673             StopSaveDataNotify();
674         }
675         return errCode;
676     }
677     {
678         std::lock_guard<std::mutex> lock(stateMachineLock_);
679         DataRecvErrCodeHandle(inMsg->GetSessionId(), errCode);
680         if (pullEndWaterkark > 0) {
681             AddPullResponseTarget(inMsg, pullEndWaterkark);
682         }
683     }
684     if (isNeedStop) {
685         StopSaveDataNotify();
686     }
687     return E_OK;
688 }
689 
HandleDataAckRecvWithSlidingWindow(int errCode,const Message * inMsg,bool ignoreInnerErr)690 void SingleVerSyncStateMachine::HandleDataAckRecvWithSlidingWindow(int errCode, const Message *inMsg,
691     bool ignoreInnerErr)
692 {
693     if (errCode == -E_RE_SEND_DATA) { // LOCAL_WATER_MARK_NOT_INIT
694         dataSync_->ClearSyncStatus();
695     }
696     if (errCode == -E_NO_DATA_SEND || errCode == -E_SEND_DATA) {
697         int ret = dataSync_->TryContinueSync(context_, inMsg);
698         if (ret == -E_FINISHED) {
699             SwitchStateAndStep(Event::SEND_FINISHED_EVENT);
700             return;
701         } else if (ret == E_OK) { // do nothing and waiting for all ack receive
702             return;
703         }
704         errCode = ret;
705     }
706     ResponsePullError(errCode, ignoreInnerErr);
707 }
708 
NeedAbilitySyncHandle()709 void SingleVerSyncStateMachine::NeedAbilitySyncHandle()
710 {
711     // if the remote device version num is overdue,
712     // mean the version num has been reset when syncing data,
713     // there should not clear the new version cache again.
714     if (currentRemoteVersionId_ == context_->GetRemoteSoftwareVersionId()) {
715         LOGI("[StateMachine] set remote version 0, currentRemoteVersionId_ = %" PRIu64, currentRemoteVersionId_);
716         context_->SetRemoteSoftwareVersion(0);
717     } else {
718         currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
719     }
720     abilitySync_->SetAbilitySyncFinishedStatus(false);
721     dataSync_->ClearSyncStatus();
722 }
723 
HandleDataAckRecv(const Message * inMsg)724 int SingleVerSyncStateMachine::HandleDataAckRecv(const Message *inMsg)
725 {
726     if (inMsg->GetMessageType() == TYPE_RESPONSE) {
727         DecRefCountOfFeedDogTimer(SyncDirectionFlag::SEND);
728     }
729     std::lock_guard<std::mutex> lock(stateMachineLock_);
730     // Unfortunately we use stateMachineLock_ in many sync process
731     // So a bad ack will check before the lock and wait
732     // And then another process is running, it will get the lock.After this process, the ack became invalid.
733     // If we don't check ack again, it will be delivered to dataSyncer.
734     if (!IsPacketValid(inMsg)) {
735         return -E_INVALID_ARGS;
736     }
737     if (IsNeedResetWatchdog(inMsg)) {
738         (void)ResetWatchDog();
739     }
740     if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 && !dataSync_->AckPacketIdCheck(inMsg)) {
741         // packetId not match but sequence id matched scene, means resend map has be rebuilt
742         // this is old ack, should be dropped and wait for the same packetId sequence.
743         return E_OK;
744     }
745     // AckRecv will save meta data, it may cost a long time. if another thread is saving data
746     // So we need to send save data notify to keep remote alive.
747     // e.g. remote do pull sync
748     bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
749     int errCode = dataSync_->AckRecv(context_, inMsg);
750     if (isNeedStop) {
751         StopSaveDataNotify();
752     }
753     if (errCode == -E_NEED_ABILITY_SYNC || errCode == -E_RE_SEND_DATA) {
754         StopFeedDogForSync(SyncDirectionFlag::SEND);
755     } else if (errCode == -E_SAVE_DATA_NOTIFY) {
756         return errCode;
757     }
758     // when this msg is from response task while request task is running,  ignore the errCode
759     bool ignoreInnerErr = inMsg->GetSessionId() == context_->GetResponseSessionId() &&
760         context_->GetRequestSessionId() != 0;
761     DataAckRecvErrCodeHandle(errCode, !ignoreInnerErr);
762     HandleDataAckRecvWithSlidingWindow(errCode, inMsg, ignoreInnerErr);
763     return errCode;
764 }
765 
DataPktRecv(Message * inMsg)766 int SingleVerSyncStateMachine::DataPktRecv(Message *inMsg)
767 {
768     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
769     int errCode = E_OK;
770     switch (inMsg->GetMessageType()) {
771         case TYPE_REQUEST:
772             ScheduleMsgAndHandle(inMsg);
773             errCode = -E_NOT_NEED_DELETE_MSG;
774             break;
775         case TYPE_RESPONSE:
776         case TYPE_NOTIFY:
777             if (performance != nullptr) {
778                 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
779                 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
780             }
781             errCode = HandleDataAckRecv(inMsg);
782             break;
783         default:
784             errCode = -E_INVALID_ARGS;
785             break;
786     }
787     return errCode;
788 }
789 
ScheduleMsgAndHandle(Message * inMsg)790 void SingleVerSyncStateMachine::ScheduleMsgAndHandle(Message *inMsg)
791 {
792     dataSync_->PutDataMsg(inMsg);
793     while (true) {
794         bool isNeedHandle = true;
795         bool isNeedContinue = true;
796         Message *msg = dataSync_->MoveNextDataMsg(context_, isNeedHandle, isNeedContinue);
797         if (!isNeedContinue) {
798             break;
799         }
800         if (msg == nullptr) {
801             if (dataSync_->IsNeedReloadQueue()) {
802                 continue;
803             }
804             dataSync_->ScheduleInfoHandle(false, false, nullptr);
805             break;
806         }
807         bool isNeedClearMap = false;
808         if (isNeedHandle) {
809             int errCode = HandleDataRequestRecv(msg);
810             if (context_->IsReceiveWaterMarkErr() || errCode == -E_NEED_ABILITY_SYNC) {
811                 isNeedClearMap = true;
812             }
813             if (errCode == -E_TIMEOUT) {
814                 isNeedHandle = false;
815             }
816         } else {
817             dataSync_->SendFinishedDataAck(context_, msg);
818         }
819         if (context_->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
820             // for lower version, no need to handle map schedule info, just reset schedule working status
821             isNeedHandle = false;
822         }
823         dataSync_->ScheduleInfoHandle(isNeedHandle, isNeedClearMap, msg);
824         delete msg;
825     }
826 }
827 
ControlPktRecv(Message * inMsg)828 int SingleVerSyncStateMachine::ControlPktRecv(Message *inMsg)
829 {
830     int errCode = E_OK;
831     switch (inMsg->GetMessageType()) {
832         case TYPE_REQUEST:
833             errCode = dataSync_->ControlCmdRequestRecv(context_, inMsg);
834             break;
835         case TYPE_RESPONSE:
836             errCode = HandleControlAckRecv(inMsg);
837             break;
838         default:
839             errCode = -E_INVALID_ARGS;
840             break;
841     }
842     return errCode;
843 }
844 
StepToTimeout(TimerId timerId)845 void SingleVerSyncStateMachine::StepToTimeout(TimerId timerId)
846 {
847     std::lock_guard<std::mutex> lock(stateMachineLock_);
848     TimerId timer = syncContext_->GetTimerId();
849     if (timer != timerId) {
850         return;
851     }
852     SwitchStateAndStep(Event::TIME_OUT_EVENT);
853 }
854 
855 namespace {
856 struct StateNode {
857     int errCode = 0;
858     SyncOperation::Status status = SyncOperation::OP_WAITING;
859 };
860 }
GetSyncOperationStatus(int errCode) const861 int SingleVerSyncStateMachine::GetSyncOperationStatus(int errCode) const
862 {
863     static const StateNode stateNodes[] = {
864         { -E_SCHEMA_MISMATCH,                 SyncOperation::OP_SCHEMA_INCOMPATIBLE },
865         { -E_EKEYREVOKED,                     SyncOperation::OP_EKEYREVOKED_FAILURE },
866         { -E_SECURITY_OPTION_CHECK_ERROR,     SyncOperation::OP_SECURITY_OPTION_CHECK_FAILURE },
867         { -E_BUSY,                            SyncOperation::OP_BUSY_FAILURE },
868         { -E_NOT_PERMIT,                      SyncOperation::OP_PERMISSION_CHECK_FAILED },
869         { -E_TIMEOUT,                         SyncOperation::OP_TIMEOUT },
870         { -E_INVALID_QUERY_FORMAT,            SyncOperation::OP_QUERY_FORMAT_FAILURE },
871         { -E_INVALID_QUERY_FIELD,             SyncOperation::OP_QUERY_FIELD_FAILURE },
872         { -E_FEEDBACK_UNKNOWN_MESSAGE,        SyncOperation::OP_NOT_SUPPORT },
873         { -E_FEEDBACK_COMMUNICATOR_NOT_FOUND, SyncOperation::OP_COMM_ABNORMAL },
874         { -E_NOT_SUPPORT,                     SyncOperation::OP_NOT_SUPPORT },
875         { -E_INTERCEPT_DATA_FAIL,             SyncOperation::OP_INTERCEPT_DATA_FAIL },
876         { -E_MAX_LIMITS,                      SyncOperation::OP_MAX_LIMITS },
877         { -E_DISTRIBUTED_SCHEMA_CHANGED,      SyncOperation::OP_SCHEMA_CHANGED },
878         { -E_NOT_REGISTER,                    SyncOperation::OP_NOT_SUPPORT },
879         { -E_DENIED_SQL,                      SyncOperation::OP_DENIED_SQL },
880         { -E_REMOTE_OVER_SIZE,                SyncOperation::OP_MAX_LIMITS },
881         { -E_INVALID_PASSWD_OR_CORRUPTED_DB,  SyncOperation::OP_NOTADB_OR_CORRUPTED }
882     };
883     const auto &result = std::find_if(std::begin(stateNodes), std::end(stateNodes), [errCode](const auto &node) {
884         return node.errCode == errCode;
885     });
886     return result == std::end(stateNodes) ? SyncOperation::OP_FAILED : static_cast<int>((*result).status);
887 }
888 
TimeMarkSyncRecv(const Message * inMsg)889 int SingleVerSyncStateMachine::TimeMarkSyncRecv(const Message *inMsg)
890 {
891     LOGD("[StateMachine][TimeMarkSyncRecv] type=%d,label=%s,dev=%s", inMsg->GetMessageType(),
892         dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
893     {
894         std::lock_guard<std::mutex> lock(stateMachineLock_);
895         (void)ResetWatchDog();
896     }
897     if (inMsg->GetMessageType() == TYPE_REQUEST) {
898         return timeSync_->RequestRecv(inMsg);
899     } else if (inMsg->GetMessageType() == TYPE_RESPONSE) {
900         int errCode = timeSync_->AckRecv(inMsg, context_->GetRequestSessionId());
901         if (errCode != E_OK) {
902             LOGE("[StateMachine][TimeMarkSyncRecv] AckRecv failed errCode=%d", errCode);
903             if (inMsg->GetSessionId() != 0 && inMsg->GetSessionId() == context_->GetRequestSessionId()) {
904                 context_->SetTaskErrCode(errCode);
905                 InnerErrorAbort(inMsg->GetSessionId());
906             }
907             return errCode;
908         }
909         std::lock_guard<std::mutex> lock(stateMachineLock_);
910         SwitchStateAndStep(TIME_SYNC_FINISHED_EVENT);
911         return E_OK;
912     } else {
913         return -E_INVALID_ARGS;
914     }
915 }
916 
Clear()917 void SingleVerSyncStateMachine::Clear()
918 {
919     dataSync_ = nullptr;
920     timeSync_ = nullptr;
921     abilitySync_ = nullptr;
922     context_ = nullptr;
923     syncInterface_ = nullptr;
924 }
925 
IsPacketValid(const Message * inMsg) const926 bool SingleVerSyncStateMachine::IsPacketValid(const Message *inMsg) const
927 {
928     if (inMsg == nullptr) {
929         return false;
930     }
931 
932     if ((inMsg->GetMessageId() < TIME_SYNC_MESSAGE) || (inMsg->GetMessageId() >= UNKNOW_MESSAGE)) {
933         LOGE("[StateMachine][IsPacketValid] Message is invalid, id=%d", inMsg->GetMessageId());
934         return false;
935     }
936     // filter invalid ack at first
937     bool isResponseType = (inMsg->GetMessageType() == TYPE_RESPONSE);
938     if (isResponseType && (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) &&
939         (inMsg->GetSessionId() != context_->GetRequestSessionId())) {
940         LOGE("[StateMachine][IsPacketValid] Control Message is invalid, label=%s, dev=%s",
941             dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
942         return false;
943     }
944     if (isResponseType && (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) &&
945         (inMsg->GetSessionId() != context_->GetRequestSessionId()) &&
946         (inMsg->GetSessionId() != context_->GetResponseSessionId())) {
947         LOGE("[StateMachine][IsPacketValid] Data Message is invalid, label=%s, dev=%s",
948             dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
949         return false;
950     }
951 
952     // timeSync and abilitySync don't need to check sequenceId
953     if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE || inMsg->GetMessageId() == ABILITY_SYNC_MESSAGE ||
954         inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
955         return true;
956     }
957     // sequenceId will be checked in dataSync
958     return true;
959 }
960 
PreStartPullResponse()961 void SingleVerSyncStateMachine::PreStartPullResponse()
962 {
963     SingleVerSyncTarget target;
964     context_->PopResponseTarget(target);
965     context_->SetEndMark(target.GetEndWaterMark());
966     context_->SetResponseSessionId(target.GetResponseSessionId());
967     context_->SetMode(SyncModeType::RESPONSE_PULL);
968     context_->ReSetSequenceId();
969     context_->SetQuerySync(target.IsQuerySync());
970     context_->SetQuery(target.GetQuery());
971 }
972 
CheckIsStartPullResponse() const973 bool SingleVerSyncStateMachine::CheckIsStartPullResponse() const
974 {
975     // Other state will step to do pull response, only this statem we need to step the statemachine
976     if (currentState_ == WAIT_FOR_RECEIVE_DATA_FINISH) {
977         return true;
978     }
979     return false;
980 }
981 
MessageCallbackPre(const Message * inMsg)982 int SingleVerSyncStateMachine::MessageCallbackPre(const Message *inMsg)
983 {
984     RefObject::AutoLock lock(context_);
985     if (context_->IsKilled()) {
986         return -E_OBJ_IS_KILLED;
987     }
988 
989     if (!IsPacketValid(inMsg)) {
990         return -E_INVALID_ARGS;
991     }
992     return E_OK;
993 }
994 
AddPullResponseTarget(const Message * inMsg,WaterMark pullEndWatermark)995 void SingleVerSyncStateMachine::AddPullResponseTarget(const Message *inMsg, WaterMark pullEndWatermark)
996 {
997     int messageType = static_cast<int>(inMsg->GetMessageId());
998     uint32_t sessionId = inMsg->GetSessionId();
999     if (pullEndWatermark == 0) {
1000         LOGE("[StateMachine][AddPullResponseTarget] pullEndWatermark is 0!");
1001         return;
1002     }
1003     if (context_->GetResponseSessionId() == sessionId || context_->FindResponseSyncTarget(sessionId)) {
1004         LOGI("[StateMachine][AddPullResponseTarget] task is already running");
1005         return;
1006     }
1007     const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
1008     if (packet == nullptr) {
1009         LOGE("[AddPullResponseTarget] get packet object failed");
1010         return;
1011     }
1012     SingleVerSyncTarget *targetTmp = new (std::nothrow) SingleVerSyncTarget;
1013     if (targetTmp == nullptr) {
1014         LOGE("[StateMachine][AddPullResponseTarget] add failed, may oom");
1015         return;
1016     }
1017     targetTmp->SetTaskType(ISyncTarget::RESPONSE);
1018     if (messageType == QUERY_SYNC_MESSAGE) {
1019         targetTmp->SetQuery(packet->GetQuery());
1020         targetTmp->SetQuerySync(true);
1021     }
1022     targetTmp->SetMode(SyncModeType::RESPONSE_PULL);
1023     targetTmp->SetEndWaterMark(pullEndWatermark);
1024     targetTmp->SetResponseSessionId(sessionId);
1025     if (context_->AddSyncTarget(targetTmp) != E_OK) {
1026         delete targetTmp;
1027         return;
1028     }
1029     if (CheckIsStartPullResponse()) {
1030         SwitchStateAndStep(TransformErrCodeToEvent(-E_NEED_PULL_REPONSE));
1031     }
1032 }
1033 
TransformErrCodeToEvent(int errCode) const1034 Event SingleVerSyncStateMachine::TransformErrCodeToEvent(int errCode) const
1035 {
1036     switch (errCode) {
1037         case -E_TIMEOUT:
1038             return TransforTimeOutErrCodeToEvent();
1039         case -static_cast<int>(VERSION_NOT_SUPPOR_EVENT):
1040             return Event::VERSION_NOT_SUPPOR_EVENT;
1041         case -E_SEND_DATA:
1042             return Event::SEND_DATA_EVENT;
1043         case -E_NO_DATA_SEND:
1044             return Event::SEND_FINISHED_EVENT;
1045         case -E_RECV_FINISHED:
1046             return Event::RECV_FINISHED_EVENT;
1047         case -E_NEED_ABILITY_SYNC:
1048             return Event::NEED_ABILITY_SYNC_EVENT;
1049         case -E_NO_SYNC_TASK:
1050             return Event::ALL_TASK_FINISHED_EVENT;
1051         case -E_NEED_PULL_REPONSE:
1052             return Event::START_PULL_RESPONSE_EVENT;
1053         case -E_RE_SEND_DATA:
1054             return Event::RE_SEND_DATA_EVENT;
1055         default:
1056             return Event::INNER_ERR_EVENT;
1057     }
1058 }
1059 
IsNeedResetWatchdog(const Message * inMsg) const1060 bool SingleVerSyncStateMachine::IsNeedResetWatchdog(const Message *inMsg) const
1061 {
1062     if (inMsg == nullptr) {
1063         return false;
1064     }
1065 
1066     if (IsNeedErrCodeHandle(inMsg->GetSessionId())) {
1067         return true;
1068     }
1069 
1070     int msgType = inMsg->GetMessageType();
1071     if (msgType == TYPE_RESPONSE || msgType == TYPE_NOTIFY) {
1072         if (inMsg->GetSessionId() == context_->GetResponseSessionId()) {
1073             // Pull response ack also should reset watchdog
1074             return true;
1075         }
1076     }
1077 
1078     return false;
1079 }
1080 
TransforTimeOutErrCodeToEvent() const1081 Event SingleVerSyncStateMachine::TransforTimeOutErrCodeToEvent() const
1082 {
1083     if (syncContext_->IsSyncTaskNeedRetry() && (syncContext_->GetRetryTime() < syncContext_->GetSyncRetryTimes())) {
1084         return Event::WAIT_TIME_OUT_EVENT;
1085     } else {
1086         return Event::TIME_OUT_EVENT;
1087     }
1088 }
1089 
IsNeedErrCodeHandle(uint32_t sessionId) const1090 bool SingleVerSyncStateMachine::IsNeedErrCodeHandle(uint32_t sessionId) const
1091 {
1092     // omit to set sessionId so version_3 should skip to compare sessionid.
1093     if (sessionId == context_->GetRequestSessionId() ||
1094         context_->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_RELEASE_2_0) {
1095         return true;
1096     }
1097     return false;
1098 }
1099 
PushPullDataRequestEvokeErrHandle()1100 void SingleVerSyncStateMachine::PushPullDataRequestEvokeErrHandle()
1101 {
1102     // the pushpull sync task should wait for send finished after remote dev get data occur E_EKEYREVOKED error.
1103     if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 &&
1104         SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL) {
1105         LOGI("data request errCode = %d, wait for send finished", -E_EKEYREVOKED);
1106         context_->SetTaskErrCode(-E_EKEYREVOKED);
1107         context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1108         SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1109     } else {
1110         context_->SetTaskErrCode(-E_EKEYREVOKED);
1111         SwitchStateAndStep(Event::INNER_ERR_EVENT);
1112     }
1113 }
1114 
DataRecvErrCodeHandle(uint32_t sessionId,int errCode)1115 void SingleVerSyncStateMachine::DataRecvErrCodeHandle(uint32_t sessionId, int errCode)
1116 {
1117     if (IsNeedErrCodeHandle(sessionId)) {
1118         switch (errCode) {
1119             case E_OK:
1120                 break;
1121             case -E_NOT_PERMIT:
1122                 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1123                 break;
1124             case -E_RECV_FINISHED:
1125                 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1126                 SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1127                 break;
1128             case -E_EKEYREVOKED:
1129                 PushPullDataRequestEvokeErrHandle();
1130                 break;
1131             case -E_BUSY:
1132             case -E_DISTRIBUTED_SCHEMA_CHANGED:
1133             case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1134             case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1135             case -E_FEEDBACK_UNKNOWN_MESSAGE:
1136             case -E_INTERCEPT_DATA_FAIL:
1137             case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1138             case -E_INVALID_QUERY_FIELD:
1139             case -E_INVALID_QUERY_FORMAT:
1140             case -E_MAX_LIMITS:
1141             case -E_NOT_REGISTER:
1142             case -E_NOT_SUPPORT:
1143             case -E_SECURITY_OPTION_CHECK_ERROR:
1144                 context_->SetTaskErrCode(errCode);
1145                 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1146                 break;
1147             default:
1148                 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1149                 break;
1150         }
1151     }
1152 }
1153 
AbilityMsgSessionIdCheck(const Message * inMsg)1154 bool SingleVerSyncStateMachine::AbilityMsgSessionIdCheck(const Message *inMsg)
1155 {
1156     if (inMsg != nullptr && inMsg->GetSessionId() == context_->GetRequestSessionId()) {
1157         return true;
1158     }
1159     LOGE("[AbilitySync] session check failed,dev=%s", STR_MASK(context_->GetDeviceId()));
1160     return false;
1161 }
1162 
GetSyncType(uint32_t messageId) const1163 SyncType SingleVerSyncStateMachine::GetSyncType(uint32_t messageId) const
1164 {
1165     if (messageId == QUERY_SYNC_MESSAGE) {
1166         return SyncType::QUERY_SYNC_TYPE;
1167     }
1168     return SyncType::MANUAL_FULL_SYNC_TYPE;
1169 }
1170 
DataAckRecvErrCodeHandle(int errCode,bool handleError)1171 void SingleVerSyncStateMachine::DataAckRecvErrCodeHandle(int errCode, bool handleError)
1172 {
1173     switch (errCode) {
1174         case -E_NEED_ABILITY_SYNC:
1175             NeedAbilitySyncHandle();
1176             break;
1177         case -E_NOT_PERMIT:
1178             if (handleError) {
1179                 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1180             }
1181             break;
1182         case -E_BUSY:
1183         case -E_DISTRIBUTED_SCHEMA_CHANGED:
1184         case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1185         case -E_EKEYREVOKED:
1186         case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1187         case -E_FEEDBACK_UNKNOWN_MESSAGE:
1188         case -E_INTERCEPT_DATA_FAIL:
1189         case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1190         case -E_INVALID_QUERY_FIELD:
1191         case -E_INVALID_QUERY_FORMAT:
1192         case -E_MAX_LIMITS:
1193         case -E_NOT_REGISTER:
1194         case -E_NOT_SUPPORT:
1195         case -E_SECURITY_OPTION_CHECK_ERROR:
1196             if (handleError) {
1197                 context_->SetTaskErrCode(errCode);
1198             }
1199             break;
1200         default:
1201             break;
1202     }
1203 }
1204 
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)1205 bool SingleVerSyncStateMachine::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
1206 {
1207     return SingleVerDataSyncUtils::IsNeedTriggerQueryAutoSync(inMsg, query);
1208 }
1209 
JumpStatusAfterAbilitySync(int mode)1210 void SingleVerSyncStateMachine::JumpStatusAfterAbilitySync(int mode)
1211 {
1212     if ((mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
1213         SwitchStateAndStep(CONTROL_CMD_EVENT);
1214     } else {
1215         SwitchStateAndStep(ABILITY_SYNC_FINISHED_EVENT);
1216     }
1217 }
1218 
ControlAckRecvErrCodeHandle(int errCode)1219 void SingleVerSyncStateMachine::ControlAckRecvErrCodeHandle(int errCode)
1220 {
1221     switch (errCode) {
1222         case -E_NEED_ABILITY_SYNC:
1223             NeedAbilitySyncHandle();
1224             break;
1225         case -E_NO_DATA_SEND:
1226             context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
1227             break;
1228         case -E_NOT_PERMIT:
1229             context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1230             break;
1231         // other errCode use default
1232         default:
1233             context_->SetTaskErrCode(errCode);
1234             break;
1235     }
1236 }
1237 
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)1238 void SingleVerSyncStateMachine::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
1239 {
1240     metadata_->GetLocalWaterMark(deviceId, outValue);
1241 }
1242 
GetSendQueryWaterMark(const std::string & queryId,const DeviceID & deviceId,bool isAutoLift,uint64_t & outValue)1243 int SingleVerSyncStateMachine::GetSendQueryWaterMark(const std::string &queryId,  const DeviceID &deviceId,
1244     bool isAutoLift, uint64_t &outValue)
1245 {
1246     return metadata_->GetSendQueryWaterMark(queryId, deviceId, outValue, isAutoLift);
1247 }
1248 
ResponsePullError(int errCode,bool ignoreInnerErr)1249 void SingleVerSyncStateMachine::ResponsePullError(int errCode, bool ignoreInnerErr)
1250 {
1251     Event event = TransformErrCodeToEvent(errCode);
1252     if (event == INNER_ERR_EVENT) {
1253         if (ignoreInnerErr) {
1254             event = RESPONSE_TASK_FINISHED_EVENT;
1255         } else if (context_ != nullptr) {
1256             context_->SetTaskErrCode(errCode);
1257         }
1258     }
1259     SwitchStateAndStep(event);
1260 }
1261 
InnerErrorAbort(uint32_t sessionId)1262 void SingleVerSyncStateMachine::InnerErrorAbort(uint32_t sessionId)
1263 {
1264     std::lock_guard<std::mutex> lock(stateMachineLock_);
1265     uint32_t requestSessionId = context_->GetRequestSessionId();
1266     if (sessionId != requestSessionId) {
1267         LOGD("[SingleVerSyncStateMachine][InnerErrorAbort] Ignore abort by different sessionId");
1268         return;
1269     }
1270     if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
1271         SyncStep();
1272     }
1273 }
1274 
NotifyClosing()1275 void SingleVerSyncStateMachine::NotifyClosing()
1276 {
1277     if (timeSync_ != nullptr) {
1278         timeSync_->Close();
1279     }
1280 }
1281 } // namespace DistributedDB
1282