1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SINGLE_VER_SYNC_STATE_MACHINE_H 17 #define SINGLE_VER_SYNC_STATE_MACHINE_H 18 19 #include <condition_variable> 20 #include <memory> 21 22 #include "ability_sync.h" 23 #include "message.h" 24 #include "meta_data.h" 25 #include "semaphore_utils.h" 26 #include "single_ver_data_sync.h" 27 #include "single_ver_sync_task_context.h" 28 #include "sync_state_machine.h" 29 #include "sync_target.h" 30 31 #include "time_sync.h" 32 #include "time_helper.h" 33 34 namespace DistributedDB { 35 using StateMappingHandler = std::function<uint8_t(void)>; 36 class SingleVerSyncStateMachine : public SyncStateMachine { 37 public: 38 enum State { 39 IDLE = 0, 40 TIME_SYNC, 41 ABILITY_SYNC, 42 WAIT_FOR_RECEIVE_DATA_FINISH, // all data send finished, wait for data revice if has pull request 43 SYNC_TASK_FINISHED, // current sync task finihsed, try to schedule next sync task 44 SYNC_TIME_OUT, 45 INNER_ERR, 46 START_INITIACTIVE_DATA_SYNC, // used to do sync started by local device, use sliding window 47 START_PASSIVE_DATA_SYNC, // used to do pull response, use sliding window 48 SYNC_CONTROL_CMD // used to send control cmd. 49 }; 50 51 enum Event { 52 START_SYNC_EVENT = 1, 53 TIME_SYNC_FINISHED_EVENT, 54 ABILITY_SYNC_FINISHED_EVENT, 55 VERSION_NOT_SUPPOR_EVENT, 56 SEND_DATA_EVENT, 57 SEND_FINISHED_EVENT, 58 RECV_FINISHED_EVENT, 59 NEED_ABILITY_SYNC_EVENT, 60 RESPONSE_TASK_FINISHED_EVENT, 61 START_PULL_RESPONSE_EVENT, 62 WAIT_ACK_EVENT, 63 ALL_TASK_FINISHED_EVENT, 64 TIME_OUT_EVENT, 65 INNER_ERR_EVENT, 66 WAIT_TIME_OUT_EVENT, 67 RE_SEND_DATA_EVENT, 68 CONTROL_CMD_EVENT, 69 NEED_TIME_SYNC_EVENT, 70 NEED_RESYNC_EVENT, 71 ANY_EVENT 72 }; 73 SingleVerSyncStateMachine(); 74 ~SingleVerSyncStateMachine() override; 75 76 // Init the SingleVerSyncStateMachine 77 int Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metaData, 78 ICommunicator *communicator) override; 79 80 // send Message to the StateMachine 81 int ReceiveMessageCallback(Message *inMsg) override; 82 83 // Called by CommErrHandler, used to abort sync when handle err 84 void CommErrAbort(uint32_t sessionId = 0) override; 85 86 int HandleDataRequestRecv(const Message *inMsg); 87 88 bool IsNeedErrCodeHandle(uint32_t sessionId) const; 89 90 void PushPullDataRequestEvokeErrHandle(); 91 92 void DataRecvErrCodeHandle(uint32_t sessionId, int errCode); 93 94 bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override; 95 96 void GetLocalWaterMark(const DeviceID &deviceId, const DeviceID &userId, uint64_t &outValue); 97 98 int GetSendQueryWaterMark(const std::string &queryId, const DeviceID &deviceId, const DeviceID &userId, 99 bool isAutoLift, uint64_t &outValue); 100 101 void InnerErrorAbort(uint32_t sessionId) override; 102 103 void NotifyClosing() override; 104 105 void SchemaChange() override; 106 107 void TimeChange() override; 108 protected: 109 // Step the SingleVerSyncStateMachine 110 void SyncStep() override; 111 112 // SyncOperation is timeout, step to timeout state 113 void StepToTimeout(TimerId timerId) override; 114 115 void SyncStepInnerLocked() override; 116 117 // Do state machine step with no lock, for inner use 118 void SyncStepInner() override; 119 120 int StartSyncInner() override; 121 122 void AbortInner() override; 123 124 void SetCurStateErrStatus() override; 125 126 // Used to get instance class' stateSwitchTables 127 const std::vector<StateSwitchTable> &GetStateSwitchTables() const override; 128 129 // Do some init for run a next sync task 130 int PrepareNextSyncTask() override; 131 132 // Called by StartSaveDataNotifyTimer, used to send a save data notify packet 133 void SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId) override; 134 135 int TimeMarkSyncRecv(const Message *inMsg); 136 137 void DataAckRecvErrCodeHandle(int errCode, bool handleError); 138 139 void ResponsePullError(int errCode, bool ignoreInnerErr); 140 141 private: 142 // Used to init sync state machine switchbables 143 static void InitStateSwitchTables(); 144 145 // To generate the statemachine switchtable with the given version 146 static void InitStateSwitchTable(uint32_t version, const std::vector<std::vector<uint8_t>> &switchTable); 147 148 void InitStateMapping(); 149 150 // Do TimeSync, for first sync 151 Event DoTimeSync() const; 152 153 // Do AbilitySync, for first sync 154 Event DoAbilitySync() const; 155 156 // Waiting for pull data revice finish, if coming a pull request, should goto START_PASSIVE_DATA_SYNC state 157 Event DoWaitForDataRecv() const; 158 159 // Sync task finihsed, should do some data clear and exec next task. 160 Event DoSyncTaskFinished(); 161 162 // Do something when sync timeout. 163 Event DoTimeout(); 164 165 // Do something when sync get some err. 166 Event DoInnerErr(); 167 168 Event DoInitiactiveDataSyncWithSlidingWindow() const; 169 170 Event DoPassiveDataSyncWithSlidingWindow(); 171 172 Event DoInitiactiveControlSync() const; 173 174 Event GetEventAfterTimeSync(int mode) const; 175 176 int HandleControlAckRecv(const Message *inMsg); 177 178 int GetSyncOperationStatus(int errCode) const; 179 180 int AbilitySyncRecv(const Message *inMsg); 181 182 int DataPktRecv(Message *inMsg); 183 184 void ScheduleMsgAndHandle(Message *inMsg); 185 186 int ControlPktRecv(Message *inMsg); 187 188 void NeedAbilitySyncHandle(); 189 190 int HandleDataAckRecv(const Message *inMsg); 191 192 void HandleDataAckRecvWithSlidingWindow(int errCode, const Message *inMsg, bool ignoreInnerErr); 193 194 void Clear(); 195 196 bool IsPacketValid(const Message *inMsg) const; 197 198 void PreStartPullResponse(); 199 200 bool CheckIsStartPullResponse() const; 201 202 int MessageCallbackPre(const Message *inMsg); 203 204 void AddPullResponseTarget(const Message *inMsg, WaterMark pullEndWatermark); 205 206 Event TransformErrCodeToEvent(int errCode) const; 207 208 bool IsNeedResetWatchdog(const Message *inMsg) const; 209 210 Event TransforTimeOutErrCodeToEvent() const; 211 212 bool AbilityMsgSessionIdCheck(const Message *inMsg); 213 214 SyncType GetSyncType(uint32_t messageId) const; 215 216 void JumpStatusAfterAbilitySync(int mode); 217 218 void ControlAckRecvErrCodeHandle(int errCode); 219 220 int AbilitySyncResponseRecv(const Message *inMsg); 221 222 int AbilitySyncNotifyRecv(const Message *inMsg); 223 224 DISABLE_COPY_ASSIGN_MOVE(SingleVerSyncStateMachine); 225 226 static std::mutex stateSwitchTableLock_; 227 static bool isStateSwitchTableInited_; 228 static std::vector<StateSwitchTable> stateSwitchTables_; 229 SingleVerSyncTaskContext *context_; 230 SyncGenericInterface *syncInterface_; 231 std::shared_ptr<TimeSync> timeSync_; 232 std::unique_ptr<AbilitySync> abilitySync_; 233 std::shared_ptr<SingleVerDataSync> dataSync_; 234 uint64_t currentRemoteVersionId_; 235 std::map<uint8_t, StateMappingHandler> stateMapping_; 236 }; 237 } // namespace DistributedDB 238 239 #endif // SINGLE_VER_SYNC_STATE_MACHINE_H 240