• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_sync_state_machine.h"
17 
18 #include <cmath>
19 #include <climits>
20 #include <algorithm>
21 
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "sync_operation.h"
25 #include "message_transform.h"
26 #include "sync_types.h"
27 #include "db_common.h"
28 #include "runtime_context.h"
29 #include "performance_analysis.h"
30 #include "single_ver_sync_target.h"
31 #include "single_ver_data_sync.h"
32 #include "single_ver_data_sync_utils.h"
33 
34 namespace DistributedDB {
35 using Event = SingleVerSyncStateMachine::Event;
36 using State = SingleVerSyncStateMachine::State;
37 namespace {
38     // used for state switch table
39     const int CURRENT_STATE_INDEX = 0;
40     const int EVENT_INDEX = 1;
41     const int OUTPUT_STATE_INDEX = 2;
42 
43     // drop v1 and v2 table by one optimize, dataSend mode in all version go with slide window mode.
44     // State switch table v3, has three columns, CurrentState, Event, and OutSate
45     const std::vector<std::vector<uint8_t>> STATE_SWITCH_TABLE_V3 = {
46         {State::IDLE, Event::START_SYNC_EVENT, State::TIME_SYNC},
47 
48         // In TIME_SYNC state
49         {State::TIME_SYNC, Event::TIME_SYNC_FINISHED_EVENT, State::ABILITY_SYNC},
50         {State::TIME_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
51         {State::TIME_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
52 
53         // In ABILITY_SYNC state, compare version num and schema
54         {State::ABILITY_SYNC, Event::VERSION_NOT_SUPPOR_EVENT, State::INNER_ERR},
55         {State::ABILITY_SYNC, Event::ABILITY_SYNC_FINISHED_EVENT, State::START_INITIACTIVE_DATA_SYNC},
56         {State::ABILITY_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
57         {State::ABILITY_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
58         {State::ABILITY_SYNC, Event::CONTROL_CMD_EVENT, State::SYNC_CONTROL_CMD},
59 
60         // In START_INITIACTIVE_DATA_SYNC state, send a sync request, and send first packt of data sync
61         {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
62         {State::START_INITIACTIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
63         {State::START_INITIACTIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
64         {State::START_INITIACTIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
65         {State::START_INITIACTIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_INITIACTIVE_DATA_SYNC},
66 
67         // In START_PASSIVE_DATA_SYNC state, do response pull request, and send first packt of data sync
68         {State::START_PASSIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
69         {State::START_PASSIVE_DATA_SYNC, Event::RESPONSE_TASK_FINISHED_EVENT, State::WAIT_FOR_RECEIVE_DATA_FINISH},
70         {State::START_PASSIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
71         {State::START_PASSIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
72         {State::START_PASSIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
73         {State::START_PASSIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_PASSIVE_DATA_SYNC},
74 
75         // In WAIT_FOR_RECEIVE_DATA_FINISH,
76         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::RECV_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
77         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::START_PULL_RESPONSE_EVENT, State::START_PASSIVE_DATA_SYNC},
78         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
79         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::INNER_ERR_EVENT, State::INNER_ERR},
80         {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
81 
82         {State::SYNC_CONTROL_CMD, Event::SEND_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
83         {State::SYNC_CONTROL_CMD, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
84         {State::SYNC_CONTROL_CMD, Event::INNER_ERR_EVENT, State::INNER_ERR},
85         {State::SYNC_CONTROL_CMD, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
86 
87         // In SYNC_TASK_FINISHED,
88         {State::SYNC_TASK_FINISHED, Event::ALL_TASK_FINISHED_EVENT, State::IDLE},
89         {State::SYNC_TASK_FINISHED, Event::START_SYNC_EVENT, State::TIME_SYNC},
90 
91         // SYNC_TIME_OUT and INNE_ERR state, just do some exception resolve
92         {State::SYNC_TIME_OUT, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
93         {State::INNER_ERR, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
94     };
95 }
96 
97 std::mutex SingleVerSyncStateMachine::stateSwitchTableLock_;
98 std::vector<StateSwitchTable> SingleVerSyncStateMachine::stateSwitchTables_;
99 bool SingleVerSyncStateMachine::isStateSwitchTableInited_ = false;
100 
SingleVerSyncStateMachine()101 SingleVerSyncStateMachine::SingleVerSyncStateMachine()
102     : context_(nullptr),
103       syncInterface_(nullptr),
104       timeSync_(nullptr),
105       abilitySync_(nullptr),
106       dataSync_(nullptr),
107       currentRemoteVersionId_(0)
108 {
109 }
110 
~SingleVerSyncStateMachine()111 SingleVerSyncStateMachine::~SingleVerSyncStateMachine()
112 {
113     LOGD("~SingleVerSyncStateMachine");
114     Clear();
115 }
116 
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,std::shared_ptr<Metadata> & metaData,ICommunicator * communicator)117 int SingleVerSyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
118     std::shared_ptr<Metadata> &metaData, ICommunicator *communicator)
119 {
120     if ((context == nullptr) || (syncInterface == nullptr) || (metaData == nullptr) || (communicator == nullptr)) {
121         return -E_INVALID_ARGS;
122     }
123 
124     int errCode = SyncStateMachine::Initialize(context, syncInterface, metaData, communicator);
125     if (errCode != E_OK) {
126         return errCode;
127     }
128 
129     timeSync_ = std::make_unique<TimeSync>();
130     dataSync_ = std::make_shared<SingleVerDataSync>();
131     abilitySync_ = std::make_unique<AbilitySync>();
132     if ((timeSync_ == nullptr) || (dataSync_ == nullptr) || (abilitySync_ == nullptr)) {
133         timeSync_ = nullptr;
134         dataSync_ = nullptr;
135         abilitySync_ = nullptr;
136         return -E_OUT_OF_MEMORY;
137     }
138 
139     errCode = timeSync_->Initialize(communicator, metaData, syncInterface, context->GetDeviceId());
140     if (errCode != E_OK) {
141         goto ERROR_OUT;
142     }
143     errCode = dataSync_->Initialize(syncInterface, communicator, metaData, context->GetDeviceId());
144     if (errCode != E_OK) {
145         goto ERROR_OUT;
146     }
147     errCode = abilitySync_->Initialize(communicator, syncInterface, metaData, context->GetDeviceId());
148     if (errCode != E_OK) {
149         goto ERROR_OUT;
150     }
151 
152     currentState_ = IDLE;
153     context_ = static_cast<SingleVerSyncTaskContext *>(context);
154     syncInterface_ = static_cast<SingleVerKvDBSyncInterface *>(syncInterface);
155 
156     InitStateSwitchTables();
157     InitStateMapping();
158     return E_OK;
159 
160 ERROR_OUT:
161     Clear();
162     return errCode;
163 }
164 
SyncStep()165 void SingleVerSyncStateMachine::SyncStep()
166 {
167     RefObject::IncObjRef(context_);
168     RefObject::IncObjRef(communicator_);
169     int errCode = RuntimeContext::GetInstance()->ScheduleTask(
170         std::bind(&SingleVerSyncStateMachine::SyncStepInnerLocked, this));
171     if (errCode != E_OK) {
172         LOGE("[StateMachine][SyncStep] Schedule SyncStep failed");
173         RefObject::DecObjRef(communicator_);
174         RefObject::DecObjRef(context_);
175     }
176 }
177 
ReceiveMessageCallback(Message * inMsg)178 int SingleVerSyncStateMachine::ReceiveMessageCallback(Message *inMsg)
179 {
180     int errCode = MessageCallbackPre(inMsg);
181     if (errCode != E_OK) {
182         return errCode;
183     }
184     switch (inMsg->GetMessageId()) {
185         case TIME_SYNC_MESSAGE:
186             errCode = TimeMarkSyncRecv(inMsg);
187             break;
188         case ABILITY_SYNC_MESSAGE:
189             errCode = AbilitySyncRecv(inMsg);
190             break;
191         case DATA_SYNC_MESSAGE:
192         case QUERY_SYNC_MESSAGE:
193             errCode = DataPktRecv(inMsg);
194             break;
195         case CONTROL_SYNC_MESSAGE:
196             errCode = ControlPktRecv(inMsg);
197             break;
198         default:
199             errCode = -E_NOT_SUPPORT;
200     }
201     return errCode;
202 }
203 
SyncStepInnerLocked()204 void SingleVerSyncStateMachine::SyncStepInnerLocked()
205 {
206     if (context_->IncUsedCount() != E_OK) {
207         goto SYNC_STEP_OUT;
208     }
209     {
210         std::lock_guard<std::mutex> lock(stateMachineLock_);
211         SyncStepInner();
212     }
213     context_->SafeExit();
214 
215 SYNC_STEP_OUT:
216     RefObject::DecObjRef(communicator_);
217     RefObject::DecObjRef(context_);
218 }
219 
SyncStepInner()220 void SingleVerSyncStateMachine::SyncStepInner()
221 {
222     Event event = INNER_ERR_EVENT;
223     do {
224         auto iter = stateMapping_.find(currentState_);
225         if (iter != stateMapping_.end()) {
226             event = static_cast<Event>(iter->second());
227         } else {
228             LOGE("[StateMachine][SyncStepInner] can not find state=%d,label=%s,dev=%s", currentState_,
229                 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
230             break;
231         }
232     } while (event != Event::WAIT_ACK_EVENT && SwitchMachineState(event) == E_OK && currentState_ != IDLE);
233 }
234 
SetCurStateErrStatus()235 void SingleVerSyncStateMachine::SetCurStateErrStatus()
236 {
237     currentState_ = State::INNER_ERR;
238 }
239 
StartSyncInner()240 int SingleVerSyncStateMachine::StartSyncInner()
241 {
242     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
243     if (performance != nullptr) {
244         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
245     }
246     int errCode = PrepareNextSyncTask();
247     if (errCode == E_OK) {
248         SwitchStateAndStep(Event::START_SYNC_EVENT);
249     }
250     return errCode;
251 }
252 
AbortInner()253 void SingleVerSyncStateMachine::AbortInner()
254 {
255     LOGE("[StateMachine][AbortInner] error occurred,abort,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
256         STR_MASK(context_->GetDeviceId()));
257     if (context_->IsKilled()) {
258         dataSync_->ClearDataMsg();
259     }
260     dataSync_->ClearSyncStatus();
261     ContinueToken token;
262     context_->GetContinueToken(token);
263     if (token != nullptr) {
264         syncInterface_->ReleaseContinueToken(token);
265     }
266     context_->SetContinueToken(nullptr);
267     context_->Clear();
268 }
269 
GetStateSwitchTables() const270 const std::vector<StateSwitchTable> &SingleVerSyncStateMachine::GetStateSwitchTables() const
271 {
272     return stateSwitchTables_;
273 }
274 
PrepareNextSyncTask()275 int SingleVerSyncStateMachine::PrepareNextSyncTask()
276 {
277     int errCode = StartWatchDog();
278     if (errCode != E_OK) {
279         LOGE("[StateMachine][PrepareNextSyncTask] WatchDog start failed,err=%d", errCode);
280         return errCode;
281     }
282 
283     if (currentState_ != State::IDLE && currentState_ != State::SYNC_TASK_FINISHED) {
284         LOGW("[StateMachine][PrepareNextSyncTask] PreSync may get an err, state=%" PRIu8 ",dev=%s",
285             currentState_, STR_MASK(context_->GetDeviceId()));
286         currentState_ = State::IDLE;
287     }
288     return E_OK;
289 }
290 
SendNotifyPacket(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)291 void SingleVerSyncStateMachine::SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
292 {
293     dataSync_->SendSaveDataNotifyPacket(context_,
294         std::min(context_->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT), sessionId, sequenceId, inMsgId);
295 }
296 
CommErrAbort(uint32_t sessionId)297 void SingleVerSyncStateMachine::CommErrAbort(uint32_t sessionId)
298 {
299     std::lock_guard<std::mutex> lock(stateMachineLock_);
300     uint32_t requestSessionId = context_->GetRequestSessionId();
301     if ((sessionId != 0) && ((requestSessionId == 0) || (sessionId != requestSessionId))) {
302         return;
303     }
304     context_->SetCommNormal(false);
305     if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
306         SyncStep();
307     }
308 }
309 
InitStateSwitchTables()310 void SingleVerSyncStateMachine::InitStateSwitchTables()
311 {
312     if (isStateSwitchTableInited_) {
313         return;
314     }
315 
316     std::lock_guard<std::mutex> lock(stateSwitchTableLock_);
317     if (isStateSwitchTableInited_) {
318         return;
319     }
320 
321     InitStateSwitchTable(SINGLE_VER_SYNC_PROCTOL_V3, STATE_SWITCH_TABLE_V3);
322     std::sort(stateSwitchTables_.begin(), stateSwitchTables_.end(),
323         [](const auto &tableA, const auto &tableB) {
324             return tableA.version > tableB.version;
325         }); // descending
326     isStateSwitchTableInited_ = true;
327 }
328 
InitStateSwitchTable(uint32_t version,const std::vector<std::vector<uint8_t>> & switchTable)329 void SingleVerSyncStateMachine::InitStateSwitchTable(uint32_t version,
330     const std::vector<std::vector<uint8_t>> &switchTable)
331 {
332     StateSwitchTable table;
333     table.version = version;
334     for (const auto &stateSwitch : switchTable) {
335         if (stateSwitch.size() <= OUTPUT_STATE_INDEX) {
336             LOGE("[StateMachine][InitSwitchTable] stateSwitch size err,size=%zu", stateSwitch.size());
337             return;
338         }
339         if (table.switchTable.count(stateSwitch[CURRENT_STATE_INDEX]) == 0) {
340             EventToState eventToState; // new EventToState
341             eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
342             table.switchTable[stateSwitch[CURRENT_STATE_INDEX]] = eventToState;
343         } else { // key stateSwitch[CURRENT_STATE_INDEX] already has EventToState
344             EventToState &eventToState = table.switchTable[stateSwitch[CURRENT_STATE_INDEX]];
345             eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
346         }
347     }
348     stateSwitchTables_.push_back(table);
349 }
350 
InitStateMapping()351 void SingleVerSyncStateMachine::InitStateMapping()
352 {
353     stateMapping_[TIME_SYNC] = std::bind(&SingleVerSyncStateMachine::DoTimeSync, this);
354     stateMapping_[ABILITY_SYNC] = std::bind(&SingleVerSyncStateMachine::DoAbilitySync, this);
355     stateMapping_[WAIT_FOR_RECEIVE_DATA_FINISH] = std::bind(&SingleVerSyncStateMachine::DoWaitForDataRecv, this);
356     stateMapping_[SYNC_TASK_FINISHED] = std::bind(&SingleVerSyncStateMachine::DoSyncTaskFinished, this);
357     stateMapping_[SYNC_TIME_OUT] = std::bind(&SingleVerSyncStateMachine::DoTimeout, this);
358     stateMapping_[INNER_ERR] = std::bind(&SingleVerSyncStateMachine::DoInnerErr, this);
359     stateMapping_[START_INITIACTIVE_DATA_SYNC] =
360         std::bind(&SingleVerSyncStateMachine::DoInitiactiveDataSyncWithSlidingWindow, this);
361     stateMapping_[START_PASSIVE_DATA_SYNC] =
362         std::bind(&SingleVerSyncStateMachine::DoPassiveDataSyncWithSlidingWindow, this);
363     stateMapping_[SYNC_CONTROL_CMD] = std::bind(&SingleVerSyncStateMachine::DoInitiactiveControlSync, this);
364 }
365 
DoInitiactiveDataSyncWithSlidingWindow()366 Event SingleVerSyncStateMachine::DoInitiactiveDataSyncWithSlidingWindow()
367 {
368     LOGD("[StateMachine][activeDataSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
369         dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
370     int errCode = E_OK;
371     switch (context_->GetMode()) {
372         case SyncModeType::PUSH:
373         case SyncModeType::QUERY_PUSH:
374             context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
375             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
376             break;
377         case SyncModeType::PULL:
378         case SyncModeType::QUERY_PULL:
379             context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
380             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
381             break;
382         case SyncModeType::PUSH_AND_PULL:
383         case SyncModeType::QUERY_PUSH_PULL:
384             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
385             break;
386         case SyncModeType::RESPONSE_PULL:
387             errCode = dataSync_->SyncStart(context_->GetMode(), context_);
388             break;
389         default:
390             errCode = -E_NOT_SUPPORT;
391             break;
392     }
393     if (errCode == E_OK) {
394         return Event::WAIT_ACK_EVENT;
395     }
396     // once E_EKEYREVOKED error occurred, PUSH_AND_PULL mode should wait for ack to pull remote data.
397     if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
398         errCode == -E_EKEYREVOKED) {
399         return Event::WAIT_ACK_EVENT;
400     }
401 
402     // when response task step dataSync again while request task is running,  ignore the errCode
403     bool ignoreInnerErr = (context_->GetResponseSessionId() != 0 && context_->GetRequestSessionId() != 0);
404     Event event = TransformErrCodeToEvent(errCode);
405     return (ignoreInnerErr && event == INNER_ERR_EVENT) ? SEND_FINISHED_EVENT : event;
406 }
407 
DoPassiveDataSyncWithSlidingWindow()408 Event SingleVerSyncStateMachine::DoPassiveDataSyncWithSlidingWindow()
409 {
410     {
411         RefObject::AutoLock lock(context_);
412         if (context_->GetRspTargetQueueSize() != 0) {
413             PreStartPullResponse();
414         } else if (context_->GetResponseSessionId() == 0 ||
415             context_->GetRetryStatus() == static_cast<int>(SyncTaskContext::NO_NEED_RETRY)) {
416             return RESPONSE_TASK_FINISHED_EVENT;
417         }
418     }
419     int errCode = dataSync_->SyncStart(SyncModeType::RESPONSE_PULL, context_);
420     if (errCode != E_OK) {
421         LOGE("[SingleVerSyncStateMachine][DoPassiveDataSyncWithSlidingWindow] response pull send failed[%d]", errCode);
422         return RESPONSE_TASK_FINISHED_EVENT;
423     }
424     return Event::WAIT_ACK_EVENT;
425 }
426 
DoInitiactiveControlSync()427 Event SingleVerSyncStateMachine::DoInitiactiveControlSync()
428 {
429     LOGD("[StateMachine][activeControlSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
430         dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
431     context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
432     int errCode = dataSync_->ControlCmdStart(context_);
433     if (errCode == E_OK) {
434         return Event::WAIT_ACK_EVENT;
435     }
436     context_->SetTaskErrCode(errCode);
437     return TransformErrCodeToEvent(errCode);
438 }
439 
HandleControlAckRecv(const Message * inMsg)440 int SingleVerSyncStateMachine::HandleControlAckRecv(const Message *inMsg)
441 {
442     std::lock_guard<std::mutex> lock(stateMachineLock_);
443     if (IsNeedResetWatchdog(inMsg)) {
444         (void)ResetWatchDog();
445     }
446     int errCode = dataSync_->ControlCmdAckRecv(context_, inMsg);
447     ControlAckRecvErrCodeHandle(errCode);
448     SwitchStateAndStep(TransformErrCodeToEvent(errCode));
449     return E_OK;
450 }
451 
DoWaitForDataRecv() const452 Event SingleVerSyncStateMachine::DoWaitForDataRecv() const
453 {
454     if (context_->GetRspTargetQueueSize() != 0) {
455         return START_PULL_RESPONSE_EVENT;
456     }
457     if (context_->GetOperationStatus() == SyncOperation::OP_FINISHED_ALL) {
458         return RECV_FINISHED_EVENT;
459     }
460     if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
461         context_->GetOperationStatus() == SyncOperation::OP_EKEYREVOKED_FAILURE &&
462         context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
463         return RECV_FINISHED_EVENT;
464     }
465     return Event::WAIT_ACK_EVENT;
466 }
467 
DoTimeSync()468 Event SingleVerSyncStateMachine::DoTimeSync()
469 {
470     if (timeSync_->IsNeedSync()) {
471         CommErrHandler handler = nullptr;
472         // Auto sync need do retry don't use errHandler to return.
473         if (!context_->IsAutoSync()) {
474             handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
475                 context_, context_->GetRequestSessionId());
476         }
477         int errCode = timeSync_->SyncStart(handler, context_->GetRequestSessionId());
478         if (errCode == E_OK) {
479             return Event::WAIT_ACK_EVENT;
480         }
481         context_->SetTaskErrCode(errCode);
482         return TransformErrCodeToEvent(errCode);
483     }
484 
485     return Event::TIME_SYNC_FINISHED_EVENT;
486 }
487 
DoAbilitySync()488 Event SingleVerSyncStateMachine::DoAbilitySync()
489 {
490     uint16_t remoteCommunicatorVersion = 0;
491     int errCode = communicator_->GetRemoteCommunicatorVersion(context_->GetDeviceId(), remoteCommunicatorVersion);
492     if (errCode != E_OK) {
493         LOGE("[StateMachine][DoAbilitySync] Get RemoteCommunicatorVersion errCode=%d", errCode);
494         return Event::INNER_ERR_EVENT;
495     }
496     // Fistr version, not support AbilitySync
497     if (remoteCommunicatorVersion == 0) {
498         context_->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
499         return GetEventAfterTimeSync(context_->GetMode());
500     }
501     if (context_->GetIsNeedResetAbilitySync()) {
502         abilitySync_->SetAbilitySyncFinishedStatus(false);
503         context_->SetIsNeedResetAbilitySync(false);
504     }
505     if (abilitySync_->GetAbilitySyncFinishedStatus()) {
506         return GetEventAfterTimeSync(context_->GetMode());
507     }
508 
509     CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
510         context_, context_->GetRequestSessionId());
511     LOGI("[StateMachine][AbilitySync] start abilitySync,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
512         STR_MASK(context_->GetDeviceId()));
513     errCode = abilitySync_->SyncStart(context_->GetRequestSessionId(), context_->GetSequenceId(),
514         remoteCommunicatorVersion, handler);
515     if (errCode != E_OK) {
516         LOGE("[StateMachine][DoAbilitySync] ability sync start failed,errCode=%d", errCode);
517         return TransformErrCodeToEvent(errCode);
518     }
519     return Event::WAIT_ACK_EVENT;
520 }
521 
GetEventAfterTimeSync(int mode) const522 Event SingleVerSyncStateMachine::GetEventAfterTimeSync(int mode) const
523 {
524     if (mode == SyncModeType::SUBSCRIBE_QUERY || mode == SyncModeType::UNSUBSCRIBE_QUERY) {
525         return Event::CONTROL_CMD_EVENT;
526     }
527     return Event::ABILITY_SYNC_FINISHED_EVENT;
528 }
529 
DoSyncTaskFinished()530 Event SingleVerSyncStateMachine::DoSyncTaskFinished()
531 {
532     StopWatchDog();
533     dataSync_->ClearSyncStatus();
534     RefObject::AutoLock lock(syncContext_);
535     int errCode = ExecNextTask();
536     if (errCode == E_OK) {
537         return Event::START_SYNC_EVENT;
538     }
539     return TransformErrCodeToEvent(errCode);
540 }
541 
DoTimeout()542 Event SingleVerSyncStateMachine::DoTimeout()
543 {
544     RefObject::AutoLock lock(context_);
545     if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
546         std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
547         if (subManager != nullptr) {
548             subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
549         }
550     }
551     context_->Abort(SyncOperation::OP_TIMEOUT);
552     context_->Clear();
553     AbortInner();
554     return Event::ANY_EVENT;
555 }
556 
DoInnerErr()557 Event SingleVerSyncStateMachine::DoInnerErr()
558 {
559     RefObject::AutoLock lock(context_);
560     if (!context_->IsCommNormal()) {
561         if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
562             std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
563             if (subManager != nullptr) {
564                 subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
565             }
566         }
567         context_->Abort(SyncOperation::OP_COMM_ABNORMAL);
568     } else {
569         int status = GetSyncOperationStatus(context_->GetTaskErrCode());
570         context_->Abort(status);
571     }
572     context_->Clear();
573     AbortInner();
574     return Event::ANY_EVENT;
575 }
576 
AbilitySyncRecv(const Message * inMsg)577 int SingleVerSyncStateMachine::AbilitySyncRecv(const Message *inMsg)
578 {
579     if (inMsg->GetMessageType() == TYPE_REQUEST) {
580         return abilitySync_->RequestRecv(inMsg, context_);
581     }
582 
583     if (inMsg->GetMessageType() == TYPE_RESPONSE && AbilityMsgSessionIdCheck(inMsg)) {
584         std::lock_guard<std::mutex> lock(stateMachineLock_);
585         int errCode = abilitySync_->AckRecv(inMsg, context_);
586         (void)ResetWatchDog();
587         if (errCode != E_OK) {
588             LOGE("[StateMachine][AbilitySyncRecv] handle ackRecv failed,errCode=%d", errCode);
589             SwitchStateAndStep(TransformErrCodeToEvent(errCode));
590         } else if (context_->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_2_0) {
591             abilitySync_->SetAbilitySyncFinishedStatus(true);
592             LOGI("[StateMachine][AbilitySyncRecv] ability Sync Finished,label=%s,dev=%s",
593                 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
594             currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
595             JumpStatusAfterAbilitySync(context_->GetMode());
596         }
597         return E_OK;
598     }
599     if (inMsg->GetMessageType() == TYPE_NOTIFY) {
600             const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
601             if (packet == nullptr) {
602                 return -E_INVALID_ARGS;
603             }
604             int ackCode = packet->GetAckCode();
605             if (ackCode != AbilitySync::CHECK_SUCCESS && ackCode != AbilitySync::LAST_NOTIFY) {
606                 LOGE("[StateMachine][AbilitySyncRecv] ackCode check failed,ackCode=%d", ackCode);
607                 context_->SetTaskErrCode(ackCode);
608                 std::lock_guard<std::mutex> lock(stateMachineLock_);
609                 SwitchStateAndStep(Event::INNER_ERR_EVENT);
610                 return E_OK;
611             }
612             if (ackCode == AbilitySync::LAST_NOTIFY && AbilityMsgSessionIdCheck(inMsg)) {
613                 abilitySync_->SetAbilitySyncFinishedStatus(true);
614                 // while recv last notify means ability sync finished,it is better to reset watchDog to avoid timeout.
615                 LOGI("[StateMachine][AbilitySyncRecv] ability sync finished,label=%s,dev=%s",
616                     dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
617                 currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
618                 (static_cast<SingleVerSyncTaskContext *>(context_))->SetIsSchemaSync(true);
619                 std::lock_guard<std::mutex> lock(stateMachineLock_);
620                 (void)ResetWatchDog();
621                 JumpStatusAfterAbilitySync(context_->GetMode());
622             } else if (ackCode != AbilitySync::LAST_NOTIFY) {
623                 abilitySync_->AckNotifyRecv(inMsg, context_);
624             }
625         return E_OK;
626     }
627 
628     LOGE("[StateMachine][AbilitySyncRecv] msg type invalid");
629     return -E_NOT_SUPPORT;
630 }
631 
HandleDataRequestRecv(const Message * inMsg)632 int SingleVerSyncStateMachine::HandleDataRequestRecv(const Message *inMsg)
633 {
634     TimeOffset offset = 0;
635     uint32_t timeout = communicator_->GetTimeout(context_->GetDeviceId());
636     // If message is data sync request, we should check timeoffset.
637     int errCode = timeSync_->GetTimeOffset(offset, timeout);
638     if (errCode != E_OK) {
639         LOGE("[StateMachine][HandleDataRequestRecv] GetTimeOffset err! errCode=%d", errCode);
640         return errCode;
641     }
642     context_->SetTimeOffset(offset);
643     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
644     if (performance != nullptr) {
645         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
646     }
647     DecRefCountOfFeedDogTimer(SyncDirectionFlag::RECEIVE);
648     {
649         std::lock_guard<std::mutex> lockWatchDog(stateMachineLock_);
650         if (IsNeedResetWatchdog(inMsg)) {
651             (void)ResetWatchDog();
652         }
653     }
654 
655     // RequestRecv will save data, it may cost a long time.
656     // So we need to send save data notify to keep remote alive.
657     bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
658     WaterMark pullEndWaterkark = 0;
659     errCode = dataSync_->DataRequestRecv(context_, inMsg, pullEndWaterkark);
660     if (performance != nullptr) {
661         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
662     }
663     if (isNeedStop) {
664         StopSaveDataNotify();
665     }
666     // only higher than 102 version receive this errCode here.
667     // while both RequestSessionId is not equal,but get this errCode;slwr would seem to handle first secquencid.
668     // so while receive the same secquencid after abiitysync it wouldn't handle.
669     if (errCode == -E_NEED_ABILITY_SYNC) {
670         return errCode;
671     }
672     std::lock_guard<std::mutex> lock(stateMachineLock_);
673     DataRecvErrCodeHandle(inMsg->GetSessionId(), errCode);
674     if (pullEndWaterkark > 0) {
675         AddPullResponseTarget(inMsg, pullEndWaterkark);
676     }
677     return E_OK;
678 }
679 
HandleDataAckRecvWithSlidingWindow(int errCode,const Message * inMsg,bool ignoreInnerErr)680 void SingleVerSyncStateMachine::HandleDataAckRecvWithSlidingWindow(int errCode, const Message *inMsg,
681     bool ignoreInnerErr)
682 {
683     if (errCode == -E_RE_SEND_DATA) { // LOCAL_WATER_MARK_NOT_INIT
684         dataSync_->ClearSyncStatus();
685     }
686     if (errCode == -E_NO_DATA_SEND || errCode == -E_SEND_DATA) {
687         int ret = dataSync_->TryContinueSync(context_, inMsg);
688         if (ret == -E_FINISHED) {
689             SwitchStateAndStep(Event::SEND_FINISHED_EVENT);
690             return;
691         } else if (ret == E_OK) { // do nothing and waiting for all ack receive
692             return;
693         }
694         errCode = ret;
695     }
696     ResponsePullError(errCode, ignoreInnerErr);
697 }
698 
NeedAbilitySyncHandle()699 void SingleVerSyncStateMachine::NeedAbilitySyncHandle()
700 {
701     // if the remote device version num is overdue,
702     // mean the version num has been reset when syncing data,
703     // there should not clear the new version cache again.
704     if (currentRemoteVersionId_ == context_->GetRemoteSoftwareVersionId()) {
705         LOGI("[StateMachine] set remote version 0, currentRemoteVersionId_ = %" PRIu64, currentRemoteVersionId_);
706         context_->SetRemoteSoftwareVersion(0);
707     } else {
708         currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
709     }
710     abilitySync_->SetAbilitySyncFinishedStatus(false);
711     dataSync_->ClearSyncStatus();
712 }
713 
HandleDataAckRecv(const Message * inMsg)714 int SingleVerSyncStateMachine::HandleDataAckRecv(const Message *inMsg)
715 {
716     if (inMsg->GetMessageType() == TYPE_RESPONSE) {
717         DecRefCountOfFeedDogTimer(SyncDirectionFlag::SEND);
718     }
719     std::lock_guard<std::mutex> lock(stateMachineLock_);
720     // Unfortunately we use stateMachineLock_ in many sync process
721     // So a bad ack will check before the lock and wait
722     // And then another process is running, it will get the lock.After this process, the ack became invalid.
723     // If we don't check ack again, it will be delivered to dataSyncer.
724     if (!IsPacketValid(inMsg)) {
725         return -E_INVALID_ARGS;
726     }
727     if (IsNeedResetWatchdog(inMsg)) {
728         (void)ResetWatchDog();
729     }
730     if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 && !dataSync_->AckPacketIdCheck(inMsg)) {
731         // packetId not match but sequence id matched scene, means resend map has be rebuilt
732         // this is old ack, shoulb be dropped and wait for the same packetId sequence.
733         return E_OK;
734     }
735     // AckRecv will save meta data, it may cost a long time. if another thread is saving data
736     // So we need to send save data notify to keep remote alive.
737     // eg. remote do pull sync
738     bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
739     int errCode = dataSync_->AckRecv(context_, inMsg);
740     if (isNeedStop) {
741         StopSaveDataNotify();
742     }
743     if (errCode == -E_NEED_ABILITY_SYNC || errCode == -E_RE_SEND_DATA) {
744         StopFeedDogForSync(SyncDirectionFlag::SEND);
745     } else if (errCode == -E_SAVE_DATA_NOTIFY) {
746         return errCode;
747     }
748     // when this msg is from response task while request task is running,  ignore the errCode
749     bool ignoreInnerErr = inMsg->GetSessionId() == context_->GetResponseSessionId() &&
750         context_->GetRequestSessionId() != 0;
751     DataAckRecvErrCodeHandle(errCode, !ignoreInnerErr);
752     HandleDataAckRecvWithSlidingWindow(errCode, inMsg, ignoreInnerErr);
753     return errCode;
754 }
755 
DataPktRecv(Message * inMsg)756 int SingleVerSyncStateMachine::DataPktRecv(Message *inMsg)
757 {
758     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
759     int errCode = E_OK;
760     switch (inMsg->GetMessageType()) {
761         case TYPE_REQUEST:
762             ScheduleMsgAndHandle(inMsg);
763             errCode = -E_NOT_NEED_DELETE_MSG;
764             break;
765         case TYPE_RESPONSE:
766         case TYPE_NOTIFY:
767             if (performance != nullptr) {
768                 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
769                 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
770             }
771             errCode = HandleDataAckRecv(inMsg);
772             break;
773         default:
774             errCode = -E_INVALID_ARGS;
775             break;
776     }
777     return errCode;
778 }
779 
ScheduleMsgAndHandle(Message * inMsg)780 void SingleVerSyncStateMachine::ScheduleMsgAndHandle(Message *inMsg)
781 {
782     dataSync_->PutDataMsg(inMsg);
783     while (true) {
784         bool isNeedHandle = true;
785         bool isNeedContinue = true;
786         Message *msg = dataSync_->MoveNextDataMsg(context_, isNeedHandle, isNeedContinue);
787         if (!isNeedContinue) {
788             break;
789         }
790         if (msg == nullptr) {
791             if (dataSync_->IsNeedReloadQueue()) {
792                 continue;
793             }
794             break;
795         }
796         bool isNeedClearMap = false;
797         if (isNeedHandle) {
798             int errCode = HandleDataRequestRecv(msg);
799             if (context_->IsReceiveWaterMarkErr() || errCode == -E_NEED_ABILITY_SYNC) {
800                 isNeedClearMap = true;
801             }
802             if (errCode == -E_TIMEOUT) {
803                 isNeedHandle = false;
804             }
805         } else {
806             dataSync_->SendFinishedDataAck(context_, msg);
807         }
808         if (context_->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
809             // for lower version, no need to handle map schedule info, just reset schedule working status
810             isNeedHandle = false;
811         }
812         dataSync_->ScheduleInfoHandle(isNeedHandle, isNeedClearMap, msg);
813         delete msg;
814     }
815 }
816 
ControlPktRecv(Message * inMsg)817 int SingleVerSyncStateMachine::ControlPktRecv(Message *inMsg)
818 {
819     int errCode = E_OK;
820     switch (inMsg->GetMessageType()) {
821         case TYPE_REQUEST:
822             errCode = dataSync_->ControlCmdRequestRecv(context_, inMsg);
823             break;
824         case TYPE_RESPONSE:
825             errCode = HandleControlAckRecv(inMsg);
826             break;
827         default:
828             errCode = -E_INVALID_ARGS;
829             break;
830     }
831     return errCode;
832 }
833 
StepToTimeout(TimerId timerId)834 void SingleVerSyncStateMachine::StepToTimeout(TimerId timerId)
835 {
836     std::lock_guard<std::mutex> lock(stateMachineLock_);
837     TimerId timer = syncContext_->GetTimerId();
838     if (timer != timerId) {
839         return;
840     }
841     SwitchStateAndStep(Event::TIME_OUT_EVENT);
842 }
843 
GetSyncOperationStatus(int errCode) const844 int SingleVerSyncStateMachine::GetSyncOperationStatus(int errCode) const
845 {
846     static const std::map<int, int> statusMap = {
847         { -E_SCHEMA_MISMATCH,                 SyncOperation::OP_SCHEMA_INCOMPATIBLE },
848         { -E_EKEYREVOKED,                     SyncOperation::OP_EKEYREVOKED_FAILURE },
849         { -E_SECURITY_OPTION_CHECK_ERROR,     SyncOperation::OP_SECURITY_OPTION_CHECK_FAILURE },
850         { -E_BUSY,                            SyncOperation::OP_BUSY_FAILURE },
851         { -E_NOT_PERMIT,                      SyncOperation::OP_PERMISSION_CHECK_FAILED },
852         { -E_TIMEOUT,                         SyncOperation::OP_TIMEOUT },
853         { -E_INVALID_QUERY_FORMAT,            SyncOperation::OP_QUERY_FORMAT_FAILURE },
854         { -E_INVALID_QUERY_FIELD,             SyncOperation::OP_QUERY_FIELD_FAILURE },
855         { -E_FEEDBACK_UNKNOWN_MESSAGE,        SyncOperation::OP_NOT_SUPPORT },
856         { -E_FEEDBACK_COMMUNICATOR_NOT_FOUND, SyncOperation::OP_COMM_ABNORMAL },
857         { -E_NOT_SUPPORT,                     SyncOperation::OP_NOT_SUPPORT },
858         { -E_INTERCEPT_DATA_FAIL,             SyncOperation::OP_INTERCEPT_DATA_FAIL },
859         { -E_MAX_LIMITS,                      SyncOperation::OP_MAX_LIMITS },
860         { -E_DISTRIBUTED_SCHEMA_CHANGED,      SyncOperation::OP_SCHEMA_CHANGED },
861         { -E_NOT_REGISTER,                    SyncOperation::OP_NOT_SUPPORT },
862         { -E_DENIED_SQL,                      SyncOperation::OP_DENIED_SQL },
863         { -E_REMOTE_OVER_SIZE,                SyncOperation::OP_MAX_LIMITS },
864         { -E_INVALID_PASSWD_OR_CORRUPTED_DB,  SyncOperation::OP_NOTADB_OR_CORRUPTED },
865     };
866     auto iter = statusMap.find(errCode);
867     if (iter != statusMap.end()) {
868         return iter->second;
869     }
870     return SyncOperation::OP_FAILED;
871 }
872 
TimeMarkSyncRecv(const Message * inMsg)873 int SingleVerSyncStateMachine::TimeMarkSyncRecv(const Message *inMsg)
874 {
875     LOGD("[StateMachine][TimeMarkSyncRecv] type=%d,label=%s,dev=%s", inMsg->GetMessageType(),
876         dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
877     {
878         std::lock_guard<std::mutex> lock(stateMachineLock_);
879         (void)ResetWatchDog();
880     }
881     if (inMsg->GetMessageType() == TYPE_REQUEST) {
882         return timeSync_->RequestRecv(inMsg);
883     } else if (inMsg->GetMessageType() == TYPE_RESPONSE) {
884         int errCode = timeSync_->AckRecv(inMsg, context_->GetRequestSessionId());
885         if (errCode != E_OK) {
886             LOGE("[StateMachine][TimeMarkSyncRecv] AckRecv failed errCode=%d", errCode);
887             if (inMsg->GetSessionId() != 0 && inMsg->GetSessionId() == context_->GetRequestSessionId()) {
888                 context_->SetTaskErrCode(errCode);
889                 InnerErrorAbort(inMsg->GetSessionId());
890             }
891             return errCode;
892         }
893         std::lock_guard<std::mutex> lock(stateMachineLock_);
894         SwitchStateAndStep(TIME_SYNC_FINISHED_EVENT);
895         return E_OK;
896     } else {
897         return -E_INVALID_ARGS;
898     }
899 }
900 
Clear()901 void SingleVerSyncStateMachine::Clear()
902 {
903     dataSync_ = nullptr;
904     timeSync_ = nullptr;
905     abilitySync_ = nullptr;
906     context_ = nullptr;
907     syncInterface_ = nullptr;
908 }
909 
IsPacketValid(const Message * inMsg) const910 bool SingleVerSyncStateMachine::IsPacketValid(const Message *inMsg) const
911 {
912     if (inMsg == nullptr) {
913         return false;
914     }
915 
916     if ((inMsg->GetMessageId() < TIME_SYNC_MESSAGE) || (inMsg->GetMessageId() >= UNKNOW_MESSAGE)) {
917         LOGE("[StateMachine][IsPacketValid] Message is invalid, id=%d", inMsg->GetMessageId());
918         return false;
919     }
920     // filter invalid ack at first
921     bool isResponseType = (inMsg->GetMessageType() == TYPE_RESPONSE);
922     if (isResponseType && (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) &&
923         (inMsg->GetSessionId() != context_->GetRequestSessionId())) {
924         LOGE("[StateMachine][IsPacketValid] Control Message is invalid, label=%s, dev=%s",
925             dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
926         return false;
927     }
928     if (isResponseType && (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) &&
929         (inMsg->GetSessionId() != context_->GetRequestSessionId()) &&
930         (inMsg->GetSessionId() != context_->GetResponseSessionId())) {
931         LOGE("[StateMachine][IsPacketValid] Data Message is invalid, label=%s, dev=%s",
932             dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
933         return false;
934     }
935 
936     // timeSync and abilitySync don't need to check sequenceId
937     if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE || inMsg->GetMessageId() == ABILITY_SYNC_MESSAGE ||
938         inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
939         return true;
940     }
941     // sequenceId will be checked in dataSync
942     return true;
943 }
944 
PreStartPullResponse()945 void SingleVerSyncStateMachine::PreStartPullResponse()
946 {
947     SingleVerSyncTarget target;
948     context_->PopResponseTarget(target);
949     context_->SetEndMark(target.GetEndWaterMark());
950     context_->SetResponseSessionId(target.GetResponseSessionId());
951     context_->SetMode(SyncModeType::RESPONSE_PULL);
952     context_->ReSetSequenceId();
953     context_->SetQuerySync(target.IsQuerySync());
954     context_->SetQuery(target.GetQuery());
955 }
956 
CheckIsStartPullResponse() const957 bool SingleVerSyncStateMachine::CheckIsStartPullResponse() const
958 {
959     // Other state will step to do pull response, only this statem we need to step the statemachine
960     if (currentState_ == WAIT_FOR_RECEIVE_DATA_FINISH) {
961         return true;
962     }
963     return false;
964 }
965 
MessageCallbackPre(const Message * inMsg)966 int SingleVerSyncStateMachine::MessageCallbackPre(const Message *inMsg)
967 {
968     RefObject::AutoLock lock(context_);
969     if (context_->IsKilled()) {
970         return -E_OBJ_IS_KILLED;
971     }
972 
973     if (!IsPacketValid(inMsg)) {
974         return -E_INVALID_ARGS;
975     }
976     return E_OK;
977 }
978 
AddPullResponseTarget(const Message * inMsg,WaterMark pullEndWatermark)979 void SingleVerSyncStateMachine::AddPullResponseTarget(const Message *inMsg, WaterMark pullEndWatermark)
980 {
981     int messageType = static_cast<int>(inMsg->GetMessageId());
982     uint32_t sessionId = inMsg->GetSessionId();
983     if (pullEndWatermark == 0) {
984         LOGE("[StateMachine][AddPullResponseTarget] pullEndWatermark is 0!");
985         return;
986     }
987     if (context_->GetResponseSessionId() == sessionId || context_->FindResponseSyncTarget(sessionId)) {
988         LOGI("[StateMachine][AddPullResponseTarget] task is already running");
989         return;
990     }
991     const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
992     if (packet == nullptr) {
993         LOGE("[AddPullResponseTarget] get packet object failed");
994         return;
995     }
996     SingleVerSyncTarget *targetTmp = new (std::nothrow) SingleVerSyncTarget;
997     if (targetTmp == nullptr) {
998         LOGE("[StateMachine][AddPullResponseTarget] add failed, may oom");
999         return;
1000     }
1001     targetTmp->SetTaskType(ISyncTarget::RESPONSE);
1002     if (messageType == QUERY_SYNC_MESSAGE) {
1003         targetTmp->SetQuery(packet->GetQuery());
1004         targetTmp->SetQuerySync(true);
1005     }
1006     targetTmp->SetMode(SyncModeType::RESPONSE_PULL);
1007     targetTmp->SetEndWaterMark(pullEndWatermark);
1008     targetTmp->SetResponseSessionId(sessionId);
1009     if (context_->AddSyncTarget(targetTmp) != E_OK) {
1010         delete targetTmp;
1011         return;
1012     }
1013     if (CheckIsStartPullResponse()) {
1014         SwitchStateAndStep(TransformErrCodeToEvent(-E_NEED_PULL_REPONSE));
1015     }
1016 }
1017 
TransformErrCodeToEvent(int errCode)1018 Event SingleVerSyncStateMachine::TransformErrCodeToEvent(int errCode)
1019 {
1020     switch (errCode) {
1021         case -E_TIMEOUT:
1022             return TransforTimeOutErrCodeToEvent();
1023         case -VERSION_NOT_SUPPOR_EVENT:
1024             return Event::VERSION_NOT_SUPPOR_EVENT;
1025         case -E_SEND_DATA:
1026             return Event::SEND_DATA_EVENT;
1027         case -E_NO_DATA_SEND:
1028             return Event::SEND_FINISHED_EVENT;
1029         case -E_RECV_FINISHED:
1030             return Event::RECV_FINISHED_EVENT;
1031         case -E_NEED_ABILITY_SYNC:
1032             return Event::NEED_ABILITY_SYNC_EVENT;
1033         case -E_NO_SYNC_TASK:
1034             return Event::ALL_TASK_FINISHED_EVENT;
1035         case -E_NEED_PULL_REPONSE:
1036             return Event::START_PULL_RESPONSE_EVENT;
1037         case -E_RE_SEND_DATA:
1038             return Event::RE_SEND_DATA_EVENT;
1039         default:
1040             return Event::INNER_ERR_EVENT;
1041     }
1042 }
1043 
IsNeedResetWatchdog(const Message * inMsg) const1044 bool SingleVerSyncStateMachine::IsNeedResetWatchdog(const Message *inMsg) const
1045 {
1046     if (inMsg == nullptr) {
1047         return false;
1048     }
1049 
1050     if (IsNeedErrCodeHandle(inMsg->GetSessionId())) {
1051         return true;
1052     }
1053 
1054     int msgType = inMsg->GetMessageType();
1055     if (msgType == TYPE_RESPONSE || msgType == TYPE_NOTIFY) {
1056         if (inMsg->GetSessionId() == context_->GetResponseSessionId()) {
1057             // Pull response ack also should reset watchdog
1058             return true;
1059         }
1060     }
1061 
1062     return false;
1063 }
1064 
TransforTimeOutErrCodeToEvent() const1065 Event SingleVerSyncStateMachine::TransforTimeOutErrCodeToEvent() const
1066 {
1067     if (syncContext_->IsSyncTaskNeedRetry() && (syncContext_->GetRetryTime() < syncContext_->GetSyncRetryTimes())) {
1068         return Event::WAIT_TIME_OUT_EVENT;
1069     } else {
1070         return Event::TIME_OUT_EVENT;
1071     }
1072 }
1073 
IsNeedErrCodeHandle(uint32_t sessionId) const1074 bool SingleVerSyncStateMachine::IsNeedErrCodeHandle(uint32_t sessionId) const
1075 {
1076     // omit to set sessionId so version_3 should skip to compare sessionid.
1077     if (sessionId == context_->GetRequestSessionId() ||
1078         context_->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_RELEASE_2_0) {
1079         return true;
1080     }
1081     return false;
1082 }
1083 
PushPullDataRequestEvokeErrHandle()1084 void SingleVerSyncStateMachine::PushPullDataRequestEvokeErrHandle()
1085 {
1086     // the pushpull sync task should wait for send finished after remote dev get data occur E_EKEYREVOKED error.
1087     if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 &&
1088         SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL) {
1089         LOGI("data request errCode = %d, wait for send finished", -E_EKEYREVOKED);
1090         context_->SetTaskErrCode(-E_EKEYREVOKED);
1091         context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1092         SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1093     } else {
1094         context_->SetTaskErrCode(-E_EKEYREVOKED);
1095         SwitchStateAndStep(Event::INNER_ERR_EVENT);
1096     }
1097 }
1098 
DataRecvErrCodeHandle(uint32_t sessionId,int errCode)1099 void SingleVerSyncStateMachine::DataRecvErrCodeHandle(uint32_t sessionId, int errCode)
1100 {
1101     if (IsNeedErrCodeHandle(sessionId)) {
1102         switch (errCode) {
1103             case E_OK:
1104                 break;
1105             case -E_RECV_FINISHED:
1106                 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1107                 SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1108                 break;
1109             case -E_EKEYREVOKED:
1110                 PushPullDataRequestEvokeErrHandle();
1111                 break;
1112             case -E_BUSY:
1113             case -E_DISTRIBUTED_SCHEMA_CHANGED:
1114             case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1115             case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1116             case -E_FEEDBACK_UNKNOWN_MESSAGE:
1117             case -E_INTERCEPT_DATA_FAIL:
1118             case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1119             case -E_INVALID_QUERY_FIELD:
1120             case -E_INVALID_QUERY_FORMAT:
1121             case -E_MAX_LIMITS:
1122             case -E_NOT_REGISTER:
1123             case -E_NOT_SUPPORT:
1124             case -E_SECURITY_OPTION_CHECK_ERROR:
1125                 context_->SetTaskErrCode(errCode);
1126                 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1127                 break;
1128             default:
1129                 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1130                 break;
1131         }
1132     }
1133 }
1134 
AbilityMsgSessionIdCheck(const Message * inMsg)1135 bool SingleVerSyncStateMachine::AbilityMsgSessionIdCheck(const Message *inMsg)
1136 {
1137     if (inMsg != nullptr && inMsg->GetSessionId() == context_->GetRequestSessionId()) {
1138         return true;
1139     }
1140     LOGE("[AbilitySync] session check failed,dev=%s", STR_MASK(context_->GetDeviceId()));
1141     return false;
1142 }
1143 
GetSyncType(uint32_t messageId) const1144 SyncType SingleVerSyncStateMachine::GetSyncType(uint32_t messageId) const
1145 {
1146     if (messageId == QUERY_SYNC_MESSAGE) {
1147         return SyncType::QUERY_SYNC_TYPE;
1148     }
1149     return SyncType::MANUAL_FULL_SYNC_TYPE;
1150 }
1151 
DataAckRecvErrCodeHandle(int errCode,bool handleError)1152 void SingleVerSyncStateMachine::DataAckRecvErrCodeHandle(int errCode, bool handleError)
1153 {
1154     switch (errCode) {
1155         case -E_NEED_ABILITY_SYNC:
1156             NeedAbilitySyncHandle();
1157             break;
1158         case -E_NOT_PERMIT:
1159             if (handleError) {
1160                 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1161             }
1162             break;
1163         case -E_BUSY:
1164         case -E_DISTRIBUTED_SCHEMA_CHANGED:
1165         case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1166         case -E_EKEYREVOKED:
1167         case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1168         case -E_FEEDBACK_UNKNOWN_MESSAGE:
1169         case -E_INTERCEPT_DATA_FAIL:
1170         case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1171         case -E_INVALID_QUERY_FIELD:
1172         case -E_INVALID_QUERY_FORMAT:
1173         case -E_MAX_LIMITS:
1174         case -E_NOT_REGISTER:
1175         case -E_NOT_SUPPORT:
1176         case -E_SECURITY_OPTION_CHECK_ERROR:
1177             if (handleError) {
1178                 context_->SetTaskErrCode(errCode);
1179             }
1180             break;
1181         default:
1182             break;
1183     }
1184 }
1185 
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)1186 bool SingleVerSyncStateMachine::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
1187 {
1188     return SingleVerDataSyncUtils::IsNeedTriggerQueryAutoSync(inMsg, query);
1189 }
1190 
JumpStatusAfterAbilitySync(int mode)1191 void SingleVerSyncStateMachine::JumpStatusAfterAbilitySync(int mode)
1192 {
1193     if ((mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
1194         SwitchStateAndStep(CONTROL_CMD_EVENT);
1195     } else {
1196         SwitchStateAndStep(ABILITY_SYNC_FINISHED_EVENT);
1197     }
1198 }
1199 
ControlAckRecvErrCodeHandle(int errCode)1200 void SingleVerSyncStateMachine::ControlAckRecvErrCodeHandle(int errCode)
1201 {
1202     switch (errCode) {
1203         case -E_NEED_ABILITY_SYNC:
1204             NeedAbilitySyncHandle();
1205             break;
1206         case -E_NO_DATA_SEND:
1207             context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
1208             break;
1209         case -E_NOT_PERMIT:
1210             context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1211             break;
1212         // other errCode use default
1213         default:
1214             context_->SetTaskErrCode(errCode);
1215             break;
1216     }
1217 }
1218 
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)1219 void SingleVerSyncStateMachine::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
1220 {
1221     metadata_->GetLocalWaterMark(deviceId, outValue);
1222 }
1223 
GetSendQueryWaterMark(const std::string & queryId,const DeviceID & deviceId,bool isAutoLift,uint64_t & outValue)1224 int SingleVerSyncStateMachine::GetSendQueryWaterMark(const std::string &queryId,  const DeviceID &deviceId,
1225     bool isAutoLift, uint64_t &outValue)
1226 {
1227     return metadata_->GetSendQueryWaterMark(queryId, deviceId, outValue, isAutoLift);
1228 }
1229 
ResponsePullError(int errCode,bool ignoreInnerErr)1230 void SingleVerSyncStateMachine::ResponsePullError(int errCode, bool ignoreInnerErr)
1231 {
1232     Event event = TransformErrCodeToEvent(errCode);
1233     if (event == INNER_ERR_EVENT) {
1234         if (ignoreInnerErr) {
1235             event = RESPONSE_TASK_FINISHED_EVENT;
1236         } else if (context_ != nullptr) {
1237             context_->SetTaskErrCode(errCode);
1238         }
1239     }
1240     SwitchStateAndStep(event);
1241 }
1242 
InnerErrorAbort(uint32_t sessionId)1243 void SingleVerSyncStateMachine::InnerErrorAbort(uint32_t sessionId)
1244 {
1245     std::lock_guard<std::mutex> lock(stateMachineLock_);
1246     uint32_t requestSessionId = context_->GetRequestSessionId();
1247     if (sessionId != requestSessionId) {
1248         LOGD("[SingleVerSyncStateMachine][InnerErrorAbort] Ignore abort by different sessionId");
1249         return;
1250     }
1251     if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
1252         SyncStep();
1253     }
1254 }
1255 
NotifyClosing()1256 void SingleVerSyncStateMachine::NotifyClosing()
1257 {
1258     if (timeSync_ != nullptr) {
1259         timeSync_->Close();
1260     }
1261 }
1262 } // namespace DistributedDB
1263