1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_sync_state_machine.h"
17
18 #include <cmath>
19 #include <climits>
20 #include <algorithm>
21
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "sync_operation.h"
25 #include "message_transform.h"
26 #include "sync_types.h"
27 #include "db_common.h"
28 #include "runtime_context.h"
29 #include "performance_analysis.h"
30 #include "single_ver_sync_target.h"
31 #include "single_ver_data_sync.h"
32 #include "single_ver_data_sync_utils.h"
33
34 namespace DistributedDB {
35 using Event = SingleVerSyncStateMachine::Event;
36 using State = SingleVerSyncStateMachine::State;
37 namespace {
38 // used for state switch table
39 const int CURRENT_STATE_INDEX = 0;
40 const int EVENT_INDEX = 1;
41 const int OUTPUT_STATE_INDEX = 2;
42
43 // drop v1 and v2 table by one optimize, dataSend mode in all version go with slide window mode.
44 // State switch table v3, has three columns, CurrentState, Event, and OutSate
45 const std::vector<std::vector<uint8_t>> STATE_SWITCH_TABLE_V3 = {
46 {State::IDLE, Event::START_SYNC_EVENT, State::TIME_SYNC},
47
48 // In TIME_SYNC state
49 {State::TIME_SYNC, Event::TIME_SYNC_FINISHED_EVENT, State::ABILITY_SYNC},
50 {State::TIME_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
51 {State::TIME_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
52
53 // In ABILITY_SYNC state, compare version num and schema
54 {State::ABILITY_SYNC, Event::VERSION_NOT_SUPPOR_EVENT, State::INNER_ERR},
55 {State::ABILITY_SYNC, Event::ABILITY_SYNC_FINISHED_EVENT, State::START_INITIACTIVE_DATA_SYNC},
56 {State::ABILITY_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
57 {State::ABILITY_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
58 {State::ABILITY_SYNC, Event::CONTROL_CMD_EVENT, State::SYNC_CONTROL_CMD},
59
60 // In START_INITIACTIVE_DATA_SYNC state, send a sync request, and send first packt of data sync
61 {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
62 {State::START_INITIACTIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
63 {State::START_INITIACTIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
64 {State::START_INITIACTIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
65 {State::START_INITIACTIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_INITIACTIVE_DATA_SYNC},
66 {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_TIME_SYNC_EVENT, State::TIME_SYNC},
67
68 // In START_PASSIVE_DATA_SYNC state, do response pull request, and send first packt of data sync
69 {State::START_PASSIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
70 {State::START_PASSIVE_DATA_SYNC, Event::RESPONSE_TASK_FINISHED_EVENT, State::WAIT_FOR_RECEIVE_DATA_FINISH},
71 {State::START_PASSIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
72 {State::START_PASSIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
73 {State::START_PASSIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
74 {State::START_PASSIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_PASSIVE_DATA_SYNC},
75 {State::START_PASSIVE_DATA_SYNC, Event::NEED_TIME_SYNC_EVENT, State::TIME_SYNC},
76
77 // In WAIT_FOR_RECEIVE_DATA_FINISH,
78 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::RECV_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
79 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::START_PULL_RESPONSE_EVENT, State::START_PASSIVE_DATA_SYNC},
80 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
81 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::INNER_ERR_EVENT, State::INNER_ERR},
82 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
83
84 {State::SYNC_CONTROL_CMD, Event::SEND_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
85 {State::SYNC_CONTROL_CMD, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
86 {State::SYNC_CONTROL_CMD, Event::INNER_ERR_EVENT, State::INNER_ERR},
87 {State::SYNC_CONTROL_CMD, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
88
89 // In SYNC_TASK_FINISHED,
90 {State::SYNC_TASK_FINISHED, Event::ALL_TASK_FINISHED_EVENT, State::IDLE},
91 {State::SYNC_TASK_FINISHED, Event::START_SYNC_EVENT, State::TIME_SYNC},
92
93 // SYNC_TIME_OUT and INNE_ERR state, just do some exception resolve
94 {State::SYNC_TIME_OUT, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
95 {State::INNER_ERR, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
96 };
97 }
98
99 std::mutex SingleVerSyncStateMachine::stateSwitchTableLock_;
100 std::vector<StateSwitchTable> SingleVerSyncStateMachine::stateSwitchTables_;
101 bool SingleVerSyncStateMachine::isStateSwitchTableInited_ = false;
102
SingleVerSyncStateMachine()103 SingleVerSyncStateMachine::SingleVerSyncStateMachine()
104 : context_(nullptr),
105 syncInterface_(nullptr),
106 timeSync_(nullptr),
107 abilitySync_(nullptr),
108 dataSync_(nullptr),
109 currentRemoteVersionId_(0)
110 {
111 }
112
~SingleVerSyncStateMachine()113 SingleVerSyncStateMachine::~SingleVerSyncStateMachine()
114 {
115 LOGD("~SingleVerSyncStateMachine");
116 Clear();
117 }
118
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metaData,ICommunicator * communicator)119 int SingleVerSyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
120 const std::shared_ptr<Metadata> &metaData, ICommunicator *communicator)
121 {
122 if ((context == nullptr) || (syncInterface == nullptr) || (metaData == nullptr) || (communicator == nullptr)) {
123 return -E_INVALID_ARGS;
124 }
125
126 int errCode = SyncStateMachine::Initialize(context, syncInterface, metaData, communicator);
127 if (errCode != E_OK) {
128 return errCode;
129 }
130
131 timeSync_ = std::make_shared<TimeSync>();
132 dataSync_ = std::make_shared<SingleVerDataSync>();
133 abilitySync_ = std::make_unique<AbilitySync>();
134 if ((timeSync_ == nullptr) || (dataSync_ == nullptr) || (abilitySync_ == nullptr)) {
135 timeSync_ = nullptr;
136 dataSync_ = nullptr;
137 abilitySync_ = nullptr;
138 return -E_OUT_OF_MEMORY;
139 }
140
141 errCode = timeSync_->Initialize(communicator, metaData, syncInterface, context->GetDeviceId());
142 if (errCode != E_OK) {
143 goto ERROR_OUT;
144 }
145 errCode = dataSync_->Initialize(syncInterface, communicator, metaData, context->GetDeviceId());
146 if (errCode != E_OK) {
147 goto ERROR_OUT;
148 }
149 errCode = abilitySync_->Initialize(communicator, syncInterface, metaData, context->GetDeviceId());
150 if (errCode != E_OK) {
151 goto ERROR_OUT;
152 }
153 abilitySync_->InitAbilitySyncFinishStatus(*context);
154
155 currentState_ = IDLE;
156 context_ = static_cast<SingleVerSyncTaskContext *>(context);
157 syncInterface_ = static_cast<SyncGenericInterface *>(syncInterface);
158
159 InitStateSwitchTables();
160 InitStateMapping();
161 return E_OK;
162
163 ERROR_OUT:
164 Clear();
165 return errCode;
166 }
167
SyncStep()168 void SingleVerSyncStateMachine::SyncStep()
169 {
170 RefObject::IncObjRef(context_);
171 RefObject::IncObjRef(communicator_);
172 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this] { SyncStepInnerLocked(); });
173 if (errCode != E_OK) {
174 LOGE("[StateMachine][SyncStep] Schedule SyncStep failed");
175 RefObject::DecObjRef(communicator_);
176 RefObject::DecObjRef(context_);
177 }
178 }
179
ReceiveMessageCallback(Message * inMsg)180 int SingleVerSyncStateMachine::ReceiveMessageCallback(Message *inMsg)
181 {
182 int errCode = MessageCallbackPre(inMsg);
183 if (errCode != E_OK) {
184 LOGE("[StateMachine] message pre check failed");
185 return errCode;
186 }
187 switch (inMsg->GetMessageId()) {
188 case TIME_SYNC_MESSAGE:
189 errCode = TimeMarkSyncRecv(inMsg);
190 break;
191 case ABILITY_SYNC_MESSAGE:
192 errCode = AbilitySyncRecv(inMsg);
193 break;
194 case DATA_SYNC_MESSAGE:
195 case QUERY_SYNC_MESSAGE:
196 errCode = DataPktRecv(inMsg);
197 break;
198 case CONTROL_SYNC_MESSAGE:
199 errCode = ControlPktRecv(inMsg);
200 break;
201 default:
202 errCode = -E_NOT_SUPPORT;
203 }
204 return errCode;
205 }
206
SyncStepInnerLocked()207 void SingleVerSyncStateMachine::SyncStepInnerLocked()
208 {
209 if (context_->IncUsedCount() != E_OK) {
210 goto SYNC_STEP_OUT;
211 }
212 {
213 std::lock_guard<std::mutex> lock(stateMachineLock_);
214 SyncStepInner();
215 }
216 context_->SafeExit();
217
218 SYNC_STEP_OUT:
219 RefObject::DecObjRef(communicator_);
220 RefObject::DecObjRef(context_);
221 }
222
SyncStepInner()223 void SingleVerSyncStateMachine::SyncStepInner()
224 {
225 Event event = INNER_ERR_EVENT;
226 do {
227 auto iter = stateMapping_.find(currentState_);
228 if (iter != stateMapping_.end()) {
229 event = static_cast<Event>(iter->second());
230 } else {
231 LOGE("[StateMachine][SyncStepInner] can not find state=%d,label=%s,dev=%s", currentState_,
232 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
233 break;
234 }
235 } while (event != Event::WAIT_ACK_EVENT && SwitchMachineState(event) == E_OK && currentState_ != IDLE);
236 }
237
SetCurStateErrStatus()238 void SingleVerSyncStateMachine::SetCurStateErrStatus()
239 {
240 currentState_ = State::INNER_ERR;
241 }
242
StartSyncInner()243 int SingleVerSyncStateMachine::StartSyncInner()
244 {
245 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
246 if (performance != nullptr) {
247 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
248 }
249 int errCode = PrepareNextSyncTask();
250 if (errCode == E_OK) {
251 SwitchStateAndStep(Event::START_SYNC_EVENT);
252 }
253 return errCode;
254 }
255
AbortInner()256 void SingleVerSyncStateMachine::AbortInner()
257 {
258 LOGE("[StateMachine][AbortInner] error occurred,abort,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
259 STR_MASK(context_->GetDeviceId()));
260 if (context_->IsKilled()) {
261 dataSync_->ClearDataMsg();
262 }
263 dataSync_->ClearSyncStatus();
264 ContinueToken token;
265 context_->GetContinueToken(token);
266 if (token != nullptr) {
267 syncInterface_->ReleaseContinueToken(token);
268 }
269 context_->SetContinueToken(nullptr);
270 context_->Clear();
271 }
272
GetStateSwitchTables() const273 const std::vector<StateSwitchTable> &SingleVerSyncStateMachine::GetStateSwitchTables() const
274 {
275 return stateSwitchTables_;
276 }
277
PrepareNextSyncTask()278 int SingleVerSyncStateMachine::PrepareNextSyncTask()
279 {
280 int errCode = StartWatchDog();
281 if (errCode != E_OK && errCode != -E_UNEXPECTED_DATA) {
282 LOGE("[StateMachine][PrepareNextSyncTask] WatchDog start failed,err=%d", errCode);
283 return errCode;
284 }
285 if (errCode == -E_UNEXPECTED_DATA) {
286 LOGI("[PrepareNextSyncTask] timer already exists, reset the timer.");
287 (void)ResetWatchDog();
288 }
289
290 if (currentState_ != State::IDLE && currentState_ != State::SYNC_TASK_FINISHED) {
291 LOGW("[StateMachine][PrepareNextSyncTask] PreSync may get an err, state=%" PRIu8 ",dev=%s",
292 currentState_, STR_MASK(context_->GetDeviceId()));
293 currentState_ = State::IDLE;
294 }
295 return E_OK;
296 }
297
SendNotifyPacket(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)298 void SingleVerSyncStateMachine::SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
299 {
300 dataSync_->SendSaveDataNotifyPacket(context_,
301 std::min(context_->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT), sessionId, sequenceId, inMsgId);
302 }
303
CommErrAbort(uint32_t sessionId)304 void SingleVerSyncStateMachine::CommErrAbort(uint32_t sessionId)
305 {
306 std::lock_guard<std::mutex> lock(stateMachineLock_);
307 uint32_t requestSessionId = context_->GetRequestSessionId();
308 if ((sessionId != 0) && ((requestSessionId == 0) || (sessionId != requestSessionId))) {
309 return;
310 }
311 context_->SetCommNormal(false);
312 if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
313 SyncStep();
314 }
315 }
316
InitStateSwitchTables()317 void SingleVerSyncStateMachine::InitStateSwitchTables()
318 {
319 if (isStateSwitchTableInited_) {
320 return;
321 }
322
323 std::lock_guard<std::mutex> lock(stateSwitchTableLock_);
324 if (isStateSwitchTableInited_) {
325 return;
326 }
327
328 InitStateSwitchTable(SINGLE_VER_SYNC_PROCTOL_V3, STATE_SWITCH_TABLE_V3);
329 std::sort(stateSwitchTables_.begin(), stateSwitchTables_.end(),
330 [](const auto &tableA, const auto &tableB) {
331 return tableA.version > tableB.version;
332 }); // descending
333 isStateSwitchTableInited_ = true;
334 }
335
InitStateSwitchTable(uint32_t version,const std::vector<std::vector<uint8_t>> & switchTable)336 void SingleVerSyncStateMachine::InitStateSwitchTable(uint32_t version,
337 const std::vector<std::vector<uint8_t>> &switchTable)
338 {
339 StateSwitchTable table;
340 table.version = version;
341 for (const auto &stateSwitch : switchTable) {
342 if (stateSwitch.size() <= OUTPUT_STATE_INDEX) {
343 LOGE("[StateMachine][InitSwitchTable] stateSwitch size err,size=%zu", stateSwitch.size());
344 return;
345 }
346 if (table.switchTable.count(stateSwitch[CURRENT_STATE_INDEX]) == 0) {
347 EventToState eventToState; // new EventToState
348 eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
349 table.switchTable[stateSwitch[CURRENT_STATE_INDEX]] = eventToState;
350 } else { // key stateSwitch[CURRENT_STATE_INDEX] already has EventToState
351 EventToState &eventToState = table.switchTable[stateSwitch[CURRENT_STATE_INDEX]];
352 eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
353 }
354 }
355 stateSwitchTables_.push_back(table);
356 }
357
InitStateMapping()358 void SingleVerSyncStateMachine::InitStateMapping()
359 {
360 stateMapping_[TIME_SYNC] = [this] { return DoTimeSync(); };
361 stateMapping_[ABILITY_SYNC] = [this] { return DoAbilitySync(); };
362 stateMapping_[WAIT_FOR_RECEIVE_DATA_FINISH] = [this] { return DoWaitForDataRecv(); };
363 stateMapping_[SYNC_TASK_FINISHED] = [this] { return DoSyncTaskFinished(); };
364 stateMapping_[SYNC_TIME_OUT] = [this] { return DoTimeout(); };
365 stateMapping_[INNER_ERR] = [this] { return DoInnerErr(); };
366 stateMapping_[START_INITIACTIVE_DATA_SYNC] = [this] { return DoInitiactiveDataSyncWithSlidingWindow(); };
367 stateMapping_[START_PASSIVE_DATA_SYNC] = [this] { return DoPassiveDataSyncWithSlidingWindow(); };
368 stateMapping_[SYNC_CONTROL_CMD] = [this] { return DoInitiactiveControlSync(); };
369 }
370
DoInitiactiveDataSyncWithSlidingWindow() const371 Event SingleVerSyncStateMachine::DoInitiactiveDataSyncWithSlidingWindow() const
372 {
373 LOGD("[StateMachine][activeDataSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
374 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
375 int errCode = E_OK;
376 switch (context_->GetMode()) {
377 case SyncModeType::PUSH:
378 case SyncModeType::QUERY_PUSH:
379 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
380 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
381 break;
382 case SyncModeType::PULL:
383 case SyncModeType::QUERY_PULL:
384 context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
385 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
386 break;
387 case SyncModeType::PUSH_AND_PULL:
388 case SyncModeType::QUERY_PUSH_PULL:
389 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
390 break;
391 case SyncModeType::RESPONSE_PULL:
392 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
393 break;
394 default:
395 errCode = -E_NOT_SUPPORT;
396 break;
397 }
398 if (errCode == E_OK) {
399 return Event::WAIT_ACK_EVENT;
400 }
401 // once E_EKEYREVOKED error occurred, PUSH_AND_PULL mode should wait for ack to pull remote data.
402 if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
403 errCode == -E_EKEYREVOKED) {
404 return Event::WAIT_ACK_EVENT;
405 }
406
407 // when response task step dataSync again while request task is running, ignore the errCode
408 bool ignoreInnerErr = (context_->GetResponseSessionId() != 0 && context_->GetRequestSessionId() != 0);
409 Event event = TransformErrCodeToEvent(errCode);
410 return (ignoreInnerErr && event == INNER_ERR_EVENT) ? SEND_FINISHED_EVENT : event;
411 }
412
DoPassiveDataSyncWithSlidingWindow()413 Event SingleVerSyncStateMachine::DoPassiveDataSyncWithSlidingWindow()
414 {
415 {
416 RefObject::AutoLock lock(context_);
417 if (context_->GetRspTargetQueueSize() != 0) {
418 PreStartPullResponse();
419 } else if (context_->GetResponseSessionId() == 0 ||
420 context_->GetRetryStatus() == static_cast<int>(SyncTaskContext::NO_NEED_RETRY)) {
421 return RESPONSE_TASK_FINISHED_EVENT;
422 }
423 }
424 int errCode = dataSync_->SyncStart(SyncModeType::RESPONSE_PULL, context_);
425 if (errCode != E_OK) {
426 LOGE("[SingleVerSyncStateMachine][DoPassiveDataSyncWithSlidingWindow] response pull send failed[%d]", errCode);
427 return RESPONSE_TASK_FINISHED_EVENT;
428 }
429 return Event::WAIT_ACK_EVENT;
430 }
431
DoInitiactiveControlSync() const432 Event SingleVerSyncStateMachine::DoInitiactiveControlSync() const
433 {
434 LOGD("[StateMachine][activeControlSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
435 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
436 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
437 int errCode = dataSync_->ControlCmdStart(context_);
438 if (errCode == E_OK) {
439 return Event::WAIT_ACK_EVENT;
440 }
441 context_->SetTaskErrCode(errCode);
442 return TransformErrCodeToEvent(errCode);
443 }
444
HandleControlAckRecv(const Message * inMsg)445 int SingleVerSyncStateMachine::HandleControlAckRecv(const Message *inMsg)
446 {
447 std::lock_guard<std::mutex> lock(stateMachineLock_);
448 if (IsNeedResetWatchdog(inMsg)) {
449 (void)ResetWatchDog();
450 }
451 int errCode = dataSync_->ControlCmdAckRecv(context_, inMsg);
452 ControlAckRecvErrCodeHandle(errCode);
453 SwitchStateAndStep(TransformErrCodeToEvent(errCode));
454 return E_OK;
455 }
456
DoWaitForDataRecv() const457 Event SingleVerSyncStateMachine::DoWaitForDataRecv() const
458 {
459 if (context_->GetRspTargetQueueSize() != 0) {
460 return START_PULL_RESPONSE_EVENT;
461 }
462 if (context_->GetOperationStatus() == SyncOperation::OP_FINISHED_ALL) {
463 return RECV_FINISHED_EVENT;
464 }
465 if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
466 context_->GetOperationStatus() == SyncOperation::OP_EKEYREVOKED_FAILURE &&
467 context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
468 return RECV_FINISHED_EVENT;
469 }
470 return Event::WAIT_ACK_EVENT;
471 }
472
DoTimeSync() const473 Event SingleVerSyncStateMachine::DoTimeSync() const
474 {
475 timeSync_->SetTimeSyncFinishIfNeed();
476 if (timeSync_->IsNeedSync()) {
477 CommErrHandler handler = nullptr;
478 // Auto sync need do retry don't use errHandler to return.
479 if (!context_->IsAutoSync()) {
480 handler = [this, context = context_,
481 requestSessionId = context_->GetRequestSessionId()](int ret, bool isDirectEnd) {
482 SyncTaskContext::CommErrHandlerFunc(ret, context, requestSessionId, isDirectEnd);
483 };
484 }
485 int errCode = timeSync_->SyncStart(handler, context_->GetRequestSessionId());
486 if (errCode == E_OK) {
487 return Event::WAIT_ACK_EVENT;
488 }
489 context_->SetTaskErrCode(errCode);
490 return TransformErrCodeToEvent(errCode);
491 }
492
493 return Event::TIME_SYNC_FINISHED_EVENT;
494 }
495
DoAbilitySync() const496 Event SingleVerSyncStateMachine::DoAbilitySync() const
497 {
498 uint16_t remoteCommunicatorVersion = 0;
499 int errCode = communicator_->GetRemoteCommunicatorVersion(context_->GetDeviceId(), remoteCommunicatorVersion);
500 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
501 LOGE("[StateMachine][DoAbilitySync] Get RemoteCommunicatorVersion errCode=%d", errCode);
502 return Event::INNER_ERR_EVENT;
503 }
504 // Fistr version, not support AbilitySync
505 if (remoteCommunicatorVersion == 0 && errCode != -E_NOT_FOUND) {
506 context_->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
507 return GetEventAfterTimeSync(context_->GetMode());
508 }
509 if (abilitySync_->GetAbilitySyncFinishedStatus()) {
510 return GetEventAfterTimeSync(context_->GetMode());
511 }
512
513 CommErrHandler handler = [this, context = context_,
514 requestSessionId = context_->GetRequestSessionId()](int ret, bool isDirectEnd) {
515 SyncTaskContext::CommErrHandlerFunc(ret, context, requestSessionId, isDirectEnd);
516 };
517 LOGI("[StateMachine][AbilitySync] start abilitySync,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
518 STR_MASK(context_->GetDeviceId()));
519 errCode = abilitySync_->SyncStart(context_->GetRequestSessionId(), context_->GetSequenceId(),
520 remoteCommunicatorVersion, handler, context_);
521 if (errCode != E_OK) {
522 LOGE("[StateMachine][DoAbilitySync] ability sync start failed,errCode=%d", errCode);
523 context_->SetTaskErrCode(errCode);
524 return TransformErrCodeToEvent(errCode);
525 }
526 return Event::WAIT_ACK_EVENT;
527 }
528
GetEventAfterTimeSync(int mode) const529 Event SingleVerSyncStateMachine::GetEventAfterTimeSync(int mode) const
530 {
531 if (mode == SyncModeType::SUBSCRIBE_QUERY || mode == SyncModeType::UNSUBSCRIBE_QUERY) {
532 return Event::CONTROL_CMD_EVENT;
533 }
534 return Event::ABILITY_SYNC_FINISHED_EVENT;
535 }
536
DoSyncTaskFinished()537 Event SingleVerSyncStateMachine::DoSyncTaskFinished()
538 {
539 StopWatchDog();
540 dataSync_->ClearSyncStatus();
541 auto timeout = communicator_->GetTimeout(syncContext_->GetDeviceId());
542 RefObject::AutoLock lock(syncContext_);
543 int errCode = ExecNextTask(timeout);
544 if (errCode == E_OK) {
545 return Event::START_SYNC_EVENT;
546 }
547 return TransformErrCodeToEvent(errCode);
548 }
549
DoTimeout()550 Event SingleVerSyncStateMachine::DoTimeout()
551 {
552 RefObject::AutoLock lock(context_);
553 if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
554 std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
555 if (subManager != nullptr) {
556 subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
557 }
558 }
559 context_->Abort(SyncOperation::OP_TIMEOUT);
560 context_->Clear();
561 AbortInner();
562 return Event::ANY_EVENT;
563 }
564
DoInnerErr()565 Event SingleVerSyncStateMachine::DoInnerErr()
566 {
567 RefObject::AutoLock lock(context_);
568 if (!context_->IsCommNormal()) {
569 if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
570 std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
571 if (subManager != nullptr) {
572 subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
573 }
574 }
575 context_->Abort(SyncOperation::OP_COMM_ABNORMAL);
576 } else {
577 int status = GetSyncOperationStatus(context_->GetTaskErrCode());
578 context_->Abort(status);
579 }
580 context_->Clear();
581 AbortInner();
582 return Event::ANY_EVENT;
583 }
584
AbilitySyncRecv(const Message * inMsg)585 int SingleVerSyncStateMachine::AbilitySyncRecv(const Message *inMsg)
586 {
587 if (inMsg->GetMessageType() == TYPE_REQUEST) {
588 int errCode = abilitySync_->RequestRecv(inMsg, context_);
589 if (errCode != E_OK && inMsg->GetSessionId() == context_->GetResponseSessionId()) {
590 context_->SetTaskErrCode(errCode);
591 std::lock_guard<std::mutex> lock(stateMachineLock_);
592 SwitchStateAndStep(Event::INNER_ERR_EVENT);
593 }
594 return E_OK;
595 }
596 return AbilitySyncResponseRecv(inMsg);
597 }
598
AbilitySyncResponseRecv(const Message * inMsg)599 int SingleVerSyncStateMachine::AbilitySyncResponseRecv(const Message *inMsg)
600 {
601 if (inMsg->GetMessageType() == TYPE_RESPONSE && AbilityMsgSessionIdCheck(inMsg)) {
602 std::lock_guard<std::mutex> lock(stateMachineLock_);
603 int errCode = abilitySync_->AckRecv(inMsg, context_);
604 (void)ResetWatchDog();
605 if (errCode == -E_ABILITY_SYNC_FINISHED) {
606 LOGI("[StateMachine][AbilitySyncRecv] ability sync finished with both kv,label=%s,dev=%s",
607 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
608 abilitySync_->SetAbilitySyncFinishedStatus(true, *context_);
609 JumpStatusAfterAbilitySync(context_->GetMode());
610 } else if (errCode != E_OK) {
611 LOGE("[StateMachine][AbilitySyncRecv] handle ackRecv failed,errCode=%d", errCode);
612 SwitchStateAndStep(TransformErrCodeToEvent(errCode));
613 } else if (context_->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_2_0) {
614 abilitySync_->SetAbilitySyncFinishedStatus(true, *context_);
615 LOGI("[StateMachine][AbilitySyncRecv] ability Sync Finished,label=%s,dev=%s",
616 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
617 currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
618 JumpStatusAfterAbilitySync(context_->GetMode());
619 }
620 return E_OK;
621 }
622 if (inMsg->GetMessageType() == TYPE_NOTIFY) {
623 return AbilitySyncNotifyRecv(inMsg);
624 }
625 LOGE("[StateMachine][AbilitySyncRecv] msg type invalid");
626 return -E_NOT_SUPPORT;
627 }
628
HandleDataRequestRecv(const Message * inMsg)629 int SingleVerSyncStateMachine::HandleDataRequestRecv(const Message *inMsg)
630 {
631 TimeOffset offset = 0;
632 auto [systemOffset, senderLocalOffset] = SingleVerDataSyncUtils::GetTimeOffsetFromRequestMsg(inMsg);
633 int errCode = timeSync_->GenerateTimeOffsetIfNeed(systemOffset, senderLocalOffset);
634 if (errCode != E_OK) {
635 (void)dataSync_->SendDataAck(context_, inMsg, errCode, 0);
636 return errCode;
637 }
638 uint32_t timeout = communicator_->GetTimeout(context_->GetDeviceId());
639 // If message is data sync request, we should check timeoffset.
640 errCode = timeSync_->GetTimeOffset(offset, timeout);
641 if (errCode != E_OK) {
642 LOGE("[StateMachine][HandleDataRequestRecv] GetTimeOffset err! errCode=%d", errCode);
643 return errCode;
644 }
645 context_->SetTimeOffset(offset);
646 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
647 if (performance != nullptr) {
648 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
649 }
650 DecRefCountOfFeedDogTimer(SyncDirectionFlag::RECEIVE);
651
652 // RequestRecv will save data, it may cost a long time.
653 // So we need to send save data notify to keep remote alive.
654 bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
655 {
656 std::lock_guard<std::mutex> lockWatchDog(stateMachineLock_);
657 if (IsNeedResetWatchdog(inMsg)) {
658 (void)ResetWatchDog();
659 }
660 }
661 WaterMark pullEndWaterkark = 0;
662 errCode = dataSync_->DataRequestRecv(context_, inMsg, pullEndWaterkark);
663 if (performance != nullptr) {
664 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
665 }
666 // only higher than 102 version receive this errCode here.
667 // while both RequestSessionId is not equal,but get this errCode;slwr would seem to handle first secquencid.
668 // so while receive the same secquencid after abiitysync it wouldn't handle.
669 if (errCode == -E_NEED_ABILITY_SYNC) {
670 if (isNeedStop) {
671 StopSaveDataNotify();
672 }
673 return errCode;
674 }
675 {
676 std::lock_guard<std::mutex> lock(stateMachineLock_);
677 DataRecvErrCodeHandle(inMsg->GetSessionId(), errCode);
678 if (pullEndWaterkark > 0) {
679 AddPullResponseTarget(inMsg, pullEndWaterkark);
680 }
681 }
682 if (isNeedStop) {
683 StopSaveDataNotify();
684 }
685 return E_OK;
686 }
687
HandleDataAckRecvWithSlidingWindow(int errCode,const Message * inMsg,bool ignoreInnerErr)688 void SingleVerSyncStateMachine::HandleDataAckRecvWithSlidingWindow(int errCode, const Message *inMsg,
689 bool ignoreInnerErr)
690 {
691 if (errCode == -E_RE_SEND_DATA) { // LOCAL_WATER_MARK_NOT_INIT
692 dataSync_->ClearSyncStatus();
693 }
694 if (errCode == -E_NO_DATA_SEND || errCode == -E_SEND_DATA) {
695 int ret = dataSync_->TryContinueSync(context_, inMsg);
696 if (ret == -E_FINISHED) {
697 SwitchStateAndStep(Event::SEND_FINISHED_EVENT);
698 return;
699 } else if (ret == E_OK) { // do nothing and waiting for all ack receive
700 return;
701 }
702 errCode = ret;
703 }
704 ResponsePullError(errCode, ignoreInnerErr);
705 }
706
NeedAbilitySyncHandle()707 void SingleVerSyncStateMachine::NeedAbilitySyncHandle()
708 {
709 // if the remote device version num is overdue,
710 // mean the version num has been reset when syncing data,
711 // there should not clear the new version cache again.
712 if (currentRemoteVersionId_ == context_->GetRemoteSoftwareVersionId()) {
713 LOGI("[StateMachine] set remote version 0, currentRemoteVersionId_ = %" PRIu64, currentRemoteVersionId_);
714 context_->SetRemoteSoftwareVersion(0);
715 } else {
716 currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
717 }
718 abilitySync_->SetAbilitySyncFinishedStatus(false, *context_);
719 }
720
HandleDataAckRecv(const Message * inMsg)721 int SingleVerSyncStateMachine::HandleDataAckRecv(const Message *inMsg)
722 {
723 if (inMsg->GetMessageType() == TYPE_RESPONSE) {
724 DecRefCountOfFeedDogTimer(SyncDirectionFlag::SEND);
725 }
726 std::lock_guard<std::mutex> lock(stateMachineLock_);
727 // Unfortunately we use stateMachineLock_ in many sync process
728 // So a bad ack will check before the lock and wait
729 // And then another process is running, it will get the lock.After this process, the ack became invalid.
730 // If we don't check ack again, it will be delivered to dataSyncer.
731 if (!IsPacketValid(inMsg)) {
732 return -E_INVALID_ARGS;
733 }
734 if (IsNeedResetWatchdog(inMsg)) {
735 (void)ResetWatchDog();
736 }
737 if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 && !dataSync_->AckPacketIdCheck(inMsg)) {
738 // packetId not match but sequence id matched scene, means resend map has be rebuilt
739 // this is old ack, should be dropped and wait for the same packetId sequence.
740 return E_OK;
741 }
742 // AckRecv will save meta data, it may cost a long time. if another thread is saving data
743 // So we need to send save data notify to keep remote alive.
744 // e.g. remote do pull sync
745 bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
746 int errCode = dataSync_->AckRecv(context_, inMsg);
747 if (isNeedStop) {
748 StopSaveDataNotify();
749 }
750 if (errCode == -E_NEED_ABILITY_SYNC || errCode == -E_RE_SEND_DATA || errCode == -E_NEED_TIME_SYNC) {
751 StopFeedDogForSync(SyncDirectionFlag::SEND);
752 dataSync_->ClearSyncStatus();
753 context_->ReSetSequenceId();
754 } else if (errCode == -E_SAVE_DATA_NOTIFY) {
755 return errCode;
756 }
757 // when this msg is from response task while request task is running, ignore the errCode
758 bool ignoreInnerErr = inMsg->GetSessionId() == context_->GetResponseSessionId() &&
759 context_->GetRequestSessionId() != 0;
760 DataAckRecvErrCodeHandle(errCode, !ignoreInnerErr);
761 HandleDataAckRecvWithSlidingWindow(errCode, inMsg, ignoreInnerErr);
762 return errCode;
763 }
764
DataPktRecv(Message * inMsg)765 int SingleVerSyncStateMachine::DataPktRecv(Message *inMsg)
766 {
767 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
768 int errCode = E_OK;
769 switch (inMsg->GetMessageType()) {
770 case TYPE_REQUEST:
771 ScheduleMsgAndHandle(inMsg);
772 errCode = -E_NOT_NEED_DELETE_MSG;
773 break;
774 case TYPE_RESPONSE:
775 case TYPE_NOTIFY:
776 if (performance != nullptr) {
777 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
778 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
779 }
780 errCode = HandleDataAckRecv(inMsg);
781 break;
782 default:
783 errCode = -E_INVALID_ARGS;
784 break;
785 }
786 return errCode;
787 }
788
ScheduleMsgAndHandle(Message * inMsg)789 void SingleVerSyncStateMachine::ScheduleMsgAndHandle(Message *inMsg)
790 {
791 dataSync_->PutDataMsg(inMsg);
792 while (true) {
793 bool isNeedHandle = true;
794 bool isNeedContinue = true;
795 Message *msg = dataSync_->MoveNextDataMsg(context_, isNeedHandle, isNeedContinue);
796 if (!isNeedContinue) {
797 break;
798 }
799 if (msg == nullptr) {
800 if (dataSync_->IsNeedReloadQueue()) {
801 continue;
802 }
803 dataSync_->ScheduleInfoHandle(false, false, nullptr);
804 break;
805 }
806 bool isNeedClearMap = false;
807 if (isNeedHandle) {
808 int errCode = HandleDataRequestRecv(msg);
809 if (context_->IsReceiveWaterMarkErr() || errCode == -E_NEED_ABILITY_SYNC || errCode == -E_NEED_TIME_SYNC) {
810 isNeedClearMap = true;
811 }
812 if (errCode == -E_TIMEOUT) {
813 isNeedHandle = false;
814 }
815 } else {
816 dataSync_->SendFinishedDataAck(context_, msg);
817 }
818 if (context_->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
819 // for lower version, no need to handle map schedule info, just reset schedule working status
820 isNeedHandle = false;
821 }
822 dataSync_->ScheduleInfoHandle(isNeedHandle, isNeedClearMap, msg);
823 delete msg;
824 }
825 }
826
ControlPktRecv(Message * inMsg)827 int SingleVerSyncStateMachine::ControlPktRecv(Message *inMsg)
828 {
829 int errCode = E_OK;
830 switch (inMsg->GetMessageType()) {
831 case TYPE_REQUEST:
832 errCode = dataSync_->ControlCmdRequestRecv(context_, inMsg);
833 break;
834 case TYPE_RESPONSE:
835 errCode = HandleControlAckRecv(inMsg);
836 break;
837 default:
838 errCode = -E_INVALID_ARGS;
839 break;
840 }
841 return errCode;
842 }
843
StepToTimeout(TimerId timerId)844 void SingleVerSyncStateMachine::StepToTimeout(TimerId timerId)
845 {
846 std::lock_guard<std::mutex> lock(stateMachineLock_);
847 TimerId timer = syncContext_->GetTimerId();
848 if (timer != timerId) {
849 return;
850 }
851 SwitchStateAndStep(Event::TIME_OUT_EVENT);
852 }
853
854 namespace {
855 struct StateNode {
856 int errCode = 0;
857 SyncOperation::Status status = SyncOperation::OP_WAITING;
858 };
859 }
GetSyncOperationStatus(int errCode) const860 int SingleVerSyncStateMachine::GetSyncOperationStatus(int errCode) const
861 {
862 static const StateNode stateNodes[] = {
863 { -E_SCHEMA_MISMATCH, SyncOperation::OP_SCHEMA_INCOMPATIBLE },
864 { -E_EKEYREVOKED, SyncOperation::OP_EKEYREVOKED_FAILURE },
865 { -E_SECURITY_OPTION_CHECK_ERROR, SyncOperation::OP_SECURITY_OPTION_CHECK_FAILURE },
866 { -E_BUSY, SyncOperation::OP_BUSY_FAILURE },
867 { -E_NOT_PERMIT, SyncOperation::OP_PERMISSION_CHECK_FAILED },
868 { -E_TIMEOUT, SyncOperation::OP_TIMEOUT },
869 { -E_INVALID_QUERY_FORMAT, SyncOperation::OP_QUERY_FORMAT_FAILURE },
870 { -E_INVALID_QUERY_FIELD, SyncOperation::OP_QUERY_FIELD_FAILURE },
871 { -E_FEEDBACK_UNKNOWN_MESSAGE, SyncOperation::OP_NOT_SUPPORT },
872 { -E_FEEDBACK_COMMUNICATOR_NOT_FOUND, SyncOperation::OP_COMM_ABNORMAL },
873 { -E_NOT_SUPPORT, SyncOperation::OP_NOT_SUPPORT },
874 { -E_INTERCEPT_DATA_FAIL, SyncOperation::OP_INTERCEPT_DATA_FAIL },
875 { -E_MAX_LIMITS, SyncOperation::OP_MAX_LIMITS },
876 { -E_DISTRIBUTED_SCHEMA_CHANGED, SyncOperation::OP_SCHEMA_CHANGED },
877 { -E_NOT_REGISTER, SyncOperation::OP_NOT_SUPPORT },
878 { -E_DENIED_SQL, SyncOperation::OP_DENIED_SQL },
879 { -E_REMOTE_OVER_SIZE, SyncOperation::OP_MAX_LIMITS },
880 { -E_INVALID_PASSWD_OR_CORRUPTED_DB, SyncOperation::OP_NOTADB_OR_CORRUPTED },
881 { -E_DISTRIBUTED_SCHEMA_NOT_FOUND, SyncOperation::OP_SCHEMA_INCOMPATIBLE }
882 };
883 const auto &result = std::find_if(std::begin(stateNodes), std::end(stateNodes), [errCode](const auto &node) {
884 return node.errCode == errCode;
885 });
886 return result == std::end(stateNodes) ? SyncOperation::OP_FAILED : static_cast<int>((*result).status);
887 }
888
TimeMarkSyncRecv(const Message * inMsg)889 int SingleVerSyncStateMachine::TimeMarkSyncRecv(const Message *inMsg)
890 {
891 LOGD("[StateMachine][TimeMarkSyncRecv] type=%d,label=%s,dev=%s", inMsg->GetMessageType(),
892 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
893 {
894 std::lock_guard<std::mutex> lock(stateMachineLock_);
895 (void)ResetWatchDog();
896 }
897 if (inMsg->GetMessageType() == TYPE_REQUEST) {
898 return timeSync_->RequestRecv(inMsg);
899 } else if (inMsg->GetMessageType() == TYPE_RESPONSE) {
900 int errCode = timeSync_->AckRecv(inMsg, context_->GetRequestSessionId());
901 if (errCode != E_OK) {
902 LOGE("[StateMachine][TimeMarkSyncRecv] AckRecv failed errCode=%d", errCode);
903 if (inMsg->GetSessionId() != 0 && inMsg->GetSessionId() == context_->GetRequestSessionId()) {
904 context_->SetTaskErrCode(errCode);
905 InnerErrorAbort(inMsg->GetSessionId());
906 }
907 return errCode;
908 }
909 std::lock_guard<std::mutex> lock(stateMachineLock_);
910 SwitchStateAndStep(TIME_SYNC_FINISHED_EVENT);
911 return E_OK;
912 } else {
913 return -E_INVALID_ARGS;
914 }
915 }
916
Clear()917 void SingleVerSyncStateMachine::Clear()
918 {
919 dataSync_ = nullptr;
920 timeSync_ = nullptr;
921 abilitySync_ = nullptr;
922 context_ = nullptr;
923 syncInterface_ = nullptr;
924 }
925
IsPacketValid(const Message * inMsg) const926 bool SingleVerSyncStateMachine::IsPacketValid(const Message *inMsg) const
927 {
928 if (inMsg == nullptr) {
929 return false;
930 }
931
932 if ((inMsg->GetMessageId() < TIME_SYNC_MESSAGE) || (inMsg->GetMessageId() >= UNKNOW_MESSAGE)) {
933 LOGE("[StateMachine][IsPacketValid] Message is invalid, id=%d", inMsg->GetMessageId());
934 return false;
935 }
936 // filter invalid ack at first
937 bool isResponseType = (inMsg->GetMessageType() == TYPE_RESPONSE);
938 if (isResponseType && (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) &&
939 (inMsg->GetSessionId() != context_->GetRequestSessionId())) {
940 LOGE("[StateMachine][IsPacketValid] Control Message is invalid, label=%s, dev=%s",
941 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
942 return false;
943 }
944 if (isResponseType && (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) &&
945 (inMsg->GetSessionId() != context_->GetRequestSessionId()) &&
946 (inMsg->GetSessionId() != context_->GetResponseSessionId())) {
947 LOGE("[StateMachine][IsPacketValid] Data Message is invalid, label=%s, dev=%s",
948 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
949 return false;
950 }
951
952 // timeSync and abilitySync don't need to check sequenceId
953 if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE || inMsg->GetMessageId() == ABILITY_SYNC_MESSAGE ||
954 inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
955 return true;
956 }
957 // sequenceId will be checked in dataSync
958 return true;
959 }
960
PreStartPullResponse()961 void SingleVerSyncStateMachine::PreStartPullResponse()
962 {
963 SingleVerSyncTarget target;
964 context_->PopResponseTarget(target);
965 context_->SetEndMark(target.GetEndWaterMark());
966 context_->SetResponseSessionId(target.GetResponseSessionId());
967 context_->SetMode(SyncModeType::RESPONSE_PULL);
968 context_->ReSetSequenceId();
969 context_->SetQuerySync(target.IsQuerySync());
970 context_->SetQuery(target.GetQuery());
971 }
972
CheckIsStartPullResponse() const973 bool SingleVerSyncStateMachine::CheckIsStartPullResponse() const
974 {
975 // Other state will step to do pull response, only this statem we need to step the statemachine
976 if (currentState_ == WAIT_FOR_RECEIVE_DATA_FINISH) {
977 return true;
978 }
979 return false;
980 }
981
MessageCallbackPre(const Message * inMsg)982 int SingleVerSyncStateMachine::MessageCallbackPre(const Message *inMsg)
983 {
984 RefObject::AutoLock lock(context_);
985 if (context_->IsKilled()) {
986 return -E_OBJ_IS_KILLED;
987 }
988
989 if (!IsPacketValid(inMsg)) {
990 return -E_INVALID_ARGS;
991 }
992 return E_OK;
993 }
994
AddPullResponseTarget(const Message * inMsg,WaterMark pullEndWatermark)995 void SingleVerSyncStateMachine::AddPullResponseTarget(const Message *inMsg, WaterMark pullEndWatermark)
996 {
997 int messageType = static_cast<int>(inMsg->GetMessageId());
998 uint32_t sessionId = inMsg->GetSessionId();
999 if (pullEndWatermark == 0) {
1000 LOGE("[StateMachine][AddPullResponseTarget] pullEndWatermark is 0!");
1001 return;
1002 }
1003 if (context_->GetResponseSessionId() == sessionId || context_->FindResponseSyncTarget(sessionId)) {
1004 LOGI("[StateMachine][AddPullResponseTarget] task is already running");
1005 return;
1006 }
1007 const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
1008 if (packet == nullptr) {
1009 LOGE("[AddPullResponseTarget] get packet object failed");
1010 return;
1011 }
1012 SingleVerSyncTarget *targetTmp = new (std::nothrow) SingleVerSyncTarget;
1013 if (targetTmp == nullptr) {
1014 LOGE("[StateMachine][AddPullResponseTarget] add failed, may oom");
1015 return;
1016 }
1017 targetTmp->SetTaskType(ISyncTarget::RESPONSE);
1018 if (messageType == QUERY_SYNC_MESSAGE) {
1019 targetTmp->SetQuery(packet->GetQuery());
1020 targetTmp->SetQuerySync(true);
1021 }
1022 targetTmp->SetMode(SyncModeType::RESPONSE_PULL);
1023 targetTmp->SetEndWaterMark(pullEndWatermark);
1024 targetTmp->SetResponseSessionId(sessionId);
1025 if (context_->AddSyncTarget(targetTmp) != E_OK) {
1026 delete targetTmp;
1027 return;
1028 }
1029 if (CheckIsStartPullResponse()) {
1030 SwitchStateAndStep(TransformErrCodeToEvent(-E_NEED_PULL_REPONSE));
1031 }
1032 }
1033
TransformErrCodeToEvent(int errCode) const1034 Event SingleVerSyncStateMachine::TransformErrCodeToEvent(int errCode) const
1035 {
1036 switch (errCode) {
1037 case -E_TIMEOUT:
1038 return TransforTimeOutErrCodeToEvent();
1039 case -static_cast<int>(VERSION_NOT_SUPPOR_EVENT):
1040 return Event::VERSION_NOT_SUPPOR_EVENT;
1041 case -E_SEND_DATA:
1042 return Event::SEND_DATA_EVENT;
1043 case -E_NO_DATA_SEND:
1044 return Event::SEND_FINISHED_EVENT;
1045 case -E_RECV_FINISHED:
1046 return Event::RECV_FINISHED_EVENT;
1047 case -E_NEED_ABILITY_SYNC:
1048 return Event::NEED_ABILITY_SYNC_EVENT;
1049 case -E_NO_SYNC_TASK:
1050 return Event::ALL_TASK_FINISHED_EVENT;
1051 case -E_NEED_PULL_REPONSE:
1052 return Event::START_PULL_RESPONSE_EVENT;
1053 case -E_RE_SEND_DATA:
1054 return Event::RE_SEND_DATA_EVENT;
1055 case -E_NEED_TIME_SYNC:
1056 return Event::NEED_TIME_SYNC_EVENT;
1057 default:
1058 return Event::INNER_ERR_EVENT;
1059 }
1060 }
1061
IsNeedResetWatchdog(const Message * inMsg) const1062 bool SingleVerSyncStateMachine::IsNeedResetWatchdog(const Message *inMsg) const
1063 {
1064 if (inMsg == nullptr) {
1065 return false;
1066 }
1067
1068 if (IsNeedErrCodeHandle(inMsg->GetSessionId())) {
1069 return true;
1070 }
1071
1072 int msgType = inMsg->GetMessageType();
1073 if (msgType == TYPE_RESPONSE || msgType == TYPE_NOTIFY) {
1074 if (inMsg->GetSessionId() == context_->GetResponseSessionId()) {
1075 // Pull response ack also should reset watchdog
1076 return true;
1077 }
1078 }
1079
1080 return false;
1081 }
1082
TransforTimeOutErrCodeToEvent() const1083 Event SingleVerSyncStateMachine::TransforTimeOutErrCodeToEvent() const
1084 {
1085 if (syncContext_->IsSyncTaskNeedRetry() && (syncContext_->GetRetryTime() < syncContext_->GetSyncRetryTimes())) {
1086 return Event::WAIT_TIME_OUT_EVENT;
1087 } else {
1088 return Event::TIME_OUT_EVENT;
1089 }
1090 }
1091
IsNeedErrCodeHandle(uint32_t sessionId) const1092 bool SingleVerSyncStateMachine::IsNeedErrCodeHandle(uint32_t sessionId) const
1093 {
1094 // omit to set sessionId so version_3 should skip to compare sessionid.
1095 if (sessionId == context_->GetRequestSessionId() ||
1096 context_->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_RELEASE_2_0) {
1097 return true;
1098 }
1099 return false;
1100 }
1101
PushPullDataRequestEvokeErrHandle()1102 void SingleVerSyncStateMachine::PushPullDataRequestEvokeErrHandle()
1103 {
1104 // the pushpull sync task should wait for send finished after remote dev get data occur E_EKEYREVOKED error.
1105 if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 &&
1106 SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL) { // LCOV_EXCL_BR_LINE
1107 LOGI("data request errCode = %d, wait for send finished", -E_EKEYREVOKED);
1108 context_->SetTaskErrCode(-E_EKEYREVOKED);
1109 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1110 SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1111 } else {
1112 context_->SetTaskErrCode(-E_EKEYREVOKED);
1113 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1114 }
1115 }
1116
DataRecvErrCodeHandle(uint32_t sessionId,int errCode)1117 void SingleVerSyncStateMachine::DataRecvErrCodeHandle(uint32_t sessionId, int errCode)
1118 {
1119 if (IsNeedErrCodeHandle(sessionId)) {
1120 switch (errCode) {
1121 case E_OK:
1122 break;
1123 case -E_NOT_PERMIT:
1124 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1125 break;
1126 case -E_RECV_FINISHED:
1127 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1128 SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1129 break;
1130 case -E_EKEYREVOKED:
1131 PushPullDataRequestEvokeErrHandle();
1132 break;
1133 case -E_BUSY:
1134 case -E_DISTRIBUTED_SCHEMA_CHANGED:
1135 case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1136 case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1137 case -E_FEEDBACK_UNKNOWN_MESSAGE:
1138 case -E_INTERCEPT_DATA_FAIL:
1139 case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1140 case -E_INVALID_QUERY_FIELD:
1141 case -E_INVALID_QUERY_FORMAT:
1142 case -E_MAX_LIMITS:
1143 case -E_NOT_REGISTER:
1144 case -E_NOT_SUPPORT:
1145 case -E_SECURITY_OPTION_CHECK_ERROR:
1146 context_->SetTaskErrCode(errCode);
1147 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1148 break;
1149 default:
1150 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1151 break;
1152 }
1153 }
1154 }
1155
AbilityMsgSessionIdCheck(const Message * inMsg)1156 bool SingleVerSyncStateMachine::AbilityMsgSessionIdCheck(const Message *inMsg)
1157 {
1158 if (inMsg != nullptr && inMsg->GetSessionId() == context_->GetRequestSessionId()) { // LCOV_EXCL_BR_LINE
1159 return true;
1160 }
1161 LOGE("[AbilitySync] session check failed,dev=%s", STR_MASK(context_->GetDeviceId()));
1162 return false;
1163 }
1164
GetSyncType(uint32_t messageId) const1165 SyncType SingleVerSyncStateMachine::GetSyncType(uint32_t messageId) const
1166 {
1167 if (messageId == QUERY_SYNC_MESSAGE) { // LCOV_EXCL_BR_LINE
1168 return SyncType::QUERY_SYNC_TYPE;
1169 }
1170 return SyncType::MANUAL_FULL_SYNC_TYPE;
1171 }
1172
DataAckRecvErrCodeHandle(int errCode,bool handleError)1173 void SingleVerSyncStateMachine::DataAckRecvErrCodeHandle(int errCode, bool handleError)
1174 {
1175 switch (errCode) {
1176 case -E_NEED_ABILITY_SYNC:
1177 NeedAbilitySyncHandle();
1178 break;
1179 case -E_NEED_TIME_SYNC:
1180 timeSync_->ClearTimeSyncFinish();
1181 break;
1182 case -E_NOT_PERMIT:
1183 if (handleError) {
1184 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1185 }
1186 break;
1187 case -E_BUSY:
1188 case -E_DISTRIBUTED_SCHEMA_CHANGED:
1189 case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1190 case -E_EKEYREVOKED:
1191 case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1192 case -E_FEEDBACK_UNKNOWN_MESSAGE:
1193 case -E_INTERCEPT_DATA_FAIL:
1194 case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1195 case -E_INVALID_QUERY_FIELD:
1196 case -E_INVALID_QUERY_FORMAT:
1197 case -E_MAX_LIMITS:
1198 case -E_NOT_REGISTER:
1199 case -E_NOT_SUPPORT:
1200 case -E_SECURITY_OPTION_CHECK_ERROR:
1201 if (handleError) {
1202 context_->SetTaskErrCode(errCode);
1203 }
1204 break;
1205 default:
1206 break;
1207 }
1208 }
1209
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)1210 bool SingleVerSyncStateMachine::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
1211 {
1212 return SingleVerDataSyncUtils::IsNeedTriggerQueryAutoSync(inMsg, query);
1213 }
1214
JumpStatusAfterAbilitySync(int mode)1215 void SingleVerSyncStateMachine::JumpStatusAfterAbilitySync(int mode)
1216 {
1217 if ((mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
1218 SwitchStateAndStep(CONTROL_CMD_EVENT);
1219 } else {
1220 SwitchStateAndStep(ABILITY_SYNC_FINISHED_EVENT);
1221 }
1222 }
1223
ControlAckRecvErrCodeHandle(int errCode)1224 void SingleVerSyncStateMachine::ControlAckRecvErrCodeHandle(int errCode)
1225 {
1226 switch (errCode) {
1227 case -E_NEED_ABILITY_SYNC:
1228 NeedAbilitySyncHandle();
1229 break;
1230 case -E_NO_DATA_SEND:
1231 context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
1232 break;
1233 case -E_NOT_PERMIT:
1234 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1235 break;
1236 // other errCode use default
1237 default:
1238 context_->SetTaskErrCode(errCode);
1239 break;
1240 }
1241 }
1242
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)1243 void SingleVerSyncStateMachine::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
1244 {
1245 metadata_->GetLocalWaterMark(deviceId, outValue);
1246 }
1247
GetSendQueryWaterMark(const std::string & queryId,const DeviceID & deviceId,bool isAutoLift,uint64_t & outValue)1248 int SingleVerSyncStateMachine::GetSendQueryWaterMark(const std::string &queryId, const DeviceID &deviceId,
1249 bool isAutoLift, uint64_t &outValue)
1250 {
1251 return metadata_->GetSendQueryWaterMark(queryId, deviceId, outValue, isAutoLift);
1252 }
1253
ResponsePullError(int errCode,bool ignoreInnerErr)1254 void SingleVerSyncStateMachine::ResponsePullError(int errCode, bool ignoreInnerErr)
1255 {
1256 Event event = TransformErrCodeToEvent(errCode);
1257 if (event == INNER_ERR_EVENT) {
1258 if (ignoreInnerErr) {
1259 event = RESPONSE_TASK_FINISHED_EVENT;
1260 } else if (context_ != nullptr) {
1261 context_->SetTaskErrCode(errCode);
1262 }
1263 }
1264 SwitchStateAndStep(event);
1265 }
1266
InnerErrorAbort(uint32_t sessionId)1267 void SingleVerSyncStateMachine::InnerErrorAbort(uint32_t sessionId)
1268 {
1269 std::lock_guard<std::mutex> lock(stateMachineLock_);
1270 uint32_t requestSessionId = context_->GetRequestSessionId();
1271 if (sessionId != requestSessionId) {
1272 LOGD("[SingleVerSyncStateMachine][InnerErrorAbort] Ignore abort by different sessionId");
1273 return;
1274 }
1275 if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
1276 SyncStep();
1277 }
1278 }
1279
NotifyClosing()1280 void SingleVerSyncStateMachine::NotifyClosing()
1281 {
1282 if (timeSync_ != nullptr) {
1283 timeSync_->Close();
1284 }
1285 }
1286
AbilitySyncNotifyRecv(const Message * inMsg)1287 int SingleVerSyncStateMachine::AbilitySyncNotifyRecv(const Message *inMsg)
1288 {
1289 const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
1290 if (packet == nullptr) {
1291 return -E_INVALID_ARGS;
1292 }
1293 int ackCode = packet->GetAckCode();
1294 if (ackCode != AbilitySync::CHECK_SUCCESS && ackCode != AbilitySync::LAST_NOTIFY) {
1295 LOGE("[StateMachine][AbilitySyncRecv] ackCode check failed,ackCode=%d", ackCode);
1296 context_->SetTaskErrCode(ackCode);
1297 std::lock_guard<std::mutex> lock(stateMachineLock_);
1298 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1299 return E_OK;
1300 }
1301 if (ackCode == AbilitySync::LAST_NOTIFY && AbilityMsgSessionIdCheck(inMsg)) {
1302 abilitySync_->SetAbilitySyncFinishedStatus(true, *context_);
1303 // while recv last notify means ability sync finished,it is better to reset watchDog to avoid timeout.
1304 LOGI("[StateMachine][AbilitySyncRecv] ability sync finished,label=%s,dev=%s",
1305 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
1306 context_->SetRemoteSoftwareVersion(packet->GetSoftwareVersion());
1307 currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
1308 std::lock_guard<std::mutex> lock(stateMachineLock_);
1309 (void)ResetWatchDog();
1310 JumpStatusAfterAbilitySync(context_->GetMode());
1311 } else if (ackCode != AbilitySync::LAST_NOTIFY) {
1312 abilitySync_->AckNotifyRecv(inMsg, context_);
1313 }
1314 return E_OK;
1315 }
1316
SchemaChange()1317 void SingleVerSyncStateMachine::SchemaChange()
1318 {
1319 abilitySync_->SetAbilitySyncFinishedStatus(false, *context_);
1320 }
1321
TimeChange()1322 void SingleVerSyncStateMachine::TimeChange()
1323 {
1324 if (timeSync_ == nullptr) {
1325 LOGW("[SingleVerSyncStateMachine] time sync is null when time change");
1326 return;
1327 }
1328 timeSync_->ClearTimeSyncFinish();
1329 }
1330 } // namespace DistributedDB
1331