1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_sync_state_machine.h"
17
18 #include <cmath>
19 #include <climits>
20 #include <algorithm>
21
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "sync_operation.h"
25 #include "message_transform.h"
26 #include "sync_types.h"
27 #include "db_common.h"
28 #include "runtime_context.h"
29 #include "performance_analysis.h"
30 #include "single_ver_sync_target.h"
31 #include "single_ver_data_sync.h"
32 #include "single_ver_data_sync_utils.h"
33
34 namespace DistributedDB {
35 using Event = SingleVerSyncStateMachine::Event;
36 using State = SingleVerSyncStateMachine::State;
37 namespace {
38 // used for state switch table
39 const int CURRENT_STATE_INDEX = 0;
40 const int EVENT_INDEX = 1;
41 const int OUTPUT_STATE_INDEX = 2;
42
43 // drop v1 and v2 table by one optimize, dataSend mode in all version go with slide window mode.
44 // State switch table v3, has three columns, CurrentState, Event, and OutSate
45 const std::vector<std::vector<uint8_t>> STATE_SWITCH_TABLE_V3 = {
46 {State::IDLE, Event::START_SYNC_EVENT, State::TIME_SYNC},
47
48 // In TIME_SYNC state
49 {State::TIME_SYNC, Event::TIME_SYNC_FINISHED_EVENT, State::ABILITY_SYNC},
50 {State::TIME_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
51 {State::TIME_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
52
53 // In ABILITY_SYNC state, compare version num and schema
54 {State::ABILITY_SYNC, Event::VERSION_NOT_SUPPOR_EVENT, State::INNER_ERR},
55 {State::ABILITY_SYNC, Event::ABILITY_SYNC_FINISHED_EVENT, State::START_INITIACTIVE_DATA_SYNC},
56 {State::ABILITY_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
57 {State::ABILITY_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
58 {State::ABILITY_SYNC, Event::CONTROL_CMD_EVENT, State::SYNC_CONTROL_CMD},
59
60 // In START_INITIACTIVE_DATA_SYNC state, send a sync request, and send first packt of data sync
61 {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
62 {State::START_INITIACTIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
63 {State::START_INITIACTIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
64 {State::START_INITIACTIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
65 {State::START_INITIACTIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_INITIACTIVE_DATA_SYNC},
66
67 // In START_PASSIVE_DATA_SYNC state, do response pull request, and send first packt of data sync
68 {State::START_PASSIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC},
69 {State::START_PASSIVE_DATA_SYNC, Event::RESPONSE_TASK_FINISHED_EVENT, State::WAIT_FOR_RECEIVE_DATA_FINISH},
70 {State::START_PASSIVE_DATA_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
71 {State::START_PASSIVE_DATA_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR},
72 {State::START_PASSIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
73 {State::START_PASSIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_PASSIVE_DATA_SYNC},
74
75 // In WAIT_FOR_RECEIVE_DATA_FINISH,
76 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::RECV_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
77 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::START_PULL_RESPONSE_EVENT, State::START_PASSIVE_DATA_SYNC},
78 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
79 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::INNER_ERR_EVENT, State::INNER_ERR},
80 {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
81
82 {State::SYNC_CONTROL_CMD, Event::SEND_FINISHED_EVENT, State::SYNC_TASK_FINISHED},
83 {State::SYNC_CONTROL_CMD, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT},
84 {State::SYNC_CONTROL_CMD, Event::INNER_ERR_EVENT, State::INNER_ERR},
85 {State::SYNC_CONTROL_CMD, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC},
86
87 // In SYNC_TASK_FINISHED,
88 {State::SYNC_TASK_FINISHED, Event::ALL_TASK_FINISHED_EVENT, State::IDLE},
89 {State::SYNC_TASK_FINISHED, Event::START_SYNC_EVENT, State::TIME_SYNC},
90
91 // SYNC_TIME_OUT and INNE_ERR state, just do some exception resolve
92 {State::SYNC_TIME_OUT, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
93 {State::INNER_ERR, Event::ANY_EVENT, State::SYNC_TASK_FINISHED},
94 };
95 }
96
97 std::mutex SingleVerSyncStateMachine::stateSwitchTableLock_;
98 std::vector<StateSwitchTable> SingleVerSyncStateMachine::stateSwitchTables_;
99 bool SingleVerSyncStateMachine::isStateSwitchTableInited_ = false;
100
SingleVerSyncStateMachine()101 SingleVerSyncStateMachine::SingleVerSyncStateMachine()
102 : context_(nullptr),
103 syncInterface_(nullptr),
104 timeSync_(nullptr),
105 abilitySync_(nullptr),
106 dataSync_(nullptr),
107 currentRemoteVersionId_(0)
108 {
109 }
110
~SingleVerSyncStateMachine()111 SingleVerSyncStateMachine::~SingleVerSyncStateMachine()
112 {
113 LOGD("~SingleVerSyncStateMachine");
114 Clear();
115 }
116
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,std::shared_ptr<Metadata> & metaData,ICommunicator * communicator)117 int SingleVerSyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
118 std::shared_ptr<Metadata> &metaData, ICommunicator *communicator)
119 {
120 if ((context == nullptr) || (syncInterface == nullptr) || (metaData == nullptr) || (communicator == nullptr)) {
121 return -E_INVALID_ARGS;
122 }
123
124 int errCode = SyncStateMachine::Initialize(context, syncInterface, metaData, communicator);
125 if (errCode != E_OK) {
126 return errCode;
127 }
128
129 timeSync_ = std::make_unique<TimeSync>();
130 dataSync_ = std::make_shared<SingleVerDataSync>();
131 abilitySync_ = std::make_unique<AbilitySync>();
132 if ((timeSync_ == nullptr) || (dataSync_ == nullptr) || (abilitySync_ == nullptr)) {
133 timeSync_ = nullptr;
134 dataSync_ = nullptr;
135 abilitySync_ = nullptr;
136 return -E_OUT_OF_MEMORY;
137 }
138
139 errCode = timeSync_->Initialize(communicator, metaData, syncInterface, context->GetDeviceId());
140 if (errCode != E_OK) {
141 goto ERROR_OUT;
142 }
143 errCode = dataSync_->Initialize(syncInterface, communicator, metaData, context->GetDeviceId());
144 if (errCode != E_OK) {
145 goto ERROR_OUT;
146 }
147 errCode = abilitySync_->Initialize(communicator, syncInterface, metaData, context->GetDeviceId());
148 if (errCode != E_OK) {
149 goto ERROR_OUT;
150 }
151
152 currentState_ = IDLE;
153 context_ = static_cast<SingleVerSyncTaskContext *>(context);
154 syncInterface_ = static_cast<SingleVerKvDBSyncInterface *>(syncInterface);
155
156 InitStateSwitchTables();
157 InitStateMapping();
158 return E_OK;
159
160 ERROR_OUT:
161 Clear();
162 return errCode;
163 }
164
SyncStep()165 void SingleVerSyncStateMachine::SyncStep()
166 {
167 RefObject::IncObjRef(context_);
168 RefObject::IncObjRef(communicator_);
169 int errCode = RuntimeContext::GetInstance()->ScheduleTask(
170 std::bind(&SingleVerSyncStateMachine::SyncStepInnerLocked, this));
171 if (errCode != E_OK) {
172 LOGE("[StateMachine][SyncStep] Schedule SyncStep failed");
173 RefObject::DecObjRef(communicator_);
174 RefObject::DecObjRef(context_);
175 }
176 }
177
ReceiveMessageCallback(Message * inMsg)178 int SingleVerSyncStateMachine::ReceiveMessageCallback(Message *inMsg)
179 {
180 int errCode = MessageCallbackPre(inMsg);
181 if (errCode != E_OK) {
182 return errCode;
183 }
184 switch (inMsg->GetMessageId()) {
185 case TIME_SYNC_MESSAGE:
186 errCode = TimeMarkSyncRecv(inMsg);
187 break;
188 case ABILITY_SYNC_MESSAGE:
189 errCode = AbilitySyncRecv(inMsg);
190 break;
191 case DATA_SYNC_MESSAGE:
192 case QUERY_SYNC_MESSAGE:
193 errCode = DataPktRecv(inMsg);
194 break;
195 case CONTROL_SYNC_MESSAGE:
196 errCode = ControlPktRecv(inMsg);
197 break;
198 default:
199 errCode = -E_NOT_SUPPORT;
200 }
201 return errCode;
202 }
203
SyncStepInnerLocked()204 void SingleVerSyncStateMachine::SyncStepInnerLocked()
205 {
206 if (context_->IncUsedCount() != E_OK) {
207 goto SYNC_STEP_OUT;
208 }
209 {
210 std::lock_guard<std::mutex> lock(stateMachineLock_);
211 SyncStepInner();
212 }
213 context_->SafeExit();
214
215 SYNC_STEP_OUT:
216 RefObject::DecObjRef(communicator_);
217 RefObject::DecObjRef(context_);
218 }
219
SyncStepInner()220 void SingleVerSyncStateMachine::SyncStepInner()
221 {
222 Event event = INNER_ERR_EVENT;
223 do {
224 auto iter = stateMapping_.find(currentState_);
225 if (iter != stateMapping_.end()) {
226 event = static_cast<Event>(iter->second());
227 } else {
228 LOGE("[StateMachine][SyncStepInner] can not find state=%d,label=%s,dev=%s", currentState_,
229 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
230 break;
231 }
232 } while (event != Event::WAIT_ACK_EVENT && SwitchMachineState(event) == E_OK && currentState_ != IDLE);
233 }
234
SetCurStateErrStatus()235 void SingleVerSyncStateMachine::SetCurStateErrStatus()
236 {
237 currentState_ = State::INNER_ERR;
238 }
239
StartSyncInner()240 int SingleVerSyncStateMachine::StartSyncInner()
241 {
242 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
243 if (performance != nullptr) {
244 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
245 }
246 int errCode = PrepareNextSyncTask();
247 if (errCode == E_OK) {
248 SwitchStateAndStep(Event::START_SYNC_EVENT);
249 }
250 return errCode;
251 }
252
AbortInner()253 void SingleVerSyncStateMachine::AbortInner()
254 {
255 LOGE("[StateMachine][AbortInner] error occurred,abort,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
256 STR_MASK(context_->GetDeviceId()));
257 if (context_->IsKilled()) {
258 dataSync_->ClearDataMsg();
259 }
260 dataSync_->ClearSyncStatus();
261 ContinueToken token;
262 context_->GetContinueToken(token);
263 if (token != nullptr) {
264 syncInterface_->ReleaseContinueToken(token);
265 }
266 context_->SetContinueToken(nullptr);
267 context_->Clear();
268 }
269
GetStateSwitchTables() const270 const std::vector<StateSwitchTable> &SingleVerSyncStateMachine::GetStateSwitchTables() const
271 {
272 return stateSwitchTables_;
273 }
274
PrepareNextSyncTask()275 int SingleVerSyncStateMachine::PrepareNextSyncTask()
276 {
277 int errCode = StartWatchDog();
278 if (errCode != E_OK) {
279 LOGE("[StateMachine][PrepareNextSyncTask] WatchDog start failed,err=%d", errCode);
280 return errCode;
281 }
282
283 if (currentState_ != State::IDLE && currentState_ != State::SYNC_TASK_FINISHED) {
284 LOGW("[StateMachine][PrepareNextSyncTask] PreSync may get an err, state=%" PRIu8 ",dev=%s",
285 currentState_, STR_MASK(context_->GetDeviceId()));
286 currentState_ = State::IDLE;
287 }
288 return E_OK;
289 }
290
SendNotifyPacket(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)291 void SingleVerSyncStateMachine::SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
292 {
293 dataSync_->SendSaveDataNotifyPacket(context_,
294 std::min(context_->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT), sessionId, sequenceId, inMsgId);
295 }
296
CommErrAbort(uint32_t sessionId)297 void SingleVerSyncStateMachine::CommErrAbort(uint32_t sessionId)
298 {
299 std::lock_guard<std::mutex> lock(stateMachineLock_);
300 uint32_t requestSessionId = context_->GetRequestSessionId();
301 if ((sessionId != 0) && ((requestSessionId == 0) || (sessionId != requestSessionId))) {
302 return;
303 }
304 context_->SetCommNormal(false);
305 if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
306 SyncStep();
307 }
308 }
309
InitStateSwitchTables()310 void SingleVerSyncStateMachine::InitStateSwitchTables()
311 {
312 if (isStateSwitchTableInited_) {
313 return;
314 }
315
316 std::lock_guard<std::mutex> lock(stateSwitchTableLock_);
317 if (isStateSwitchTableInited_) {
318 return;
319 }
320
321 InitStateSwitchTable(SINGLE_VER_SYNC_PROCTOL_V3, STATE_SWITCH_TABLE_V3);
322 std::sort(stateSwitchTables_.begin(), stateSwitchTables_.end(),
323 [](const auto &tableA, const auto &tableB) {
324 return tableA.version > tableB.version;
325 }); // descending
326 isStateSwitchTableInited_ = true;
327 }
328
InitStateSwitchTable(uint32_t version,const std::vector<std::vector<uint8_t>> & switchTable)329 void SingleVerSyncStateMachine::InitStateSwitchTable(uint32_t version,
330 const std::vector<std::vector<uint8_t>> &switchTable)
331 {
332 StateSwitchTable table;
333 table.version = version;
334 for (const auto &stateSwitch : switchTable) {
335 if (stateSwitch.size() <= OUTPUT_STATE_INDEX) {
336 LOGE("[StateMachine][InitSwitchTable] stateSwitch size err,size=%zu", stateSwitch.size());
337 return;
338 }
339 if (table.switchTable.count(stateSwitch[CURRENT_STATE_INDEX]) == 0) {
340 EventToState eventToState; // new EventToState
341 eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
342 table.switchTable[stateSwitch[CURRENT_STATE_INDEX]] = eventToState;
343 } else { // key stateSwitch[CURRENT_STATE_INDEX] already has EventToState
344 EventToState &eventToState = table.switchTable[stateSwitch[CURRENT_STATE_INDEX]];
345 eventToState[stateSwitch[EVENT_INDEX]] = stateSwitch[OUTPUT_STATE_INDEX];
346 }
347 }
348 stateSwitchTables_.push_back(table);
349 }
350
InitStateMapping()351 void SingleVerSyncStateMachine::InitStateMapping()
352 {
353 stateMapping_[TIME_SYNC] = std::bind(&SingleVerSyncStateMachine::DoTimeSync, this);
354 stateMapping_[ABILITY_SYNC] = std::bind(&SingleVerSyncStateMachine::DoAbilitySync, this);
355 stateMapping_[WAIT_FOR_RECEIVE_DATA_FINISH] = std::bind(&SingleVerSyncStateMachine::DoWaitForDataRecv, this);
356 stateMapping_[SYNC_TASK_FINISHED] = std::bind(&SingleVerSyncStateMachine::DoSyncTaskFinished, this);
357 stateMapping_[SYNC_TIME_OUT] = std::bind(&SingleVerSyncStateMachine::DoTimeout, this);
358 stateMapping_[INNER_ERR] = std::bind(&SingleVerSyncStateMachine::DoInnerErr, this);
359 stateMapping_[START_INITIACTIVE_DATA_SYNC] =
360 std::bind(&SingleVerSyncStateMachine::DoInitiactiveDataSyncWithSlidingWindow, this);
361 stateMapping_[START_PASSIVE_DATA_SYNC] =
362 std::bind(&SingleVerSyncStateMachine::DoPassiveDataSyncWithSlidingWindow, this);
363 stateMapping_[SYNC_CONTROL_CMD] = std::bind(&SingleVerSyncStateMachine::DoInitiactiveControlSync, this);
364 }
365
DoInitiactiveDataSyncWithSlidingWindow()366 Event SingleVerSyncStateMachine::DoInitiactiveDataSyncWithSlidingWindow()
367 {
368 LOGD("[StateMachine][activeDataSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
369 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
370 int errCode = E_OK;
371 switch (context_->GetMode()) {
372 case SyncModeType::PUSH:
373 case SyncModeType::QUERY_PUSH:
374 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
375 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
376 break;
377 case SyncModeType::PULL:
378 case SyncModeType::QUERY_PULL:
379 context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
380 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
381 break;
382 case SyncModeType::PUSH_AND_PULL:
383 case SyncModeType::QUERY_PUSH_PULL:
384 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
385 break;
386 case SyncModeType::RESPONSE_PULL:
387 errCode = dataSync_->SyncStart(context_->GetMode(), context_);
388 break;
389 default:
390 errCode = -E_NOT_SUPPORT;
391 break;
392 }
393 if (errCode == E_OK) {
394 return Event::WAIT_ACK_EVENT;
395 }
396 // once E_EKEYREVOKED error occurred, PUSH_AND_PULL mode should wait for ack to pull remote data.
397 if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
398 errCode == -E_EKEYREVOKED) {
399 return Event::WAIT_ACK_EVENT;
400 }
401
402 // when response task step dataSync again while request task is running, ignore the errCode
403 bool ignoreInnerErr = (context_->GetResponseSessionId() != 0 && context_->GetRequestSessionId() != 0);
404 Event event = TransformErrCodeToEvent(errCode);
405 return (ignoreInnerErr && event == INNER_ERR_EVENT) ? SEND_FINISHED_EVENT : event;
406 }
407
DoPassiveDataSyncWithSlidingWindow()408 Event SingleVerSyncStateMachine::DoPassiveDataSyncWithSlidingWindow()
409 {
410 {
411 RefObject::AutoLock lock(context_);
412 if (context_->GetRspTargetQueueSize() != 0) {
413 PreStartPullResponse();
414 } else if (context_->GetResponseSessionId() == 0 ||
415 context_->GetRetryStatus() == static_cast<int>(SyncTaskContext::NO_NEED_RETRY)) {
416 return RESPONSE_TASK_FINISHED_EVENT;
417 }
418 }
419 int errCode = dataSync_->SyncStart(SyncModeType::RESPONSE_PULL, context_);
420 if (errCode != E_OK) {
421 LOGE("[SingleVerSyncStateMachine][DoPassiveDataSyncWithSlidingWindow] response pull send failed[%d]", errCode);
422 return RESPONSE_TASK_FINISHED_EVENT;
423 }
424 return Event::WAIT_ACK_EVENT;
425 }
426
DoInitiactiveControlSync()427 Event SingleVerSyncStateMachine::DoInitiactiveControlSync()
428 {
429 LOGD("[StateMachine][activeControlSync] mode=%d,label=%s,dev=%s", context_->GetMode(),
430 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
431 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
432 int errCode = dataSync_->ControlCmdStart(context_);
433 if (errCode == E_OK) {
434 return Event::WAIT_ACK_EVENT;
435 }
436 context_->SetTaskErrCode(errCode);
437 return TransformErrCodeToEvent(errCode);
438 }
439
HandleControlAckRecv(const Message * inMsg)440 int SingleVerSyncStateMachine::HandleControlAckRecv(const Message *inMsg)
441 {
442 std::lock_guard<std::mutex> lock(stateMachineLock_);
443 if (IsNeedResetWatchdog(inMsg)) {
444 (void)ResetWatchDog();
445 }
446 int errCode = dataSync_->ControlCmdAckRecv(context_, inMsg);
447 ControlAckRecvErrCodeHandle(errCode);
448 SwitchStateAndStep(TransformErrCodeToEvent(errCode));
449 return E_OK;
450 }
451
DoWaitForDataRecv() const452 Event SingleVerSyncStateMachine::DoWaitForDataRecv() const
453 {
454 if (context_->GetRspTargetQueueSize() != 0) {
455 return START_PULL_RESPONSE_EVENT;
456 }
457 if (context_->GetOperationStatus() == SyncOperation::OP_FINISHED_ALL) {
458 return RECV_FINISHED_EVENT;
459 }
460 if (SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL &&
461 context_->GetOperationStatus() == SyncOperation::OP_EKEYREVOKED_FAILURE &&
462 context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
463 return RECV_FINISHED_EVENT;
464 }
465 return Event::WAIT_ACK_EVENT;
466 }
467
DoTimeSync()468 Event SingleVerSyncStateMachine::DoTimeSync()
469 {
470 if (timeSync_->IsNeedSync()) {
471 CommErrHandler handler = nullptr;
472 // Auto sync need do retry don't use errHandler to return.
473 if (!context_->IsAutoSync()) {
474 handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
475 context_, context_->GetRequestSessionId());
476 }
477 int errCode = timeSync_->SyncStart(handler, context_->GetRequestSessionId());
478 if (errCode == E_OK) {
479 return Event::WAIT_ACK_EVENT;
480 }
481 context_->SetTaskErrCode(errCode);
482 return TransformErrCodeToEvent(errCode);
483 }
484
485 return Event::TIME_SYNC_FINISHED_EVENT;
486 }
487
DoAbilitySync()488 Event SingleVerSyncStateMachine::DoAbilitySync()
489 {
490 uint16_t remoteCommunicatorVersion = 0;
491 int errCode = communicator_->GetRemoteCommunicatorVersion(context_->GetDeviceId(), remoteCommunicatorVersion);
492 if (errCode != E_OK) {
493 LOGE("[StateMachine][DoAbilitySync] Get RemoteCommunicatorVersion errCode=%d", errCode);
494 return Event::INNER_ERR_EVENT;
495 }
496 // Fistr version, not support AbilitySync
497 if (remoteCommunicatorVersion == 0) {
498 context_->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
499 return GetEventAfterTimeSync(context_->GetMode());
500 }
501 if (context_->GetIsNeedResetAbilitySync()) {
502 abilitySync_->SetAbilitySyncFinishedStatus(false);
503 context_->SetIsNeedResetAbilitySync(false);
504 }
505 if (abilitySync_->GetAbilitySyncFinishedStatus()) {
506 return GetEventAfterTimeSync(context_->GetMode());
507 }
508
509 CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
510 context_, context_->GetRequestSessionId());
511 LOGI("[StateMachine][AbilitySync] start abilitySync,label=%s,dev=%s", dataSync_->GetLabel().c_str(),
512 STR_MASK(context_->GetDeviceId()));
513 errCode = abilitySync_->SyncStart(context_->GetRequestSessionId(), context_->GetSequenceId(),
514 remoteCommunicatorVersion, handler);
515 if (errCode != E_OK) {
516 LOGE("[StateMachine][DoAbilitySync] ability sync start failed,errCode=%d", errCode);
517 return TransformErrCodeToEvent(errCode);
518 }
519 return Event::WAIT_ACK_EVENT;
520 }
521
GetEventAfterTimeSync(int mode) const522 Event SingleVerSyncStateMachine::GetEventAfterTimeSync(int mode) const
523 {
524 if (mode == SyncModeType::SUBSCRIBE_QUERY || mode == SyncModeType::UNSUBSCRIBE_QUERY) {
525 return Event::CONTROL_CMD_EVENT;
526 }
527 return Event::ABILITY_SYNC_FINISHED_EVENT;
528 }
529
DoSyncTaskFinished()530 Event SingleVerSyncStateMachine::DoSyncTaskFinished()
531 {
532 StopWatchDog();
533 dataSync_->ClearSyncStatus();
534 RefObject::AutoLock lock(syncContext_);
535 int errCode = ExecNextTask();
536 if (errCode == E_OK) {
537 return Event::START_SYNC_EVENT;
538 }
539 return TransformErrCodeToEvent(errCode);
540 }
541
DoTimeout()542 Event SingleVerSyncStateMachine::DoTimeout()
543 {
544 RefObject::AutoLock lock(context_);
545 if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
546 std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
547 if (subManager != nullptr) {
548 subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
549 }
550 }
551 context_->Abort(SyncOperation::OP_TIMEOUT);
552 context_->Clear();
553 AbortInner();
554 return Event::ANY_EVENT;
555 }
556
DoInnerErr()557 Event SingleVerSyncStateMachine::DoInnerErr()
558 {
559 RefObject::AutoLock lock(context_);
560 if (!context_->IsCommNormal()) {
561 if (context_->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
562 std::shared_ptr<SubscribeManager> subManager = context_->GetSubscribeManager();
563 if (subManager != nullptr) {
564 subManager->DeleteLocalSubscribeQuery(context_->GetDeviceId(), context_->GetQuery());
565 }
566 }
567 context_->Abort(SyncOperation::OP_COMM_ABNORMAL);
568 } else {
569 int status = GetSyncOperationStatus(context_->GetTaskErrCode());
570 context_->Abort(status);
571 }
572 context_->Clear();
573 AbortInner();
574 return Event::ANY_EVENT;
575 }
576
AbilitySyncRecv(const Message * inMsg)577 int SingleVerSyncStateMachine::AbilitySyncRecv(const Message *inMsg)
578 {
579 if (inMsg->GetMessageType() == TYPE_REQUEST) {
580 return abilitySync_->RequestRecv(inMsg, context_);
581 }
582
583 if (inMsg->GetMessageType() == TYPE_RESPONSE && AbilityMsgSessionIdCheck(inMsg)) {
584 std::lock_guard<std::mutex> lock(stateMachineLock_);
585 int errCode = abilitySync_->AckRecv(inMsg, context_);
586 (void)ResetWatchDog();
587 if (errCode != E_OK) {
588 LOGE("[StateMachine][AbilitySyncRecv] handle ackRecv failed,errCode=%d", errCode);
589 SwitchStateAndStep(TransformErrCodeToEvent(errCode));
590 } else if (context_->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_2_0) {
591 abilitySync_->SetAbilitySyncFinishedStatus(true);
592 LOGI("[StateMachine][AbilitySyncRecv] ability Sync Finished,label=%s,dev=%s",
593 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
594 currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
595 JumpStatusAfterAbilitySync(context_->GetMode());
596 }
597 return E_OK;
598 }
599 if (inMsg->GetMessageType() == TYPE_NOTIFY) {
600 const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
601 if (packet == nullptr) {
602 return -E_INVALID_ARGS;
603 }
604 int ackCode = packet->GetAckCode();
605 if (ackCode != AbilitySync::CHECK_SUCCESS && ackCode != AbilitySync::LAST_NOTIFY) {
606 LOGE("[StateMachine][AbilitySyncRecv] ackCode check failed,ackCode=%d", ackCode);
607 context_->SetTaskErrCode(ackCode);
608 std::lock_guard<std::mutex> lock(stateMachineLock_);
609 SwitchStateAndStep(Event::INNER_ERR_EVENT);
610 return E_OK;
611 }
612 if (ackCode == AbilitySync::LAST_NOTIFY && AbilityMsgSessionIdCheck(inMsg)) {
613 abilitySync_->SetAbilitySyncFinishedStatus(true);
614 // while recv last notify means ability sync finished,it is better to reset watchDog to avoid timeout.
615 LOGI("[StateMachine][AbilitySyncRecv] ability sync finished,label=%s,dev=%s",
616 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
617 currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
618 (static_cast<SingleVerSyncTaskContext *>(context_))->SetIsSchemaSync(true);
619 std::lock_guard<std::mutex> lock(stateMachineLock_);
620 (void)ResetWatchDog();
621 JumpStatusAfterAbilitySync(context_->GetMode());
622 } else if (ackCode != AbilitySync::LAST_NOTIFY) {
623 abilitySync_->AckNotifyRecv(inMsg, context_);
624 }
625 return E_OK;
626 }
627
628 LOGE("[StateMachine][AbilitySyncRecv] msg type invalid");
629 return -E_NOT_SUPPORT;
630 }
631
HandleDataRequestRecv(const Message * inMsg)632 int SingleVerSyncStateMachine::HandleDataRequestRecv(const Message *inMsg)
633 {
634 TimeOffset offset = 0;
635 uint32_t timeout = communicator_->GetTimeout(context_->GetDeviceId());
636 // If message is data sync request, we should check timeoffset.
637 int errCode = timeSync_->GetTimeOffset(offset, timeout);
638 if (errCode != E_OK) {
639 LOGE("[StateMachine][HandleDataRequestRecv] GetTimeOffset err! errCode=%d", errCode);
640 return errCode;
641 }
642 context_->SetTimeOffset(offset);
643 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
644 if (performance != nullptr) {
645 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
646 }
647 DecRefCountOfFeedDogTimer(SyncDirectionFlag::RECEIVE);
648 {
649 std::lock_guard<std::mutex> lockWatchDog(stateMachineLock_);
650 if (IsNeedResetWatchdog(inMsg)) {
651 (void)ResetWatchDog();
652 }
653 }
654
655 // RequestRecv will save data, it may cost a long time.
656 // So we need to send save data notify to keep remote alive.
657 bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
658 WaterMark pullEndWaterkark = 0;
659 errCode = dataSync_->DataRequestRecv(context_, inMsg, pullEndWaterkark);
660 if (performance != nullptr) {
661 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_REQUEST_RECV_TO_SEND_ACK);
662 }
663 if (isNeedStop) {
664 StopSaveDataNotify();
665 }
666 // only higher than 102 version receive this errCode here.
667 // while both RequestSessionId is not equal,but get this errCode;slwr would seem to handle first secquencid.
668 // so while receive the same secquencid after abiitysync it wouldn't handle.
669 if (errCode == -E_NEED_ABILITY_SYNC) {
670 return errCode;
671 }
672 std::lock_guard<std::mutex> lock(stateMachineLock_);
673 DataRecvErrCodeHandle(inMsg->GetSessionId(), errCode);
674 if (pullEndWaterkark > 0) {
675 AddPullResponseTarget(inMsg, pullEndWaterkark);
676 }
677 return E_OK;
678 }
679
HandleDataAckRecvWithSlidingWindow(int errCode,const Message * inMsg,bool ignoreInnerErr)680 void SingleVerSyncStateMachine::HandleDataAckRecvWithSlidingWindow(int errCode, const Message *inMsg,
681 bool ignoreInnerErr)
682 {
683 if (errCode == -E_RE_SEND_DATA) { // LOCAL_WATER_MARK_NOT_INIT
684 dataSync_->ClearSyncStatus();
685 }
686 if (errCode == -E_NO_DATA_SEND || errCode == -E_SEND_DATA) {
687 int ret = dataSync_->TryContinueSync(context_, inMsg);
688 if (ret == -E_FINISHED) {
689 SwitchStateAndStep(Event::SEND_FINISHED_EVENT);
690 return;
691 } else if (ret == E_OK) { // do nothing and waiting for all ack receive
692 return;
693 }
694 errCode = ret;
695 }
696 ResponsePullError(errCode, ignoreInnerErr);
697 }
698
NeedAbilitySyncHandle()699 void SingleVerSyncStateMachine::NeedAbilitySyncHandle()
700 {
701 // if the remote device version num is overdue,
702 // mean the version num has been reset when syncing data,
703 // there should not clear the new version cache again.
704 if (currentRemoteVersionId_ == context_->GetRemoteSoftwareVersionId()) {
705 LOGI("[StateMachine] set remote version 0, currentRemoteVersionId_ = %" PRIu64, currentRemoteVersionId_);
706 context_->SetRemoteSoftwareVersion(0);
707 } else {
708 currentRemoteVersionId_ = context_->GetRemoteSoftwareVersionId();
709 }
710 abilitySync_->SetAbilitySyncFinishedStatus(false);
711 dataSync_->ClearSyncStatus();
712 }
713
HandleDataAckRecv(const Message * inMsg)714 int SingleVerSyncStateMachine::HandleDataAckRecv(const Message *inMsg)
715 {
716 if (inMsg->GetMessageType() == TYPE_RESPONSE) {
717 DecRefCountOfFeedDogTimer(SyncDirectionFlag::SEND);
718 }
719 std::lock_guard<std::mutex> lock(stateMachineLock_);
720 // Unfortunately we use stateMachineLock_ in many sync process
721 // So a bad ack will check before the lock and wait
722 // And then another process is running, it will get the lock.After this process, the ack became invalid.
723 // If we don't check ack again, it will be delivered to dataSyncer.
724 if (!IsPacketValid(inMsg)) {
725 return -E_INVALID_ARGS;
726 }
727 if (IsNeedResetWatchdog(inMsg)) {
728 (void)ResetWatchDog();
729 }
730 if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 && !dataSync_->AckPacketIdCheck(inMsg)) {
731 // packetId not match but sequence id matched scene, means resend map has be rebuilt
732 // this is old ack, shoulb be dropped and wait for the same packetId sequence.
733 return E_OK;
734 }
735 // AckRecv will save meta data, it may cost a long time. if another thread is saving data
736 // So we need to send save data notify to keep remote alive.
737 // eg. remote do pull sync
738 bool isNeedStop = StartSaveDataNotify(inMsg->GetSessionId(), inMsg->GetSequenceId(), inMsg->GetMessageId());
739 int errCode = dataSync_->AckRecv(context_, inMsg);
740 if (isNeedStop) {
741 StopSaveDataNotify();
742 }
743 if (errCode == -E_NEED_ABILITY_SYNC || errCode == -E_RE_SEND_DATA) {
744 StopFeedDogForSync(SyncDirectionFlag::SEND);
745 } else if (errCode == -E_SAVE_DATA_NOTIFY) {
746 return errCode;
747 }
748 // when this msg is from response task while request task is running, ignore the errCode
749 bool ignoreInnerErr = inMsg->GetSessionId() == context_->GetResponseSessionId() &&
750 context_->GetRequestSessionId() != 0;
751 DataAckRecvErrCodeHandle(errCode, !ignoreInnerErr);
752 HandleDataAckRecvWithSlidingWindow(errCode, inMsg, ignoreInnerErr);
753 return errCode;
754 }
755
DataPktRecv(Message * inMsg)756 int SingleVerSyncStateMachine::DataPktRecv(Message *inMsg)
757 {
758 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
759 int errCode = E_OK;
760 switch (inMsg->GetMessageType()) {
761 case TYPE_REQUEST:
762 ScheduleMsgAndHandle(inMsg);
763 errCode = -E_NOT_NEED_DELETE_MSG;
764 break;
765 case TYPE_RESPONSE:
766 case TYPE_NOTIFY:
767 if (performance != nullptr) {
768 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
769 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
770 }
771 errCode = HandleDataAckRecv(inMsg);
772 break;
773 default:
774 errCode = -E_INVALID_ARGS;
775 break;
776 }
777 return errCode;
778 }
779
ScheduleMsgAndHandle(Message * inMsg)780 void SingleVerSyncStateMachine::ScheduleMsgAndHandle(Message *inMsg)
781 {
782 dataSync_->PutDataMsg(inMsg);
783 while (true) {
784 bool isNeedHandle = true;
785 bool isNeedContinue = true;
786 Message *msg = dataSync_->MoveNextDataMsg(context_, isNeedHandle, isNeedContinue);
787 if (!isNeedContinue) {
788 break;
789 }
790 if (msg == nullptr) {
791 if (dataSync_->IsNeedReloadQueue()) {
792 continue;
793 }
794 break;
795 }
796 bool isNeedClearMap = false;
797 if (isNeedHandle) {
798 int errCode = HandleDataRequestRecv(msg);
799 if (context_->IsReceiveWaterMarkErr() || errCode == -E_NEED_ABILITY_SYNC) {
800 isNeedClearMap = true;
801 }
802 if (errCode == -E_TIMEOUT) {
803 isNeedHandle = false;
804 }
805 } else {
806 dataSync_->SendFinishedDataAck(context_, msg);
807 }
808 if (context_->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
809 // for lower version, no need to handle map schedule info, just reset schedule working status
810 isNeedHandle = false;
811 }
812 dataSync_->ScheduleInfoHandle(isNeedHandle, isNeedClearMap, msg);
813 delete msg;
814 }
815 }
816
ControlPktRecv(Message * inMsg)817 int SingleVerSyncStateMachine::ControlPktRecv(Message *inMsg)
818 {
819 int errCode = E_OK;
820 switch (inMsg->GetMessageType()) {
821 case TYPE_REQUEST:
822 errCode = dataSync_->ControlCmdRequestRecv(context_, inMsg);
823 break;
824 case TYPE_RESPONSE:
825 errCode = HandleControlAckRecv(inMsg);
826 break;
827 default:
828 errCode = -E_INVALID_ARGS;
829 break;
830 }
831 return errCode;
832 }
833
StepToTimeout(TimerId timerId)834 void SingleVerSyncStateMachine::StepToTimeout(TimerId timerId)
835 {
836 std::lock_guard<std::mutex> lock(stateMachineLock_);
837 TimerId timer = syncContext_->GetTimerId();
838 if (timer != timerId) {
839 return;
840 }
841 SwitchStateAndStep(Event::TIME_OUT_EVENT);
842 }
843
GetSyncOperationStatus(int errCode) const844 int SingleVerSyncStateMachine::GetSyncOperationStatus(int errCode) const
845 {
846 static const std::map<int, int> statusMap = {
847 { -E_SCHEMA_MISMATCH, SyncOperation::OP_SCHEMA_INCOMPATIBLE },
848 { -E_EKEYREVOKED, SyncOperation::OP_EKEYREVOKED_FAILURE },
849 { -E_SECURITY_OPTION_CHECK_ERROR, SyncOperation::OP_SECURITY_OPTION_CHECK_FAILURE },
850 { -E_BUSY, SyncOperation::OP_BUSY_FAILURE },
851 { -E_NOT_PERMIT, SyncOperation::OP_PERMISSION_CHECK_FAILED },
852 { -E_TIMEOUT, SyncOperation::OP_TIMEOUT },
853 { -E_INVALID_QUERY_FORMAT, SyncOperation::OP_QUERY_FORMAT_FAILURE },
854 { -E_INVALID_QUERY_FIELD, SyncOperation::OP_QUERY_FIELD_FAILURE },
855 { -E_FEEDBACK_UNKNOWN_MESSAGE, SyncOperation::OP_NOT_SUPPORT },
856 { -E_FEEDBACK_COMMUNICATOR_NOT_FOUND, SyncOperation::OP_COMM_ABNORMAL },
857 { -E_NOT_SUPPORT, SyncOperation::OP_NOT_SUPPORT },
858 { -E_INTERCEPT_DATA_FAIL, SyncOperation::OP_INTERCEPT_DATA_FAIL },
859 { -E_MAX_LIMITS, SyncOperation::OP_MAX_LIMITS },
860 { -E_DISTRIBUTED_SCHEMA_CHANGED, SyncOperation::OP_SCHEMA_CHANGED },
861 { -E_NOT_REGISTER, SyncOperation::OP_NOT_SUPPORT },
862 { -E_DENIED_SQL, SyncOperation::OP_DENIED_SQL },
863 { -E_REMOTE_OVER_SIZE, SyncOperation::OP_MAX_LIMITS },
864 { -E_INVALID_PASSWD_OR_CORRUPTED_DB, SyncOperation::OP_NOTADB_OR_CORRUPTED },
865 };
866 auto iter = statusMap.find(errCode);
867 if (iter != statusMap.end()) {
868 return iter->second;
869 }
870 return SyncOperation::OP_FAILED;
871 }
872
TimeMarkSyncRecv(const Message * inMsg)873 int SingleVerSyncStateMachine::TimeMarkSyncRecv(const Message *inMsg)
874 {
875 LOGD("[StateMachine][TimeMarkSyncRecv] type=%d,label=%s,dev=%s", inMsg->GetMessageType(),
876 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
877 {
878 std::lock_guard<std::mutex> lock(stateMachineLock_);
879 (void)ResetWatchDog();
880 }
881 if (inMsg->GetMessageType() == TYPE_REQUEST) {
882 return timeSync_->RequestRecv(inMsg);
883 } else if (inMsg->GetMessageType() == TYPE_RESPONSE) {
884 int errCode = timeSync_->AckRecv(inMsg, context_->GetRequestSessionId());
885 if (errCode != E_OK) {
886 LOGE("[StateMachine][TimeMarkSyncRecv] AckRecv failed errCode=%d", errCode);
887 if (inMsg->GetSessionId() != 0 && inMsg->GetSessionId() == context_->GetRequestSessionId()) {
888 context_->SetTaskErrCode(errCode);
889 InnerErrorAbort(inMsg->GetSessionId());
890 }
891 return errCode;
892 }
893 std::lock_guard<std::mutex> lock(stateMachineLock_);
894 SwitchStateAndStep(TIME_SYNC_FINISHED_EVENT);
895 return E_OK;
896 } else {
897 return -E_INVALID_ARGS;
898 }
899 }
900
Clear()901 void SingleVerSyncStateMachine::Clear()
902 {
903 dataSync_ = nullptr;
904 timeSync_ = nullptr;
905 abilitySync_ = nullptr;
906 context_ = nullptr;
907 syncInterface_ = nullptr;
908 }
909
IsPacketValid(const Message * inMsg) const910 bool SingleVerSyncStateMachine::IsPacketValid(const Message *inMsg) const
911 {
912 if (inMsg == nullptr) {
913 return false;
914 }
915
916 if ((inMsg->GetMessageId() < TIME_SYNC_MESSAGE) || (inMsg->GetMessageId() >= UNKNOW_MESSAGE)) {
917 LOGE("[StateMachine][IsPacketValid] Message is invalid, id=%d", inMsg->GetMessageId());
918 return false;
919 }
920 // filter invalid ack at first
921 bool isResponseType = (inMsg->GetMessageType() == TYPE_RESPONSE);
922 if (isResponseType && (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) &&
923 (inMsg->GetSessionId() != context_->GetRequestSessionId())) {
924 LOGE("[StateMachine][IsPacketValid] Control Message is invalid, label=%s, dev=%s",
925 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
926 return false;
927 }
928 if (isResponseType && (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) &&
929 (inMsg->GetSessionId() != context_->GetRequestSessionId()) &&
930 (inMsg->GetSessionId() != context_->GetResponseSessionId())) {
931 LOGE("[StateMachine][IsPacketValid] Data Message is invalid, label=%s, dev=%s",
932 dataSync_->GetLabel().c_str(), STR_MASK(context_->GetDeviceId()));
933 return false;
934 }
935
936 // timeSync and abilitySync don't need to check sequenceId
937 if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE || inMsg->GetMessageId() == ABILITY_SYNC_MESSAGE ||
938 inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
939 return true;
940 }
941 // sequenceId will be checked in dataSync
942 return true;
943 }
944
PreStartPullResponse()945 void SingleVerSyncStateMachine::PreStartPullResponse()
946 {
947 SingleVerSyncTarget target;
948 context_->PopResponseTarget(target);
949 context_->SetEndMark(target.GetEndWaterMark());
950 context_->SetResponseSessionId(target.GetResponseSessionId());
951 context_->SetMode(SyncModeType::RESPONSE_PULL);
952 context_->ReSetSequenceId();
953 context_->SetQuerySync(target.IsQuerySync());
954 context_->SetQuery(target.GetQuery());
955 }
956
CheckIsStartPullResponse() const957 bool SingleVerSyncStateMachine::CheckIsStartPullResponse() const
958 {
959 // Other state will step to do pull response, only this statem we need to step the statemachine
960 if (currentState_ == WAIT_FOR_RECEIVE_DATA_FINISH) {
961 return true;
962 }
963 return false;
964 }
965
MessageCallbackPre(const Message * inMsg)966 int SingleVerSyncStateMachine::MessageCallbackPre(const Message *inMsg)
967 {
968 RefObject::AutoLock lock(context_);
969 if (context_->IsKilled()) {
970 return -E_OBJ_IS_KILLED;
971 }
972
973 if (!IsPacketValid(inMsg)) {
974 return -E_INVALID_ARGS;
975 }
976 return E_OK;
977 }
978
AddPullResponseTarget(const Message * inMsg,WaterMark pullEndWatermark)979 void SingleVerSyncStateMachine::AddPullResponseTarget(const Message *inMsg, WaterMark pullEndWatermark)
980 {
981 int messageType = static_cast<int>(inMsg->GetMessageId());
982 uint32_t sessionId = inMsg->GetSessionId();
983 if (pullEndWatermark == 0) {
984 LOGE("[StateMachine][AddPullResponseTarget] pullEndWatermark is 0!");
985 return;
986 }
987 if (context_->GetResponseSessionId() == sessionId || context_->FindResponseSyncTarget(sessionId)) {
988 LOGI("[StateMachine][AddPullResponseTarget] task is already running");
989 return;
990 }
991 const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
992 if (packet == nullptr) {
993 LOGE("[AddPullResponseTarget] get packet object failed");
994 return;
995 }
996 SingleVerSyncTarget *targetTmp = new (std::nothrow) SingleVerSyncTarget;
997 if (targetTmp == nullptr) {
998 LOGE("[StateMachine][AddPullResponseTarget] add failed, may oom");
999 return;
1000 }
1001 targetTmp->SetTaskType(ISyncTarget::RESPONSE);
1002 if (messageType == QUERY_SYNC_MESSAGE) {
1003 targetTmp->SetQuery(packet->GetQuery());
1004 targetTmp->SetQuerySync(true);
1005 }
1006 targetTmp->SetMode(SyncModeType::RESPONSE_PULL);
1007 targetTmp->SetEndWaterMark(pullEndWatermark);
1008 targetTmp->SetResponseSessionId(sessionId);
1009 if (context_->AddSyncTarget(targetTmp) != E_OK) {
1010 delete targetTmp;
1011 return;
1012 }
1013 if (CheckIsStartPullResponse()) {
1014 SwitchStateAndStep(TransformErrCodeToEvent(-E_NEED_PULL_REPONSE));
1015 }
1016 }
1017
TransformErrCodeToEvent(int errCode)1018 Event SingleVerSyncStateMachine::TransformErrCodeToEvent(int errCode)
1019 {
1020 switch (errCode) {
1021 case -E_TIMEOUT:
1022 return TransforTimeOutErrCodeToEvent();
1023 case -VERSION_NOT_SUPPOR_EVENT:
1024 return Event::VERSION_NOT_SUPPOR_EVENT;
1025 case -E_SEND_DATA:
1026 return Event::SEND_DATA_EVENT;
1027 case -E_NO_DATA_SEND:
1028 return Event::SEND_FINISHED_EVENT;
1029 case -E_RECV_FINISHED:
1030 return Event::RECV_FINISHED_EVENT;
1031 case -E_NEED_ABILITY_SYNC:
1032 return Event::NEED_ABILITY_SYNC_EVENT;
1033 case -E_NO_SYNC_TASK:
1034 return Event::ALL_TASK_FINISHED_EVENT;
1035 case -E_NEED_PULL_REPONSE:
1036 return Event::START_PULL_RESPONSE_EVENT;
1037 case -E_RE_SEND_DATA:
1038 return Event::RE_SEND_DATA_EVENT;
1039 default:
1040 return Event::INNER_ERR_EVENT;
1041 }
1042 }
1043
IsNeedResetWatchdog(const Message * inMsg) const1044 bool SingleVerSyncStateMachine::IsNeedResetWatchdog(const Message *inMsg) const
1045 {
1046 if (inMsg == nullptr) {
1047 return false;
1048 }
1049
1050 if (IsNeedErrCodeHandle(inMsg->GetSessionId())) {
1051 return true;
1052 }
1053
1054 int msgType = inMsg->GetMessageType();
1055 if (msgType == TYPE_RESPONSE || msgType == TYPE_NOTIFY) {
1056 if (inMsg->GetSessionId() == context_->GetResponseSessionId()) {
1057 // Pull response ack also should reset watchdog
1058 return true;
1059 }
1060 }
1061
1062 return false;
1063 }
1064
TransforTimeOutErrCodeToEvent() const1065 Event SingleVerSyncStateMachine::TransforTimeOutErrCodeToEvent() const
1066 {
1067 if (syncContext_->IsSyncTaskNeedRetry() && (syncContext_->GetRetryTime() < syncContext_->GetSyncRetryTimes())) {
1068 return Event::WAIT_TIME_OUT_EVENT;
1069 } else {
1070 return Event::TIME_OUT_EVENT;
1071 }
1072 }
1073
IsNeedErrCodeHandle(uint32_t sessionId) const1074 bool SingleVerSyncStateMachine::IsNeedErrCodeHandle(uint32_t sessionId) const
1075 {
1076 // omit to set sessionId so version_3 should skip to compare sessionid.
1077 if (sessionId == context_->GetRequestSessionId() ||
1078 context_->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_RELEASE_2_0) {
1079 return true;
1080 }
1081 return false;
1082 }
1083
PushPullDataRequestEvokeErrHandle()1084 void SingleVerSyncStateMachine::PushPullDataRequestEvokeErrHandle()
1085 {
1086 // the pushpull sync task should wait for send finished after remote dev get data occur E_EKEYREVOKED error.
1087 if (context_->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 &&
1088 SyncOperation::TransferSyncMode(context_->GetMode()) == SyncModeType::PUSH_AND_PULL) {
1089 LOGI("data request errCode = %d, wait for send finished", -E_EKEYREVOKED);
1090 context_->SetTaskErrCode(-E_EKEYREVOKED);
1091 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1092 SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1093 } else {
1094 context_->SetTaskErrCode(-E_EKEYREVOKED);
1095 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1096 }
1097 }
1098
DataRecvErrCodeHandle(uint32_t sessionId,int errCode)1099 void SingleVerSyncStateMachine::DataRecvErrCodeHandle(uint32_t sessionId, int errCode)
1100 {
1101 if (IsNeedErrCodeHandle(sessionId)) {
1102 switch (errCode) {
1103 case E_OK:
1104 break;
1105 case -E_RECV_FINISHED:
1106 context_->SetOperationStatus(SyncOperation::OP_RECV_FINISHED);
1107 SwitchStateAndStep(Event::RECV_FINISHED_EVENT);
1108 break;
1109 case -E_EKEYREVOKED:
1110 PushPullDataRequestEvokeErrHandle();
1111 break;
1112 case -E_BUSY:
1113 case -E_DISTRIBUTED_SCHEMA_CHANGED:
1114 case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1115 case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1116 case -E_FEEDBACK_UNKNOWN_MESSAGE:
1117 case -E_INTERCEPT_DATA_FAIL:
1118 case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1119 case -E_INVALID_QUERY_FIELD:
1120 case -E_INVALID_QUERY_FORMAT:
1121 case -E_MAX_LIMITS:
1122 case -E_NOT_REGISTER:
1123 case -E_NOT_SUPPORT:
1124 case -E_SECURITY_OPTION_CHECK_ERROR:
1125 context_->SetTaskErrCode(errCode);
1126 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1127 break;
1128 default:
1129 SwitchStateAndStep(Event::INNER_ERR_EVENT);
1130 break;
1131 }
1132 }
1133 }
1134
AbilityMsgSessionIdCheck(const Message * inMsg)1135 bool SingleVerSyncStateMachine::AbilityMsgSessionIdCheck(const Message *inMsg)
1136 {
1137 if (inMsg != nullptr && inMsg->GetSessionId() == context_->GetRequestSessionId()) {
1138 return true;
1139 }
1140 LOGE("[AbilitySync] session check failed,dev=%s", STR_MASK(context_->GetDeviceId()));
1141 return false;
1142 }
1143
GetSyncType(uint32_t messageId) const1144 SyncType SingleVerSyncStateMachine::GetSyncType(uint32_t messageId) const
1145 {
1146 if (messageId == QUERY_SYNC_MESSAGE) {
1147 return SyncType::QUERY_SYNC_TYPE;
1148 }
1149 return SyncType::MANUAL_FULL_SYNC_TYPE;
1150 }
1151
DataAckRecvErrCodeHandle(int errCode,bool handleError)1152 void SingleVerSyncStateMachine::DataAckRecvErrCodeHandle(int errCode, bool handleError)
1153 {
1154 switch (errCode) {
1155 case -E_NEED_ABILITY_SYNC:
1156 NeedAbilitySyncHandle();
1157 break;
1158 case -E_NOT_PERMIT:
1159 if (handleError) {
1160 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1161 }
1162 break;
1163 case -E_BUSY:
1164 case -E_DISTRIBUTED_SCHEMA_CHANGED:
1165 case -E_DISTRIBUTED_SCHEMA_NOT_FOUND:
1166 case -E_EKEYREVOKED:
1167 case -E_FEEDBACK_COMMUNICATOR_NOT_FOUND:
1168 case -E_FEEDBACK_UNKNOWN_MESSAGE:
1169 case -E_INTERCEPT_DATA_FAIL:
1170 case -E_INVALID_PASSWD_OR_CORRUPTED_DB:
1171 case -E_INVALID_QUERY_FIELD:
1172 case -E_INVALID_QUERY_FORMAT:
1173 case -E_MAX_LIMITS:
1174 case -E_NOT_REGISTER:
1175 case -E_NOT_SUPPORT:
1176 case -E_SECURITY_OPTION_CHECK_ERROR:
1177 if (handleError) {
1178 context_->SetTaskErrCode(errCode);
1179 }
1180 break;
1181 default:
1182 break;
1183 }
1184 }
1185
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)1186 bool SingleVerSyncStateMachine::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
1187 {
1188 return SingleVerDataSyncUtils::IsNeedTriggerQueryAutoSync(inMsg, query);
1189 }
1190
JumpStatusAfterAbilitySync(int mode)1191 void SingleVerSyncStateMachine::JumpStatusAfterAbilitySync(int mode)
1192 {
1193 if ((mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
1194 SwitchStateAndStep(CONTROL_CMD_EVENT);
1195 } else {
1196 SwitchStateAndStep(ABILITY_SYNC_FINISHED_EVENT);
1197 }
1198 }
1199
ControlAckRecvErrCodeHandle(int errCode)1200 void SingleVerSyncStateMachine::ControlAckRecvErrCodeHandle(int errCode)
1201 {
1202 switch (errCode) {
1203 case -E_NEED_ABILITY_SYNC:
1204 NeedAbilitySyncHandle();
1205 break;
1206 case -E_NO_DATA_SEND:
1207 context_->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
1208 break;
1209 case -E_NOT_PERMIT:
1210 context_->SetOperationStatus(SyncOperation::OP_PERMISSION_CHECK_FAILED);
1211 break;
1212 // other errCode use default
1213 default:
1214 context_->SetTaskErrCode(errCode);
1215 break;
1216 }
1217 }
1218
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)1219 void SingleVerSyncStateMachine::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
1220 {
1221 metadata_->GetLocalWaterMark(deviceId, outValue);
1222 }
1223
GetSendQueryWaterMark(const std::string & queryId,const DeviceID & deviceId,bool isAutoLift,uint64_t & outValue)1224 int SingleVerSyncStateMachine::GetSendQueryWaterMark(const std::string &queryId, const DeviceID &deviceId,
1225 bool isAutoLift, uint64_t &outValue)
1226 {
1227 return metadata_->GetSendQueryWaterMark(queryId, deviceId, outValue, isAutoLift);
1228 }
1229
ResponsePullError(int errCode,bool ignoreInnerErr)1230 void SingleVerSyncStateMachine::ResponsePullError(int errCode, bool ignoreInnerErr)
1231 {
1232 Event event = TransformErrCodeToEvent(errCode);
1233 if (event == INNER_ERR_EVENT) {
1234 if (ignoreInnerErr) {
1235 event = RESPONSE_TASK_FINISHED_EVENT;
1236 } else if (context_ != nullptr) {
1237 context_->SetTaskErrCode(errCode);
1238 }
1239 }
1240 SwitchStateAndStep(event);
1241 }
1242
InnerErrorAbort(uint32_t sessionId)1243 void SingleVerSyncStateMachine::InnerErrorAbort(uint32_t sessionId)
1244 {
1245 std::lock_guard<std::mutex> lock(stateMachineLock_);
1246 uint32_t requestSessionId = context_->GetRequestSessionId();
1247 if (sessionId != requestSessionId) {
1248 LOGD("[SingleVerSyncStateMachine][InnerErrorAbort] Ignore abort by different sessionId");
1249 return;
1250 }
1251 if (SwitchMachineState(Event::INNER_ERR_EVENT) == E_OK) {
1252 SyncStep();
1253 }
1254 }
1255
NotifyClosing()1256 void SingleVerSyncStateMachine::NotifyClosing()
1257 {
1258 if (timeSync_ != nullptr) {
1259 timeSync_->Close();
1260 }
1261 }
1262 } // namespace DistributedDB
1263