1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_sync_state_machine.h"
17
18 #include <cmath>
19 #include <climits>
20 #include <algorithm>
21
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "sync_operation.h"
25 #include "message_transform.h"
26 #include "sync_types.h"
27 #include "db_common.h"
28 #include "runtime_context.h"
29 #include "performance_analysis.h"
30 #include "single_ver_sync_target.h"
31 #include "single_ver_data_sync.h"
32 #include "single_ver_data_sync_utils.h"
33
34 namespace DistributedDB {
35 using Event = SingleVerSyncStateMachine::Event;
36 using State = SingleVerSyncStateMachine::State;
37 namespace {
38 // used for state switch table
39 const int CURRENT_STATE_INDEX = 0;
40 const int EVENT_INDEX = 1;
41 const int OUTPUT_STATE_INDEX = 2;
42
43 // drop v1 and v2 table by one optimize, dataSend mode in all version go with slide window mode.
44 // State switch table v3, has three columns, CurrentState, Event, and OutSate
45 const std::vector<std::vector<uint8_t>> STATE_SWITCH_TABLE_V3 = {
46 {State::IDLE, Event::START_SYNC_EVENT, State::TIME_SYNC},
47
48 // In TIME_SYNC state
49 {State::TIME_SYNC, Event::TIME_SYNC_FINISHED_EVENT, State::ABILITY_SYNC},
50 {State::TIME_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
51 {State::TIME_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
52
53 // In ABILITY_SYNC state, compare version num and schema
54 {State::ABILITY_SYNC, Event::VERSION_NOT_SUPPOR_EVENT, State::INNER_ERR},
55 {State::ABILITY_SYNC, Event::ABILITY_SYNC_FINISHED_EVENT, State::START_INITIACTIVE_DATA_SYNC},
56 {State::ABILITY_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
57 {State::ABILITY_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
58 {State::ABILITY_SYNC, Event::CONTROL_CMD_EVENT, State::SYNC_CONTROL_CMD},
59
60 // In START_INITIACTIVE_DATA_SYNC state, send a sync request, and send first packt of data sync
61 {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
62 {State::START_INITIACTIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
63 {State::START_INITIACTIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
64 {State::START_INITIACTIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
65 {State::START_INITIACTIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_INITIACTIVE_DATA_SYNC},
66
67 // In START_PASSIVE_DATA_SYNC state, do response pull request, and send first packt of data sync
68 {State::START_PASSIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
69 {State::START_PASSIVE_DATA_SYNC, Event::RESPONSE_TASK_FINISHED_EVENT, State::WAIT_FOR_RECEIVE_DATA_FINISH},
70 {State::START_PASSIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
71 {State::START_PASSIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
72 {State::START_PASSIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
73 {State::START_PASSIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_PASSIVE_DATA_SYNC},
74
75 // In WAIT_FOR_RECEIVE_DATA_FINISH,
76 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::RECV_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
77 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::START_PULL_RESPONSE_EVENT, State::START_PASSIVE_DATA_SYNC},
78 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
79 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::INNER_ERR_EVENT, State::INNER_ERR},
80 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
81
82 {State::SYNC_CONTROL_CMD, Event::SEND_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
83 {State::SYNC_CONTROL_CMD, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
84 {State::SYNC_CONTROL_CMD, Event::INNER_ERR_EVENT, State::INNER_ERR},
85 {State::SYNC_CONTROL_CMD, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
86
87 // In SYNC_TASK_FINISHED,
88 {State::SYNC_TASK_FINISHED, Event::ALL_TASK_FINISHED_EVENT, State::IDLE},
89 {State::SYNC_TASK_FINISHED, Event::START_SYNC_EVENT, State::TIME_SYNC},
90
91 // SYNC_TIME_OUT and INNE_ERR state, just do some exception resolve
92 {State::SYNC_TIME_OUT, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
93 {State::INNER_ERR, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
94 };
95 }
96
97 std::mutex SingleVerSyncStateMachine::stateSwitchTableLock_;
98 std::vector<StateSwitchTable> SingleVerSyncStateMachine::stateSwitchTables_;
99 bool SingleVerSyncStateMachine::isStateSwitchTableInited_ = false;
100
SingleVerSyncStateMachine()101 SingleVerSyncStateMachine::SingleVerSyncStateMachine()
102 : context_(nullptr),
103 syncInterface_(nullptr),
104 timeSync_(nullptr),
105 abilitySync_(nullptr),
106 dataSync_(nullptr),
107 currentRemoteVersionId_(0)
108 {
109 }
110
~SingleVerSyncStateMachine()111 SingleVerSyncStateMachine::~SingleVerSyncStateMachine()
112 {
113 LOGD("~SingleVerSyncStateMachine");
114 Clear();
115 }
116
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metaData,ICommunicator * communicator)117 int SingleVerSyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
118 const std::shared_ptr<Metadata> &metaData, ICommunicator *communicator)
119 {
120 if ((context == nullptr) || (syncInterface == nullptr) || (metaData == nullptr) || (communicator == nullptr)) {
121 return -E_INVALID_ARGS;
122 }
123
124 int errCode = SyncStateMachine::Initialize(context, syncInterface, metaData, communicator);
125 if (errCode != E_OK) {
126 return errCode;
127 }
128
129 timeSync_ = std::make_unique<TimeSync>();
130 dataSync_ = std::make_shared<SingleVerDataSync>();
131 abilitySync_ = std::make_unique<AbilitySync>();
132 if ((timeSync_ == nullptr) || (dataSync_ == nullptr) || (abilitySync_ == nullptr)) {
133 timeSync_ = nullptr;
134 dataSync_ = nullptr;
135 abilitySync_ = nullptr;
136 return -E_OUT_OF_MEMORY;
137 }
138
139 errCode = timeSync_->Initialize(communicator, metaData, syncInterface, context->GetDeviceId());
140 if (errCode != E_OK) {
141 goto ERROR_OUT;
142 }
143 errCode = dataSync_->Initialize(syncInterface, communicator, metaData, context->GetDeviceId());
144 if (errCode != E_OK) {
145 goto ERROR_OUT;
146 }
147 errCode = abilitySync_->Initialize(communicator, syncInterface, metaData, context->GetDeviceId());
148 if (errCode != E_OK) {
149 goto ERROR_OUT;
150 }
151
152 currentState_ = IDLE;
153 context_ = static_cast<SingleVerSyncTaskContext *>(context);
154 syncInterface_ = static_cast<SingleVerKvDBSyncInterface *>(syncInterface);
155
156 InitStateSwitchTables();
157 InitStateMapping();
158 return E_OK;
159
160 ERROR_OUT:
161 Clear();
162 return errCode;
163 }
164
SyncStep()165 void SingleVerSyncStateMachine::SyncStep()
166 {
167 RefObject::IncObjRef(context_);
168 RefObject::IncObjRef(communicator_);
169 int errCode = RuntimeContext::GetInstance()->ScheduleTask(
170 std::bind(&SingleVerSyncStateMachine::SyncStepInnerLocked, this));
171 if (errCode != E_OK) {
172 LOGE("[StateMachine][SyncStep] Schedule SyncStep failed");
173 RefObject::DecObjRef(communicator_);
174 RefObject::DecObjRef(context_);
175 }
176 }
177
ReceiveMessageCallback(Message * inMsg)178 int SingleVerSyncStateMachine::ReceiveMessageCallback(Message *inMsg)
179 {
180 int errCode = MessageCallbackPre(inMsg);
181 if (errCode != E_OK) {
182 LOGE("[StateMachine] message pre check failed");
183 return errCode;
184 }
185 switch (inMsg->GetMessageId()) {
186 case TIME_SYNC_MESSAGE:
187 errCode = TimeMarkSyncRecv(inMsg);
188 break;
189 case ABILITY_SYNC_MESSAGE:
190 errCode = AbilitySyncRecv(inMsg);
191 break;
192 case DATA_SYNC_MESSAGE:
193 case QUERY_SYNC_MESSAGE:
194 errCode = DataPktRecv(inMsg);
195 break;
196 case CONTROL_SYNC_MESSAGE:
197 errCode = ControlPktRecv(inMsg);
198 break;
199 default:
200 errCode = -E_NOT_SUPPORT;
201 }
202 return errCode;
203 }
204
SyncStepInnerLocked()205 void SingleVerSyncStateMachine::SyncStepInnerLocked()
206 {
207 if (context_->IncUsedCount() != E_OK) {
208 goto SYNC_STEP_OUT;
209 }
210 {
211 std::lock_guard<std::mutex> lock(stateMachineLock_);
212 SyncStepInner();
213 }
214 context_->SafeExit();
215
216 SYNC_STEP_OUT:
217 RefObject::DecObjRef(communicator_);
218 RefObject::DecObjRef(context_);
219 }
220
SyncStepInner()221 void SingleVerSyncStateMachine::SyncStepInner()
222 {
223 Event event = INNER_ERR_EVENT;
224 do {
225 auto iter = stateMapping_.find(currentState_);
226 if (iter != stateMapping_.end()) {
227 event = static_cast<Event>(iter->second());
228 } else {
229 LOGE("[StateMachine][SyncStepInner] can not find state=%d,label=%s,dev=%s", currentState_,
230 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
231 break;
232 }
233 } while (event != Event::WAIT_ACK_EVENT && SwitchMachineState(event) == E_OK && currentState_ != IDLE);
234 }
235
SetCurStateErrStatus()236 void SingleVerSyncStateMachine::SetCurStateErrStatus()
237 {
238 currentState_ = State::INNER_ERR;
239 }
240
StartSyncInner()241 int SingleVerSyncStateMachine::StartSyncInner()
242 {
243 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
244 if (performance != nullptr) {
245 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
246 }
247 int errCode = PrepareNextSyncTask();
248 if (errCode == E_OK) {
249 SwitchStateAndStep(Event::START_SYNC_EVENT);
250 }
251 return errCode;
252 }
253
AbortInner()254 void SingleVerSyncStateMachine::AbortInner()
255 {
256 LOGE("[StateMachine][AbortInner] error occurred,abort,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
257 STR_MASK(context_->GetDeviceId()));
258 if (context_->IsKilled()) {
259 dataSync_->ClearDataMsg();
260 }
261 dataSync_->ClearSyncStatus();
262 ContinueToken token;
263 context_->GetContinueToken(token);
264 if (token != nullptr) {
265 syncInterface_->ReleaseContinueToken(token);
266 }
267 context_->SetContinueToken(nullptr);
268 context_->Clear();
269 }
270
GetStateSwitchTables() const271 const std::vector<StateSwitchTable> &SingleVerSyncStateMachine::GetStateSwitchTables() const
272 {
273 return stateSwitchTables_;
274 }
275
PrepareNextSyncTask()276 int SingleVerSyncStateMachine::PrepareNextSyncTask()
277 {
278 int errCode = StartWatchDog();
279 if (errCode != E_OK && errCode != -E_UNEXPECTED_DATA) {
280 LOGE("[StateMachine][PrepareNextSyncTask] WatchDog start failed,err=%d", errCode);
281 return errCode;
282 }
283 if (errCode == -E_UNEXPECTED_DATA) {
284 LOGI("[PrepareNextSyncTask] timer already exists, reset the timer.");
285 (void)ResetWatchDog();
286 }
287
288 if (currentState_ != State::IDLE && currentState_ != State::SYNC_TASK_FINISHED) {
289 LOGW("[StateMachine][PrepareNextSyncTask] PreSync may get an err, state=%" PRIu8 ",dev=%s",
290 currentState_, STR_MASK(context_->GetDeviceId()));
291 currentState_ = State::IDLE;
292 }
293 return E_OK;
294 }
295
SendNotifyPacket(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)296 void SingleVerSyncStateMachine::SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
297 {
298 dataSync_->SendSaveDataNotifyPacket(context_,
299 std::min(context_->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT), sessionId, sequenceId, inMsgId);
300 }
301
CommErrAbort(uint32_t sessionId)302 void SingleVerSyncStateMachine::CommErrAbort(uint32_t sessionId)
303 {
304 std::lock_guard<std::mutex> lock(stateMachineLock_);
305 uint32_t requestSessionId = context_->GetRequestSessionId();
306 if ((sessionId != 0) && ((requestSessionId == 0) || (sessionId != requestSessionId))) {
307 return;
308 }
309 context_->SetCommNormal(false);
310 if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
311 SyncStep();
312 }
313 }
314
InitStateSwitchTables()315 void SingleVerSyncStateMachine::InitStateSwitchTables()
316 {
317 if (isStateSwitchTableInited_) {
318 return;
319 }
320
321 std::lock_guard<std::mutex> lock(stateSwitchTableLock_);
322 if (isStateSwitchTableInited_) {
323 return;
324 }
325
326 InitStateSwitchTable(SINGLE_VER_SYNC_PROCTOL_V3, STATE_SWITCH_TABLE_V3);
327 std::sort(stateSwitchTables_.begin(), stateSwitchTables_.end(),
328 [](const auto &tableA, const auto &tableB) {
329 return tableA.version > tableB.version;
330 }); // descending
331 isStateSwitchTableInited_ = true;
332 }
333
InitStateSwitchTable(uint32_t version,const std::vector<std::vector<uint8_t>> & switchTable)334 void SingleVerSyncStateMachine::InitStateSwitchTable(uint32_t version,
335 const std::vector<std::vector<uint8_t>> &switchTable)
336 {
337 StateSwitchTable table;
338 table.version = version;
339 for (const auto &stateSwitch : switchTable) {
340 if (stateSwitch.size() <= OUTPUT_STATE_INDEX) {
341 LOGE("[StateMachine][InitSwitchTable] stateSwitch size err,size=%zu", stateSwitch.size());
342 return;
343 }
344 if (table.switchTable.count(stateSwitch[CURRENT_STATE_INDEX]) == 0) {
345 EventToState eventToState; // new EventToState
346 eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
347 table.switchTable[stateSwitch[CURRENT_STATE_INDEX]] = eventToState;
348 } else { // key stateSwitch[CURRENT_STATE_INDEX] already has EventToState
349 EventToState &eventToState = table.switchTable[stateSwitch[CURRENT_STATE_INDEX]];
350 eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
351 }
352 }
353 stateSwitchTables_.push_back(table);
354 }
355
InitStateMapping()356 void SingleVerSyncStateMachine::InitStateMapping()
357 {
358 stateMapping_[TIME_SYNC] = std::bind(&SingleVerSyncStateMachine::DoTimeSync, this);
359 stateMapping_[ABILITY_SYNC] = std::bind(&SingleVerSyncStateMachine::DoAbilitySync, this);
360 stateMapping_[WAIT_FOR_RECEIVE_DATA_FINISH] = std::bind(&SingleVerSyncStateMachine::DoWaitForDataRecv, this);
361 stateMapping_[SYNC_TASK_FINISHED] = std::bind(&SingleVerSyncStateMachine::DoSyncTaskFinished, this);
362 stateMapping_[SYNC_TIME_OUT] = std::bind(&SingleVerSyncStateMachine::DoTimeout, this);
363 stateMapping_[INNER_ERR] = std::bind(&SingleVerSyncStateMachine::DoInnerErr, this);
364 stateMapping_[START_INITIACTIVE_DATA_SYNC] =
365 std::bind(&SingleVerSyncStateMachine::DoInitiactiveDataSyncWithSlidingWindow, this);
366 stateMapping_[START_PASSIVE_DATA_SYNC] =
367 std::bind(&SingleVerSyncStateMachine::DoPassiveDataSyncWithSlidingWindow, this);
368 stateMapping_[SYNC_CONTROL_CMD] = std::bind(&SingleVerSyncStateMachine::DoInitiactiveControlSync, this);
369 }
370
DoInitiactiveDataSyncWithSlidingWindow() const371 Event SingleVerSyncStateMachine::DoInitiactiveDataSyncWithSlidingWindow() const
372 {
373 LOGD("[StateMachine][activeDataSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
374 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
375 int errCode = E_OK;
376 switch (context_->GetMode()) {
377 case SyncModeType::PUSH:
378 case SyncModeType::QUERY_PUSH:
379 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
380 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
381 break;
382 case SyncModeType::PULL:
383 case SyncModeType::QUERY_PULL:
384 context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
385 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
386 break;
387 case SyncModeType::PUSH_AND_PULL:
388 case SyncModeType::QUERY_PUSH_PULL:
389 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
390 break;
391 case SyncModeType::RESPONSE_PULL:
392 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
393 break;
394 default:
395 errCode = -E_NOT_SUPPORT;
396 break;
397 }
398 if (errCode == E_OK) {
399 return Event::WAIT_ACK_EVENT;
400 }
401 // once E_EKEYREVOKED error occurred, PUSH_AND_PULL mode should wait for ack to pull remote data.
402 if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
403 errCode == -E_EKEYREVOKED) {
404 return Event::WAIT_ACK_EVENT;
405 }
406
407 // when response task step dataSync again while request task is running, ignore the errCode
408 bool ignoreInnerErr = (context_->GetResponseSessionId() != 0 && context_->GetRequestSessionId() != 0);
409 Event event = TransformErrCodeToEvent(errCode);
410 return (ignoreInnerErr && event == INNER_ERR_EVENT) ? SEND_FINISHED_EVENT : event;
411 }
412
DoPassiveDataSyncWithSlidingWindow()413 Event SingleVerSyncStateMachine::DoPassiveDataSyncWithSlidingWindow()
414 {
415 {
416 RefObject::AutoLock lock(context_);
417 if (context_->GetRspTargetQueueSize() != 0) {
418 PreStartPullResponse();
419 } else if (context_->GetResponseSessionId() == 0 ||
420 context_->GetRetryStatus() == static_cast<int>(SyncTaskContext::NO_NEED_RETRY)) {
421 return RESPONSE_TASK_FINISHED_EVENT;
422 }
423 }
424 int errCode = dataSync_->SyncStart(SyncModeType::RESPONSE_PULL, context_);
425 if (errCode != E_OK) {
426 LOGE("[SingleVerSyncStateMachine][DoPassiveDataSyncWithSlidingWindow] response pull send failed[%d]", errCode);
427 return RESPONSE_TASK_FINISHED_EVENT;
428 }
429 return Event::WAIT_ACK_EVENT;
430 }
431
DoInitiactiveControlSync() const432 Event SingleVerSyncStateMachine::DoInitiactiveControlSync() const
433 {
434 LOGD("[StateMachine][activeControlSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
435 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
436 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
437 int errCode = dataSync_->ControlCmdStart(context_);
438 if (errCode == E_OK) {
439 return Event::WAIT_ACK_EVENT;
440 }
441 context_->SetTaskErrCode(errCode);
442 return TransformErrCodeToEvent(errCode);
443 }
444
HandleControlAckRecv(const Message * inMsg)445 int SingleVerSyncStateMachine::HandleControlAckRecv(const Message *inMsg)
446 {
447 std::lock_guard<std::mutex> lock(stateMachineLock_);
448 if (IsNeedResetWatchdog(inMsg)) {
449 (void)ResetWatchDog();
450 }
451 int errCode = dataSync_->ControlCmdAckRecv(context_, inMsg);
452 ControlAckRecvErrCodeHandle(errCode);
453 SwitchStateAndStep(TransformErrCodeToEvent(errCode));
454 return E_OK;
455 }
456
DoWaitForDataRecv() const457 Event SingleVerSyncStateMachine::DoWaitForDataRecv() const
458 {
459 if (context_->GetRspTargetQueueSize() != 0) {
460 return START_PULL_RESPONSE_EVENT;
461 }
462 if (context_->GetOperationStatus() == SyncOperation::OP_FINISHED_ALL) {
463 return RECV_FINISHED_EVENT;
464 }
465 if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
466 context_->GetOperationStatus() == SyncOperation::OP_EKEYREVOKED_FAILURE &&
467 context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
468 return RECV_FINISHED_EVENT;
469 }
470 return Event::WAIT_ACK_EVENT;
471 }
472
DoTimeSync() const473 Event SingleVerSyncStateMachine::DoTimeSync() const
474 {
475 if (timeSync_->IsNeedSync()) {
476 CommErrHandler handler = nullptr;
477 // Auto sync need do retry don't use errHandler to return.
478 if (!context_->IsAutoSync()) {
479 handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
480 context_, context_->GetRequestSessionId());
481 }
482 int errCode = timeSync_->SyncStart(handler, context_->GetRequestSessionId());
483 if (errCode == E_OK) {
484 return Event::WAIT_ACK_EVENT;
485 }
486 context_->SetTaskErrCode(errCode);
487 return TransformErrCodeToEvent(errCode);
488 }
489
490 return Event::TIME_SYNC_FINISHED_EVENT;
491 }
492
DoAbilitySync() const493 Event SingleVerSyncStateMachine::DoAbilitySync() const
494 {
495 uint16_t remoteCommunicatorVersion = 0;
496 int errCode = communicator_->GetRemoteCommunicatorVersion(context_->GetDeviceId(), remoteCommunicatorVersion);
497 if (errCode != E_OK) {
498 LOGE("[StateMachine][DoAbilitySync] Get RemoteCommunicatorVersion errCode=%d", errCode);
499 return Event::INNER_ERR_EVENT;
500 }
501 // Fistr version, not support AbilitySync
502 if (remoteCommunicatorVersion == 0) {
503 context_->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
504 return GetEventAfterTimeSync(context_->GetMode());
505 }
506 if (context_->GetIsNeedResetAbilitySync()) {
507 abilitySync_->SetAbilitySyncFinishedStatus(false);
508 context_->SetIsNeedResetAbilitySync(false);
509 }
510 if (abilitySync_->GetAbilitySyncFinishedStatus()) {
511 return GetEventAfterTimeSync(context_->GetMode());
512 }
513
514 CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
515 context_, context_->GetRequestSessionId());
516 LOGI("[StateMachine][AbilitySync] start abilitySync,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
517 STR_MASK(context_->GetDeviceId()));
518 errCode = abilitySync_->SyncStart(context_->GetRequestSessionId(), context_->GetSequenceId(),
519 remoteCommunicatorVersion, handler);
520 if (errCode != E_OK) {
521 LOGE("[StateMachine][DoAbilitySync] ability sync start failed,errCode=%d", errCode);
522 return TransformErrCodeToEvent(errCode);
523 }
524 return Event::WAIT_ACK_EVENT;
525 }
526
GetEventAfterTimeSync(int mode) const527 Event SingleVerSyncStateMachine::GetEventAfterTimeSync(int mode) const
528 {
529 if (mode == SyncModeType::SUBSCRIBE_QUERY || mode == SyncModeType::UNSUBSCRIBE_QUERY) {
530 return Event::CONTROL_CMD_EVENT;
531 }
532 return Event::ABILITY_SYNC_FINISHED_EVENT;
533 }
534
DoSyncTaskFinished()535 Event SingleVerSyncStateMachine::DoSyncTaskFinished()
536 {
537 StopWatchDog();
538 dataSync_->ClearSyncStatus();
539 RefObject::AutoLock lock(syncContext_);
540 int errCode = ExecNextTask();
541 if (errCode == E_OK) {
542 return Event::START_SYNC_EVENT;
543 }
544 return TransformErrCodeToEvent(errCode);
545 }
546
DoTimeout()547 Event SingleVerSyncStateMachine::DoTimeout()
548 {
549 RefObject::AutoLock lock(context_);
550 if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
551 std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
552 if (subManager != nullptr) {
553 subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
554 }
555 }
556 context_->Abort(SyncOperation::OP_TIMEOUT);
557 context_->Clear();
558 AbortInner();
559 return Event::ANY_EVENT;
560 }
561
DoInnerErr()562 Event SingleVerSyncStateMachine::DoInnerErr()
563 {
564 RefObject::AutoLock lock(context_);
565 if (!context_->IsCommNormal()) {
566 if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
567 std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
568 if (subManager != nullptr) {
569 subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
570 }
571 }
572 context_->Abort(SyncOperation::OP_COMM_ABNORMAL);
573 } else {
574 int status = GetSyncOperationStatus(context_->GetTaskErrCode());
575 context_->Abort(status);
576 }
577 context_->Clear();
578 AbortInner();
579 return Event::ANY_EVENT;
580 }
581
AbilitySyncRecv(const Message * inMsg)582 int SingleVerSyncStateMachine::AbilitySyncRecv(const Message *inMsg)
583 {
584 if (inMsg->GetMessageType() == TYPE_REQUEST) {
585 return abilitySync_->RequestRecv(inMsg, context_);
586 }
587
588 if (inMsg->GetMessageType() == TYPE_RESPONSE && AbilityMsgSessionIdCheck(inMsg)) {
589 std::lock_guard<std::mutex> lock(stateMachineLock_);
590 int errCode = abilitySync_->AckRecv(inMsg, context_);
591 (void)ResetWatchDog();
592 if (errCode != E_OK) {
593 LOGE("[StateMachine][AbilitySyncRecv] handle ackRecv failed,errCode=%d", errCode);
594 SwitchStateAndStep(TransformErrCodeToEvent(errCode));
595 } else if (context_->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_2_0) {
596 abilitySync_->SetAbilitySyncFinishedStatus(true);
597 LOGI("[StateMachine][AbilitySyncRecv] ability Sync Finished,label=%s,dev=%s",
598 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
599 currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
600 JumpStatusAfterAbilitySync(context_->GetMode());
601 }
602 return E_OK;
603 }
604 if (inMsg->GetMessageType() == TYPE_NOTIFY) {
605 const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
606 if (packet == nullptr) {
607 return -E_INVALID_ARGS;
608 }
609 int ackCode = packet->GetAckCode();
610 if (ackCode != AbilitySync::CHECK_SUCCESS && ackCode != AbilitySync::LAST_NOTIFY) {
611 LOGE("[StateMachine][AbilitySyncRecv] ackCode check failed,ackCode=%d", ackCode);
612 context_->SetTaskErrCode(ackCode);
613 std::lock_guard<std::mutex> lock(stateMachineLock_);
614 SwitchStateAndStep(Event::INNER_ERR_EVENT);
615 return E_OK;
616 }
617 if (ackCode == AbilitySync::LAST_NOTIFY && AbilityMsgSessionIdCheck(inMsg)) {
618 abilitySync_->SetAbilitySyncFinishedStatus(true);
619 // while recv last notify means ability sync finished,it is better to reset watchDog to avoid timeout.
620 LOGI("[StateMachine][AbilitySyncRecv] ability sync finished,label=%s,dev=%s",
621 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
622 context_->SetRemoteSoftwareVersion(packet->GetSoftwareVersion());
623 currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
624 std::lock_guard<std::mutex> lock(stateMachineLock_);
625 (void)ResetWatchDog();
626 JumpStatusAfterAbilitySync(context_->GetMode());
627 } else if (ackCode != AbilitySync::LAST_NOTIFY) {
628 abilitySync_->AckNotifyRecv(inMsg, context_);
629 }
630 return E_OK;
631 }
632
633 LOGE("[StateMachine][AbilitySyncRecv] msg type invalid");
634 return -E_NOT_SUPPORT;
635 }
636
HandleDataRequestRecv(const Message * inMsg)637 int SingleVerSyncStateMachine::HandleDataRequestRecv(const Message *inMsg)
638 {
639 TimeOffset offset = 0;
640 uint32_t timeout = communicator_->GetTimeout(context_->GetDeviceId());
641 // If message is data sync request, we should check timeoffset.
642 int errCode = timeSync_->GetTimeOffset(offset, timeout);
643 if (errCode != E_OK) {
644 LOGE("[StateMachine][HandleDataRequestRecv] GetTimeOffset err! errCode=%d", errCode);
645 return errCode;
646 }
647 context_->SetTimeOffset(offset);
648 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
649 if (performance != nullptr) {
650 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
651 }
652 DecRefCountOfFeedDogTimer(SyncDirectionFlag::RECEIVE);
653
654 // RequestRecv will save data, it may cost a long time.
655 // So we need to send save data notify to keep remote alive.
656 bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
657 {
658 std::lock_guard<std::mutex> lockWatchDog(stateMachineLock_);
659 if (IsNeedResetWatchdog(inMsg)) {
660 (void)ResetWatchDog();
661 }
662 }
663 WaterMark pullEndWaterkark = 0;
664 errCode = dataSync_->DataRequestRecv(context_, inMsg, pullEndWaterkark);
665 if (performance != nullptr) {
666 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
667 }
668 // only higher than 102 version receive this errCode here.
669 // while both RequestSessionId is not equal,but get this errCode;slwr would seem to handle first secquencid.
670 // so while receive the same secquencid after abiitysync it wouldn't handle.
671 if (errCode == -E_NEED_ABILITY_SYNC) {
672 if (isNeedStop) {
673 StopSaveDataNotify();
674 }
675 return errCode;
676 }
677 {
678 std::lock_guard<std::mutex> lock(stateMachineLock_);
679 DataRecvErrCodeHandle(inMsg->GetSessionId(), errCode);
680 if (pullEndWaterkark > 0) {
681 AddPullResponseTarget(inMsg, pullEndWaterkark);
682 }
683 }
684 if (isNeedStop) {
685 StopSaveDataNotify();
686 }
687 return E_OK;
688 }
689
HandleDataAckRecvWithSlidingWindow(int errCode,const Message * inMsg,bool ignoreInnerErr)690 void SingleVerSyncStateMachine::HandleDataAckRecvWithSlidingWindow(int errCode, const Message *inMsg,
691 bool ignoreInnerErr)
692 {
693 if (errCode == -E_RE_SEND_DATA) { // LOCAL_WATER_MARK_NOT_INIT
694 dataSync_->ClearSyncStatus();
695 }
696 if (errCode == -E_NO_DATA_SEND || errCode == -E_SEND_DATA) {
697 int ret = dataSync_->TryContinueSync(context_, inMsg);
698 if (ret == -E_FINISHED) {
699 SwitchStateAndStep(Event::SEND_FINISHED_EVENT);
700 return;
701 } else if (ret == E_OK) { // do nothing and waiting for all ack receive
702 return;
703 }
704 errCode = ret;
705 }
706 ResponsePullError(errCode, ignoreInnerErr);
707 }
708
NeedAbilitySyncHandle()709 void SingleVerSyncStateMachine::NeedAbilitySyncHandle()
710 {
711 // if the remote device version num is overdue,
712 // mean the version num has been reset when syncing data,
713 // there should not clear the new version cache again.
714 if (currentRemoteVersionId_ == context_->GetRemoteSoftwareVersionId()) {
715 LOGI("[StateMachine] set remote version 0, currentRemoteVersionId_ = %" PRIu64, currentRemoteVersionId_);
716 context_->SetRemoteSoftwareVersion(0);
717 } else {
718 currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
719 }
720 abilitySync_->SetAbilitySyncFinishedStatus(false);
721 dataSync_->ClearSyncStatus();
722 }
723
HandleDataAckRecv(const Message * inMsg)724 int SingleVerSyncStateMachine::HandleDataAckRecv(const Message *inMsg)
725 {
726 if (inMsg->GetMessageType() == TYPE_RESPONSE) {
727 DecRefCountOfFeedDogTimer(SyncDirectionFlag::SEND);
728 }
729 std::lock_guard<std::mutex> lock(stateMachineLock_);
730 // Unfortunately we use stateMachineLock_ in many sync process
731 // So a bad ack will check before the lock and wait
732 // And then another process is running, it will get the lock.After this process, the ack became invalid.
733 // If we don't check ack again, it will be delivered to dataSyncer.
734 if (!IsPacketValid(inMsg)) {
735 return -E_INVALID_ARGS;
736 }
737 if (IsNeedResetWatchdog(inMsg)) {
738 (void)ResetWatchDog();
739 }
740 if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 && !dataSync_->AckPacketIdCheck(inMsg)) {
741 // packetId not match but sequence id matched scene, means resend map has be rebuilt
742 // this is old ack, should be dropped and wait for the same packetId sequence.
743 return E_OK;
744 }
745 // AckRecv will save meta data, it may cost a long time. if another thread is saving data
746 // So we need to send save data notify to keep remote alive.
747 // e.g. remote do pull sync
748 bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
749 int errCode = dataSync_->AckRecv(context_, inMsg);
750 if (isNeedStop) {
751 StopSaveDataNotify();
752 }
753 if (errCode == -E_NEED_ABILITY_SYNC || errCode == -E_RE_SEND_DATA) {
754 StopFeedDogForSync(SyncDirectionFlag::SEND);
755 } else if (errCode == -E_SAVE_DATA_NOTIFY) {
756 return errCode;
757 }
758 // when this msg is from response task while request task is running, ignore the errCode
759 bool ignoreInnerErr = inMsg->GetSessionId() == context_->GetResponseSessionId() &&
760 context_->GetRequestSessionId() != 0;
761 DataAckRecvErrCodeHandle(errCode, !ignoreInnerErr);
762 HandleDataAckRecvWithSlidingWindow(errCode, inMsg, ignoreInnerErr);
763 return errCode;
764 }
765
DataPktRecv(Message * inMsg)766 int SingleVerSyncStateMachine::DataPktRecv(Message *inMsg)
767 {
768 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
769 int errCode = E_OK;
770 switch (inMsg->GetMessageType()) {
771 case TYPE_REQUEST:
772 ScheduleMsgAndHandle(inMsg);
773 errCode = -E_NOT_NEED_DELETE_MSG;
774 break;
775 case TYPE_RESPONSE:
776 case TYPE_NOTIFY:
777 if (performance != nullptr) {
778 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
779 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
780 }
781 errCode = HandleDataAckRecv(inMsg);
782 break;
783 default:
784 errCode = -E_INVALID_ARGS;
785 break;
786 }
787 return errCode;
788 }
789
ScheduleMsgAndHandle(Message * inMsg)790 void SingleVerSyncStateMachine::ScheduleMsgAndHandle(Message *inMsg)
791 {
792 dataSync_->PutDataMsg(inMsg);
793 while (true) {
794 bool isNeedHandle = true;
795 bool isNeedContinue = true;
796 Message *msg = dataSync_->MoveNextDataMsg(context_, isNeedHandle, isNeedContinue);
797 if (!isNeedContinue) {
798 break;
799 }
800 if (msg == nullptr) {
801 if (dataSync_->IsNeedReloadQueue()) {
802 continue;
803 }
804 dataSync_->ScheduleInfoHandle(false, false, nullptr);
805 break;
806 }
807 bool isNeedClearMap = false;
808 if (isNeedHandle) {
809 int errCode = HandleDataRequestRecv(msg);
810 if (context_->IsReceiveWaterMarkErr() || errCode == -E_NEED_ABILITY_SYNC) {
811 isNeedClearMap = true;
812 }
813 if (errCode == -E_TIMEOUT) {
814 isNeedHandle = false;
815 }
816 } else {
817 dataSync_->SendFinishedDataAck(context_, msg);
818 }
819 if (context_->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
820 // for lower version, no need to handle map schedule info, just reset schedule working status
821 isNeedHandle = false;
822 }
823 dataSync_->ScheduleInfoHandle(isNeedHandle, isNeedClearMap, msg);
824 delete msg;
825 }
826 }
827
ControlPktRecv(Message * inMsg)828 int SingleVerSyncStateMachine::ControlPktRecv(Message *inMsg)
829 {
830 int errCode = E_OK;
831 switch (inMsg->GetMessageType()) {
832 case TYPE_REQUEST:
833 errCode = dataSync_->ControlCmdRequestRecv(context_, inMsg);
834 break;
835 case TYPE_RESPONSE:
836 errCode = HandleControlAckRecv(inMsg);
837 break;
838 default:
839 errCode = -E_INVALID_ARGS;
840 break;
841 }
842 return errCode;
843 }
844
StepToTimeout(TimerId timerId)845 void SingleVerSyncStateMachine::StepToTimeout(TimerId timerId)
846 {
847 std::lock_guard<std::mutex> lock(stateMachineLock_);
848 TimerId timer = syncContext_->GetTimerId();
849 if (timer != timerId) {
850 return;
851 }
852 SwitchStateAndStep(Event::TIME_OUT_EVENT);
853 }
854
855 namespace {
856 struct StateNode {
857 int errCode = 0;
858 SyncOperation::Status status = SyncOperation::OP_WAITING;
859 };
860 }
GetSyncOperationStatus(int errCode) const861 int SingleVerSyncStateMachine::GetSyncOperationStatus(int errCode) const
862 {
863 static const StateNode stateNodes[] = {
864 { -E_SCHEMA_MISMATCH, SyncOperation::OP_SCHEMA_INCOMPATIBLE },
865 { -E_EKEYREVOKED, SyncOperation::OP_EKEYREVOKED_FAILURE },
866 { -E_SECURITY_OPTION_CHECK_ERROR, SyncOperation::OP_SECURITY_OPTION_CHECK_FAILURE },
867 { -E_BUSY, SyncOperation::OP_BUSY_FAILURE },
868 { -E_NOT_PERMIT, SyncOperation::OP_PERMISSION_CHECK_FAILED },
869 { -E_TIMEOUT, SyncOperation::OP_TIMEOUT },
870 { -E_INVALID_QUERY_FORMAT, SyncOperation::OP_QUERY_FORMAT_FAILURE },
871 { -E_INVALID_QUERY_FIELD, SyncOperation::OP_QUERY_FIELD_FAILURE },
872 { -E_FEEDBACK_UNKNOWN_MESSAGE, SyncOperation::OP_NOT_SUPPORT },
873 { -E_FEEDBACK_COMMUNICATOR_NOT_FOUND, SyncOperation::OP_COMM_ABNORMAL },
874 { -E_NOT_SUPPORT, SyncOperation::OP_NOT_SUPPORT },
875 { -E_INTERCEPT_DATA_FAIL, SyncOperation::OP_INTERCEPT_DATA_FAIL },
876 { -E_MAX_LIMITS, SyncOperation::OP_MAX_LIMITS },
877 { -E_DISTRIBUTED_SCHEMA_CHANGED, SyncOperation::OP_SCHEMA_CHANGED },
878 { -E_NOT_REGISTER, SyncOperation::OP_NOT_SUPPORT },
879 { -E_DENIED_SQL, SyncOperation::OP_DENIED_SQL },
880 { -E_REMOTE_OVER_SIZE, SyncOperation::OP_MAX_LIMITS },
881 { -E_INVALID_PASSWD_OR_CORRUPTED_DB, SyncOperation::OP_NOTADB_OR_CORRUPTED }
882 };
883 const auto &result = std::find_if(std::begin(stateNodes), std::end(stateNodes), [errCode](const auto &node) {
884 return node.errCode == errCode;
885 });
886 return result == std::end(stateNodes) ? SyncOperation::OP_FAILED : static_cast<int>((*result).status);
887 }
888
TimeMarkSyncRecv(const Message * inMsg)889 int SingleVerSyncStateMachine::TimeMarkSyncRecv(const Message *inMsg)
890 {
891 LOGD("[StateMachine][TimeMarkSyncRecv] type=%d,label=%s,dev=%s", inMsg->GetMessageType(),
892 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
893 {
894 std::lock_guard<std::mutex> lock(stateMachineLock_);
895 (void)ResetWatchDog();
896 }
897 if (inMsg->GetMessageType() == TYPE_REQUEST) {
898 return timeSync_->RequestRecv(inMsg);
899 } else if (inMsg->GetMessageType() == TYPE_RESPONSE) {
900 int errCode = timeSync_->AckRecv(inMsg, context_->GetRequestSessionId());
901 if (errCode != E_OK) {
902 LOGE("[StateMachine][TimeMarkSyncRecv] AckRecv failed errCode=%d", errCode);
903 if (inMsg->GetSessionId() != 0 && inMsg->GetSessionId() == context_->GetRequestSessionId()) {
904 context_->SetTaskErrCode(errCode);
905 InnerErrorAbort(inMsg->GetSessionId());
906 }
907 return errCode;
908 }
909 std::lock_guard<std::mutex> lock(stateMachineLock_);
910 SwitchStateAndStep(TIME_SYNC_FINISHED_EVENT);
911 return E_OK;
912 } else {
913 return -E_INVALID_ARGS;
914 }
915 }
916
Clear()917 void SingleVerSyncStateMachine::Clear()
918 {
919 dataSync_ = nullptr;
920 timeSync_ = nullptr;
921 abilitySync_ = nullptr;
922 context_ = nullptr;
923 syncInterface_ = nullptr;
924 }
925
IsPacketValid(const Message * inMsg) const926 bool SingleVerSyncStateMachine::IsPacketValid(const Message *inMsg) const
927 {
928 if (inMsg == nullptr) {
929 return false;
930 }
931
932 if ((inMsg->GetMessageId() < TIME_SYNC_MESSAGE) || (inMsg->GetMessageId() >= UNKNOW_MESSAGE)) {
933 LOGE("[StateMachine][IsPacketValid] Message is invalid, id=%d", inMsg->GetMessageId());
934 return false;
935 }
936 // filter invalid ack at first
937 bool isResponseType = (inMsg->GetMessageType() == TYPE_RESPONSE);
938 if (isResponseType && (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) &&
939 (inMsg->GetSessionId() != context_->GetRequestSessionId())) {
940 LOGE("[StateMachine][IsPacketValid] Control Message is invalid, label=%s, dev=%s",
941 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
942 return false;
943 }
944 if (isResponseType && (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) &&
945 (inMsg->GetSessionId() != context_->GetRequestSessionId()) &&
946 (inMsg->GetSessionId() != context_->GetResponseSessionId())) {
947 LOGE("[StateMachine][IsPacketValid] Data Message is invalid, label=%s, dev=%s",
948 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
949 return false;
950 }
951
952 // timeSync and abilitySync don't need to check sequenceId
953 if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE || inMsg->GetMessageId() == ABILITY_SYNC_MESSAGE ||
954 inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
955 return true;
956 }
957 // sequenceId will be checked in dataSync
958 return true;
959 }
960
PreStartPullResponse()961 void SingleVerSyncStateMachine::PreStartPullResponse()
962 {
963 SingleVerSyncTarget target;
964 context_->PopResponseTarget(target);
965 context_->SetEndMark(target.GetEndWaterMark());
966 context_->SetResponseSessionId(target.GetResponseSessionId());
967 context_->SetMode(SyncModeType::RESPONSE_PULL);
968 context_->ReSetSequenceId();
969 context_->SetQuerySync(target.IsQuerySync());
970 context_->SetQuery(target.GetQuery());
971 }
972
CheckIsStartPullResponse() const973 bool SingleVerSyncStateMachine::CheckIsStartPullResponse() const
974 {
975 // Other state will step to do pull response, only this statem we need to step the statemachine
976 if (currentState_ == WAIT_FOR_RECEIVE_DATA_FINISH) {
977 return true;
978 }
979 return false;
980 }
981
MessageCallbackPre(const Message * inMsg)982 int SingleVerSyncStateMachine::MessageCallbackPre(const Message *inMsg)
983 {
984 RefObject::AutoLock lock(context_);
985 if (context_->IsKilled()) {
986 return -E_OBJ_IS_KILLED;
987 }
988
989 if (!IsPacketValid(inMsg)) {
990 return -E_INVALID_ARGS;
991 }
992 return E_OK;
993 }
994
AddPullResponseTarget(const Message * inMsg,WaterMark pullEndWatermark)995 void SingleVerSyncStateMachine::AddPullResponseTarget(const Message *inMsg, WaterMark pullEndWatermark)
996 {
997 int messageType = static_cast<int>(inMsg->GetMessageId());
998 uint32_t sessionId = inMsg->GetSessionId();
999 if (pullEndWatermark == 0) {
1000 LOGE("[StateMachine][AddPullResponseTarget] pullEndWatermark is 0!");
1001 return;
1002 }
1003 if (context_->GetResponseSessionId() == sessionId || context_->FindResponseSyncTarget(sessionId)) {
1004 LOGI("[StateMachine][AddPullResponseTarget] task is already running");
1005 return;
1006 }
1007 const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
1008 if (packet == nullptr) {
1009 LOGE("[AddPullResponseTarget] get packet object failed");
1010 return;
1011 }
1012 SingleVerSyncTarget *targetTmp = new (std::nothrow) SingleVerSyncTarget;
1013 if (targetTmp == nullptr) {
1014 LOGE("[StateMachine][AddPullResponseTarget] add failed, may oom");
1015 return;
1016 }
1017 targetTmp->SetTaskType(ISyncTarget::RESPONSE);
1018 if (messageType == QUERY_SYNC_MESSAGE) {
1019 targetTmp->SetQuery(packet->GetQuery());
1020 targetTmp->SetQuerySync(true);
1021 }
1022 targetTmp->SetMode(SyncModeType::RESPONSE_PULL);
1023 targetTmp->SetEndWaterMark(pullEndWatermark);
1024 targetTmp->SetResponseSessionId(sessionId);
1025 if (context_->AddSyncTarget(targetTmp) != E_OK) {
1026 delete targetTmp;
1027 return;
1028 }
1029 if (CheckIsStartPullResponse()) {
1030 SwitchStateAndStep(TransformErrCodeToEvent(-E_NEED_PULL_REPONSE));
1031 }
1032 }
1033
TransformErrCodeToEvent(int errCode) const1034 Event SingleVerSyncStateMachine::TransformErrCodeToEvent(int errCode) const
1035 {
1036 switch (errCode) {
1037 case -E_TIMEOUT:
1038 return TransforTimeOutErrCodeToEvent();
1039 case -static_cast<int>(VERSION_NOT_SUPPOR_EVENT):
1040 return Event::VERSION_NOT_SUPPOR_EVENT;
1041 case -E_SEND_DATA:
1042 return Event::SEND_DATA_EVENT;
1043 case -E_NO_DATA_SEND:
1044 return Event::SEND_FINISHED_EVENT;
1045 case -E_RECV_FINISHED:
1046 return Event::RECV_FINISHED_EVENT;
1047 case -E_NEED_ABILITY_SYNC:
1048 return Event::NEED_ABILITY_SYNC_EVENT;
1049 case -E_NO_SYNC_TASK:
1050 return Event::ALL_TASK_FINISHED_EVENT;
1051 case -E_NEED_PULL_REPONSE:
1052 return Event::START_PULL_RESPONSE_EVENT;
1053 case -E_RE_SEND_DATA:
1054 return Event::RE_SEND_DATA_EVENT;
1055 default:
1056 return Event::INNER_ERR_EVENT;
1057 }
1058 }
1059
IsNeedResetWatchdog(const Message * inMsg) const1060 bool SingleVerSyncStateMachine::IsNeedResetWatchdog(const Message *inMsg) const
1061 {
1062 if (inMsg == nullptr) {
1063 return false;
1064 }
1065
1066 if (IsNeedErrCodeHandle(inMsg->GetSessionId())) {
1067 return true;
1068 }
1069
1070 int msgType = inMsg->GetMessageType();
1071 if (msgType == TYPE_RESPONSE || msgType == TYPE_NOTIFY) {
1072 if (inMsg->GetSessionId() == context_->GetResponseSessionId()) {
1073 // Pull response ack also should reset watchdog
1074 return true;
1075 }
1076 }
1077
1078 return false;
1079 }
1080
TransforTimeOutErrCodeToEvent() const1081 Event SingleVerSyncStateMachine::TransforTimeOutErrCodeToEvent() const
1082 {
1083 if (syncContext_->IsSyncTaskNeedRetry() && (syncContext_->GetRetryTime() < syncContext_->GetSyncRetryTimes())) {
1084 return Event::WAIT_TIME_OUT_EVENT;
1085 } else {
1086 return Event::TIME_OUT_EVENT;
1087 }
1088 }
1089
IsNeedErrCodeHandle(uint32_t sessionId) const1090 bool SingleVerSyncStateMachine::IsNeedErrCodeHandle(uint32_t sessionId) const
1091 {
1092 // omit to set sessionId so version_3 should skip to compare sessionid.
1093 if (sessionId == context_->GetRequestSessionId() ||
1094 context_->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_RELEASE_2_0) {
1095 return true;
1096 }
1097 return false;
1098 }
1099
PushPullDataRequestEvokeErrHandle()1100 void SingleVerSyncStateMachine::PushPullDataRequestEvokeErrHandle()
1101 {
1102 // the pushpull sync task should wait for send finished after remote dev get data occur E_EKEYREVOKED error.
1103 if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 &&
1104 SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL) {
1105 LOGI("data request errCode = %d, wait for send finished", -E_EKEYREVOKED);
1106 context_->SetTaskErrCode(-E_EKEYREVOKED);
1107 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1108 SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1109 } else {
1110 context_->SetTaskErrCode(-E_EKEYREVOKED);
1111 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1112 }
1113 }
1114
DataRecvErrCodeHandle(uint32_t sessionId,int errCode)1115 void SingleVerSyncStateMachine::DataRecvErrCodeHandle(uint32_t sessionId, int errCode)
1116 {
1117 if (IsNeedErrCodeHandle(sessionId)) {
1118 switch (errCode) {
1119 case E_OK:
1120 break;
1121 case -E_NOT_PERMIT:
1122 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1123 break;
1124 case -E_RECV_FINISHED:
1125 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1126 SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1127 break;
1128 case -E_EKEYREVOKED:
1129 PushPullDataRequestEvokeErrHandle();
1130 break;
1131 case -E_BUSY:
1132 case -E_DISTRIBUTED_SCHEMA_CHANGED:
1133 case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1134 case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1135 case -E_FEEDBACK_UNKNOWN_MESSAGE:
1136 case -E_INTERCEPT_DATA_FAIL:
1137 case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1138 case -E_INVALID_QUERY_FIELD:
1139 case -E_INVALID_QUERY_FORMAT:
1140 case -E_MAX_LIMITS:
1141 case -E_NOT_REGISTER:
1142 case -E_NOT_SUPPORT:
1143 case -E_SECURITY_OPTION_CHECK_ERROR:
1144 context_->SetTaskErrCode(errCode);
1145 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1146 break;
1147 default:
1148 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1149 break;
1150 }
1151 }
1152 }
1153
AbilityMsgSessionIdCheck(const Message * inMsg)1154 bool SingleVerSyncStateMachine::AbilityMsgSessionIdCheck(const Message *inMsg)
1155 {
1156 if (inMsg != nullptr && inMsg->GetSessionId() == context_->GetRequestSessionId()) {
1157 return true;
1158 }
1159 LOGE("[AbilitySync] session check failed,dev=%s", STR_MASK(context_->GetDeviceId()));
1160 return false;
1161 }
1162
GetSyncType(uint32_t messageId) const1163 SyncType SingleVerSyncStateMachine::GetSyncType(uint32_t messageId) const
1164 {
1165 if (messageId == QUERY_SYNC_MESSAGE) {
1166 return SyncType::QUERY_SYNC_TYPE;
1167 }
1168 return SyncType::MANUAL_FULL_SYNC_TYPE;
1169 }
1170
DataAckRecvErrCodeHandle(int errCode,bool handleError)1171 void SingleVerSyncStateMachine::DataAckRecvErrCodeHandle(int errCode, bool handleError)
1172 {
1173 switch (errCode) {
1174 case -E_NEED_ABILITY_SYNC:
1175 NeedAbilitySyncHandle();
1176 break;
1177 case -E_NOT_PERMIT:
1178 if (handleError) {
1179 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1180 }
1181 break;
1182 case -E_BUSY:
1183 case -E_DISTRIBUTED_SCHEMA_CHANGED:
1184 case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1185 case -E_EKEYREVOKED:
1186 case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1187 case -E_FEEDBACK_UNKNOWN_MESSAGE:
1188 case -E_INTERCEPT_DATA_FAIL:
1189 case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1190 case -E_INVALID_QUERY_FIELD:
1191 case -E_INVALID_QUERY_FORMAT:
1192 case -E_MAX_LIMITS:
1193 case -E_NOT_REGISTER:
1194 case -E_NOT_SUPPORT:
1195 case -E_SECURITY_OPTION_CHECK_ERROR:
1196 if (handleError) {
1197 context_->SetTaskErrCode(errCode);
1198 }
1199 break;
1200 default:
1201 break;
1202 }
1203 }
1204
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)1205 bool SingleVerSyncStateMachine::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
1206 {
1207 return SingleVerDataSyncUtils::IsNeedTriggerQueryAutoSync(inMsg, query);
1208 }
1209
JumpStatusAfterAbilitySync(int mode)1210 void SingleVerSyncStateMachine::JumpStatusAfterAbilitySync(int mode)
1211 {
1212 if ((mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
1213 SwitchStateAndStep(CONTROL_CMD_EVENT);
1214 } else {
1215 SwitchStateAndStep(ABILITY_SYNC_FINISHED_EVENT);
1216 }
1217 }
1218
ControlAckRecvErrCodeHandle(int errCode)1219 void SingleVerSyncStateMachine::ControlAckRecvErrCodeHandle(int errCode)
1220 {
1221 switch (errCode) {
1222 case -E_NEED_ABILITY_SYNC:
1223 NeedAbilitySyncHandle();
1224 break;
1225 case -E_NO_DATA_SEND:
1226 context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
1227 break;
1228 case -E_NOT_PERMIT:
1229 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1230 break;
1231 // other errCode use default
1232 default:
1233 context_->SetTaskErrCode(errCode);
1234 break;
1235 }
1236 }
1237
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)1238 void SingleVerSyncStateMachine::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
1239 {
1240 metadata_->GetLocalWaterMark(deviceId, outValue);
1241 }
1242
GetSendQueryWaterMark(const std::string & queryId,const DeviceID & deviceId,bool isAutoLift,uint64_t & outValue)1243 int SingleVerSyncStateMachine::GetSendQueryWaterMark(const std::string &queryId, const DeviceID &deviceId,
1244 bool isAutoLift, uint64_t &outValue)
1245 {
1246 return metadata_->GetSendQueryWaterMark(queryId, deviceId, outValue, isAutoLift);
1247 }
1248
ResponsePullError(int errCode,bool ignoreInnerErr)1249 void SingleVerSyncStateMachine::ResponsePullError(int errCode, bool ignoreInnerErr)
1250 {
1251 Event event = TransformErrCodeToEvent(errCode);
1252 if (event == INNER_ERR_EVENT) {
1253 if (ignoreInnerErr) {
1254 event = RESPONSE_TASK_FINISHED_EVENT;
1255 } else if (context_ != nullptr) {
1256 context_->SetTaskErrCode(errCode);
1257 }
1258 }
1259 SwitchStateAndStep(event);
1260 }
1261
InnerErrorAbort(uint32_t sessionId)1262 void SingleVerSyncStateMachine::InnerErrorAbort(uint32_t sessionId)
1263 {
1264 std::lock_guard<std::mutex> lock(stateMachineLock_);
1265 uint32_t requestSessionId = context_->GetRequestSessionId();
1266 if (sessionId != requestSessionId) {
1267 LOGD("[SingleVerSyncStateMachine][InnerErrorAbort] Ignore abort by different sessionId");
1268 return;
1269 }
1270 if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
1271 SyncStep();
1272 }
1273 }
1274
NotifyClosing()1275 void SingleVerSyncStateMachine::NotifyClosing()
1276 {
1277 if (timeSync_ != nullptr) {
1278 timeSync_->Close();
1279 }
1280 }
1281 } // namespace DistributedDB
1282