1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_sync_state_machine.h"
17
18 #include <cmath>
19 #include <climits>
20 #include <algorithm>
21
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "sync_operation.h"
25 #include "message_transform.h"
26 #include "sync_types.h"
27 #include "db_common.h"
28 #include "runtime_context.h"
29 #include "performance_analysis.h"
30 #include "single_ver_sync_target.h"
31 #include "single_ver_data_sync.h"
32 #include "single_ver_data_sync_utils.h"
33
34 namespace DistributedDB {
35 using Event = SingleVerSyncStateMachine::Event;
36 using State = SingleVerSyncStateMachine::State;
37 namespace {
38 // used for state switch table
39 const int CURRENT_STATE_INDEX = 0;
40 const int EVENT_INDEX = 1;
41 const int OUTPUT_STATE_INDEX = 2;
42
43 // drop v1 and v2 table by one optimize, dataSend mode in all version go with slide window mode.
44 // State switch table v3, has three columns, CurrentState, Event, and OutSate
45 const std::vector<std::vector<uint8_t>> STATE_SWITCH_TABLE_V3 = {
46 {State::IDLE, Event::START_SYNC_EVENT, State::TIME_SYNC},
47
48 // In TIME_SYNC state
49 {State::TIME_SYNC, Event::TIME_SYNC_FINISHED_EVENT, State::ABILITY_SYNC},
50 {State::TIME_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
51 {State::TIME_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
52 {State::TIME_SYNC, Event::NEED_RESYNC_EVENT, State::TIME_SYNC},
53
54 // In ABILITY_SYNC state, compare version num and schema
55 {State::ABILITY_SYNC, Event::VERSION_NOT_SUPPOR_EVENT, State::INNER_ERR},
56 {State::ABILITY_SYNC, Event::ABILITY_SYNC_FINISHED_EVENT, State::START_INITIACTIVE_DATA_SYNC},
57 {State::ABILITY_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
58 {State::ABILITY_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
59 {State::ABILITY_SYNC, Event::CONTROL_CMD_EVENT, State::SYNC_CONTROL_CMD},
60 {State::ABILITY_SYNC, Event::NEED_RESYNC_EVENT, State::ABILITY_SYNC},
61
62 // In START_INITIACTIVE_DATA_SYNC state, send a sync request, and send first packt of data sync
63 {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
64 {State::START_INITIACTIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
65 {State::START_INITIACTIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
66 {State::START_INITIACTIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
67 {State::START_INITIACTIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_INITIACTIVE_DATA_SYNC},
68 {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_TIME_SYNC_EVENT, State::TIME_SYNC},
69 {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_RESYNC_EVENT, State::START_INITIACTIVE_DATA_SYNC},
70
71 // In START_PASSIVE_DATA_SYNC state, do response pull request, and send first packt of data sync
72 {State::START_PASSIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
73 {State::START_PASSIVE_DATA_SYNC, Event::RESPONSE_TASK_FINISHED_EVENT, State::WAIT_FOR_RECEIVE_DATA_FINISH},
74 {State::START_PASSIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
75 {State::START_PASSIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
76 {State::START_PASSIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
77 {State::START_PASSIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_PASSIVE_DATA_SYNC},
78 {State::START_PASSIVE_DATA_SYNC, Event::NEED_TIME_SYNC_EVENT, State::TIME_SYNC},
79 {State::START_PASSIVE_DATA_SYNC, Event::NEED_RESYNC_EVENT, State::START_PASSIVE_DATA_SYNC},
80
81 // In WAIT_FOR_RECEIVE_DATA_FINISH,
82 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::RECV_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
83 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::START_PULL_RESPONSE_EVENT, State::START_PASSIVE_DATA_SYNC},
84 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
85 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::INNER_ERR_EVENT, State::INNER_ERR},
86 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
87 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::NEED_RESYNC_EVENT, State::START_PASSIVE_DATA_SYNC},
88
89 {State::SYNC_CONTROL_CMD, Event::SEND_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
90 {State::SYNC_CONTROL_CMD, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
91 {State::SYNC_CONTROL_CMD, Event::INNER_ERR_EVENT, State::INNER_ERR},
92 {State::SYNC_CONTROL_CMD, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
93 {State::SYNC_CONTROL_CMD, Event::NEED_RESYNC_EVENT, State::SYNC_CONTROL_CMD},
94
95 // In SYNC_TASK_FINISHED,
96 {State::SYNC_TASK_FINISHED, Event::ALL_TASK_FINISHED_EVENT, State::IDLE},
97 {State::SYNC_TASK_FINISHED, Event::START_SYNC_EVENT, State::TIME_SYNC},
98
99 // SYNC_TIME_OUT and INNE_ERR state, just do some exception resolve
100 {State::SYNC_TIME_OUT, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
101 {State::INNER_ERR, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
102 };
103 }
104
105 std::mutex SingleVerSyncStateMachine::stateSwitchTableLock_;
106 std::vector<StateSwitchTable> SingleVerSyncStateMachine::stateSwitchTables_;
107 bool SingleVerSyncStateMachine::isStateSwitchTableInited_ = false;
108
SingleVerSyncStateMachine()109 SingleVerSyncStateMachine::SingleVerSyncStateMachine()
110 : context_(nullptr),
111 syncInterface_(nullptr),
112 timeSync_(nullptr),
113 abilitySync_(nullptr),
114 dataSync_(nullptr),
115 currentRemoteVersionId_(0)
116 {
117 }
118
~SingleVerSyncStateMachine()119 SingleVerSyncStateMachine::~SingleVerSyncStateMachine()
120 {
121 LOGD("~SingleVerSyncStateMachine");
122 Clear();
123 }
124
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metaData,ICommunicator * communicator)125 int SingleVerSyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
126 const std::shared_ptr<Metadata> &metaData, ICommunicator *communicator)
127 {
128 if ((context == nullptr) || (syncInterface == nullptr) || (metaData == nullptr) || (communicator == nullptr)) {
129 return -E_INVALID_ARGS;
130 }
131
132 int errCode = SyncStateMachine::Initialize(context, syncInterface, metaData, communicator);
133 if (errCode != E_OK) {
134 return errCode;
135 }
136
137 timeSync_ = std::make_shared<TimeSync>();
138 dataSync_ = std::make_shared<SingleVerDataSync>();
139 abilitySync_ = std::make_unique<AbilitySync>();
140 if ((timeSync_ == nullptr) || (dataSync_ == nullptr) || (abilitySync_ == nullptr)) {
141 timeSync_ = nullptr;
142 dataSync_ = nullptr;
143 abilitySync_ = nullptr;
144 return -E_OUT_OF_MEMORY;
145 }
146
147 errCode = timeSync_->Initialize(communicator, metaData, syncInterface, context->GetDeviceId(),
148 context->GetTargetUserId());
149 if (errCode != E_OK) {
150 goto ERROR_OUT;
151 }
152 errCode = dataSync_->Initialize(syncInterface, communicator, metaData, context->GetDeviceId());
153 if (errCode != E_OK) {
154 goto ERROR_OUT;
155 }
156 errCode = abilitySync_->Initialize(communicator, syncInterface, metaData, context->GetDeviceId());
157 if (errCode != E_OK) {
158 goto ERROR_OUT;
159 }
160 abilitySync_->InitAbilitySyncFinishStatus(*context);
161
162 currentState_ = IDLE;
163 context_ = static_cast<SingleVerSyncTaskContext *>(context);
164 syncInterface_ = static_cast<SyncGenericInterface *>(syncInterface);
165
166 InitStateSwitchTables();
167 InitStateMapping();
168 return E_OK;
169
170 ERROR_OUT:
171 Clear();
172 return errCode;
173 }
174
SyncStep()175 void SingleVerSyncStateMachine::SyncStep()
176 {
177 RefObject::IncObjRef(context_);
178 RefObject::IncObjRef(communicator_);
179 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this] { SyncStepInnerLocked(); });
180 if (errCode != E_OK) {
181 LOGE("[StateMachine][SyncStep] Schedule SyncStep failed");
182 RefObject::DecObjRef(communicator_);
183 RefObject::DecObjRef(context_);
184 }
185 }
186
ReceiveMessageCallback(Message * inMsg)187 int SingleVerSyncStateMachine::ReceiveMessageCallback(Message *inMsg)
188 {
189 int errCode = MessageCallbackPre(inMsg);
190 if (errCode != E_OK) {
191 LOGE("[StateMachine] message pre check failed");
192 return errCode;
193 }
194 if (context_->IsNeedRetrySync(inMsg->GetErrorNo(), inMsg->GetMessageType())) {
195 SwitchStateAndStep(NEED_RESYNC_EVENT);
196 return E_OK;
197 }
198 switch (inMsg->GetMessageId()) {
199 case TIME_SYNC_MESSAGE:
200 errCode = TimeMarkSyncRecv(inMsg);
201 break;
202 case ABILITY_SYNC_MESSAGE:
203 errCode = AbilitySyncRecv(inMsg);
204 break;
205 case DATA_SYNC_MESSAGE:
206 case QUERY_SYNC_MESSAGE:
207 errCode = DataPktRecv(inMsg);
208 break;
209 case CONTROL_SYNC_MESSAGE:
210 errCode = ControlPktRecv(inMsg);
211 break;
212 default:
213 errCode = -E_NOT_SUPPORT;
214 }
215 return errCode;
216 }
217
SyncStepInnerLocked()218 void SingleVerSyncStateMachine::SyncStepInnerLocked()
219 {
220 if (context_->IncUsedCount() != E_OK) {
221 goto SYNC_STEP_OUT;
222 }
223 {
224 std::lock_guard<std::mutex> lock(stateMachineLock_);
225 SyncStepInner();
226 }
227 context_->SafeExit();
228
229 SYNC_STEP_OUT:
230 RefObject::DecObjRef(communicator_);
231 RefObject::DecObjRef(context_);
232 }
233
SyncStepInner()234 void SingleVerSyncStateMachine::SyncStepInner()
235 {
236 Event event = INNER_ERR_EVENT;
237 do {
238 auto iter = stateMapping_.find(currentState_);
239 if (iter != stateMapping_.end()) {
240 event = static_cast<Event>(iter->second());
241 } else {
242 LOGE("[StateMachine][SyncStepInner] can not find state=%d,label=%s,dev=%s", currentState_,
243 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
244 break;
245 }
246 } while (event != Event::WAIT_ACK_EVENT && SwitchMachineState(event) == E_OK && currentState_ != IDLE);
247 }
248
SetCurStateErrStatus()249 void SingleVerSyncStateMachine::SetCurStateErrStatus()
250 {
251 currentState_ = State::INNER_ERR;
252 }
253
StartSyncInner()254 int SingleVerSyncStateMachine::StartSyncInner()
255 {
256 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
257 if (performance != nullptr) {
258 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
259 }
260 int errCode = PrepareNextSyncTask();
261 if (errCode == E_OK) {
262 SwitchStateAndStep(Event::START_SYNC_EVENT);
263 }
264 return errCode;
265 }
266
AbortInner()267 void SingleVerSyncStateMachine::AbortInner()
268 {
269 LOGE("[StateMachine][AbortInner] error occurred,abort,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
270 STR_MASK(context_->GetDeviceId()));
271 if (context_->IsKilled()) {
272 dataSync_->ClearDataMsg();
273 }
274 dataSync_->ClearSyncStatus();
275 ContinueToken token;
276 context_->GetContinueToken(token);
277 if (token != nullptr) {
278 syncInterface_->ReleaseContinueToken(token);
279 }
280 context_->SetContinueToken(nullptr);
281 context_->Clear();
282 }
283
GetStateSwitchTables() const284 const std::vector<StateSwitchTable> &SingleVerSyncStateMachine::GetStateSwitchTables() const
285 {
286 return stateSwitchTables_;
287 }
288
PrepareNextSyncTask()289 int SingleVerSyncStateMachine::PrepareNextSyncTask()
290 {
291 int errCode = StartWatchDog();
292 if (errCode != E_OK && errCode != -E_UNEXPECTED_DATA) {
293 LOGE("[StateMachine][PrepareNextSyncTask] WatchDog start failed,err=%d", errCode);
294 return errCode;
295 }
296 if (errCode == -E_UNEXPECTED_DATA) {
297 LOGI("[PrepareNextSyncTask] timer already exists, reset the timer.");
298 (void)ResetWatchDog();
299 }
300
301 if (currentState_ != State::IDLE && currentState_ != State::SYNC_TASK_FINISHED) {
302 LOGW("[StateMachine][PrepareNextSyncTask] PreSync may get an err, state=%" PRIu8 ",dev=%s",
303 currentState_, STR_MASK(context_->GetDeviceId()));
304 currentState_ = State::IDLE;
305 }
306 return E_OK;
307 }
308
SendNotifyPacket(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)309 void SingleVerSyncStateMachine::SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
310 {
311 dataSync_->SendSaveDataNotifyPacket(context_,
312 std::min(context_->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT), sessionId, sequenceId, inMsgId);
313 }
314
CommErrAbort(uint32_t sessionId)315 void SingleVerSyncStateMachine::CommErrAbort(uint32_t sessionId)
316 {
317 std::lock_guard<std::mutex> lock(stateMachineLock_);
318 uint32_t requestSessionId = context_->GetRequestSessionId();
319 if ((sessionId != 0) && ((requestSessionId == 0) || (sessionId != requestSessionId))) {
320 return;
321 }
322 context_->SetCommNormal(false);
323 if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
324 SyncStep();
325 }
326 }
327
InitStateSwitchTables()328 void SingleVerSyncStateMachine::InitStateSwitchTables()
329 {
330 if (isStateSwitchTableInited_) {
331 return;
332 }
333
334 std::lock_guard<std::mutex> lock(stateSwitchTableLock_);
335 if (isStateSwitchTableInited_) {
336 return;
337 }
338
339 InitStateSwitchTable(SINGLE_VER_SYNC_PROCTOL_V3, STATE_SWITCH_TABLE_V3);
340 std::sort(stateSwitchTables_.begin(), stateSwitchTables_.end(),
341 [](const auto &tableA, const auto &tableB) {
342 return tableA.version > tableB.version;
343 }); // descending
344 isStateSwitchTableInited_ = true;
345 }
346
InitStateSwitchTable(uint32_t version,const std::vector<std::vector<uint8_t>> & switchTable)347 void SingleVerSyncStateMachine::InitStateSwitchTable(uint32_t version,
348 const std::vector<std::vector<uint8_t>> &switchTable)
349 {
350 StateSwitchTable table;
351 table.version = version;
352 for (const auto &stateSwitch : switchTable) {
353 if (stateSwitch.size() <= OUTPUT_STATE_INDEX) {
354 LOGE("[StateMachine][InitSwitchTable] stateSwitch size err,size=%zu", stateSwitch.size());
355 return;
356 }
357 if (table.switchTable.count(stateSwitch[CURRENT_STATE_INDEX]) == 0) {
358 EventToState eventToState; // new EventToState
359 eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
360 table.switchTable[stateSwitch[CURRENT_STATE_INDEX]] = eventToState;
361 } else { // key stateSwitch[CURRENT_STATE_INDEX] already has EventToState
362 EventToState &eventToState = table.switchTable[stateSwitch[CURRENT_STATE_INDEX]];
363 eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
364 }
365 }
366 stateSwitchTables_.push_back(table);
367 }
368
InitStateMapping()369 void SingleVerSyncStateMachine::InitStateMapping()
370 {
371 stateMapping_[TIME_SYNC] = [this] { return DoTimeSync(); };
372 stateMapping_[ABILITY_SYNC] = [this] { return DoAbilitySync(); };
373 stateMapping_[WAIT_FOR_RECEIVE_DATA_FINISH] = [this] { return DoWaitForDataRecv(); };
374 stateMapping_[SYNC_TASK_FINISHED] = [this] { return DoSyncTaskFinished(); };
375 stateMapping_[SYNC_TIME_OUT] = [this] { return DoTimeout(); };
376 stateMapping_[INNER_ERR] = [this] { return DoInnerErr(); };
377 stateMapping_[START_INITIACTIVE_DATA_SYNC] = [this] { return DoInitiactiveDataSyncWithSlidingWindow(); };
378 stateMapping_[START_PASSIVE_DATA_SYNC] = [this] { return DoPassiveDataSyncWithSlidingWindow(); };
379 stateMapping_[SYNC_CONTROL_CMD] = [this] { return DoInitiactiveControlSync(); };
380 }
381
DoInitiactiveDataSyncWithSlidingWindow() const382 Event SingleVerSyncStateMachine::DoInitiactiveDataSyncWithSlidingWindow() const
383 {
384 LOGD("[StateMachine][activeDataSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
385 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
386 int errCode = E_OK;
387 switch (context_->GetMode()) {
388 case SyncModeType::PUSH:
389 case SyncModeType::QUERY_PUSH:
390 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
391 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
392 break;
393 case SyncModeType::PULL:
394 case SyncModeType::QUERY_PULL:
395 context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
396 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
397 break;
398 case SyncModeType::PUSH_AND_PULL:
399 case SyncModeType::QUERY_PUSH_PULL:
400 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
401 break;
402 case SyncModeType::RESPONSE_PULL:
403 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
404 break;
405 default:
406 errCode = -E_NOT_SUPPORT;
407 break;
408 }
409 if (errCode == E_OK) {
410 return Event::WAIT_ACK_EVENT;
411 }
412 // once E_EKEYREVOKED error occurred, PUSH_AND_PULL mode should wait for ack to pull remote data.
413 if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
414 errCode == -E_EKEYREVOKED) {
415 return Event::WAIT_ACK_EVENT;
416 }
417
418 // when response task step dataSync again while request task is running, ignore the errCode
419 bool ignoreInnerErr = (context_->GetResponseSessionId() != 0 && context_->GetRequestSessionId() != 0);
420 Event event = TransformErrCodeToEvent(errCode);
421 return (ignoreInnerErr && event == INNER_ERR_EVENT) ? SEND_FINISHED_EVENT : event;
422 }
423
DoPassiveDataSyncWithSlidingWindow()424 Event SingleVerSyncStateMachine::DoPassiveDataSyncWithSlidingWindow()
425 {
426 {
427 RefObject::AutoLock lock(context_);
428 if (context_->GetRspTargetQueueSize() != 0) {
429 PreStartPullResponse();
430 } else if (context_->GetResponseSessionId() == 0 ||
431 context_->GetRetryStatus() == static_cast<int>(SyncTaskContext::NO_NEED_RETRY)) {
432 return RESPONSE_TASK_FINISHED_EVENT;
433 }
434 }
435 int errCode = dataSync_->SyncStart(SyncModeType::RESPONSE_PULL, context_);
436 if (errCode != E_OK) {
437 LOGE("[SingleVerSyncStateMachine][DoPassiveDataSyncWithSlidingWindow] response pull send failed[%d]", errCode);
438 return RESPONSE_TASK_FINISHED_EVENT;
439 }
440 return Event::WAIT_ACK_EVENT;
441 }
442
DoInitiactiveControlSync() const443 Event SingleVerSyncStateMachine::DoInitiactiveControlSync() const
444 {
445 LOGD("[StateMachine][activeControlSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
446 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
447 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
448 int errCode = dataSync_->ControlCmdStart(context_);
449 if (errCode == E_OK) {
450 return Event::WAIT_ACK_EVENT;
451 }
452 context_->SetTaskErrCode(errCode);
453 return TransformErrCodeToEvent(errCode);
454 }
455
HandleControlAckRecv(const Message * inMsg)456 int SingleVerSyncStateMachine::HandleControlAckRecv(const Message *inMsg)
457 {
458 std::lock_guard<std::mutex> lock(stateMachineLock_);
459 if (IsNeedResetWatchdog(inMsg)) {
460 (void)ResetWatchDog();
461 }
462 int errCode = dataSync_->ControlCmdAckRecv(context_, inMsg);
463 ControlAckRecvErrCodeHandle(errCode);
464 SwitchStateAndStep(TransformErrCodeToEvent(errCode));
465 return E_OK;
466 }
467
DoWaitForDataRecv() const468 Event SingleVerSyncStateMachine::DoWaitForDataRecv() const
469 {
470 if (context_->GetRspTargetQueueSize() != 0) {
471 return START_PULL_RESPONSE_EVENT;
472 }
473 if (context_->GetOperationStatus() == SyncOperation::OP_FINISHED_ALL) {
474 return RECV_FINISHED_EVENT;
475 }
476 if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
477 context_->GetOperationStatus() == SyncOperation::OP_EKEYREVOKED_FAILURE &&
478 context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
479 return RECV_FINISHED_EVENT;
480 }
481 return Event::WAIT_ACK_EVENT;
482 }
483
DoTimeSync() const484 Event SingleVerSyncStateMachine::DoTimeSync() const
485 {
486 timeSync_->SetTimeSyncFinishIfNeed();
487 if (timeSync_->IsNeedSync()) {
488 CommErrHandler handler = nullptr;
489 // Auto sync need do retry don't use errHandler to return.
490 if (!context_->IsAutoSync()) {
491 handler = [this, context = context_,
492 requestSessionId = context_->GetRequestSessionId()](int ret, bool isDirectEnd) {
493 SyncTaskContext::CommErrHandlerFunc(ret, context, requestSessionId, isDirectEnd);
494 };
495 }
496 int errCode = timeSync_->SyncStart(handler, context_->GetRequestSessionId(), context_->IsRetryTask());
497 if (errCode == E_OK) {
498 return Event::WAIT_ACK_EVENT;
499 }
500 context_->SetTaskErrCode(errCode);
501 return TransformErrCodeToEvent(errCode);
502 }
503
504 return Event::TIME_SYNC_FINISHED_EVENT;
505 }
506
DoAbilitySync() const507 Event SingleVerSyncStateMachine::DoAbilitySync() const
508 {
509 uint16_t remoteCommunicatorVersion = 0;
510 int errCode = communicator_->GetRemoteCommunicatorVersion(context_->GetDeviceId(), remoteCommunicatorVersion);
511 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
512 LOGE("[StateMachine][DoAbilitySync] Get RemoteCommunicatorVersion errCode=%d", errCode);
513 return Event::INNER_ERR_EVENT;
514 }
515 // Fistr version, not support AbilitySync
516 if (remoteCommunicatorVersion == 0 && errCode != -E_NOT_FOUND) {
517 context_->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
518 return GetEventAfterTimeSync(context_->GetMode());
519 }
520 if (abilitySync_->GetAbilitySyncFinishedStatus()) {
521 return GetEventAfterTimeSync(context_->GetMode());
522 }
523
524 CommErrHandler handler = [this, context = context_,
525 requestSessionId = context_->GetRequestSessionId()](int ret, bool isDirectEnd) {
526 SyncTaskContext::CommErrHandlerFunc(ret, context, requestSessionId, isDirectEnd);
527 };
528 LOGI("[StateMachine][AbilitySync] start abilitySync,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
529 STR_MASK(context_->GetDeviceId()));
530 errCode = abilitySync_->SyncStart(context_->GetRequestSessionId(), context_->GetSequenceId(),
531 remoteCommunicatorVersion, handler, context_);
532 if (errCode != E_OK) {
533 LOGE("[StateMachine][DoAbilitySync] ability sync start failed,errCode=%d", errCode);
534 context_->SetTaskErrCode(errCode);
535 return TransformErrCodeToEvent(errCode);
536 }
537 return Event::WAIT_ACK_EVENT;
538 }
539
GetEventAfterTimeSync(int mode) const540 Event SingleVerSyncStateMachine::GetEventAfterTimeSync(int mode) const
541 {
542 if (mode == SyncModeType::SUBSCRIBE_QUERY || mode == SyncModeType::UNSUBSCRIBE_QUERY) {
543 return Event::CONTROL_CMD_EVENT;
544 }
545 return Event::ABILITY_SYNC_FINISHED_EVENT;
546 }
547
DoSyncTaskFinished()548 Event SingleVerSyncStateMachine::DoSyncTaskFinished()
549 {
550 StopWatchDog();
551 context_->ResetResyncTimes();
552 if (dataSync_ == nullptr || communicator_ == nullptr || syncContext_ == nullptr) {
553 LOGE("[SingleVerSyncStateMachine] [DoSyncTaskFinished] dataSync_ or communicator_ or syncContext_ is nullptr.");
554 return TransformErrCodeToEvent(-E_OUT_OF_MEMORY);
555 }
556 dataSync_->ClearSyncStatus();
557 auto timeout = communicator_->GetTimeout(syncContext_->GetDeviceId());
558 RefObject::AutoLock lock(syncContext_);
559 int errCode = ExecNextTask(timeout);
560 if (errCode == E_OK) {
561 return Event::START_SYNC_EVENT;
562 }
563 return TransformErrCodeToEvent(errCode);
564 }
565
DoTimeout()566 Event SingleVerSyncStateMachine::DoTimeout()
567 {
568 RefObject::AutoLock lock(context_);
569 if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
570 std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
571 if (subManager != nullptr) {
572 subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
573 }
574 }
575 context_->Abort(SyncOperation::OP_TIMEOUT);
576 context_->Clear();
577 AbortInner();
578 return Event::ANY_EVENT;
579 }
580
DoInnerErr()581 Event SingleVerSyncStateMachine::DoInnerErr()
582 {
583 RefObject::AutoLock lock(context_);
584 if (!context_->IsCommNormal()) {
585 if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
586 std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
587 if (subManager != nullptr) {
588 subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
589 }
590 }
591 context_->Abort(SyncOperation::OP_COMM_ABNORMAL);
592 } else {
593 int status = GetSyncOperationStatus(context_->GetTaskErrCode());
594 context_->Abort(status);
595 }
596 context_->Clear();
597 AbortInner();
598 return Event::ANY_EVENT;
599 }
600
AbilitySyncRecv(const Message * inMsg)601 int SingleVerSyncStateMachine::AbilitySyncRecv(const Message *inMsg)
602 {
603 if (inMsg->GetMessageType() == TYPE_REQUEST) {
604 int errCode = abilitySync_->RequestRecv(inMsg, context_);
605 if (errCode != E_OK && inMsg->GetSessionId() == context_->GetResponseSessionId()) {
606 context_->SetTaskErrCode(errCode);
607 std::lock_guard<std::mutex> lock(stateMachineLock_);
608 SwitchStateAndStep(Event::INNER_ERR_EVENT);
609 }
610 return E_OK;
611 }
612 return AbilitySyncResponseRecv(inMsg);
613 }
614
AbilitySyncResponseRecv(const Message * inMsg)615 int SingleVerSyncStateMachine::AbilitySyncResponseRecv(const Message *inMsg)
616 {
617 if (inMsg->GetMessageType() == TYPE_RESPONSE && AbilityMsgSessionIdCheck(inMsg)) {
618 std::lock_guard<std::mutex> lock(stateMachineLock_);
619 int errCode = abilitySync_->AckRecv(inMsg, context_);
620 (void)ResetWatchDog();
621 if (errCode == -E_ABILITY_SYNC_FINISHED) {
622 LOGI("[StateMachine][AbilitySyncRecv] ability sync finished with both kv,label=%s,dev=%s",
623 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
624 abilitySync_->SetAbilitySyncFinishedStatus(true, *context_);
625 JumpStatusAfterAbilitySync(context_->GetMode());
626 } else if (errCode != E_OK) {
627 LOGE("[StateMachine][AbilitySyncRecv] handle ackRecv failed,errCode=%d", errCode);
628 SwitchStateAndStep(TransformErrCodeToEvent(errCode));
629 } else if (context_->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_2_0) {
630 abilitySync_->SetAbilitySyncFinishedStatus(true, *context_);
631 LOGI("[StateMachine][AbilitySyncRecv] ability Sync Finished,label=%s,dev=%s",
632 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
633 currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
634 JumpStatusAfterAbilitySync(context_->GetMode());
635 }
636 return E_OK;
637 }
638 if (inMsg->GetMessageType() == TYPE_NOTIFY) {
639 return AbilitySyncNotifyRecv(inMsg);
640 }
641 LOGE("[StateMachine][AbilitySyncRecv] msg type invalid");
642 return -E_NOT_SUPPORT;
643 }
644
HandleDataRequestRecv(const Message * inMsg)645 int SingleVerSyncStateMachine::HandleDataRequestRecv(const Message *inMsg)
646 {
647 TimeOffset offset = 0;
648 auto [systemOffset, senderLocalOffset] = SingleVerDataSyncUtils::GetTimeOffsetFromRequestMsg(inMsg);
649 int errCode = timeSync_->GenerateTimeOffsetIfNeed(systemOffset, senderLocalOffset);
650 if (errCode != E_OK) {
651 (void)dataSync_->SendDataAck(context_, inMsg, errCode, 0);
652 return errCode;
653 }
654 uint32_t timeout = communicator_->GetTimeout(context_->GetDeviceId());
655 // If message is data sync request, we should check timeoffset.
656 errCode = timeSync_->GetTimeOffset(offset, timeout);
657 if (errCode != E_OK) {
658 LOGE("[StateMachine][HandleDataRequestRecv] GetTimeOffset err! errCode=%d", errCode);
659 return errCode;
660 }
661 context_->SetTimeOffset(offset);
662 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
663 if (performance != nullptr) {
664 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
665 }
666 DecRefCountOfFeedDogTimer(SyncDirectionFlag::RECEIVE);
667
668 // RequestRecv will save data, it may cost a long time.
669 // So we need to send save data notify to keep remote alive.
670 bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
671 {
672 std::lock_guard<std::mutex> lockWatchDog(stateMachineLock_);
673 if (IsNeedResetWatchdog(inMsg)) {
674 (void)ResetWatchDog();
675 }
676 }
677 WaterMark pullEndWaterkark = 0;
678 errCode = dataSync_->DataRequestRecv(context_, inMsg, pullEndWaterkark);
679 if (performance != nullptr) {
680 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
681 }
682 // only higher than 102 version receive this errCode here.
683 // while both RequestSessionId is not equal,but get this errCode;slwr would seem to handle first secquencid.
684 // so while receive the same secquencid after abiitysync it wouldn't handle.
685 if (errCode == -E_NEED_ABILITY_SYNC) {
686 if (isNeedStop) {
687 StopSaveDataNotify();
688 }
689 return errCode;
690 }
691 {
692 std::lock_guard<std::mutex> lock(stateMachineLock_);
693 DataRecvErrCodeHandle(inMsg->GetSessionId(), errCode);
694 if (pullEndWaterkark > 0) {
695 AddPullResponseTarget(inMsg, pullEndWaterkark);
696 }
697 }
698 if (isNeedStop) {
699 StopSaveDataNotify();
700 }
701 return E_OK;
702 }
703
HandleDataAckRecvWithSlidingWindow(int errCode,const Message * inMsg,bool ignoreInnerErr)704 void SingleVerSyncStateMachine::HandleDataAckRecvWithSlidingWindow(int errCode, const Message *inMsg,
705 bool ignoreInnerErr)
706 {
707 if (errCode == -E_RE_SEND_DATA) { // LOCAL_WATER_MARK_NOT_INIT
708 dataSync_->ClearSyncStatus();
709 }
710 if (errCode == -E_NO_DATA_SEND || errCode == -E_SEND_DATA) {
711 int ret = dataSync_->TryContinueSync(context_, inMsg);
712 if (ret == -E_FINISHED) {
713 SwitchStateAndStep(Event::SEND_FINISHED_EVENT);
714 return;
715 } else if (ret == E_OK) { // do nothing and waiting for all ack receive
716 return;
717 }
718 errCode = ret;
719 }
720 ResponsePullError(errCode, ignoreInnerErr);
721 }
722
NeedAbilitySyncHandle()723 void SingleVerSyncStateMachine::NeedAbilitySyncHandle()
724 {
725 // if the remote device version num is overdue,
726 // mean the version num has been reset when syncing data,
727 // there should not clear the new version cache again.
728 if (currentRemoteVersionId_ == context_->GetRemoteSoftwareVersionId()) {
729 LOGI("[StateMachine] set remote version 0, currentRemoteVersionId_ = %" PRIu64, currentRemoteVersionId_);
730 context_->SetRemoteSoftwareVersion(0);
731 } else {
732 currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
733 }
734 abilitySync_->SetAbilitySyncFinishedStatus(false, *context_);
735 }
736
HandleDataAckRecv(const Message * inMsg)737 int SingleVerSyncStateMachine::HandleDataAckRecv(const Message *inMsg)
738 {
739 if (inMsg->GetMessageType() == TYPE_RESPONSE) {
740 DecRefCountOfFeedDogTimer(SyncDirectionFlag::SEND);
741 }
742 std::lock_guard<std::mutex> lock(stateMachineLock_);
743 // Unfortunately we use stateMachineLock_ in many sync process
744 // So a bad ack will check before the lock and wait
745 // And then another process is running, it will get the lock.After this process, the ack became invalid.
746 // If we don't check ack again, it will be delivered to dataSyncer.
747 if (!IsPacketValid(inMsg)) {
748 return -E_INVALID_ARGS;
749 }
750 if (IsNeedResetWatchdog(inMsg)) {
751 (void)ResetWatchDog();
752 }
753 if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 && !dataSync_->AckPacketIdCheck(inMsg)) {
754 // packetId not match but sequence id matched scene, means resend map has be rebuilt
755 // this is old ack, should be dropped and wait for the same packetId sequence.
756 return E_OK;
757 }
758 // AckRecv will save meta data, it may cost a long time. if another thread is saving data
759 // So we need to send save data notify to keep remote alive.
760 // e.g. remote do pull sync
761 bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
762 int errCode = dataSync_->AckRecv(context_, inMsg);
763 if (isNeedStop) {
764 StopSaveDataNotify();
765 }
766 if (errCode == -E_NEED_ABILITY_SYNC || errCode == -E_RE_SEND_DATA || errCode == -E_NEED_TIME_SYNC) {
767 StopFeedDogForSync(SyncDirectionFlag::SEND);
768 dataSync_->ClearSyncStatus();
769 context_->ReSetSequenceId();
770 } else if (errCode == -E_SAVE_DATA_NOTIFY) {
771 return errCode;
772 }
773 // when this msg is from response task while request task is running, ignore the errCode
774 bool ignoreInnerErr = inMsg->GetSessionId() == context_->GetResponseSessionId() &&
775 context_->GetRequestSessionId() != 0;
776 DataAckRecvErrCodeHandle(errCode, !ignoreInnerErr);
777 HandleDataAckRecvWithSlidingWindow(errCode, inMsg, ignoreInnerErr);
778 return errCode;
779 }
780
DataPktRecv(Message * inMsg)781 int SingleVerSyncStateMachine::DataPktRecv(Message *inMsg)
782 {
783 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
784 int errCode = E_OK;
785 switch (inMsg->GetMessageType()) {
786 case TYPE_REQUEST:
787 ScheduleMsgAndHandle(inMsg);
788 errCode = -E_NOT_NEED_DELETE_MSG;
789 break;
790 case TYPE_RESPONSE:
791 case TYPE_NOTIFY:
792 if (performance != nullptr) {
793 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
794 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
795 }
796 errCode = HandleDataAckRecv(inMsg);
797 break;
798 default:
799 errCode = -E_INVALID_ARGS;
800 break;
801 }
802 return errCode;
803 }
804
ScheduleMsgAndHandle(Message * inMsg)805 void SingleVerSyncStateMachine::ScheduleMsgAndHandle(Message *inMsg)
806 {
807 dataSync_->PutDataMsg(inMsg);
808 while (true) {
809 bool isNeedHandle = true;
810 bool isNeedContinue = true;
811 Message *msg = dataSync_->MoveNextDataMsg(context_, isNeedHandle, isNeedContinue);
812 if (!isNeedContinue) {
813 break;
814 }
815 if (msg == nullptr) {
816 if (dataSync_->IsNeedReloadQueue()) {
817 continue;
818 }
819 dataSync_->ScheduleInfoHandle(false, false, nullptr);
820 break;
821 }
822 bool isNeedClearMap = false;
823 if (isNeedHandle) {
824 int errCode = HandleDataRequestRecv(msg);
825 if (context_->IsReceiveWaterMarkErr() || errCode == -E_NEED_ABILITY_SYNC || errCode == -E_NEED_TIME_SYNC) {
826 isNeedClearMap = true;
827 }
828 if (errCode == -E_TIMEOUT) {
829 isNeedHandle = false;
830 }
831 } else {
832 dataSync_->SendFinishedDataAck(context_, msg);
833 }
834 if (context_->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
835 // for lower version, no need to handle map schedule info, just reset schedule working status
836 isNeedHandle = false;
837 }
838 dataSync_->ScheduleInfoHandle(isNeedHandle, isNeedClearMap, msg);
839 delete msg;
840 }
841 }
842
ControlPktRecv(Message * inMsg)843 int SingleVerSyncStateMachine::ControlPktRecv(Message *inMsg)
844 {
845 int errCode = E_OK;
846 switch (inMsg->GetMessageType()) {
847 case TYPE_REQUEST:
848 errCode = dataSync_->ControlCmdRequestRecv(context_, inMsg);
849 break;
850 case TYPE_RESPONSE:
851 errCode = HandleControlAckRecv(inMsg);
852 break;
853 default:
854 errCode = -E_INVALID_ARGS;
855 break;
856 }
857 return errCode;
858 }
859
StepToTimeout(TimerId timerId)860 void SingleVerSyncStateMachine::StepToTimeout(TimerId timerId)
861 {
862 std::lock_guard<std::mutex> lock(stateMachineLock_);
863 TimerId timer = syncContext_->GetTimerId();
864 if (timer != timerId) {
865 return;
866 }
867 SwitchStateAndStep(Event::TIME_OUT_EVENT);
868 }
869
870 namespace {
871 struct StateNode {
872 int errCode = 0;
873 SyncOperation::Status status = SyncOperation::OP_WAITING;
874 };
875 }
GetSyncOperationStatus(int errCode) const876 int SingleVerSyncStateMachine::GetSyncOperationStatus(int errCode) const
877 {
878 static const StateNode stateNodes[] = {
879 { -E_SCHEMA_MISMATCH, SyncOperation::OP_SCHEMA_INCOMPATIBLE },
880 { -E_EKEYREVOKED, SyncOperation::OP_EKEYREVOKED_FAILURE },
881 { -E_SECURITY_OPTION_CHECK_ERROR, SyncOperation::OP_SECURITY_OPTION_CHECK_FAILURE },
882 { -E_BUSY, SyncOperation::OP_BUSY_FAILURE },
883 { -E_NOT_PERMIT, SyncOperation::OP_PERMISSION_CHECK_FAILED },
884 { -E_TIMEOUT, SyncOperation::OP_TIMEOUT },
885 { -E_INVALID_QUERY_FORMAT, SyncOperation::OP_QUERY_FORMAT_FAILURE },
886 { -E_INVALID_QUERY_FIELD, SyncOperation::OP_QUERY_FIELD_FAILURE },
887 { -E_FEEDBACK_UNKNOWN_MESSAGE, SyncOperation::OP_NOT_SUPPORT },
888 { -E_FEEDBACK_COMMUNICATOR_NOT_FOUND, SyncOperation::OP_COMM_ABNORMAL },
889 { -E_NOT_SUPPORT, SyncOperation::OP_NOT_SUPPORT },
890 { -E_INTERCEPT_DATA_FAIL, SyncOperation::OP_INTERCEPT_DATA_FAIL },
891 { -E_MAX_LIMITS, SyncOperation::OP_MAX_LIMITS },
892 { -E_DISTRIBUTED_SCHEMA_CHANGED, SyncOperation::OP_SCHEMA_CHANGED },
893 { -E_NOT_REGISTER, SyncOperation::OP_NOT_SUPPORT },
894 { -E_DENIED_SQL, SyncOperation::OP_DENIED_SQL },
895 { -E_REMOTE_OVER_SIZE, SyncOperation::OP_MAX_LIMITS },
896 { -E_INVALID_PASSWD_OR_CORRUPTED_DB, SyncOperation::OP_NOTADB_OR_CORRUPTED },
897 { -E_DISTRIBUTED_SCHEMA_NOT_FOUND, SyncOperation::OP_SCHEMA_INCOMPATIBLE },
898 { -E_FEEDBACK_DB_CLOSING, SyncOperation::OP_DB_CLOSING },
899 };
900 const auto &result = std::find_if(std::begin(stateNodes), std::end(stateNodes), [errCode](const auto &node) {
901 return node.errCode == errCode;
902 });
903 return result == std::end(stateNodes) ? SyncOperation::OP_FAILED : static_cast<int>((*result).status);
904 }
905
TimeMarkSyncRecv(const Message * inMsg)906 int SingleVerSyncStateMachine::TimeMarkSyncRecv(const Message *inMsg)
907 {
908 LOGD("[StateMachine][TimeMarkSyncRecv] type=%d,label=%s,dev=%s", inMsg->GetMessageType(),
909 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
910 {
911 std::lock_guard<std::mutex> lock(stateMachineLock_);
912 (void)ResetWatchDog();
913 }
914 if (inMsg->GetMessageType() == TYPE_REQUEST) {
915 return timeSync_->RequestRecv(inMsg);
916 } else if (inMsg->GetMessageType() == TYPE_RESPONSE) {
917 int errCode = timeSync_->AckRecv(inMsg, context_->GetRequestSessionId());
918 if (errCode != E_OK) {
919 LOGE("[StateMachine][TimeMarkSyncRecv] AckRecv failed errCode=%d", errCode);
920 if (inMsg->GetSessionId() != 0 && inMsg->GetSessionId() == context_->GetRequestSessionId()) {
921 context_->SetTaskErrCode(errCode);
922 InnerErrorAbort(inMsg->GetSessionId());
923 }
924 return errCode;
925 }
926 std::lock_guard<std::mutex> lock(stateMachineLock_);
927 SwitchStateAndStep(TIME_SYNC_FINISHED_EVENT);
928 return E_OK;
929 } else {
930 return -E_INVALID_ARGS;
931 }
932 }
933
Clear()934 void SingleVerSyncStateMachine::Clear()
935 {
936 dataSync_ = nullptr;
937 timeSync_ = nullptr;
938 abilitySync_ = nullptr;
939 context_ = nullptr;
940 syncInterface_ = nullptr;
941 }
942
IsPacketValid(const Message * inMsg) const943 bool SingleVerSyncStateMachine::IsPacketValid(const Message *inMsg) const
944 {
945 if (inMsg == nullptr) {
946 return false;
947 }
948
949 if ((inMsg->GetMessageId() < TIME_SYNC_MESSAGE) || (inMsg->GetMessageId() >= UNKNOW_MESSAGE)) {
950 LOGE("[StateMachine][IsPacketValid] Message is invalid, id=%d", inMsg->GetMessageId());
951 return false;
952 }
953 // filter invalid ack at first
954 bool isResponseType = (inMsg->GetMessageType() == TYPE_RESPONSE);
955 if (isResponseType && (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) &&
956 (inMsg->GetSessionId() != context_->GetRequestSessionId())) {
957 LOGE("[StateMachine][IsPacketValid] Control Message is invalid, label=%s, dev=%s",
958 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
959 return false;
960 }
961 if (isResponseType && (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) &&
962 (inMsg->GetSessionId() != context_->GetRequestSessionId()) &&
963 (inMsg->GetSessionId() != context_->GetResponseSessionId())) {
964 LOGE("[StateMachine][IsPacketValid] Data Message is invalid, label=%s, dev=%s",
965 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
966 return false;
967 }
968
969 // timeSync and abilitySync don't need to check sequenceId
970 if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE || inMsg->GetMessageId() == ABILITY_SYNC_MESSAGE ||
971 inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
972 return true;
973 }
974 // sequenceId will be checked in dataSync
975 return true;
976 }
977
PreStartPullResponse()978 void SingleVerSyncStateMachine::PreStartPullResponse()
979 {
980 SingleVerSyncTarget target;
981 context_->PopResponseTarget(target);
982 context_->SetEndMark(target.GetEndWaterMark());
983 context_->SetResponseSessionId(target.GetResponseSessionId());
984 context_->SetMode(SyncModeType::RESPONSE_PULL);
985 context_->ReSetSequenceId();
986 context_->SetQuerySync(target.IsQuerySync());
987 context_->SetQuery(target.GetQuery());
988 }
989
CheckIsStartPullResponse() const990 bool SingleVerSyncStateMachine::CheckIsStartPullResponse() const
991 {
992 // Other state will step to do pull response, only this statem we need to step the statemachine
993 if (currentState_ == WAIT_FOR_RECEIVE_DATA_FINISH) {
994 return true;
995 }
996 return false;
997 }
998
MessageCallbackPre(const Message * inMsg)999 int SingleVerSyncStateMachine::MessageCallbackPre(const Message *inMsg)
1000 {
1001 RefObject::AutoLock lock(context_);
1002 if (context_->IsKilled()) {
1003 return -E_OBJ_IS_KILLED;
1004 }
1005
1006 if (!IsPacketValid(inMsg)) {
1007 return -E_INVALID_ARGS;
1008 }
1009 return E_OK;
1010 }
1011
AddPullResponseTarget(const Message * inMsg,WaterMark pullEndWatermark)1012 void SingleVerSyncStateMachine::AddPullResponseTarget(const Message *inMsg, WaterMark pullEndWatermark)
1013 {
1014 int messageType = static_cast<int>(inMsg->GetMessageId());
1015 uint32_t sessionId = inMsg->GetSessionId();
1016 if (pullEndWatermark == 0) {
1017 LOGE("[StateMachine][AddPullResponseTarget] pullEndWatermark is 0!");
1018 return;
1019 }
1020 if (context_->GetResponseSessionId() == sessionId || context_->FindResponseSyncTarget(sessionId)) {
1021 LOGI("[StateMachine][AddPullResponseTarget] task is already running");
1022 return;
1023 }
1024 const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
1025 if (packet == nullptr) {
1026 LOGE("[AddPullResponseTarget] get packet object failed");
1027 return;
1028 }
1029 SingleVerSyncTarget *targetTmp = new (std::nothrow) SingleVerSyncTarget;
1030 if (targetTmp == nullptr) {
1031 LOGE("[StateMachine][AddPullResponseTarget] add failed, may oom");
1032 return;
1033 }
1034 targetTmp->SetTaskType(ISyncTarget::RESPONSE);
1035 if (messageType == QUERY_SYNC_MESSAGE) {
1036 targetTmp->SetQuery(packet->GetQuery());
1037 targetTmp->SetQuerySync(true);
1038 }
1039 targetTmp->SetMode(SyncModeType::RESPONSE_PULL);
1040 targetTmp->SetEndWaterMark(pullEndWatermark);
1041 targetTmp->SetResponseSessionId(sessionId);
1042 if (context_->AddSyncTarget(targetTmp) != E_OK) {
1043 delete targetTmp;
1044 return;
1045 }
1046 if (CheckIsStartPullResponse()) {
1047 SwitchStateAndStep(TransformErrCodeToEvent(-E_NEED_PULL_REPONSE));
1048 }
1049 }
1050
TransformErrCodeToEvent(int errCode) const1051 Event SingleVerSyncStateMachine::TransformErrCodeToEvent(int errCode) const
1052 {
1053 switch (errCode) {
1054 case -E_TIMEOUT:
1055 return TransforTimeOutErrCodeToEvent();
1056 case -static_cast<int>(VERSION_NOT_SUPPOR_EVENT):
1057 return Event::VERSION_NOT_SUPPOR_EVENT;
1058 case -E_SEND_DATA:
1059 return Event::SEND_DATA_EVENT;
1060 case -E_NO_DATA_SEND:
1061 return Event::SEND_FINISHED_EVENT;
1062 case -E_RECV_FINISHED:
1063 return Event::RECV_FINISHED_EVENT;
1064 case -E_NEED_ABILITY_SYNC:
1065 return Event::NEED_ABILITY_SYNC_EVENT;
1066 case -E_NO_SYNC_TASK:
1067 return Event::ALL_TASK_FINISHED_EVENT;
1068 case -E_NEED_PULL_REPONSE:
1069 return Event::START_PULL_RESPONSE_EVENT;
1070 case -E_RE_SEND_DATA:
1071 return Event::RE_SEND_DATA_EVENT;
1072 case -E_NEED_TIME_SYNC:
1073 return Event::NEED_TIME_SYNC_EVENT;
1074 default:
1075 return Event::INNER_ERR_EVENT;
1076 }
1077 }
1078
IsNeedResetWatchdog(const Message * inMsg) const1079 bool SingleVerSyncStateMachine::IsNeedResetWatchdog(const Message *inMsg) const
1080 {
1081 if (inMsg == nullptr) {
1082 return false;
1083 }
1084
1085 if (IsNeedErrCodeHandle(inMsg->GetSessionId())) {
1086 return true;
1087 }
1088
1089 int msgType = inMsg->GetMessageType();
1090 if (msgType == TYPE_RESPONSE || msgType == TYPE_NOTIFY) {
1091 if (inMsg->GetSessionId() == context_->GetResponseSessionId()) {
1092 // Pull response ack also should reset watchdog
1093 return true;
1094 }
1095 }
1096
1097 return false;
1098 }
1099
TransforTimeOutErrCodeToEvent() const1100 Event SingleVerSyncStateMachine::TransforTimeOutErrCodeToEvent() const
1101 {
1102 if (syncContext_->IsSyncTaskNeedRetry() && (syncContext_->GetRetryTime() < syncContext_->GetSyncRetryTimes())) {
1103 return Event::WAIT_TIME_OUT_EVENT;
1104 } else {
1105 return Event::TIME_OUT_EVENT;
1106 }
1107 }
1108
IsNeedErrCodeHandle(uint32_t sessionId) const1109 bool SingleVerSyncStateMachine::IsNeedErrCodeHandle(uint32_t sessionId) const
1110 {
1111 // omit to set sessionId so version_3 should skip to compare sessionid.
1112 if (sessionId == context_->GetRequestSessionId() ||
1113 context_->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_RELEASE_2_0) {
1114 return true;
1115 }
1116 return false;
1117 }
1118
PushPullDataRequestEvokeErrHandle()1119 void SingleVerSyncStateMachine::PushPullDataRequestEvokeErrHandle()
1120 {
1121 // the pushpull sync task should wait for send finished after remote dev get data occur E_EKEYREVOKED error.
1122 if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 &&
1123 SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL) { // LCOV_EXCL_BR_LINE
1124 LOGI("data request errCode = %d, wait for send finished", -E_EKEYREVOKED);
1125 context_->SetTaskErrCode(-E_EKEYREVOKED);
1126 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1127 SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1128 } else {
1129 context_->SetTaskErrCode(-E_EKEYREVOKED);
1130 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1131 }
1132 }
1133
DataRecvErrCodeHandle(uint32_t sessionId,int errCode)1134 void SingleVerSyncStateMachine::DataRecvErrCodeHandle(uint32_t sessionId, int errCode)
1135 {
1136 if (IsNeedErrCodeHandle(sessionId)) {
1137 switch (errCode) {
1138 case E_OK:
1139 break;
1140 case -E_NOT_PERMIT:
1141 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1142 break;
1143 case -E_RECV_FINISHED:
1144 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1145 SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1146 break;
1147 case -E_EKEYREVOKED:
1148 PushPullDataRequestEvokeErrHandle();
1149 break;
1150 case -E_BUSY:
1151 case -E_DISTRIBUTED_SCHEMA_CHANGED:
1152 case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1153 case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1154 case -E_FEEDBACK_UNKNOWN_MESSAGE:
1155 case -E_INTERCEPT_DATA_FAIL:
1156 case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1157 case -E_INVALID_QUERY_FIELD:
1158 case -E_INVALID_QUERY_FORMAT:
1159 case -E_MAX_LIMITS:
1160 case -E_NOT_REGISTER:
1161 case -E_NOT_SUPPORT:
1162 case -E_SECURITY_OPTION_CHECK_ERROR:
1163 context_->SetTaskErrCode(errCode);
1164 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1165 break;
1166 default:
1167 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1168 break;
1169 }
1170 }
1171 }
1172
AbilityMsgSessionIdCheck(const Message * inMsg)1173 bool SingleVerSyncStateMachine::AbilityMsgSessionIdCheck(const Message *inMsg)
1174 {
1175 if (inMsg != nullptr && inMsg->GetSessionId() == context_->GetRequestSessionId()) { // LCOV_EXCL_BR_LINE
1176 return true;
1177 }
1178 LOGE("[AbilitySync] session check failed,dev=%s", STR_MASK(context_->GetDeviceId()));
1179 return false;
1180 }
1181
GetSyncType(uint32_t messageId) const1182 SyncType SingleVerSyncStateMachine::GetSyncType(uint32_t messageId) const
1183 {
1184 if (messageId == QUERY_SYNC_MESSAGE) { // LCOV_EXCL_BR_LINE
1185 return SyncType::QUERY_SYNC_TYPE;
1186 }
1187 return SyncType::MANUAL_FULL_SYNC_TYPE;
1188 }
1189
DataAckRecvErrCodeHandle(int errCode,bool handleError)1190 void SingleVerSyncStateMachine::DataAckRecvErrCodeHandle(int errCode, bool handleError)
1191 {
1192 switch (errCode) {
1193 case -E_NEED_ABILITY_SYNC:
1194 NeedAbilitySyncHandle();
1195 break;
1196 case -E_NEED_TIME_SYNC:
1197 timeSync_->ClearTimeSyncFinish();
1198 break;
1199 case -E_NOT_PERMIT:
1200 if (handleError) {
1201 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1202 }
1203 break;
1204 case -E_BUSY:
1205 case -E_DISTRIBUTED_SCHEMA_CHANGED:
1206 case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1207 case -E_EKEYREVOKED:
1208 case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1209 case -E_FEEDBACK_UNKNOWN_MESSAGE:
1210 case -E_INTERCEPT_DATA_FAIL:
1211 case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1212 case -E_INVALID_QUERY_FIELD:
1213 case -E_INVALID_QUERY_FORMAT:
1214 case -E_MAX_LIMITS:
1215 case -E_NOT_REGISTER:
1216 case -E_NOT_SUPPORT:
1217 case -E_SECURITY_OPTION_CHECK_ERROR:
1218 if (handleError) {
1219 context_->SetTaskErrCode(errCode);
1220 }
1221 break;
1222 default:
1223 break;
1224 }
1225 }
1226
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)1227 bool SingleVerSyncStateMachine::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
1228 {
1229 return SingleVerDataSyncUtils::IsNeedTriggerQueryAutoSync(inMsg, query);
1230 }
1231
JumpStatusAfterAbilitySync(int mode)1232 void SingleVerSyncStateMachine::JumpStatusAfterAbilitySync(int mode)
1233 {
1234 if ((mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
1235 SwitchStateAndStep(CONTROL_CMD_EVENT);
1236 } else {
1237 SwitchStateAndStep(ABILITY_SYNC_FINISHED_EVENT);
1238 }
1239 }
1240
ControlAckRecvErrCodeHandle(int errCode)1241 void SingleVerSyncStateMachine::ControlAckRecvErrCodeHandle(int errCode)
1242 {
1243 switch (errCode) {
1244 case -E_NEED_ABILITY_SYNC:
1245 NeedAbilitySyncHandle();
1246 break;
1247 case -E_NO_DATA_SEND:
1248 context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
1249 break;
1250 case -E_NOT_PERMIT:
1251 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1252 break;
1253 // other errCode use default
1254 default:
1255 context_->SetTaskErrCode(errCode);
1256 break;
1257 }
1258 }
1259
GetLocalWaterMark(const DeviceID & deviceId,const DeviceID & userId,uint64_t & outValue)1260 void SingleVerSyncStateMachine::GetLocalWaterMark(const DeviceID &deviceId, const DeviceID &userId, uint64_t &outValue)
1261 {
1262 metadata_->GetLocalWaterMark(deviceId, userId, outValue);
1263 }
1264
GetSendQueryWaterMark(const std::string & queryId,const DeviceID & deviceId,const DeviceID & userId,bool isAutoLift,uint64_t & outValue)1265 int SingleVerSyncStateMachine::GetSendQueryWaterMark(const std::string &queryId, const DeviceID &deviceId,
1266 const DeviceID &userId, bool isAutoLift, uint64_t &outValue)
1267 {
1268 return metadata_->GetSendQueryWaterMark(queryId, deviceId, userId, outValue, isAutoLift);
1269 }
1270
ResponsePullError(int errCode,bool ignoreInnerErr)1271 void SingleVerSyncStateMachine::ResponsePullError(int errCode, bool ignoreInnerErr)
1272 {
1273 Event event = TransformErrCodeToEvent(errCode);
1274 if (event == INNER_ERR_EVENT) {
1275 if (ignoreInnerErr) {
1276 event = RESPONSE_TASK_FINISHED_EVENT;
1277 } else if (context_ != nullptr) {
1278 context_->SetTaskErrCode(errCode);
1279 }
1280 }
1281 SwitchStateAndStep(event);
1282 }
1283
InnerErrorAbort(uint32_t sessionId)1284 void SingleVerSyncStateMachine::InnerErrorAbort(uint32_t sessionId)
1285 {
1286 std::lock_guard<std::mutex> lock(stateMachineLock_);
1287 uint32_t requestSessionId = context_->GetRequestSessionId();
1288 if (sessionId != requestSessionId) {
1289 LOGD("[SingleVerSyncStateMachine][InnerErrorAbort] Ignore abort by different sessionId");
1290 return;
1291 }
1292 if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
1293 SyncStep();
1294 }
1295 }
1296
NotifyClosing()1297 void SingleVerSyncStateMachine::NotifyClosing()
1298 {
1299 if (timeSync_ != nullptr) {
1300 timeSync_->Close();
1301 }
1302 }
1303
AbilitySyncNotifyRecv(const Message * inMsg)1304 int SingleVerSyncStateMachine::AbilitySyncNotifyRecv(const Message *inMsg)
1305 {
1306 const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
1307 if (packet == nullptr) {
1308 return -E_INVALID_ARGS;
1309 }
1310 int ackCode = packet->GetAckCode();
1311 if (ackCode != AbilitySync::CHECK_SUCCESS && ackCode != AbilitySync::LAST_NOTIFY) {
1312 LOGE("[StateMachine][AbilitySyncRecv] ackCode check failed,ackCode=%d", ackCode);
1313 context_->SetTaskErrCode(ackCode);
1314 std::lock_guard<std::mutex> lock(stateMachineLock_);
1315 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1316 return E_OK;
1317 }
1318 if (ackCode == AbilitySync::LAST_NOTIFY && AbilityMsgSessionIdCheck(inMsg)) {
1319 abilitySync_->SetAbilitySyncFinishedStatus(true, *context_);
1320 // while recv last notify means ability sync finished,it is better to reset watchDog to avoid timeout.
1321 LOGI("[StateMachine][AbilitySyncRecv] ability sync finished,label=%s,dev=%s",
1322 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
1323 context_->SetRemoteSoftwareVersion(packet->GetSoftwareVersion());
1324 currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
1325 std::lock_guard<std::mutex> lock(stateMachineLock_);
1326 (void)ResetWatchDog();
1327 JumpStatusAfterAbilitySync(context_->GetMode());
1328 } else if (ackCode != AbilitySync::LAST_NOTIFY) {
1329 abilitySync_->AckNotifyRecv(inMsg, context_);
1330 }
1331 return E_OK;
1332 }
1333
SchemaChange()1334 void SingleVerSyncStateMachine::SchemaChange()
1335 {
1336 abilitySync_->SetAbilitySyncFinishedStatus(false, *context_);
1337 }
1338
TimeChange()1339 void SingleVerSyncStateMachine::TimeChange()
1340 {
1341 if (timeSync_ == nullptr) {
1342 LOGW("[SingleVerSyncStateMachine] time sync is null when time change");
1343 return;
1344 }
1345 timeSync_->ClearTimeSyncFinish();
1346 }
1347 } // namespace DistributedDB
1348