• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_sync_state_machine.h"
17 
18 #include <cmath>
19 #include <climits>
20 #include <algorithm>
21 
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "sync_operation.h"
25 #include "message_transform.h"
26 #include "sync_types.h"
27 #include "db_common.h"
28 #include "runtime_context.h"
29 #include "performance_analysis.h"
30 #include "single_ver_sync_target.h"
31 #include "single_ver_data_sync.h"
32 #include "single_ver_data_sync_utils.h"
33 
34 namespace DistributedDB {
35 using Event = SingleVerSyncStateMachine::Event;
36 using State = SingleVerSyncStateMachine::State;
37 namespace {
38     // used for state switch table
39     const int CURRENT_STATE_INDEX = 0;
40     const int EVENT_INDEX = 1;
41     const int OUTPUT_STATE_INDEX = 2;
42 
43     // drop v1 and v2 table by one optimize, dataSend mode in all version go with slide window mode.
44     // State switch table v3, has three columns, CurrentState, Event, and OutSate
45     const std::vector<std::vector<uint8_t>> STATE_SWITCH_TABLE_V3 = {
46         {State::IDLE, Event::START_SYNC_EVENT, State::TIME_SYNC},
47 
48         // In TIME_SYNC state
49         {State::TIME_SYNC, Event::TIME_SYNC_FINISHED_EVENT, State::ABILITY_SYNC},
50         {State::TIME_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
51         {State::TIME_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
52         {State::TIME_SYNC, Event::NEED_RESYNC_EVENT, State::TIME_SYNC},
53 
54         // In ABILITY_SYNC state, compare version num and schema
55         {State::ABILITY_SYNC, Event::VERSION_NOT_SUPPOR_EVENT, State::INNER_ERR},
56         {State::ABILITY_SYNC, Event::ABILITY_SYNC_FINISHED_EVENT, State::START_INITIACTIVE_DATA_SYNC},
57         {State::ABILITY_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
58         {State::ABILITY_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
59         {State::ABILITY_SYNC, Event::CONTROL_CMD_EVENT, State::SYNC_CONTROL_CMD},
60         {State::ABILITY_SYNC, Event::NEED_RESYNC_EVENT, State::ABILITY_SYNC},
61 
62         // In START_INITIACTIVE_DATA_SYNC state, send a sync request, and send first packt of data sync
63         {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
64         {State::START_INITIACTIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
65         {State::START_INITIACTIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
66         {State::START_INITIACTIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
67         {State::START_INITIACTIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_INITIACTIVE_DATA_SYNC},
68         {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_TIME_SYNC_EVENT, State::TIME_SYNC},
69         {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_RESYNC_EVENT, State::START_INITIACTIVE_DATA_SYNC},
70 
71         // In START_PASSIVE_DATA_SYNC state, do response pull request, and send first packt of data sync
72         {State::START_PASSIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
73         {State::START_PASSIVE_DATA_SYNC, Event::RESPONSE_TASK_FINISHED_EVENT, State::WAIT_FOR_RECEIVE_DATA_FINISH},
74         {State::START_PASSIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
75         {State::START_PASSIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
76         {State::START_PASSIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
77         {State::START_PASSIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_PASSIVE_DATA_SYNC},
78         {State::START_PASSIVE_DATA_SYNC, Event::NEED_TIME_SYNC_EVENT, State::TIME_SYNC},
79         {State::START_PASSIVE_DATA_SYNC, Event::NEED_RESYNC_EVENT, State::START_PASSIVE_DATA_SYNC},
80 
81         // In WAIT_FOR_RECEIVE_DATA_FINISH,
82         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::RECV_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
83         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::START_PULL_RESPONSE_EVENT, State::START_PASSIVE_DATA_SYNC},
84         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
85         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::INNER_ERR_EVENT, State::INNER_ERR},
86         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
87         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::NEED_RESYNC_EVENT, State::START_PASSIVE_DATA_SYNC},
88 
89         {State::SYNC_CONTROL_CMD, Event::SEND_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
90         {State::SYNC_CONTROL_CMD, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
91         {State::SYNC_CONTROL_CMD, Event::INNER_ERR_EVENT, State::INNER_ERR},
92         {State::SYNC_CONTROL_CMD, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
93         {State::SYNC_CONTROL_CMD, Event::NEED_RESYNC_EVENT, State::SYNC_CONTROL_CMD},
94 
95         // In SYNC_TASK_FINISHED,
96         {State::SYNC_TASK_FINISHED, Event::ALL_TASK_FINISHED_EVENT, State::IDLE},
97         {State::SYNC_TASK_FINISHED, Event::START_SYNC_EVENT, State::TIME_SYNC},
98 
99         // SYNC_TIME_OUT and INNE_ERR state, just do some exception resolve
100         {State::SYNC_TIME_OUT, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
101         {State::INNER_ERR, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
102     };
103 }
104 
105 std::mutex SingleVerSyncStateMachine::stateSwitchTableLock_;
106 std::vector<StateSwitchTable> SingleVerSyncStateMachine::stateSwitchTables_;
107 bool SingleVerSyncStateMachine::isStateSwitchTableInited_ = false;
108 
SingleVerSyncStateMachine()109 SingleVerSyncStateMachine::SingleVerSyncStateMachine()
110     : context_(nullptr),
111       syncInterface_(nullptr),
112       timeSync_(nullptr),
113       abilitySync_(nullptr),
114       dataSync_(nullptr),
115       currentRemoteVersionId_(0)
116 {
117 }
118 
~SingleVerSyncStateMachine()119 SingleVerSyncStateMachine::~SingleVerSyncStateMachine()
120 {
121     LOGD("~SingleVerSyncStateMachine");
122     Clear();
123 }
124 
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metaData,ICommunicator * communicator)125 int SingleVerSyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
126     const std::shared_ptr<Metadata> &metaData, ICommunicator *communicator)
127 {
128     if ((context == nullptr) || (syncInterface == nullptr) || (metaData == nullptr) || (communicator == nullptr)) {
129         return -E_INVALID_ARGS;
130     }
131 
132     int errCode = SyncStateMachine::Initialize(context, syncInterface, metaData, communicator);
133     if (errCode != E_OK) {
134         return errCode;
135     }
136 
137     timeSync_ = std::make_shared<TimeSync>();
138     dataSync_ = std::make_shared<SingleVerDataSync>();
139     abilitySync_ = std::make_unique<AbilitySync>();
140     if ((timeSync_ == nullptr) || (dataSync_ == nullptr) || (abilitySync_ == nullptr)) {
141         timeSync_ = nullptr;
142         dataSync_ = nullptr;
143         abilitySync_ = nullptr;
144         return -E_OUT_OF_MEMORY;
145     }
146 
147     errCode = timeSync_->Initialize(communicator, metaData, syncInterface, context->GetDeviceId(),
148         context->GetTargetUserId());
149     if (errCode != E_OK) {
150         goto ERROR_OUT;
151     }
152     errCode = dataSync_->Initialize(syncInterface, communicator, metaData, context->GetDeviceId());
153     if (errCode != E_OK) {
154         goto ERROR_OUT;
155     }
156     errCode = abilitySync_->Initialize(communicator, syncInterface, metaData, context->GetDeviceId());
157     if (errCode != E_OK) {
158         goto ERROR_OUT;
159     }
160     abilitySync_->InitAbilitySyncFinishStatus(*context);
161 
162     currentState_ = IDLE;
163     context_ = static_cast<SingleVerSyncTaskContext *>(context);
164     syncInterface_ = static_cast<SyncGenericInterface *>(syncInterface);
165 
166     InitStateSwitchTables();
167     InitStateMapping();
168     return E_OK;
169 
170 ERROR_OUT:
171     Clear();
172     return errCode;
173 }
174 
SyncStep()175 void SingleVerSyncStateMachine::SyncStep()
176 {
177     RefObject::IncObjRef(context_);
178     RefObject::IncObjRef(communicator_);
179     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this] { SyncStepInnerLocked(); });
180     if (errCode != E_OK) {
181         LOGE("[StateMachine][SyncStep] Schedule SyncStep failed");
182         RefObject::DecObjRef(communicator_);
183         RefObject::DecObjRef(context_);
184     }
185 }
186 
ReceiveMessageCallback(Message * inMsg)187 int SingleVerSyncStateMachine::ReceiveMessageCallback(Message *inMsg)
188 {
189     int errCode = MessageCallbackPre(inMsg);
190     if (errCode != E_OK) {
191         LOGE("[StateMachine] message pre check failed");
192         return errCode;
193     }
194     if (context_->IsNeedRetrySync(inMsg->GetErrorNo(), inMsg->GetMessageType())) {
195         SwitchStateAndStep(NEED_RESYNC_EVENT);
196         return E_OK;
197     }
198     switch (inMsg->GetMessageId()) {
199         case TIME_SYNC_MESSAGE:
200             errCode = TimeMarkSyncRecv(inMsg);
201             break;
202         case ABILITY_SYNC_MESSAGE:
203             errCode = AbilitySyncRecv(inMsg);
204             break;
205         case DATA_SYNC_MESSAGE:
206         case QUERY_SYNC_MESSAGE:
207             errCode = DataPktRecv(inMsg);
208             break;
209         case CONTROL_SYNC_MESSAGE:
210             errCode = ControlPktRecv(inMsg);
211             break;
212         default:
213             errCode = -E_NOT_SUPPORT;
214     }
215     return errCode;
216 }
217 
SyncStepInnerLocked()218 void SingleVerSyncStateMachine::SyncStepInnerLocked()
219 {
220     if (context_->IncUsedCount() != E_OK) {
221         goto SYNC_STEP_OUT;
222     }
223     {
224         std::lock_guard<std::mutex> lock(stateMachineLock_);
225         SyncStepInner();
226     }
227     context_->SafeExit();
228 
229 SYNC_STEP_OUT:
230     RefObject::DecObjRef(communicator_);
231     RefObject::DecObjRef(context_);
232 }
233 
SyncStepInner()234 void SingleVerSyncStateMachine::SyncStepInner()
235 {
236     Event event = INNER_ERR_EVENT;
237     do {
238         auto iter = stateMapping_.find(currentState_);
239         if (iter != stateMapping_.end()) {
240             event = static_cast<Event>(iter->second());
241         } else {
242             LOGE("[StateMachine][SyncStepInner] can not find state=%d,label=%s,dev=%s", currentState_,
243                 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
244             break;
245         }
246     } while (event != Event::WAIT_ACK_EVENT && SwitchMachineState(event) == E_OK && currentState_ != IDLE);
247 }
248 
SetCurStateErrStatus()249 void SingleVerSyncStateMachine::SetCurStateErrStatus()
250 {
251     currentState_ = State::INNER_ERR;
252 }
253 
StartSyncInner()254 int SingleVerSyncStateMachine::StartSyncInner()
255 {
256     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
257     if (performance != nullptr) {
258         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
259     }
260     int errCode = PrepareNextSyncTask();
261     if (errCode == E_OK) {
262         SwitchStateAndStep(Event::START_SYNC_EVENT);
263     }
264     return errCode;
265 }
266 
AbortInner()267 void SingleVerSyncStateMachine::AbortInner()
268 {
269     LOGE("[StateMachine][AbortInner] error occurred,abort,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
270         STR_MASK(context_->GetDeviceId()));
271     if (context_->IsKilled()) {
272         dataSync_->ClearDataMsg();
273     }
274     dataSync_->ClearSyncStatus();
275     ContinueToken token;
276     context_->GetContinueToken(token);
277     if (token != nullptr) {
278         syncInterface_->ReleaseContinueToken(token);
279     }
280     context_->SetContinueToken(nullptr);
281     context_->Clear();
282 }
283 
GetStateSwitchTables() const284 const std::vector<StateSwitchTable> &SingleVerSyncStateMachine::GetStateSwitchTables() const
285 {
286     return stateSwitchTables_;
287 }
288 
PrepareNextSyncTask()289 int SingleVerSyncStateMachine::PrepareNextSyncTask()
290 {
291     int errCode = StartWatchDog();
292     if (errCode != E_OK && errCode != -E_UNEXPECTED_DATA) {
293         LOGE("[StateMachine][PrepareNextSyncTask] WatchDog start failed,err=%d", errCode);
294         return errCode;
295     }
296     if (errCode == -E_UNEXPECTED_DATA) {
297         LOGI("[PrepareNextSyncTask] timer already exists, reset the timer.");
298         (void)ResetWatchDog();
299     }
300 
301     if (currentState_ != State::IDLE && currentState_ != State::SYNC_TASK_FINISHED) {
302         LOGW("[StateMachine][PrepareNextSyncTask] PreSync may get an err, state=%" PRIu8 ",dev=%s",
303             currentState_, STR_MASK(context_->GetDeviceId()));
304         currentState_ = State::IDLE;
305     }
306     return E_OK;
307 }
308 
SendNotifyPacket(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)309 void SingleVerSyncStateMachine::SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
310 {
311     dataSync_->SendSaveDataNotifyPacket(context_,
312         std::min(context_->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT), sessionId, sequenceId, inMsgId);
313 }
314 
CommErrAbort(uint32_t sessionId)315 void SingleVerSyncStateMachine::CommErrAbort(uint32_t sessionId)
316 {
317     std::lock_guard<std::mutex> lock(stateMachineLock_);
318     uint32_t requestSessionId = context_->GetRequestSessionId();
319     if ((sessionId != 0) && ((requestSessionId == 0) || (sessionId != requestSessionId))) {
320         return;
321     }
322     context_->SetCommNormal(false);
323     if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
324         SyncStep();
325     }
326 }
327 
InitStateSwitchTables()328 void SingleVerSyncStateMachine::InitStateSwitchTables()
329 {
330     if (isStateSwitchTableInited_) {
331         return;
332     }
333 
334     std::lock_guard<std::mutex> lock(stateSwitchTableLock_);
335     if (isStateSwitchTableInited_) {
336         return;
337     }
338 
339     InitStateSwitchTable(SINGLE_VER_SYNC_PROCTOL_V3, STATE_SWITCH_TABLE_V3);
340     std::sort(stateSwitchTables_.begin(), stateSwitchTables_.end(),
341         [](const auto &tableA, const auto &tableB) {
342             return tableA.version > tableB.version;
343         }); // descending
344     isStateSwitchTableInited_ = true;
345 }
346 
InitStateSwitchTable(uint32_t version,const std::vector<std::vector<uint8_t>> & switchTable)347 void SingleVerSyncStateMachine::InitStateSwitchTable(uint32_t version,
348     const std::vector<std::vector<uint8_t>> &switchTable)
349 {
350     StateSwitchTable table;
351     table.version = version;
352     for (const auto &stateSwitch : switchTable) {
353         if (stateSwitch.size() <= OUTPUT_STATE_INDEX) {
354             LOGE("[StateMachine][InitSwitchTable] stateSwitch size err,size=%zu", stateSwitch.size());
355             return;
356         }
357         if (table.switchTable.count(stateSwitch[CURRENT_STATE_INDEX]) == 0) {
358             EventToState eventToState; // new EventToState
359             eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
360             table.switchTable[stateSwitch[CURRENT_STATE_INDEX]] = eventToState;
361         } else { // key stateSwitch[CURRENT_STATE_INDEX] already has EventToState
362             EventToState &eventToState = table.switchTable[stateSwitch[CURRENT_STATE_INDEX]];
363             eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
364         }
365     }
366     stateSwitchTables_.push_back(table);
367 }
368 
InitStateMapping()369 void SingleVerSyncStateMachine::InitStateMapping()
370 {
371     stateMapping_[TIME_SYNC] = [this] { return DoTimeSync(); };
372     stateMapping_[ABILITY_SYNC] = [this] { return DoAbilitySync(); };
373     stateMapping_[WAIT_FOR_RECEIVE_DATA_FINISH] = [this] { return DoWaitForDataRecv(); };
374     stateMapping_[SYNC_TASK_FINISHED] = [this] { return DoSyncTaskFinished(); };
375     stateMapping_[SYNC_TIME_OUT] = [this] { return DoTimeout(); };
376     stateMapping_[INNER_ERR] = [this] { return DoInnerErr(); };
377     stateMapping_[START_INITIACTIVE_DATA_SYNC] = [this] { return DoInitiactiveDataSyncWithSlidingWindow(); };
378     stateMapping_[START_PASSIVE_DATA_SYNC] = [this] { return DoPassiveDataSyncWithSlidingWindow(); };
379     stateMapping_[SYNC_CONTROL_CMD] = [this] { return DoInitiactiveControlSync(); };
380 }
381 
DoInitiactiveDataSyncWithSlidingWindow() const382 Event SingleVerSyncStateMachine::DoInitiactiveDataSyncWithSlidingWindow() const
383 {
384     LOGD("[StateMachine][activeDataSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
385         dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
386     int errCode = E_OK;
387     switch (context_->GetMode()) {
388         case SyncModeType::PUSH:
389         case SyncModeType::QUERY_PUSH:
390             context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
391             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
392             break;
393         case SyncModeType::PULL:
394         case SyncModeType::QUERY_PULL:
395             context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
396             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
397             break;
398         case SyncModeType::PUSH_AND_PULL:
399         case SyncModeType::QUERY_PUSH_PULL:
400             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
401             break;
402         case SyncModeType::RESPONSE_PULL:
403             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
404             break;
405         default:
406             errCode = -E_NOT_SUPPORT;
407             break;
408     }
409     if (errCode == E_OK) {
410         return Event::WAIT_ACK_EVENT;
411     }
412     // once E_EKEYREVOKED error occurred, PUSH_AND_PULL mode should wait for ack to pull remote data.
413     if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
414         errCode == -E_EKEYREVOKED) {
415         return Event::WAIT_ACK_EVENT;
416     }
417 
418     // when response task step dataSync again while request task is running,  ignore the errCode
419     bool ignoreInnerErr = (context_->GetResponseSessionId() != 0 && context_->GetRequestSessionId() != 0);
420     Event event = TransformErrCodeToEvent(errCode);
421     return (ignoreInnerErr && event == INNER_ERR_EVENT) ? SEND_FINISHED_EVENT : event;
422 }
423 
DoPassiveDataSyncWithSlidingWindow()424 Event SingleVerSyncStateMachine::DoPassiveDataSyncWithSlidingWindow()
425 {
426     {
427         RefObject::AutoLock lock(context_);
428         if (context_->GetRspTargetQueueSize() != 0) {
429             PreStartPullResponse();
430         } else if (context_->GetResponseSessionId() == 0 ||
431             context_->GetRetryStatus() == static_cast<int>(SyncTaskContext::NO_NEED_RETRY)) {
432             return RESPONSE_TASK_FINISHED_EVENT;
433         }
434     }
435     int errCode = dataSync_->SyncStart(SyncModeType::RESPONSE_PULL, context_);
436     if (errCode != E_OK) {
437         LOGE("[SingleVerSyncStateMachine][DoPassiveDataSyncWithSlidingWindow] response pull send failed[%d]", errCode);
438         return RESPONSE_TASK_FINISHED_EVENT;
439     }
440     return Event::WAIT_ACK_EVENT;
441 }
442 
DoInitiactiveControlSync() const443 Event SingleVerSyncStateMachine::DoInitiactiveControlSync() const
444 {
445     LOGD("[StateMachine][activeControlSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
446         dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
447     context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
448     int errCode = dataSync_->ControlCmdStart(context_);
449     if (errCode == E_OK) {
450         return Event::WAIT_ACK_EVENT;
451     }
452     context_->SetTaskErrCode(errCode);
453     return TransformErrCodeToEvent(errCode);
454 }
455 
HandleControlAckRecv(const Message * inMsg)456 int SingleVerSyncStateMachine::HandleControlAckRecv(const Message *inMsg)
457 {
458     std::lock_guard<std::mutex> lock(stateMachineLock_);
459     if (IsNeedResetWatchdog(inMsg)) {
460         (void)ResetWatchDog();
461     }
462     int errCode = dataSync_->ControlCmdAckRecv(context_, inMsg);
463     ControlAckRecvErrCodeHandle(errCode);
464     SwitchStateAndStep(TransformErrCodeToEvent(errCode));
465     return E_OK;
466 }
467 
DoWaitForDataRecv() const468 Event SingleVerSyncStateMachine::DoWaitForDataRecv() const
469 {
470     if (context_->GetRspTargetQueueSize() != 0) {
471         return START_PULL_RESPONSE_EVENT;
472     }
473     if (context_->GetOperationStatus() == SyncOperation::OP_FINISHED_ALL) {
474         return RECV_FINISHED_EVENT;
475     }
476     if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
477         context_->GetOperationStatus() == SyncOperation::OP_EKEYREVOKED_FAILURE &&
478         context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
479         return RECV_FINISHED_EVENT;
480     }
481     return Event::WAIT_ACK_EVENT;
482 }
483 
DoTimeSync() const484 Event SingleVerSyncStateMachine::DoTimeSync() const
485 {
486     timeSync_->SetTimeSyncFinishIfNeed();
487     if (timeSync_->IsNeedSync()) {
488         CommErrHandler handler = nullptr;
489         // Auto sync need do retry don't use errHandler to return.
490         if (!context_->IsAutoSync()) {
491             handler = [this, context = context_,
492                 requestSessionId = context_->GetRequestSessionId()](int ret, bool isDirectEnd) {
493                 SyncTaskContext::CommErrHandlerFunc(ret, context, requestSessionId, isDirectEnd);
494             };
495         }
496         int errCode = timeSync_->SyncStart(handler, context_->GetRequestSessionId(), context_->IsRetryTask());
497         if (errCode == E_OK) {
498             return Event::WAIT_ACK_EVENT;
499         }
500         context_->SetTaskErrCode(errCode);
501         return TransformErrCodeToEvent(errCode);
502     }
503 
504     return Event::TIME_SYNC_FINISHED_EVENT;
505 }
506 
DoAbilitySync() const507 Event SingleVerSyncStateMachine::DoAbilitySync() const
508 {
509     uint16_t remoteCommunicatorVersion = 0;
510     int errCode = communicator_->GetRemoteCommunicatorVersion(context_->GetDeviceId(), remoteCommunicatorVersion);
511     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
512         LOGE("[StateMachine][DoAbilitySync] Get RemoteCommunicatorVersion errCode=%d", errCode);
513         return Event::INNER_ERR_EVENT;
514     }
515     // Fistr version, not support AbilitySync
516     if (remoteCommunicatorVersion == 0 && errCode != -E_NOT_FOUND) {
517         context_->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
518         return GetEventAfterTimeSync(context_->GetMode());
519     }
520     if (abilitySync_->GetAbilitySyncFinishedStatus()) {
521         return GetEventAfterTimeSync(context_->GetMode());
522     }
523 
524     CommErrHandler handler = [this, context = context_,
525         requestSessionId = context_->GetRequestSessionId()](int ret, bool isDirectEnd) {
526         SyncTaskContext::CommErrHandlerFunc(ret, context, requestSessionId, isDirectEnd);
527     };
528     LOGI("[StateMachine][AbilitySync] start abilitySync,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
529         STR_MASK(context_->GetDeviceId()));
530     errCode = abilitySync_->SyncStart(context_->GetRequestSessionId(), context_->GetSequenceId(),
531         remoteCommunicatorVersion, handler, context_);
532     if (errCode != E_OK) {
533         LOGE("[StateMachine][DoAbilitySync] ability sync start failed,errCode=%d", errCode);
534         context_->SetTaskErrCode(errCode);
535         return TransformErrCodeToEvent(errCode);
536     }
537     return Event::WAIT_ACK_EVENT;
538 }
539 
GetEventAfterTimeSync(int mode) const540 Event SingleVerSyncStateMachine::GetEventAfterTimeSync(int mode) const
541 {
542     if (mode == SyncModeType::SUBSCRIBE_QUERY || mode == SyncModeType::UNSUBSCRIBE_QUERY) {
543         return Event::CONTROL_CMD_EVENT;
544     }
545     return Event::ABILITY_SYNC_FINISHED_EVENT;
546 }
547 
DoSyncTaskFinished()548 Event SingleVerSyncStateMachine::DoSyncTaskFinished()
549 {
550     StopWatchDog();
551     context_->ResetResyncTimes();
552     if (dataSync_ == nullptr || communicator_ == nullptr || syncContext_ == nullptr) {
553         LOGE("[SingleVerSyncStateMachine] [DoSyncTaskFinished] dataSync_ or communicator_ or syncContext_ is nullptr.");
554         return TransformErrCodeToEvent(-E_OUT_OF_MEMORY);
555     }
556     dataSync_->ClearSyncStatus();
557     auto timeout = communicator_->GetTimeout(syncContext_->GetDeviceId());
558     RefObject::AutoLock lock(syncContext_);
559     int errCode = ExecNextTask(timeout);
560     if (errCode == E_OK) {
561         return Event::START_SYNC_EVENT;
562     }
563     return TransformErrCodeToEvent(errCode);
564 }
565 
DoTimeout()566 Event SingleVerSyncStateMachine::DoTimeout()
567 {
568     RefObject::AutoLock lock(context_);
569     if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
570         std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
571         if (subManager != nullptr) {
572             subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
573         }
574     }
575     context_->Abort(SyncOperation::OP_TIMEOUT);
576     context_->Clear();
577     AbortInner();
578     return Event::ANY_EVENT;
579 }
580 
DoInnerErr()581 Event SingleVerSyncStateMachine::DoInnerErr()
582 {
583     RefObject::AutoLock lock(context_);
584     if (!context_->IsCommNormal()) {
585         if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
586             std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
587             if (subManager != nullptr) {
588                 subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
589             }
590         }
591         context_->Abort(SyncOperation::OP_COMM_ABNORMAL);
592     } else {
593         int status = GetSyncOperationStatus(context_->GetTaskErrCode());
594         context_->Abort(status);
595     }
596     context_->Clear();
597     AbortInner();
598     return Event::ANY_EVENT;
599 }
600 
AbilitySyncRecv(const Message * inMsg)601 int SingleVerSyncStateMachine::AbilitySyncRecv(const Message *inMsg)
602 {
603     if (inMsg->GetMessageType() == TYPE_REQUEST) {
604         int errCode = abilitySync_->RequestRecv(inMsg, context_);
605         if (errCode != E_OK && inMsg->GetSessionId() == context_->GetResponseSessionId()) {
606             context_->SetTaskErrCode(errCode);
607             std::lock_guard<std::mutex> lock(stateMachineLock_);
608             SwitchStateAndStep(Event::INNER_ERR_EVENT);
609         }
610         return E_OK;
611     }
612     return AbilitySyncResponseRecv(inMsg);
613 }
614 
AbilitySyncResponseRecv(const Message * inMsg)615 int SingleVerSyncStateMachine::AbilitySyncResponseRecv(const Message *inMsg)
616 {
617     if (inMsg->GetMessageType() == TYPE_RESPONSE && AbilityMsgSessionIdCheck(inMsg)) {
618         std::lock_guard<std::mutex> lock(stateMachineLock_);
619         int errCode = abilitySync_->AckRecv(inMsg, context_);
620         (void)ResetWatchDog();
621         if (errCode == -E_ABILITY_SYNC_FINISHED) {
622             LOGI("[StateMachine][AbilitySyncRecv] ability sync finished with both kv,label=%s,dev=%s",
623                 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
624             abilitySync_->SetAbilitySyncFinishedStatus(true, *context_);
625             JumpStatusAfterAbilitySync(context_->GetMode());
626         } else if (errCode != E_OK) {
627             LOGE("[StateMachine][AbilitySyncRecv] handle ackRecv failed,errCode=%d", errCode);
628             SwitchStateAndStep(TransformErrCodeToEvent(errCode));
629         } else if (context_->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_2_0) {
630             abilitySync_->SetAbilitySyncFinishedStatus(true, *context_);
631             LOGI("[StateMachine][AbilitySyncRecv] ability Sync Finished,label=%s,dev=%s",
632                 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
633             currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
634             JumpStatusAfterAbilitySync(context_->GetMode());
635         }
636         return E_OK;
637     }
638     if (inMsg->GetMessageType() == TYPE_NOTIFY) {
639         return AbilitySyncNotifyRecv(inMsg);
640     }
641     LOGE("[StateMachine][AbilitySyncRecv] msg type invalid");
642     return -E_NOT_SUPPORT;
643 }
644 
HandleDataRequestRecv(const Message * inMsg)645 int SingleVerSyncStateMachine::HandleDataRequestRecv(const Message *inMsg)
646 {
647     TimeOffset offset = 0;
648     auto [systemOffset, senderLocalOffset] = SingleVerDataSyncUtils::GetTimeOffsetFromRequestMsg(inMsg);
649     int errCode = timeSync_->GenerateTimeOffsetIfNeed(systemOffset, senderLocalOffset);
650     if (errCode != E_OK) {
651         (void)dataSync_->SendDataAck(context_, inMsg, errCode, 0);
652         return errCode;
653     }
654     uint32_t timeout = communicator_->GetTimeout(context_->GetDeviceId());
655     // If message is data sync request, we should check timeoffset.
656     errCode = timeSync_->GetTimeOffset(offset, timeout);
657     if (errCode != E_OK) {
658         LOGE("[StateMachine][HandleDataRequestRecv] GetTimeOffset err! errCode=%d", errCode);
659         return errCode;
660     }
661     context_->SetTimeOffset(offset);
662     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
663     if (performance != nullptr) {
664         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
665     }
666     DecRefCountOfFeedDogTimer(SyncDirectionFlag::RECEIVE);
667 
668     // RequestRecv will save data, it may cost a long time.
669     // So we need to send save data notify to keep remote alive.
670     bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
671     {
672         std::lock_guard<std::mutex> lockWatchDog(stateMachineLock_);
673         if (IsNeedResetWatchdog(inMsg)) {
674             (void)ResetWatchDog();
675         }
676     }
677     WaterMark pullEndWaterkark = 0;
678     errCode = dataSync_->DataRequestRecv(context_, inMsg, pullEndWaterkark);
679     if (performance != nullptr) {
680         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
681     }
682     // only higher than 102 version receive this errCode here.
683     // while both RequestSessionId is not equal,but get this errCode;slwr would seem to handle first secquencid.
684     // so while receive the same secquencid after abiitysync it wouldn't handle.
685     if (errCode == -E_NEED_ABILITY_SYNC) {
686         if (isNeedStop) {
687             StopSaveDataNotify();
688         }
689         return errCode;
690     }
691     {
692         std::lock_guard<std::mutex> lock(stateMachineLock_);
693         DataRecvErrCodeHandle(inMsg->GetSessionId(), errCode);
694         if (pullEndWaterkark > 0) {
695             AddPullResponseTarget(inMsg, pullEndWaterkark);
696         }
697     }
698     if (isNeedStop) {
699         StopSaveDataNotify();
700     }
701     return E_OK;
702 }
703 
HandleDataAckRecvWithSlidingWindow(int errCode,const Message * inMsg,bool ignoreInnerErr)704 void SingleVerSyncStateMachine::HandleDataAckRecvWithSlidingWindow(int errCode, const Message *inMsg,
705     bool ignoreInnerErr)
706 {
707     if (errCode == -E_RE_SEND_DATA) { // LOCAL_WATER_MARK_NOT_INIT
708         dataSync_->ClearSyncStatus();
709     }
710     if (errCode == -E_NO_DATA_SEND || errCode == -E_SEND_DATA) {
711         int ret = dataSync_->TryContinueSync(context_, inMsg);
712         if (ret == -E_FINISHED) {
713             SwitchStateAndStep(Event::SEND_FINISHED_EVENT);
714             return;
715         } else if (ret == E_OK) { // do nothing and waiting for all ack receive
716             return;
717         }
718         errCode = ret;
719     }
720     ResponsePullError(errCode, ignoreInnerErr);
721 }
722 
NeedAbilitySyncHandle()723 void SingleVerSyncStateMachine::NeedAbilitySyncHandle()
724 {
725     // if the remote device version num is overdue,
726     // mean the version num has been reset when syncing data,
727     // there should not clear the new version cache again.
728     if (currentRemoteVersionId_ == context_->GetRemoteSoftwareVersionId()) {
729         LOGI("[StateMachine] set remote version 0, currentRemoteVersionId_ = %" PRIu64, currentRemoteVersionId_);
730         context_->SetRemoteSoftwareVersion(0);
731     } else {
732         currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
733     }
734     abilitySync_->SetAbilitySyncFinishedStatus(false, *context_);
735 }
736 
HandleDataAckRecv(const Message * inMsg)737 int SingleVerSyncStateMachine::HandleDataAckRecv(const Message *inMsg)
738 {
739     if (inMsg->GetMessageType() == TYPE_RESPONSE) {
740         DecRefCountOfFeedDogTimer(SyncDirectionFlag::SEND);
741     }
742     std::lock_guard<std::mutex> lock(stateMachineLock_);
743     // Unfortunately we use stateMachineLock_ in many sync process
744     // So a bad ack will check before the lock and wait
745     // And then another process is running, it will get the lock.After this process, the ack became invalid.
746     // If we don't check ack again, it will be delivered to dataSyncer.
747     if (!IsPacketValid(inMsg)) {
748         return -E_INVALID_ARGS;
749     }
750     if (IsNeedResetWatchdog(inMsg)) {
751         (void)ResetWatchDog();
752     }
753     if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 && !dataSync_->AckPacketIdCheck(inMsg)) {
754         // packetId not match but sequence id matched scene, means resend map has be rebuilt
755         // this is old ack, should be dropped and wait for the same packetId sequence.
756         return E_OK;
757     }
758     // AckRecv will save meta data, it may cost a long time. if another thread is saving data
759     // So we need to send save data notify to keep remote alive.
760     // e.g. remote do pull sync
761     bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
762     int errCode = dataSync_->AckRecv(context_, inMsg);
763     if (isNeedStop) {
764         StopSaveDataNotify();
765     }
766     if (errCode == -E_NEED_ABILITY_SYNC || errCode == -E_RE_SEND_DATA || errCode == -E_NEED_TIME_SYNC) {
767         StopFeedDogForSync(SyncDirectionFlag::SEND);
768         dataSync_->ClearSyncStatus();
769         context_->ReSetSequenceId();
770     } else if (errCode == -E_SAVE_DATA_NOTIFY) {
771         return errCode;
772     }
773     // when this msg is from response task while request task is running,  ignore the errCode
774     bool ignoreInnerErr = inMsg->GetSessionId() == context_->GetResponseSessionId() &&
775         context_->GetRequestSessionId() != 0;
776     DataAckRecvErrCodeHandle(errCode, !ignoreInnerErr);
777     HandleDataAckRecvWithSlidingWindow(errCode, inMsg, ignoreInnerErr);
778     return errCode;
779 }
780 
DataPktRecv(Message * inMsg)781 int SingleVerSyncStateMachine::DataPktRecv(Message *inMsg)
782 {
783     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
784     int errCode = E_OK;
785     switch (inMsg->GetMessageType()) {
786         case TYPE_REQUEST:
787             ScheduleMsgAndHandle(inMsg);
788             errCode = -E_NOT_NEED_DELETE_MSG;
789             break;
790         case TYPE_RESPONSE:
791         case TYPE_NOTIFY:
792             if (performance != nullptr) {
793                 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
794                 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
795             }
796             errCode = HandleDataAckRecv(inMsg);
797             break;
798         default:
799             errCode = -E_INVALID_ARGS;
800             break;
801     }
802     return errCode;
803 }
804 
ScheduleMsgAndHandle(Message * inMsg)805 void SingleVerSyncStateMachine::ScheduleMsgAndHandle(Message *inMsg)
806 {
807     dataSync_->PutDataMsg(inMsg);
808     while (true) {
809         bool isNeedHandle = true;
810         bool isNeedContinue = true;
811         Message *msg = dataSync_->MoveNextDataMsg(context_, isNeedHandle, isNeedContinue);
812         if (!isNeedContinue) {
813             break;
814         }
815         if (msg == nullptr) {
816             if (dataSync_->IsNeedReloadQueue()) {
817                 continue;
818             }
819             dataSync_->ScheduleInfoHandle(false, false, nullptr);
820             break;
821         }
822         bool isNeedClearMap = false;
823         if (isNeedHandle) {
824             int errCode = HandleDataRequestRecv(msg);
825             if (context_->IsReceiveWaterMarkErr() || errCode == -E_NEED_ABILITY_SYNC || errCode == -E_NEED_TIME_SYNC) {
826                 isNeedClearMap = true;
827             }
828             if (errCode == -E_TIMEOUT) {
829                 isNeedHandle = false;
830             }
831         } else {
832             dataSync_->SendFinishedDataAck(context_, msg);
833         }
834         if (context_->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
835             // for lower version, no need to handle map schedule info, just reset schedule working status
836             isNeedHandle = false;
837         }
838         dataSync_->ScheduleInfoHandle(isNeedHandle, isNeedClearMap, msg);
839         delete msg;
840     }
841 }
842 
ControlPktRecv(Message * inMsg)843 int SingleVerSyncStateMachine::ControlPktRecv(Message *inMsg)
844 {
845     int errCode = E_OK;
846     switch (inMsg->GetMessageType()) {
847         case TYPE_REQUEST:
848             errCode = dataSync_->ControlCmdRequestRecv(context_, inMsg);
849             break;
850         case TYPE_RESPONSE:
851             errCode = HandleControlAckRecv(inMsg);
852             break;
853         default:
854             errCode = -E_INVALID_ARGS;
855             break;
856     }
857     return errCode;
858 }
859 
StepToTimeout(TimerId timerId)860 void SingleVerSyncStateMachine::StepToTimeout(TimerId timerId)
861 {
862     std::lock_guard<std::mutex> lock(stateMachineLock_);
863     TimerId timer = syncContext_->GetTimerId();
864     if (timer != timerId) {
865         return;
866     }
867     SwitchStateAndStep(Event::TIME_OUT_EVENT);
868 }
869 
870 namespace {
871 struct StateNode {
872     int errCode = 0;
873     SyncOperation::Status status = SyncOperation::OP_WAITING;
874 };
875 }
GetSyncOperationStatus(int errCode) const876 int SingleVerSyncStateMachine::GetSyncOperationStatus(int errCode) const
877 {
878     static const StateNode stateNodes[] = {
879         { -E_SCHEMA_MISMATCH,                 SyncOperation::OP_SCHEMA_INCOMPATIBLE },
880         { -E_EKEYREVOKED,                     SyncOperation::OP_EKEYREVOKED_FAILURE },
881         { -E_SECURITY_OPTION_CHECK_ERROR,     SyncOperation::OP_SECURITY_OPTION_CHECK_FAILURE },
882         { -E_BUSY,                            SyncOperation::OP_BUSY_FAILURE },
883         { -E_NOT_PERMIT,                      SyncOperation::OP_PERMISSION_CHECK_FAILED },
884         { -E_TIMEOUT,                         SyncOperation::OP_TIMEOUT },
885         { -E_INVALID_QUERY_FORMAT,            SyncOperation::OP_QUERY_FORMAT_FAILURE },
886         { -E_INVALID_QUERY_FIELD,             SyncOperation::OP_QUERY_FIELD_FAILURE },
887         { -E_FEEDBACK_UNKNOWN_MESSAGE,        SyncOperation::OP_NOT_SUPPORT },
888         { -E_FEEDBACK_COMMUNICATOR_NOT_FOUND, SyncOperation::OP_COMM_ABNORMAL },
889         { -E_NOT_SUPPORT,                     SyncOperation::OP_NOT_SUPPORT },
890         { -E_INTERCEPT_DATA_FAIL,             SyncOperation::OP_INTERCEPT_DATA_FAIL },
891         { -E_MAX_LIMITS,                      SyncOperation::OP_MAX_LIMITS },
892         { -E_DISTRIBUTED_SCHEMA_CHANGED,      SyncOperation::OP_SCHEMA_CHANGED },
893         { -E_NOT_REGISTER,                    SyncOperation::OP_NOT_SUPPORT },
894         { -E_DENIED_SQL,                      SyncOperation::OP_DENIED_SQL },
895         { -E_REMOTE_OVER_SIZE,                SyncOperation::OP_MAX_LIMITS },
896         { -E_INVALID_PASSWD_OR_CORRUPTED_DB,  SyncOperation::OP_NOTADB_OR_CORRUPTED },
897         { -E_DISTRIBUTED_SCHEMA_NOT_FOUND,    SyncOperation::OP_SCHEMA_INCOMPATIBLE },
898         { -E_FEEDBACK_DB_CLOSING,             SyncOperation::OP_DB_CLOSING },
899     };
900     const auto &result = std::find_if(std::begin(stateNodes), std::end(stateNodes), [errCode](const auto &node) {
901         return node.errCode == errCode;
902     });
903     return result == std::end(stateNodes) ? SyncOperation::OP_FAILED : static_cast<int>((*result).status);
904 }
905 
TimeMarkSyncRecv(const Message * inMsg)906 int SingleVerSyncStateMachine::TimeMarkSyncRecv(const Message *inMsg)
907 {
908     LOGD("[StateMachine][TimeMarkSyncRecv] type=%d,label=%s,dev=%s", inMsg->GetMessageType(),
909         dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
910     {
911         std::lock_guard<std::mutex> lock(stateMachineLock_);
912         (void)ResetWatchDog();
913     }
914     if (inMsg->GetMessageType() == TYPE_REQUEST) {
915         return timeSync_->RequestRecv(inMsg);
916     } else if (inMsg->GetMessageType() == TYPE_RESPONSE) {
917         int errCode = timeSync_->AckRecv(inMsg, context_->GetRequestSessionId());
918         if (errCode != E_OK) {
919             LOGE("[StateMachine][TimeMarkSyncRecv] AckRecv failed errCode=%d", errCode);
920             if (inMsg->GetSessionId() != 0 && inMsg->GetSessionId() == context_->GetRequestSessionId()) {
921                 context_->SetTaskErrCode(errCode);
922                 InnerErrorAbort(inMsg->GetSessionId());
923             }
924             return errCode;
925         }
926         std::lock_guard<std::mutex> lock(stateMachineLock_);
927         SwitchStateAndStep(TIME_SYNC_FINISHED_EVENT);
928         return E_OK;
929     } else {
930         return -E_INVALID_ARGS;
931     }
932 }
933 
Clear()934 void SingleVerSyncStateMachine::Clear()
935 {
936     dataSync_ = nullptr;
937     timeSync_ = nullptr;
938     abilitySync_ = nullptr;
939     context_ = nullptr;
940     syncInterface_ = nullptr;
941 }
942 
IsPacketValid(const Message * inMsg) const943 bool SingleVerSyncStateMachine::IsPacketValid(const Message *inMsg) const
944 {
945     if (inMsg == nullptr) {
946         return false;
947     }
948 
949     if ((inMsg->GetMessageId() < TIME_SYNC_MESSAGE) || (inMsg->GetMessageId() >= UNKNOW_MESSAGE)) {
950         LOGE("[StateMachine][IsPacketValid] Message is invalid, id=%d", inMsg->GetMessageId());
951         return false;
952     }
953     // filter invalid ack at first
954     bool isResponseType = (inMsg->GetMessageType() == TYPE_RESPONSE);
955     if (isResponseType && (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) &&
956         (inMsg->GetSessionId() != context_->GetRequestSessionId())) {
957         LOGE("[StateMachine][IsPacketValid] Control Message is invalid, label=%s, dev=%s",
958             dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
959         return false;
960     }
961     if (isResponseType && (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) &&
962         (inMsg->GetSessionId() != context_->GetRequestSessionId()) &&
963         (inMsg->GetSessionId() != context_->GetResponseSessionId())) {
964         LOGE("[StateMachine][IsPacketValid] Data Message is invalid, label=%s, dev=%s",
965             dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
966         return false;
967     }
968 
969     // timeSync and abilitySync don't need to check sequenceId
970     if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE || inMsg->GetMessageId() == ABILITY_SYNC_MESSAGE ||
971         inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
972         return true;
973     }
974     // sequenceId will be checked in dataSync
975     return true;
976 }
977 
PreStartPullResponse()978 void SingleVerSyncStateMachine::PreStartPullResponse()
979 {
980     SingleVerSyncTarget target;
981     context_->PopResponseTarget(target);
982     context_->SetEndMark(target.GetEndWaterMark());
983     context_->SetResponseSessionId(target.GetResponseSessionId());
984     context_->SetMode(SyncModeType::RESPONSE_PULL);
985     context_->ReSetSequenceId();
986     context_->SetQuerySync(target.IsQuerySync());
987     context_->SetQuery(target.GetQuery());
988 }
989 
CheckIsStartPullResponse() const990 bool SingleVerSyncStateMachine::CheckIsStartPullResponse() const
991 {
992     // Other state will step to do pull response, only this statem we need to step the statemachine
993     if (currentState_ == WAIT_FOR_RECEIVE_DATA_FINISH) {
994         return true;
995     }
996     return false;
997 }
998 
MessageCallbackPre(const Message * inMsg)999 int SingleVerSyncStateMachine::MessageCallbackPre(const Message *inMsg)
1000 {
1001     RefObject::AutoLock lock(context_);
1002     if (context_->IsKilled()) {
1003         return -E_OBJ_IS_KILLED;
1004     }
1005 
1006     if (!IsPacketValid(inMsg)) {
1007         return -E_INVALID_ARGS;
1008     }
1009     return E_OK;
1010 }
1011 
AddPullResponseTarget(const Message * inMsg,WaterMark pullEndWatermark)1012 void SingleVerSyncStateMachine::AddPullResponseTarget(const Message *inMsg, WaterMark pullEndWatermark)
1013 {
1014     int messageType = static_cast<int>(inMsg->GetMessageId());
1015     uint32_t sessionId = inMsg->GetSessionId();
1016     if (pullEndWatermark == 0) {
1017         LOGE("[StateMachine][AddPullResponseTarget] pullEndWatermark is 0!");
1018         return;
1019     }
1020     if (context_->GetResponseSessionId() == sessionId || context_->FindResponseSyncTarget(sessionId)) {
1021         LOGI("[StateMachine][AddPullResponseTarget] task is already running");
1022         return;
1023     }
1024     const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
1025     if (packet == nullptr) {
1026         LOGE("[AddPullResponseTarget] get packet object failed");
1027         return;
1028     }
1029     SingleVerSyncTarget *targetTmp = new (std::nothrow) SingleVerSyncTarget;
1030     if (targetTmp == nullptr) {
1031         LOGE("[StateMachine][AddPullResponseTarget] add failed, may oom");
1032         return;
1033     }
1034     targetTmp->SetTaskType(ISyncTarget::RESPONSE);
1035     if (messageType == QUERY_SYNC_MESSAGE) {
1036         targetTmp->SetQuery(packet->GetQuery());
1037         targetTmp->SetQuerySync(true);
1038     }
1039     targetTmp->SetMode(SyncModeType::RESPONSE_PULL);
1040     targetTmp->SetEndWaterMark(pullEndWatermark);
1041     targetTmp->SetResponseSessionId(sessionId);
1042     if (context_->AddSyncTarget(targetTmp) != E_OK) {
1043         delete targetTmp;
1044         return;
1045     }
1046     if (CheckIsStartPullResponse()) {
1047         SwitchStateAndStep(TransformErrCodeToEvent(-E_NEED_PULL_REPONSE));
1048     }
1049 }
1050 
TransformErrCodeToEvent(int errCode) const1051 Event SingleVerSyncStateMachine::TransformErrCodeToEvent(int errCode) const
1052 {
1053     switch (errCode) {
1054         case -E_TIMEOUT:
1055             return TransforTimeOutErrCodeToEvent();
1056         case -static_cast<int>(VERSION_NOT_SUPPOR_EVENT):
1057             return Event::VERSION_NOT_SUPPOR_EVENT;
1058         case -E_SEND_DATA:
1059             return Event::SEND_DATA_EVENT;
1060         case -E_NO_DATA_SEND:
1061             return Event::SEND_FINISHED_EVENT;
1062         case -E_RECV_FINISHED:
1063             return Event::RECV_FINISHED_EVENT;
1064         case -E_NEED_ABILITY_SYNC:
1065             return Event::NEED_ABILITY_SYNC_EVENT;
1066         case -E_NO_SYNC_TASK:
1067             return Event::ALL_TASK_FINISHED_EVENT;
1068         case -E_NEED_PULL_REPONSE:
1069             return Event::START_PULL_RESPONSE_EVENT;
1070         case -E_RE_SEND_DATA:
1071             return Event::RE_SEND_DATA_EVENT;
1072         case -E_NEED_TIME_SYNC:
1073             return Event::NEED_TIME_SYNC_EVENT;
1074         default:
1075             return Event::INNER_ERR_EVENT;
1076     }
1077 }
1078 
IsNeedResetWatchdog(const Message * inMsg) const1079 bool SingleVerSyncStateMachine::IsNeedResetWatchdog(const Message *inMsg) const
1080 {
1081     if (inMsg == nullptr) {
1082         return false;
1083     }
1084 
1085     if (IsNeedErrCodeHandle(inMsg->GetSessionId())) {
1086         return true;
1087     }
1088 
1089     int msgType = inMsg->GetMessageType();
1090     if (msgType == TYPE_RESPONSE || msgType == TYPE_NOTIFY) {
1091         if (inMsg->GetSessionId() == context_->GetResponseSessionId()) {
1092             // Pull response ack also should reset watchdog
1093             return true;
1094         }
1095     }
1096 
1097     return false;
1098 }
1099 
TransforTimeOutErrCodeToEvent() const1100 Event SingleVerSyncStateMachine::TransforTimeOutErrCodeToEvent() const
1101 {
1102     if (syncContext_->IsSyncTaskNeedRetry() && (syncContext_->GetRetryTime() < syncContext_->GetSyncRetryTimes())) {
1103         return Event::WAIT_TIME_OUT_EVENT;
1104     } else {
1105         return Event::TIME_OUT_EVENT;
1106     }
1107 }
1108 
IsNeedErrCodeHandle(uint32_t sessionId) const1109 bool SingleVerSyncStateMachine::IsNeedErrCodeHandle(uint32_t sessionId) const
1110 {
1111     // omit to set sessionId so version_3 should skip to compare sessionid.
1112     if (sessionId == context_->GetRequestSessionId() ||
1113         context_->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_RELEASE_2_0) {
1114         return true;
1115     }
1116     return false;
1117 }
1118 
PushPullDataRequestEvokeErrHandle()1119 void SingleVerSyncStateMachine::PushPullDataRequestEvokeErrHandle()
1120 {
1121     // the pushpull sync task should wait for send finished after remote dev get data occur E_EKEYREVOKED error.
1122     if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 &&
1123         SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL) { // LCOV_EXCL_BR_LINE
1124         LOGI("data request errCode = %d, wait for send finished", -E_EKEYREVOKED);
1125         context_->SetTaskErrCode(-E_EKEYREVOKED);
1126         context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1127         SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1128     } else {
1129         context_->SetTaskErrCode(-E_EKEYREVOKED);
1130         SwitchStateAndStep(Event::INNER_ERR_EVENT);
1131     }
1132 }
1133 
DataRecvErrCodeHandle(uint32_t sessionId,int errCode)1134 void SingleVerSyncStateMachine::DataRecvErrCodeHandle(uint32_t sessionId, int errCode)
1135 {
1136     if (IsNeedErrCodeHandle(sessionId)) {
1137         switch (errCode) {
1138             case E_OK:
1139                 break;
1140             case -E_NOT_PERMIT:
1141                 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1142                 break;
1143             case -E_RECV_FINISHED:
1144                 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1145                 SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1146                 break;
1147             case -E_EKEYREVOKED:
1148                 PushPullDataRequestEvokeErrHandle();
1149                 break;
1150             case -E_BUSY:
1151             case -E_DISTRIBUTED_SCHEMA_CHANGED:
1152             case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1153             case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1154             case -E_FEEDBACK_UNKNOWN_MESSAGE:
1155             case -E_INTERCEPT_DATA_FAIL:
1156             case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1157             case -E_INVALID_QUERY_FIELD:
1158             case -E_INVALID_QUERY_FORMAT:
1159             case -E_MAX_LIMITS:
1160             case -E_NOT_REGISTER:
1161             case -E_NOT_SUPPORT:
1162             case -E_SECURITY_OPTION_CHECK_ERROR:
1163                 context_->SetTaskErrCode(errCode);
1164                 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1165                 break;
1166             default:
1167                 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1168                 break;
1169         }
1170     }
1171 }
1172 
AbilityMsgSessionIdCheck(const Message * inMsg)1173 bool SingleVerSyncStateMachine::AbilityMsgSessionIdCheck(const Message *inMsg)
1174 {
1175     if (inMsg != nullptr && inMsg->GetSessionId() == context_->GetRequestSessionId()) { // LCOV_EXCL_BR_LINE
1176         return true;
1177     }
1178     LOGE("[AbilitySync] session check failed,dev=%s", STR_MASK(context_->GetDeviceId()));
1179     return false;
1180 }
1181 
GetSyncType(uint32_t messageId) const1182 SyncType SingleVerSyncStateMachine::GetSyncType(uint32_t messageId) const
1183 {
1184     if (messageId == QUERY_SYNC_MESSAGE) { // LCOV_EXCL_BR_LINE
1185         return SyncType::QUERY_SYNC_TYPE;
1186     }
1187     return SyncType::MANUAL_FULL_SYNC_TYPE;
1188 }
1189 
DataAckRecvErrCodeHandle(int errCode,bool handleError)1190 void SingleVerSyncStateMachine::DataAckRecvErrCodeHandle(int errCode, bool handleError)
1191 {
1192     switch (errCode) {
1193         case -E_NEED_ABILITY_SYNC:
1194             NeedAbilitySyncHandle();
1195             break;
1196         case -E_NEED_TIME_SYNC:
1197             timeSync_->ClearTimeSyncFinish();
1198             break;
1199         case -E_NOT_PERMIT:
1200             if (handleError) {
1201                 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1202             }
1203             break;
1204         case -E_BUSY:
1205         case -E_DISTRIBUTED_SCHEMA_CHANGED:
1206         case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1207         case -E_EKEYREVOKED:
1208         case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1209         case -E_FEEDBACK_UNKNOWN_MESSAGE:
1210         case -E_INTERCEPT_DATA_FAIL:
1211         case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1212         case -E_INVALID_QUERY_FIELD:
1213         case -E_INVALID_QUERY_FORMAT:
1214         case -E_MAX_LIMITS:
1215         case -E_NOT_REGISTER:
1216         case -E_NOT_SUPPORT:
1217         case -E_SECURITY_OPTION_CHECK_ERROR:
1218             if (handleError) {
1219                 context_->SetTaskErrCode(errCode);
1220             }
1221             break;
1222         default:
1223             break;
1224     }
1225 }
1226 
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)1227 bool SingleVerSyncStateMachine::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
1228 {
1229     return SingleVerDataSyncUtils::IsNeedTriggerQueryAutoSync(inMsg, query);
1230 }
1231 
JumpStatusAfterAbilitySync(int mode)1232 void SingleVerSyncStateMachine::JumpStatusAfterAbilitySync(int mode)
1233 {
1234     if ((mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
1235         SwitchStateAndStep(CONTROL_CMD_EVENT);
1236     } else {
1237         SwitchStateAndStep(ABILITY_SYNC_FINISHED_EVENT);
1238     }
1239 }
1240 
ControlAckRecvErrCodeHandle(int errCode)1241 void SingleVerSyncStateMachine::ControlAckRecvErrCodeHandle(int errCode)
1242 {
1243     switch (errCode) {
1244         case -E_NEED_ABILITY_SYNC:
1245             NeedAbilitySyncHandle();
1246             break;
1247         case -E_NO_DATA_SEND:
1248             context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
1249             break;
1250         case -E_NOT_PERMIT:
1251             context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1252             break;
1253         // other errCode use default
1254         default:
1255             context_->SetTaskErrCode(errCode);
1256             break;
1257     }
1258 }
1259 
GetLocalWaterMark(const DeviceID & deviceId,const DeviceID & userId,uint64_t & outValue)1260 void SingleVerSyncStateMachine::GetLocalWaterMark(const DeviceID &deviceId, const DeviceID &userId, uint64_t &outValue)
1261 {
1262     metadata_->GetLocalWaterMark(deviceId, userId, outValue);
1263 }
1264 
GetSendQueryWaterMark(const std::string & queryId,const DeviceID & deviceId,const DeviceID & userId,bool isAutoLift,uint64_t & outValue)1265 int SingleVerSyncStateMachine::GetSendQueryWaterMark(const std::string &queryId, const DeviceID &deviceId,
1266     const DeviceID &userId, bool isAutoLift, uint64_t &outValue)
1267 {
1268     return metadata_->GetSendQueryWaterMark(queryId, deviceId, userId, outValue, isAutoLift);
1269 }
1270 
ResponsePullError(int errCode,bool ignoreInnerErr)1271 void SingleVerSyncStateMachine::ResponsePullError(int errCode, bool ignoreInnerErr)
1272 {
1273     Event event = TransformErrCodeToEvent(errCode);
1274     if (event == INNER_ERR_EVENT) {
1275         if (ignoreInnerErr) {
1276             event = RESPONSE_TASK_FINISHED_EVENT;
1277         } else if (context_ != nullptr) {
1278             context_->SetTaskErrCode(errCode);
1279         }
1280     }
1281     SwitchStateAndStep(event);
1282 }
1283 
InnerErrorAbort(uint32_t sessionId)1284 void SingleVerSyncStateMachine::InnerErrorAbort(uint32_t sessionId)
1285 {
1286     std::lock_guard<std::mutex> lock(stateMachineLock_);
1287     uint32_t requestSessionId = context_->GetRequestSessionId();
1288     if (sessionId != requestSessionId) {
1289         LOGD("[SingleVerSyncStateMachine][InnerErrorAbort] Ignore abort by different sessionId");
1290         return;
1291     }
1292     if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
1293         SyncStep();
1294     }
1295 }
1296 
NotifyClosing()1297 void SingleVerSyncStateMachine::NotifyClosing()
1298 {
1299     if (timeSync_ != nullptr) {
1300         timeSync_->Close();
1301     }
1302 }
1303 
AbilitySyncNotifyRecv(const Message * inMsg)1304 int SingleVerSyncStateMachine::AbilitySyncNotifyRecv(const Message *inMsg)
1305 {
1306     const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
1307     if (packet == nullptr) {
1308         return -E_INVALID_ARGS;
1309     }
1310     int ackCode = packet->GetAckCode();
1311     if (ackCode != AbilitySync::CHECK_SUCCESS && ackCode != AbilitySync::LAST_NOTIFY) {
1312         LOGE("[StateMachine][AbilitySyncRecv] ackCode check failed,ackCode=%d", ackCode);
1313         context_->SetTaskErrCode(ackCode);
1314         std::lock_guard<std::mutex> lock(stateMachineLock_);
1315         SwitchStateAndStep(Event::INNER_ERR_EVENT);
1316         return E_OK;
1317     }
1318     if (ackCode == AbilitySync::LAST_NOTIFY && AbilityMsgSessionIdCheck(inMsg)) {
1319         abilitySync_->SetAbilitySyncFinishedStatus(true, *context_);
1320         // while recv last notify means ability sync finished,it is better to reset watchDog to avoid timeout.
1321         LOGI("[StateMachine][AbilitySyncRecv] ability sync finished,label=%s,dev=%s",
1322             dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
1323         context_->SetRemoteSoftwareVersion(packet->GetSoftwareVersion());
1324         currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
1325         std::lock_guard<std::mutex> lock(stateMachineLock_);
1326         (void)ResetWatchDog();
1327         JumpStatusAfterAbilitySync(context_->GetMode());
1328     } else if (ackCode != AbilitySync::LAST_NOTIFY) {
1329         abilitySync_->AckNotifyRecv(inMsg, context_);
1330     }
1331     return E_OK;
1332 }
1333 
SchemaChange()1334 void SingleVerSyncStateMachine::SchemaChange()
1335 {
1336     abilitySync_->SetAbilitySyncFinishedStatus(false, *context_);
1337 }
1338 
TimeChange()1339 void SingleVerSyncStateMachine::TimeChange()
1340 {
1341     if (timeSync_ == nullptr) {
1342         LOGW("[SingleVerSyncStateMachine] time sync is null when time change");
1343         return;
1344     }
1345     timeSync_->ClearTimeSyncFinish();
1346 }
1347 } // namespace DistributedDB
1348