• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SINGLE_VER_SYNC_STATE_MACHINE_H
17 #define SINGLE_VER_SYNC_STATE_MACHINE_H
18 
19 #include <condition_variable>
20 #include <memory>
21 
22 #include "ability_sync.h"
23 #include "message.h"
24 #include "meta_data.h"
25 #include "semaphore_utils.h"
26 #include "single_ver_data_sync.h"
27 #include "single_ver_sync_task_context.h"
28 #include "sync_state_machine.h"
29 #include "sync_target.h"
30 
31 #include "time_sync.h"
32 #include "time_helper.h"
33 
34 namespace DistributedDB {
35 using StateMappingHandler = std::function<uint8_t(void)>;
36 class SingleVerSyncStateMachine : public SyncStateMachine {
37 public:
38     enum State {
39         IDLE = 0,
40         TIME_SYNC,
41         ABILITY_SYNC,
42         WAIT_FOR_RECEIVE_DATA_FINISH, // all data send finished, wait for data revice if has pull request
43         SYNC_TASK_FINISHED, // current sync task finihsed, try to schedule next sync task
44         SYNC_TIME_OUT,
45         INNER_ERR,
46         START_INITIACTIVE_DATA_SYNC, // used to do sync started by local device, use sliding window
47         START_PASSIVE_DATA_SYNC, // used to do pull response, use sliding window
48         SYNC_CONTROL_CMD // used to send control cmd.
49     };
50 
51     enum Event {
52         START_SYNC_EVENT = 1,
53         TIME_SYNC_FINISHED_EVENT,
54         ABILITY_SYNC_FINISHED_EVENT,
55         VERSION_NOT_SUPPOR_EVENT,
56         SEND_DATA_EVENT,
57         SEND_FINISHED_EVENT,
58         RECV_FINISHED_EVENT,
59         NEED_ABILITY_SYNC_EVENT,
60         RESPONSE_TASK_FINISHED_EVENT,
61         START_PULL_RESPONSE_EVENT,
62         WAIT_ACK_EVENT,
63         ALL_TASK_FINISHED_EVENT,
64         TIME_OUT_EVENT,
65         INNER_ERR_EVENT,
66         WAIT_TIME_OUT_EVENT,
67         RE_SEND_DATA_EVENT,
68         CONTROL_CMD_EVENT,
69         ANY_EVENT
70     };
71     SingleVerSyncStateMachine();
72     ~SingleVerSyncStateMachine() override;
73 
74     // Init the SingleVerSyncStateMachine
75     int Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface, std::shared_ptr<Metadata> &metadata,
76         ICommunicator *communicator) override;
77 
78     // send Message to the StateMachine
79     int ReceiveMessageCallback(Message *inMsg) override;
80 
81     // Called by CommErrHandler, used to abort sync when handle err
82     void CommErrAbort(uint32_t sessionId = 0) override;
83 
84     int HandleDataRequestRecv(const Message *inMsg);
85 
86     bool IsNeedErrCodeHandle(uint32_t sessionId) const;
87 
88     void PushPullDataRequestEvokeErrHandle();
89 
90     void DataRecvErrCodeHandle(uint32_t sessionId, int errCode);
91 
92     bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override;
93 
94     void GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue);
95 
96     int GetSendQueryWaterMark(const std::string &queryId, const DeviceID &deviceId, bool isAutoLift,
97         uint64_t &outValue);
98 
99     void InnerErrorAbort(uint32_t sessionId) override;
100 
101     void NotifyClosing() override;
102 protected:
103     // Step the SingleVerSyncStateMachine
104     void SyncStep() override;
105 
106     // SyncOperation is timeout, step to timeout state
107     void StepToTimeout(TimerId timerId) override;
108 
109     void SyncStepInnerLocked() override;
110 
111     // Do state machine step with no lock, for inner use
112     void SyncStepInner() override;
113 
114     int StartSyncInner() override;
115 
116     void AbortInner() override;
117 
118     void SetCurStateErrStatus() override;
119 
120     // Used to get instance class' stateSwitchTables
121     const std::vector<StateSwitchTable> &GetStateSwitchTables() const override;
122 
123     // Do some init for run a next sync task
124     int PrepareNextSyncTask() override;
125 
126     // Called by StartSaveDataNotifyTimer, used to send a save data notify packet
127     void SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId) override;
128 
129     int TimeMarkSyncRecv(const Message *inMsg);
130 
131     void DataAckRecvErrCodeHandle(int errCode, bool handleError);
132 
133     void ResponsePullError(int errCode, bool ignoreInnerErr);
134 
135 private:
136     // Used to init sync state machine switchbables
137     static void InitStateSwitchTables();
138 
139     // To generate the statemachine switchtable with the given version
140     static void InitStateSwitchTable(uint32_t version, const std::vector<std::vector<uint8_t>> &switchTable);
141 
142     void InitStateMapping();
143 
144     // Do TimeSync, for first sync
145     Event DoTimeSync();
146 
147     // Do AbilitySync, for first sync
148     Event DoAbilitySync();
149 
150     // Waiting for pull data revice finish, if coming a pull request, should goto START_PASSIVE_DATA_SYNC state
151     Event DoWaitForDataRecv() const;
152 
153     // Sync task finihsed, should do some data clear and exec next task.
154     Event DoSyncTaskFinished();
155 
156     // Do something when sync timeout.
157     Event DoTimeout();
158 
159     // Do something when sync get some err.
160     Event DoInnerErr();
161 
162     Event DoInitiactiveDataSyncWithSlidingWindow();
163 
164     Event DoPassiveDataSyncWithSlidingWindow();
165 
166     Event DoInitiactiveControlSync();
167 
168     Event GetEventAfterTimeSync(int mode) const;
169 
170     int HandleControlAckRecv(const Message *inMsg);
171 
172     int GetSyncOperationStatus(int errCode) const;
173 
174     int AbilitySyncRecv(const Message *inMsg);
175 
176     int DataPktRecv(Message *inMsg);
177 
178     void ScheduleMsgAndHandle(Message *inMsg);
179 
180     int ControlPktRecv(Message *inMsg);
181 
182     void NeedAbilitySyncHandle();
183 
184     int HandleDataAckRecv(const Message *inMsg);
185 
186     void HandleDataAckRecvWithSlidingWindow(int errCode, const Message *inMsg, bool ignoreInnerErr);
187 
188     void Clear();
189 
190     bool IsPacketValid(const Message *inMsg) const;
191 
192     void PreStartPullResponse();
193 
194     bool CheckIsStartPullResponse() const;
195 
196     int MessageCallbackPre(const Message *inMsg);
197 
198     void AddPullResponseTarget(const Message *inMsg, WaterMark pullEndWatermark);
199 
200     Event TransformErrCodeToEvent(int errCode);
201 
202     bool IsNeedResetWatchdog(const Message *inMsg) const;
203 
204     Event TransforTimeOutErrCodeToEvent() const;
205 
206     bool AbilityMsgSessionIdCheck(const Message *inMsg);
207 
208     SyncType GetSyncType(uint32_t messageId) const;
209 
210     void JumpStatusAfterAbilitySync(int mode);
211 
212     void ControlAckRecvErrCodeHandle(int errCode);
213 
214     DISABLE_COPY_ASSIGN_MOVE(SingleVerSyncStateMachine);
215 
216     static std::mutex stateSwitchTableLock_;
217     static bool isStateSwitchTableInited_;
218     static std::vector<StateSwitchTable> stateSwitchTables_;
219     SingleVerSyncTaskContext *context_;
220     SingleVerKvDBSyncInterface *syncInterface_;
221     std::unique_ptr<TimeSync> timeSync_;
222     std::unique_ptr<AbilitySync> abilitySync_;
223     std::shared_ptr<SingleVerDataSync> dataSync_;
224     uint64_t currentRemoteVersionId_;
225     std::map<uint8_t, StateMappingHandler> stateMapping_;
226 };
227 } // namespace DistributedDB
228 
229 #endif // SINGLE_VER_SYNC_STATE_MACHINE_H
230