1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SINGLE_VER_SYNC_STATE_MACHINE_H 17 #define SINGLE_VER_SYNC_STATE_MACHINE_H 18 19 #include <condition_variable> 20 #include <memory> 21 22 #include "ability_sync.h" 23 #include "message.h" 24 #include "meta_data.h" 25 #include "semaphore_utils.h" 26 #include "single_ver_data_sync.h" 27 #include "single_ver_sync_task_context.h" 28 #include "sync_state_machine.h" 29 #include "sync_target.h" 30 31 #include "time_sync.h" 32 #include "time_helper.h" 33 34 namespace DistributedDB { 35 using StateMappingHandler = std::function<uint8_t(void)>; 36 class SingleVerSyncStateMachine : public SyncStateMachine { 37 public: 38 enum State { 39 IDLE = 0, 40 TIME_SYNC, 41 ABILITY_SYNC, 42 WAIT_FOR_RECEIVE_DATA_FINISH, // all data send finished, wait for data revice if has pull request 43 SYNC_TASK_FINISHED, // current sync task finihsed, try to schedule next sync task 44 SYNC_TIME_OUT, 45 INNER_ERR, 46 START_INITIACTIVE_DATA_SYNC, // used to do sync started by local device, use sliding window 47 START_PASSIVE_DATA_SYNC, // used to do pull response, use sliding window 48 SYNC_CONTROL_CMD // used to send control cmd. 49 }; 50 51 enum Event { 52 START_SYNC_EVENT = 1, 53 TIME_SYNC_FINISHED_EVENT, 54 ABILITY_SYNC_FINISHED_EVENT, 55 VERSION_NOT_SUPPOR_EVENT, 56 SEND_DATA_EVENT, 57 SEND_FINISHED_EVENT, 58 RECV_FINISHED_EVENT, 59 NEED_ABILITY_SYNC_EVENT, 60 RESPONSE_TASK_FINISHED_EVENT, 61 START_PULL_RESPONSE_EVENT, 62 WAIT_ACK_EVENT, 63 ALL_TASK_FINISHED_EVENT, 64 TIME_OUT_EVENT, 65 INNER_ERR_EVENT, 66 WAIT_TIME_OUT_EVENT, 67 RE_SEND_DATA_EVENT, 68 CONTROL_CMD_EVENT, 69 ANY_EVENT 70 }; 71 SingleVerSyncStateMachine(); 72 ~SingleVerSyncStateMachine() override; 73 74 // Init the SingleVerSyncStateMachine 75 int Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface, std::shared_ptr<Metadata> &metadata, 76 ICommunicator *communicator) override; 77 78 // send Message to the StateMachine 79 int ReceiveMessageCallback(Message *inMsg) override; 80 81 // Called by CommErrHandler, used to abort sync when handle err 82 void CommErrAbort(uint32_t sessionId = 0) override; 83 84 int HandleDataRequestRecv(const Message *inMsg); 85 86 bool IsNeedErrCodeHandle(uint32_t sessionId) const; 87 88 void PushPullDataRequestEvokeErrHandle(); 89 90 void DataRecvErrCodeHandle(uint32_t sessionId, int errCode); 91 92 bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override; 93 94 void GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue); 95 96 int GetSendQueryWaterMark(const std::string &queryId, const DeviceID &deviceId, bool isAutoLift, 97 uint64_t &outValue); 98 99 void InnerErrorAbort(uint32_t sessionId) override; 100 101 void NotifyClosing() override; 102 protected: 103 // Step the SingleVerSyncStateMachine 104 void SyncStep() override; 105 106 // SyncOperation is timeout, step to timeout state 107 void StepToTimeout(TimerId timerId) override; 108 109 void SyncStepInnerLocked() override; 110 111 // Do state machine step with no lock, for inner use 112 void SyncStepInner() override; 113 114 int StartSyncInner() override; 115 116 void AbortInner() override; 117 118 void SetCurStateErrStatus() override; 119 120 // Used to get instance class' stateSwitchTables 121 const std::vector<StateSwitchTable> &GetStateSwitchTables() const override; 122 123 // Do some init for run a next sync task 124 int PrepareNextSyncTask() override; 125 126 // Called by StartSaveDataNotifyTimer, used to send a save data notify packet 127 void SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId) override; 128 129 int TimeMarkSyncRecv(const Message *inMsg); 130 131 void DataAckRecvErrCodeHandle(int errCode, bool handleError); 132 133 void ResponsePullError(int errCode, bool ignoreInnerErr); 134 135 private: 136 // Used to init sync state machine switchbables 137 static void InitStateSwitchTables(); 138 139 // To generate the statemachine switchtable with the given version 140 static void InitStateSwitchTable(uint32_t version, const std::vector<std::vector<uint8_t>> &switchTable); 141 142 void InitStateMapping(); 143 144 // Do TimeSync, for first sync 145 Event DoTimeSync(); 146 147 // Do AbilitySync, for first sync 148 Event DoAbilitySync(); 149 150 // Waiting for pull data revice finish, if coming a pull request, should goto START_PASSIVE_DATA_SYNC state 151 Event DoWaitForDataRecv() const; 152 153 // Sync task finihsed, should do some data clear and exec next task. 154 Event DoSyncTaskFinished(); 155 156 // Do something when sync timeout. 157 Event DoTimeout(); 158 159 // Do something when sync get some err. 160 Event DoInnerErr(); 161 162 Event DoInitiactiveDataSyncWithSlidingWindow(); 163 164 Event DoPassiveDataSyncWithSlidingWindow(); 165 166 Event DoInitiactiveControlSync(); 167 168 Event GetEventAfterTimeSync(int mode) const; 169 170 int HandleControlAckRecv(const Message *inMsg); 171 172 int GetSyncOperationStatus(int errCode) const; 173 174 int AbilitySyncRecv(const Message *inMsg); 175 176 int DataPktRecv(Message *inMsg); 177 178 void ScheduleMsgAndHandle(Message *inMsg); 179 180 int ControlPktRecv(Message *inMsg); 181 182 void NeedAbilitySyncHandle(); 183 184 int HandleDataAckRecv(const Message *inMsg); 185 186 void HandleDataAckRecvWithSlidingWindow(int errCode, const Message *inMsg, bool ignoreInnerErr); 187 188 void Clear(); 189 190 bool IsPacketValid(const Message *inMsg) const; 191 192 void PreStartPullResponse(); 193 194 bool CheckIsStartPullResponse() const; 195 196 int MessageCallbackPre(const Message *inMsg); 197 198 void AddPullResponseTarget(const Message *inMsg, WaterMark pullEndWatermark); 199 200 Event TransformErrCodeToEvent(int errCode); 201 202 bool IsNeedResetWatchdog(const Message *inMsg) const; 203 204 Event TransforTimeOutErrCodeToEvent() const; 205 206 bool AbilityMsgSessionIdCheck(const Message *inMsg); 207 208 SyncType GetSyncType(uint32_t messageId) const; 209 210 void JumpStatusAfterAbilitySync(int mode); 211 212 void ControlAckRecvErrCodeHandle(int errCode); 213 214 DISABLE_COPY_ASSIGN_MOVE(SingleVerSyncStateMachine); 215 216 static std::mutex stateSwitchTableLock_; 217 static bool isStateSwitchTableInited_; 218 static std::vector<StateSwitchTable> stateSwitchTables_; 219 SingleVerSyncTaskContext *context_; 220 SingleVerKvDBSyncInterface *syncInterface_; 221 std::unique_ptr<TimeSync> timeSync_; 222 std::unique_ptr<AbilitySync> abilitySync_; 223 std::shared_ptr<SingleVerDataSync> dataSync_; 224 uint64_t currentRemoteVersionId_; 225 std::map<uint8_t, StateMappingHandler> stateMapping_; 226 }; 227 } // namespace DistributedDB 228 229 #endif // SINGLE_VER_SYNC_STATE_MACHINE_H 230