• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_state_machine.h"
17 
18 #include <algorithm>
19 
20 #include "log_print.h"
21 #include "version.h"
22 
23 namespace DistributedDB {
SyncStateMachine()24 SyncStateMachine::SyncStateMachine()
25     : syncContext_(nullptr),
26       storageInterface_(nullptr),
27       communicator_(nullptr),
28       metadata_(nullptr),
29       currentState_(0),
30       watchDogStarted_(false),
31       currentSyncProctolVersion_(SINGLE_VER_SYNC_PROCTOL_V3),
32       saveDataNotifyTimerId_(0),
33       saveDataNotifyCount_(0),
34       getDataNotifyTimerId_(0),
35       getDataNotifyCount_(0)
36 {
37 }
38 
~SyncStateMachine()39 SyncStateMachine::~SyncStateMachine()
40 {
41     syncContext_ = nullptr;
42     storageInterface_ = nullptr;
43     watchDogStarted_ = false;
44     metadata_ = nullptr;
45     if (communicator_ != nullptr) {
46         RefObject::DecObjRef(communicator_);
47         communicator_ = nullptr;
48     }
49 }
50 
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)51 int SyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
52     std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
53 {
54     if ((context == nullptr) || (syncInterface == nullptr) || (metadata == nullptr) || (communicator == nullptr)) {
55         return -E_INVALID_ARGS;
56     }
57     syncContext_ = context;
58     storageInterface_ = syncInterface;
59     metadata_ = metadata;
60     RefObject::IncObjRef(communicator);
61     communicator_ = communicator;
62     return E_OK;
63 }
64 
StartSync()65 int SyncStateMachine::StartSync()
66 {
67     int errCode = syncContext_->IncUsedCount();
68     if (errCode != E_OK) {
69         return errCode;
70     }
71     std::lock_guard<std::mutex> lock(stateMachineLock_);
72     errCode = StartSyncInner();
73     syncContext_->SafeExit();
74     return errCode;
75 }
76 
TimeoutCallback(TimerId timerId)77 int SyncStateMachine::TimeoutCallback(TimerId timerId)
78 {
79     RefObject::AutoLock lock(syncContext_);
80     if (syncContext_->IsKilled()) {
81         return -E_OBJ_IS_KILLED;
82     }
83     TimerId timer = syncContext_->GetTimerId();
84     if (timer != timerId) {
85         return -E_UNEXPECTED_DATA;
86     }
87 
88     int retryTime = syncContext_->GetRetryTime();
89     if (retryTime >= syncContext_->GetSyncRetryTimes() || !syncContext_->IsSyncTaskNeedRetry()) {
90         LOGI("[SyncStateMachine][Timeout] TimeoutCallback retryTime:%d", retryTime);
91         syncContext_->UnlockObj();
92         StepToTimeout(timerId);
93         syncContext_->LockObj();
94         return E_OK;
95     }
96     retryTime++;
97     syncContext_->SetRetryTime(retryTime);
98     // the sequenceid will be managed by dataSync slide windows.
99     syncContext_->SetRetryStatus(SyncTaskContext::NEED_RETRY);
100     int timeoutTime = syncContext_->GetSyncRetryTimeout(retryTime);
101     syncContext_->ModifyTimer(timeoutTime);
102     LOGI("[SyncStateMachine][Timeout] Schedule task, timeoutTime = %d, retryTime = %d", timeoutTime, retryTime);
103     SyncStep();
104     return E_OK;
105 }
106 
Abort()107 void SyncStateMachine::Abort()
108 {
109     RefObject::IncObjRef(syncContext_);
110     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
111         this->AbortImmediately();
112         RefObject::DecObjRef(this->syncContext_);
113     });
114     if (errCode != E_OK) {
115         LOGE("[SyncStateMachine][Abort] Abort failed, errCode %d", errCode);
116         RefObject::DecObjRef(syncContext_);
117     }
118 }
119 
AbortImmediately()120 void SyncStateMachine::AbortImmediately()
121 {
122     std::lock_guard<std::mutex> lock(stateMachineLock_);
123     AbortInner();
124     StopWatchDog();
125     currentState_ = 0;
126 }
127 
SwitchMachineState(uint8_t event)128 int SyncStateMachine::SwitchMachineState(uint8_t event)
129 {
130     const std::vector<StateSwitchTable> &tables = GetStateSwitchTables();
131     auto tableIter = std::find_if(tables.begin(), tables.end(),
132         [this](const StateSwitchTable &table) {
133             return table.version <= currentSyncProctolVersion_;
134         });
135     if (tableIter == tables.end()) {
136         LOGE("[SyncStateMachine][SwitchState] Can't find a compatible version by version %u",
137             currentSyncProctolVersion_);
138         return -E_NOT_FOUND;
139     }
140 
141     const std::map<uint8_t, EventToState> &table = (*tableIter).switchTable;
142     auto eventToStateIter = table.find(currentState_);
143     if (eventToStateIter == table.end()) {
144         LOGE("[SyncStateMachine][SwitchState] tableVer:%d, Can't find EventToState with currentSate %u",
145             (*tableIter).version, currentState_);
146         SetCurStateErrStatus();
147         return E_OK;
148     }
149 
150     const EventToState &eventToState = eventToStateIter->second;
151     auto stateIter = eventToState.find(event);
152     if (stateIter == eventToState.end()) {
153         LOGD("[SyncStateMachine][SwitchState] tableVer:%d, Can't find event %u int currentSate %u ignore",
154             (*tableIter).version, event, currentState_);
155         return -E_NOT_FOUND;
156     }
157 
158     currentState_ = stateIter->second;
159     LOGD("[SyncStateMachine][SwitchState] tableVer:%d, from state %u move to state %u with event %u dev %s{private}",
160         (*tableIter).version, eventToStateIter->first, currentState_, event, syncContext_->GetDeviceId().c_str());
161     return E_OK;
162 }
163 
SwitchStateAndStep(uint8_t event)164 void SyncStateMachine::SwitchStateAndStep(uint8_t event)
165 {
166     if (SwitchMachineState(event) == E_OK) {
167         SyncStepInner();
168     }
169 }
170 
ExecNextTask()171 int SyncStateMachine::ExecNextTask()
172 {
173     syncContext_->Clear();
174     while (!syncContext_->IsTargetQueueEmpty()) {
175         int errCode = syncContext_->GetNextTarget(false);
176         if (errCode != E_OK) {
177             continue;
178         }
179         if (syncContext_->IsCurrentSyncTaskCanBeSkipped()) {
180             syncContext_->SetOperationStatus(SyncOperation::OP_FINISHED_ALL);
181             continue;
182         }
183         errCode = PrepareNextSyncTask();
184         if (errCode != E_OK) {
185             LOGE("[SyncStateMachine] PrepareSync failed");
186             syncContext_->SetOperationStatus(SyncOperation::OP_FAILED);
187         }
188         return errCode;
189     }
190     syncContext_->SetTaskExecStatus(ISyncTaskContext::FINISHED);
191     // no task left
192     LOGD("[SyncStateMachine] All sync task finished!");
193     return -E_NO_SYNC_TASK;
194 }
195 
StartWatchDog()196 int SyncStateMachine::StartWatchDog()
197 {
198     int errCode = syncContext_->StartTimer();
199     if (errCode == E_OK) {
200         watchDogStarted_ = true;
201     }
202     return errCode;
203 }
204 
ResetWatchDog()205 int SyncStateMachine::ResetWatchDog()
206 {
207     if (!watchDogStarted_) {
208         return E_OK;
209     }
210     LOGD("[SyncStateMachine][WatchDog] ResetWatchDog.");
211     syncContext_->StopTimer();
212     syncContext_->SetRetryTime(0);
213     return syncContext_->StartTimer();
214 }
215 
StopWatchDog()216 void SyncStateMachine::StopWatchDog()
217 {
218     watchDogStarted_ = false;
219     LOGD("[SyncStateMachine][WatchDog] StopWatchDog.");
220     syncContext_->StopTimer();
221 }
222 
StartSaveDataNotify(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)223 bool SyncStateMachine::StartSaveDataNotify(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
224 {
225     std::lock_guard<std::mutex> lockGuard(saveDataNotifyLock_);
226     if (saveDataNotifyTimerId_ > 0) {
227         saveDataNotifyCount_ = 0;
228         LOGW("[SyncStateMachine][SaveDataNotify] timer has been started!");
229         return false;
230     }
231 
232     // Incref to make sure context still alive before timer stopped.
233     RefObject::IncObjRef(syncContext_);
234     int errCode = RuntimeContext::GetInstance()->SetTimer(
235         DATA_NOTIFY_INTERVAL,
236         [this, sessionId, sequenceId, inMsgId](TimerId timerId) {
237             RefObject::IncObjRef(syncContext_);
238             int ret = RuntimeContext::GetInstance()->ScheduleTask([this, sessionId, sequenceId, inMsgId]() {
239                 DoSaveDataNotify(sessionId, sequenceId, inMsgId);
240                 RefObject::DecObjRef(syncContext_);
241             });
242             if (ret != E_OK) {
243                 LOGE("[SyncStateMachine] [DoSaveDataNotify] ScheduleTask failed errCode %d", ret);
244                 RefObject::DecObjRef(syncContext_);
245             }
246             return ret;
247         },
248         [this]() { RefObject::DecObjRef(syncContext_); },
249         saveDataNotifyTimerId_);
250     if (errCode != E_OK) {
251         LOGW("[SyncStateMachine][SaveDataNotify] start timer failed err %d !", errCode);
252         return false;
253     }
254     return true;
255 }
256 
StopSaveDataNotify()257 void SyncStateMachine::StopSaveDataNotify()
258 {
259     std::lock_guard<std::mutex> lockGuard(saveDataNotifyLock_);
260     StopSaveDataNotifyNoLock();
261 }
262 
StopSaveDataNotifyNoLock()263 void SyncStateMachine::StopSaveDataNotifyNoLock()
264 {
265     if (saveDataNotifyTimerId_ == 0) {
266         LOGI("[SyncStateMachine][SaveDataNotify] timer is not started!");
267         return;
268     }
269     RuntimeContext::GetInstance()->RemoveTimer(saveDataNotifyTimerId_);
270     saveDataNotifyTimerId_ = 0;
271     saveDataNotifyCount_ = 0;
272 }
273 
StartFeedDogForSync(uint32_t time,SyncDirectionFlag flag)274 bool SyncStateMachine::StartFeedDogForSync(uint32_t time, SyncDirectionFlag flag)
275 {
276     if (flag != SyncDirectionFlag::SEND && flag != SyncDirectionFlag::RECEIVE) {
277         LOGE("[SyncStateMachine][feedDog] start wrong flag:%d", flag);
278         return false;
279     }
280 
281     uint8_t cnt = GetFeedDogTimeout(time / DATA_NOTIFY_INTERVAL);
282     LOGI("[SyncStateMachine][feedDog] start cnt:%d, flag:%d", cnt, flag);
283 
284     std::lock_guard<std::mutex> lockGuard(feedDogLock_[flag]);
285     watchDogController_[flag].refCount++;
286     LOGD("af incr refCount = %d", watchDogController_[flag].refCount);
287 
288     if (watchDogController_[flag].feedDogTimerId > 0) {
289         // update the upperLimit, if the new cnt is bigger then last upperLimit
290         if (cnt > watchDogController_[flag].feedDogUpperLimit) {
291             LOGD("update feedDogUpperLimit = %d", cnt);
292             watchDogController_[flag].feedDogUpperLimit = cnt;
293         }
294         watchDogController_[flag].feedDogCnt = 0u;
295         LOGW("[SyncStateMachine][feedDog] timer has been started!, flag:%d", flag);
296         return false;
297     }
298 
299     // Incref to make sure context still alive before timer stopped.
300     RefObject::IncObjRef(syncContext_);
301     watchDogController_[flag].feedDogUpperLimit = cnt;
302     int errCode = RuntimeContext::GetInstance()->SetTimer(
303         DATA_NOTIFY_INTERVAL,
304         [this, flag](TimerId timerId) {
305             RefObject::IncObjRef(syncContext_);
306             int ret = RuntimeContext::GetInstance()->ScheduleTask([this, flag]() {
307                 DoFeedDogForSync(flag);
308                 RefObject::DecObjRef(syncContext_);
309             });
310             if (ret != E_OK) {
311                 LOGE("[SyncStateMachine] [DoFeedDogForSync] ScheduleTask failed errCode %d", ret);
312                 RefObject::DecObjRef(syncContext_);
313             }
314             return ret;
315         },
316         [this]() { RefObject::DecObjRef(syncContext_); },
317         watchDogController_[flag].feedDogTimerId);
318     if (errCode != E_OK) {
319         LOGW("[SyncStateMachine][feedDog] start timer failed err %d !", errCode);
320         return false;
321     }
322     return true;
323 }
324 
GetFeedDogTimeout(int timeoutCount) const325 uint8_t SyncStateMachine::GetFeedDogTimeout(int timeoutCount) const
326 {
327     if (timeoutCount > UINT8_MAX) {
328         return UINT8_MAX;
329     }
330     return timeoutCount;
331 }
332 
StopFeedDogForSync(SyncDirectionFlag flag)333 void SyncStateMachine::StopFeedDogForSync(SyncDirectionFlag flag)
334 {
335     if (flag != SyncDirectionFlag::SEND && flag != SyncDirectionFlag::RECEIVE) {
336         LOGE("[SyncStateMachine][feedDog] stop wrong flag:%d", flag);
337         return;
338     }
339     std::lock_guard<std::mutex> lockGuard(feedDogLock_[flag]);
340     StopFeedDogForSyncNoLock(flag);
341 }
342 
StopFeedDogForSyncNoLock(SyncDirectionFlag flag)343 void SyncStateMachine::StopFeedDogForSyncNoLock(SyncDirectionFlag flag)
344 {
345     if (flag != SyncDirectionFlag::SEND && flag != SyncDirectionFlag::RECEIVE) {
346         LOGE("[SyncStateMachine][feedDog] stop wrong flag:%d", flag);
347         return;
348     }
349     if (watchDogController_[flag].feedDogTimerId == 0) {
350         return;
351     }
352     LOGI("[SyncStateMachine][feedDog] stop flag:%d", flag);
353     RuntimeContext::GetInstance()->RemoveTimer(watchDogController_[flag].feedDogTimerId);
354     watchDogController_[flag].feedDogTimerId = 0;
355     watchDogController_[flag].feedDogCnt = 0;
356     watchDogController_[flag].refCount = 0;
357 }
358 
SetCurStateErrStatus()359 void SyncStateMachine::SetCurStateErrStatus()
360 {
361 }
362 
DecRefCountOfFeedDogTimer(SyncDirectionFlag flag)363 void SyncStateMachine::DecRefCountOfFeedDogTimer(SyncDirectionFlag flag)
364 {
365     std::lock_guard<std::mutex> lockGuard(feedDogLock_[flag]);
366     if (watchDogController_[flag].feedDogTimerId == 0) {
367         return;
368     }
369     if (--watchDogController_[flag].refCount <= 0) {
370         LOGD("stop feed dog timer, refcount = %d", watchDogController_[flag].refCount);
371         StopFeedDogForSyncNoLock(flag);
372     }
373     LOGD("af dec refcount = %d", watchDogController_[flag].refCount);
374 }
375 
DoSaveDataNotify(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)376 void SyncStateMachine::DoSaveDataNotify(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
377 {
378     {
379         std::lock_guard<std::mutex> lock(stateMachineLock_);
380         (void)ResetWatchDog();
381     }
382     std::lock_guard<std::mutex> innerLock(saveDataNotifyLock_);
383     if (saveDataNotifyCount_ >= MAX_DATA_NOTIFY_COUNT) {
384         StopSaveDataNotifyNoLock();
385         return;
386     }
387     SendNotifyPacket(sessionId, sequenceId, inMsgId);
388     saveDataNotifyCount_++;
389 }
390 
DoFeedDogForSync(SyncDirectionFlag flag)391 void SyncStateMachine::DoFeedDogForSync(SyncDirectionFlag flag)
392 {
393     {
394         std::lock_guard<std::mutex> lock(stateMachineLock_);
395         (void)ResetWatchDog();
396     }
397     std::lock_guard<std::mutex> innerLock(feedDogLock_[flag]);
398     if (watchDogController_[flag].feedDogCnt >= watchDogController_[flag].feedDogUpperLimit) {
399         StopFeedDogForSyncNoLock(flag);
400         return;
401     }
402     watchDogController_[flag].feedDogCnt++;
403 }
404 
InnerErrorAbort(uint32_t sessionId)405 void SyncStateMachine::InnerErrorAbort(uint32_t sessionId)
406 {
407     // do nothing
408     (void) sessionId;
409 }
410 
NotifyClosing()411 void SyncStateMachine::NotifyClosing()
412 {
413     // do nothing
414 }
415 
StartFeedDogForGetData(uint32_t sessionId)416 void SyncStateMachine::StartFeedDogForGetData(uint32_t sessionId)
417 {
418     std::lock_guard<std::mutex> lockGuard(getDataNotifyLock_);
419     if (getDataNotifyTimerId_ > 0) {
420         getDataNotifyCount_ = 0;
421         LOGW("[SyncStateMachine][StartFeedDogForGetData] timer has been started!");
422     }
423 
424     // Incref to make sure context still alive before timer stopped.
425     RefObject::IncObjRef(syncContext_);
426     int errCode = RuntimeContext::GetInstance()->SetTimer(
427         DATA_NOTIFY_INTERVAL,
428         [this, sessionId](TimerId timerId) {
429             RefObject::IncObjRef(syncContext_);
430             int ret = RuntimeContext::GetInstance()->ScheduleTask([this, sessionId, timerId]() {
431                 DoGetAndSendDataNotify(sessionId);
432                 int getDataNotifyCount = 0;
433                 {
434                     std::lock_guard<std::mutex> autoLock(getDataNotifyLock_);
435                     getDataNotifyCount = getDataNotifyCount_;
436                 }
437                 if (getDataNotifyCount >= MAX_DATA_NOTIFY_COUNT) {
438                     StopFeedDogForGetDataInner(timerId);
439                 }
440                 RefObject::DecObjRef(syncContext_);
441             });
442             if (ret != E_OK) {
443                 LOGE("[SyncStateMachine] [StartFeedDogForGetData] ScheduleTask failed errCode %d", ret);
444                 RefObject::DecObjRef(syncContext_);
445             }
446             return ret;
447         },
448         [this]() { RefObject::DecObjRef(syncContext_); },
449         getDataNotifyTimerId_);
450     if (errCode != E_OK) {
451         LOGW("[SyncStateMachine][StartFeedDogForGetData] start timer failed err %d !", errCode);
452     }
453 }
454 
StopFeedDogForGetData()455 void SyncStateMachine::StopFeedDogForGetData()
456 {
457     TimerId timerId = 0;
458     {
459         std::lock_guard<std::mutex> lockGuard(getDataNotifyLock_);
460         timerId = getDataNotifyTimerId_;
461     }
462     if (timerId == 0) {
463         return;
464     }
465     StopFeedDogForGetDataInner(timerId);
466 }
467 
DoGetAndSendDataNotify(uint32_t sessionId)468 void SyncStateMachine::DoGetAndSendDataNotify(uint32_t sessionId)
469 {
470     (void)ResetWatchDog();
471     std::lock_guard<std::mutex> autoLock(getDataNotifyLock_);
472     if (getDataNotifyCount_ >= MAX_DATA_NOTIFY_COUNT) {
473         return;
474     }
475     if (sessionId != 0) {
476         SendNotifyPacket(sessionId, 0, DATA_SYNC_MESSAGE);
477     }
478     getDataNotifyCount_++;
479 }
480 
StopFeedDogForGetDataInner(TimerId timerId)481 void SyncStateMachine::StopFeedDogForGetDataInner(TimerId timerId)
482 {
483     std::lock_guard<std::mutex> lockGuard(getDataNotifyLock_);
484     if (getDataNotifyTimerId_ == 0 || getDataNotifyTimerId_ != timerId) {
485         return;
486     }
487     RuntimeContext::GetInstance()->RemoveTimer(timerId);
488     getDataNotifyTimerId_ = 0;
489     getDataNotifyCount_ = 0;
490 }
491 } // namespace DistributedDB
492