1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_state_machine.h"
17
18 #include <algorithm>
19
20 #include "log_print.h"
21 #include "version.h"
22
23 namespace DistributedDB {
SyncStateMachine()24 SyncStateMachine::SyncStateMachine()
25 : syncContext_(nullptr),
26 storageInterface_(nullptr),
27 communicator_(nullptr),
28 metadata_(nullptr),
29 currentState_(0),
30 watchDogStarted_(false),
31 currentSyncProctolVersion_(SINGLE_VER_SYNC_PROCTOL_V3),
32 saveDataNotifyTimerId_(0),
33 saveDataNotifyCount_(0),
34 getDataNotifyTimerId_(0),
35 getDataNotifyCount_(0)
36 {
37 }
38
~SyncStateMachine()39 SyncStateMachine::~SyncStateMachine()
40 {
41 syncContext_ = nullptr;
42 storageInterface_ = nullptr;
43 watchDogStarted_ = false;
44 metadata_ = nullptr;
45 if (communicator_ != nullptr) {
46 RefObject::DecObjRef(communicator_);
47 communicator_ = nullptr;
48 }
49 }
50
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)51 int SyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
52 std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
53 {
54 if ((context == nullptr) || (syncInterface == nullptr) || (metadata == nullptr) || (communicator == nullptr)) {
55 return -E_INVALID_ARGS;
56 }
57 syncContext_ = context;
58 storageInterface_ = syncInterface;
59 metadata_ = metadata;
60 RefObject::IncObjRef(communicator);
61 communicator_ = communicator;
62 return E_OK;
63 }
64
StartSync()65 int SyncStateMachine::StartSync()
66 {
67 int errCode = syncContext_->IncUsedCount();
68 if (errCode != E_OK) {
69 return errCode;
70 }
71 std::lock_guard<std::mutex> lock(stateMachineLock_);
72 errCode = StartSyncInner();
73 syncContext_->SafeExit();
74 return errCode;
75 }
76
TimeoutCallback(TimerId timerId)77 int SyncStateMachine::TimeoutCallback(TimerId timerId)
78 {
79 RefObject::AutoLock lock(syncContext_);
80 if (syncContext_->IsKilled()) {
81 return -E_OBJ_IS_KILLED;
82 }
83 TimerId timer = syncContext_->GetTimerId();
84 if (timer != timerId) {
85 return -E_UNEXPECTED_DATA;
86 }
87
88 int retryTime = syncContext_->GetRetryTime();
89 if (retryTime >= syncContext_->GetSyncRetryTimes() || !syncContext_->IsSyncTaskNeedRetry()) {
90 LOGI("[SyncStateMachine][Timeout] TimeoutCallback retryTime:%d", retryTime);
91 syncContext_->UnlockObj();
92 StepToTimeout(timerId);
93 syncContext_->LockObj();
94 return E_OK;
95 }
96 retryTime++;
97 syncContext_->SetRetryTime(retryTime);
98 // the sequenceid will be managed by dataSync slide windows.
99 syncContext_->SetRetryStatus(SyncTaskContext::NEED_RETRY);
100 int timeoutTime = syncContext_->GetSyncRetryTimeout(retryTime);
101 syncContext_->ModifyTimer(timeoutTime);
102 LOGI("[SyncStateMachine][Timeout] Schedule task, timeoutTime = %d, retryTime = %d", timeoutTime, retryTime);
103 SyncStep();
104 return E_OK;
105 }
106
Abort()107 void SyncStateMachine::Abort()
108 {
109 RefObject::IncObjRef(syncContext_);
110 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
111 this->AbortImmediately();
112 RefObject::DecObjRef(this->syncContext_);
113 });
114 if (errCode != E_OK) {
115 LOGE("[SyncStateMachine][Abort] Abort failed, errCode %d", errCode);
116 RefObject::DecObjRef(syncContext_);
117 }
118 }
119
AbortImmediately()120 void SyncStateMachine::AbortImmediately()
121 {
122 std::lock_guard<std::mutex> lock(stateMachineLock_);
123 AbortInner();
124 StopWatchDog();
125 currentState_ = 0;
126 }
127
SwitchMachineState(uint8_t event)128 int SyncStateMachine::SwitchMachineState(uint8_t event)
129 {
130 const std::vector<StateSwitchTable> &tables = GetStateSwitchTables();
131 auto tableIter = std::find_if(tables.begin(), tables.end(),
132 [this](const StateSwitchTable &table) {
133 return table.version <= currentSyncProctolVersion_;
134 });
135 if (tableIter == tables.end()) {
136 LOGE("[SyncStateMachine][SwitchState] Can't find a compatible version by version %u",
137 currentSyncProctolVersion_);
138 return -E_NOT_FOUND;
139 }
140
141 const std::map<uint8_t, EventToState> &table = (*tableIter).switchTable;
142 auto eventToStateIter = table.find(currentState_);
143 if (eventToStateIter == table.end()) {
144 LOGE("[SyncStateMachine][SwitchState] tableVer:%d, Can't find EventToState with currentSate %u",
145 (*tableIter).version, currentState_);
146 SetCurStateErrStatus();
147 return E_OK;
148 }
149
150 const EventToState &eventToState = eventToStateIter->second;
151 auto stateIter = eventToState.find(event);
152 if (stateIter == eventToState.end()) {
153 LOGD("[SyncStateMachine][SwitchState] tableVer:%d, Can't find event %u int currentSate %u ignore",
154 (*tableIter).version, event, currentState_);
155 return -E_NOT_FOUND;
156 }
157
158 currentState_ = stateIter->second;
159 LOGD("[SyncStateMachine][SwitchState] tableVer:%d, from state %u move to state %u with event %u dev %s{private}",
160 (*tableIter).version, eventToStateIter->first, currentState_, event, syncContext_->GetDeviceId().c_str());
161 return E_OK;
162 }
163
SwitchStateAndStep(uint8_t event)164 void SyncStateMachine::SwitchStateAndStep(uint8_t event)
165 {
166 if (SwitchMachineState(event) == E_OK) {
167 SyncStepInner();
168 }
169 }
170
ExecNextTask()171 int SyncStateMachine::ExecNextTask()
172 {
173 syncContext_->Clear();
174 while (!syncContext_->IsTargetQueueEmpty()) {
175 int errCode = syncContext_->GetNextTarget(false);
176 if (errCode != E_OK) {
177 continue;
178 }
179 if (syncContext_->IsCurrentSyncTaskCanBeSkipped()) {
180 syncContext_->SetOperationStatus(SyncOperation::OP_FINISHED_ALL);
181 continue;
182 }
183 errCode = PrepareNextSyncTask();
184 if (errCode != E_OK) {
185 LOGE("[SyncStateMachine] PrepareSync failed");
186 syncContext_->SetOperationStatus(SyncOperation::OP_FAILED);
187 }
188 return errCode;
189 }
190 syncContext_->SetTaskExecStatus(ISyncTaskContext::FINISHED);
191 // no task left
192 LOGD("[SyncStateMachine] All sync task finished!");
193 return -E_NO_SYNC_TASK;
194 }
195
StartWatchDog()196 int SyncStateMachine::StartWatchDog()
197 {
198 int errCode = syncContext_->StartTimer();
199 if (errCode == E_OK) {
200 watchDogStarted_ = true;
201 }
202 return errCode;
203 }
204
ResetWatchDog()205 int SyncStateMachine::ResetWatchDog()
206 {
207 if (!watchDogStarted_) {
208 return E_OK;
209 }
210 LOGD("[SyncStateMachine][WatchDog] ResetWatchDog.");
211 syncContext_->StopTimer();
212 syncContext_->SetRetryTime(0);
213 return syncContext_->StartTimer();
214 }
215
StopWatchDog()216 void SyncStateMachine::StopWatchDog()
217 {
218 watchDogStarted_ = false;
219 LOGD("[SyncStateMachine][WatchDog] StopWatchDog.");
220 syncContext_->StopTimer();
221 }
222
StartSaveDataNotify(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)223 bool SyncStateMachine::StartSaveDataNotify(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
224 {
225 std::lock_guard<std::mutex> lockGuard(saveDataNotifyLock_);
226 if (saveDataNotifyTimerId_ > 0) {
227 saveDataNotifyCount_ = 0;
228 LOGW("[SyncStateMachine][SaveDataNotify] timer has been started!");
229 return false;
230 }
231
232 // Incref to make sure context still alive before timer stopped.
233 RefObject::IncObjRef(syncContext_);
234 int errCode = RuntimeContext::GetInstance()->SetTimer(
235 DATA_NOTIFY_INTERVAL,
236 [this, sessionId, sequenceId, inMsgId](TimerId timerId) {
237 RefObject::IncObjRef(syncContext_);
238 int ret = RuntimeContext::GetInstance()->ScheduleTask([this, sessionId, sequenceId, inMsgId]() {
239 DoSaveDataNotify(sessionId, sequenceId, inMsgId);
240 RefObject::DecObjRef(syncContext_);
241 });
242 if (ret != E_OK) {
243 LOGE("[SyncStateMachine] [DoSaveDataNotify] ScheduleTask failed errCode %d", ret);
244 RefObject::DecObjRef(syncContext_);
245 }
246 return ret;
247 },
248 [this]() { RefObject::DecObjRef(syncContext_); },
249 saveDataNotifyTimerId_);
250 if (errCode != E_OK) {
251 LOGW("[SyncStateMachine][SaveDataNotify] start timer failed err %d !", errCode);
252 return false;
253 }
254 return true;
255 }
256
StopSaveDataNotify()257 void SyncStateMachine::StopSaveDataNotify()
258 {
259 std::lock_guard<std::mutex> lockGuard(saveDataNotifyLock_);
260 StopSaveDataNotifyNoLock();
261 }
262
StopSaveDataNotifyNoLock()263 void SyncStateMachine::StopSaveDataNotifyNoLock()
264 {
265 if (saveDataNotifyTimerId_ == 0) {
266 LOGI("[SyncStateMachine][SaveDataNotify] timer is not started!");
267 return;
268 }
269 RuntimeContext::GetInstance()->RemoveTimer(saveDataNotifyTimerId_);
270 saveDataNotifyTimerId_ = 0;
271 saveDataNotifyCount_ = 0;
272 }
273
StartFeedDogForSync(uint32_t time,SyncDirectionFlag flag)274 bool SyncStateMachine::StartFeedDogForSync(uint32_t time, SyncDirectionFlag flag)
275 {
276 if (flag != SyncDirectionFlag::SEND && flag != SyncDirectionFlag::RECEIVE) {
277 LOGE("[SyncStateMachine][feedDog] start wrong flag:%d", flag);
278 return false;
279 }
280
281 uint8_t cnt = GetFeedDogTimeout(time / DATA_NOTIFY_INTERVAL);
282 LOGI("[SyncStateMachine][feedDog] start cnt:%d, flag:%d", cnt, flag);
283
284 std::lock_guard<std::mutex> lockGuard(feedDogLock_[flag]);
285 watchDogController_[flag].refCount++;
286 LOGD("af incr refCount = %d", watchDogController_[flag].refCount);
287
288 if (watchDogController_[flag].feedDogTimerId > 0) {
289 // update the upperLimit, if the new cnt is bigger then last upperLimit
290 if (cnt > watchDogController_[flag].feedDogUpperLimit) {
291 LOGD("update feedDogUpperLimit = %d", cnt);
292 watchDogController_[flag].feedDogUpperLimit = cnt;
293 }
294 watchDogController_[flag].feedDogCnt = 0u;
295 LOGW("[SyncStateMachine][feedDog] timer has been started!, flag:%d", flag);
296 return false;
297 }
298
299 // Incref to make sure context still alive before timer stopped.
300 RefObject::IncObjRef(syncContext_);
301 watchDogController_[flag].feedDogUpperLimit = cnt;
302 int errCode = RuntimeContext::GetInstance()->SetTimer(
303 DATA_NOTIFY_INTERVAL,
304 [this, flag](TimerId timerId) {
305 RefObject::IncObjRef(syncContext_);
306 int ret = RuntimeContext::GetInstance()->ScheduleTask([this, flag]() {
307 DoFeedDogForSync(flag);
308 RefObject::DecObjRef(syncContext_);
309 });
310 if (ret != E_OK) {
311 LOGE("[SyncStateMachine] [DoFeedDogForSync] ScheduleTask failed errCode %d", ret);
312 RefObject::DecObjRef(syncContext_);
313 }
314 return ret;
315 },
316 [this]() { RefObject::DecObjRef(syncContext_); },
317 watchDogController_[flag].feedDogTimerId);
318 if (errCode != E_OK) {
319 LOGW("[SyncStateMachine][feedDog] start timer failed err %d !", errCode);
320 return false;
321 }
322 return true;
323 }
324
GetFeedDogTimeout(int timeoutCount) const325 uint8_t SyncStateMachine::GetFeedDogTimeout(int timeoutCount) const
326 {
327 if (timeoutCount > UINT8_MAX) {
328 return UINT8_MAX;
329 }
330 return timeoutCount;
331 }
332
StopFeedDogForSync(SyncDirectionFlag flag)333 void SyncStateMachine::StopFeedDogForSync(SyncDirectionFlag flag)
334 {
335 if (flag != SyncDirectionFlag::SEND && flag != SyncDirectionFlag::RECEIVE) {
336 LOGE("[SyncStateMachine][feedDog] stop wrong flag:%d", flag);
337 return;
338 }
339 std::lock_guard<std::mutex> lockGuard(feedDogLock_[flag]);
340 StopFeedDogForSyncNoLock(flag);
341 }
342
StopFeedDogForSyncNoLock(SyncDirectionFlag flag)343 void SyncStateMachine::StopFeedDogForSyncNoLock(SyncDirectionFlag flag)
344 {
345 if (flag != SyncDirectionFlag::SEND && flag != SyncDirectionFlag::RECEIVE) {
346 LOGE("[SyncStateMachine][feedDog] stop wrong flag:%d", flag);
347 return;
348 }
349 if (watchDogController_[flag].feedDogTimerId == 0) {
350 return;
351 }
352 LOGI("[SyncStateMachine][feedDog] stop flag:%d", flag);
353 RuntimeContext::GetInstance()->RemoveTimer(watchDogController_[flag].feedDogTimerId);
354 watchDogController_[flag].feedDogTimerId = 0;
355 watchDogController_[flag].feedDogCnt = 0;
356 watchDogController_[flag].refCount = 0;
357 }
358
SetCurStateErrStatus()359 void SyncStateMachine::SetCurStateErrStatus()
360 {
361 }
362
DecRefCountOfFeedDogTimer(SyncDirectionFlag flag)363 void SyncStateMachine::DecRefCountOfFeedDogTimer(SyncDirectionFlag flag)
364 {
365 std::lock_guard<std::mutex> lockGuard(feedDogLock_[flag]);
366 if (watchDogController_[flag].feedDogTimerId == 0) {
367 return;
368 }
369 if (--watchDogController_[flag].refCount <= 0) {
370 LOGD("stop feed dog timer, refcount = %d", watchDogController_[flag].refCount);
371 StopFeedDogForSyncNoLock(flag);
372 }
373 LOGD("af dec refcount = %d", watchDogController_[flag].refCount);
374 }
375
DoSaveDataNotify(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)376 void SyncStateMachine::DoSaveDataNotify(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
377 {
378 {
379 std::lock_guard<std::mutex> lock(stateMachineLock_);
380 (void)ResetWatchDog();
381 }
382 std::lock_guard<std::mutex> innerLock(saveDataNotifyLock_);
383 if (saveDataNotifyCount_ >= MAX_DATA_NOTIFY_COUNT) {
384 StopSaveDataNotifyNoLock();
385 return;
386 }
387 SendNotifyPacket(sessionId, sequenceId, inMsgId);
388 saveDataNotifyCount_++;
389 }
390
DoFeedDogForSync(SyncDirectionFlag flag)391 void SyncStateMachine::DoFeedDogForSync(SyncDirectionFlag flag)
392 {
393 {
394 std::lock_guard<std::mutex> lock(stateMachineLock_);
395 (void)ResetWatchDog();
396 }
397 std::lock_guard<std::mutex> innerLock(feedDogLock_[flag]);
398 if (watchDogController_[flag].feedDogCnt >= watchDogController_[flag].feedDogUpperLimit) {
399 StopFeedDogForSyncNoLock(flag);
400 return;
401 }
402 watchDogController_[flag].feedDogCnt++;
403 }
404
InnerErrorAbort(uint32_t sessionId)405 void SyncStateMachine::InnerErrorAbort(uint32_t sessionId)
406 {
407 // do nothing
408 (void) sessionId;
409 }
410
NotifyClosing()411 void SyncStateMachine::NotifyClosing()
412 {
413 // do nothing
414 }
415
StartFeedDogForGetData(uint32_t sessionId)416 void SyncStateMachine::StartFeedDogForGetData(uint32_t sessionId)
417 {
418 std::lock_guard<std::mutex> lockGuard(getDataNotifyLock_);
419 if (getDataNotifyTimerId_ > 0) {
420 getDataNotifyCount_ = 0;
421 LOGW("[SyncStateMachine][StartFeedDogForGetData] timer has been started!");
422 }
423
424 // Incref to make sure context still alive before timer stopped.
425 RefObject::IncObjRef(syncContext_);
426 int errCode = RuntimeContext::GetInstance()->SetTimer(
427 DATA_NOTIFY_INTERVAL,
428 [this, sessionId](TimerId timerId) {
429 RefObject::IncObjRef(syncContext_);
430 int ret = RuntimeContext::GetInstance()->ScheduleTask([this, sessionId, timerId]() {
431 DoGetAndSendDataNotify(sessionId);
432 int getDataNotifyCount = 0;
433 {
434 std::lock_guard<std::mutex> autoLock(getDataNotifyLock_);
435 getDataNotifyCount = getDataNotifyCount_;
436 }
437 if (getDataNotifyCount >= MAX_DATA_NOTIFY_COUNT) {
438 StopFeedDogForGetDataInner(timerId);
439 }
440 RefObject::DecObjRef(syncContext_);
441 });
442 if (ret != E_OK) {
443 LOGE("[SyncStateMachine] [StartFeedDogForGetData] ScheduleTask failed errCode %d", ret);
444 RefObject::DecObjRef(syncContext_);
445 }
446 return ret;
447 },
448 [this]() { RefObject::DecObjRef(syncContext_); },
449 getDataNotifyTimerId_);
450 if (errCode != E_OK) {
451 LOGW("[SyncStateMachine][StartFeedDogForGetData] start timer failed err %d !", errCode);
452 }
453 }
454
StopFeedDogForGetData()455 void SyncStateMachine::StopFeedDogForGetData()
456 {
457 TimerId timerId = 0;
458 {
459 std::lock_guard<std::mutex> lockGuard(getDataNotifyLock_);
460 timerId = getDataNotifyTimerId_;
461 }
462 if (timerId == 0) {
463 return;
464 }
465 StopFeedDogForGetDataInner(timerId);
466 }
467
DoGetAndSendDataNotify(uint32_t sessionId)468 void SyncStateMachine::DoGetAndSendDataNotify(uint32_t sessionId)
469 {
470 (void)ResetWatchDog();
471 std::lock_guard<std::mutex> autoLock(getDataNotifyLock_);
472 if (getDataNotifyCount_ >= MAX_DATA_NOTIFY_COUNT) {
473 return;
474 }
475 if (sessionId != 0) {
476 SendNotifyPacket(sessionId, 0, DATA_SYNC_MESSAGE);
477 }
478 getDataNotifyCount_++;
479 }
480
StopFeedDogForGetDataInner(TimerId timerId)481 void SyncStateMachine::StopFeedDogForGetDataInner(TimerId timerId)
482 {
483 std::lock_guard<std::mutex> lockGuard(getDataNotifyLock_);
484 if (getDataNotifyTimerId_ == 0 || getDataNotifyTimerId_ != timerId) {
485 return;
486 }
487 RuntimeContext::GetInstance()->RemoveTimer(timerId);
488 getDataNotifyTimerId_ = 0;
489 getDataNotifyCount_ = 0;
490 }
491 } // namespace DistributedDB
492