• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SINGLE_VER_SYNC_STATE_MACHINE_H
17 #define SINGLE_VER_SYNC_STATE_MACHINE_H
18 
19 #include <condition_variable>
20 #include <memory>
21 
22 #include "ability_sync.h"
23 #include "message.h"
24 #include "meta_data.h"
25 #include "semaphore_utils.h"
26 #include "single_ver_data_sync.h"
27 #include "single_ver_sync_task_context.h"
28 #include "sync_state_machine.h"
29 #include "sync_target.h"
30 
31 #include "time_sync.h"
32 #include "time_helper.h"
33 
34 namespace DistributedDB {
35 using StateMappingHandler = std::function<uint8_t(void)>;
36 class SingleVerSyncStateMachine : public SyncStateMachine {
37 public:
38     enum State {
39         IDLE = 0,
40         TIME_SYNC,
41         ABILITY_SYNC,
42         WAIT_FOR_RECEIVE_DATA_FINISH, // all data send finished, wait for data revice if has pull request
43         SYNC_TASK_FINISHED, // current sync task finihsed, try to schedule next sync task
44         SYNC_TIME_OUT,
45         INNER_ERR,
46         START_INITIACTIVE_DATA_SYNC, // used to do sync started by local device, use sliding window
47         START_PASSIVE_DATA_SYNC, // used to do pull response, use sliding window
48         SYNC_CONTROL_CMD // used to send control cmd.
49     };
50 
51     enum Event {
52         START_SYNC_EVENT = 1,
53         TIME_SYNC_FINISHED_EVENT,
54         ABILITY_SYNC_FINISHED_EVENT,
55         VERSION_NOT_SUPPOR_EVENT,
56         SEND_DATA_EVENT,
57         SEND_FINISHED_EVENT,
58         RECV_FINISHED_EVENT,
59         NEED_ABILITY_SYNC_EVENT,
60         RESPONSE_TASK_FINISHED_EVENT,
61         START_PULL_RESPONSE_EVENT,
62         WAIT_ACK_EVENT,
63         ALL_TASK_FINISHED_EVENT,
64         TIME_OUT_EVENT,
65         INNER_ERR_EVENT,
66         WAIT_TIME_OUT_EVENT,
67         RE_SEND_DATA_EVENT,
68         CONTROL_CMD_EVENT,
69         NEED_TIME_SYNC_EVENT,
70         NEED_RESYNC_EVENT,
71         ANY_EVENT
72     };
73     SingleVerSyncStateMachine();
74     ~SingleVerSyncStateMachine() override;
75 
76     // Init the SingleVerSyncStateMachine
77     int Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metaData,
78         ICommunicator *communicator) override;
79 
80     // send Message to the StateMachine
81     int ReceiveMessageCallback(Message *inMsg) override;
82 
83     // Called by CommErrHandler, used to abort sync when handle err
84     void CommErrAbort(uint32_t sessionId = 0) override;
85 
86     int HandleDataRequestRecv(const Message *inMsg);
87 
88     bool IsNeedErrCodeHandle(uint32_t sessionId) const;
89 
90     void PushPullDataRequestEvokeErrHandle();
91 
92     void DataRecvErrCodeHandle(uint32_t sessionId, int errCode);
93 
94     bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override;
95 
96     void GetLocalWaterMark(const DeviceID &deviceId, const DeviceID &userId, uint64_t &outValue);
97 
98     int GetSendQueryWaterMark(const std::string &queryId, const DeviceID &deviceId, const DeviceID &userId,
99         bool isAutoLift, uint64_t &outValue);
100 
101     void InnerErrorAbort(uint32_t sessionId) override;
102 
103     void NotifyClosing() override;
104 
105     void SchemaChange() override;
106 
107     void TimeChange() override;
108 protected:
109     // Step the SingleVerSyncStateMachine
110     void SyncStep() override;
111 
112     // SyncOperation is timeout, step to timeout state
113     void StepToTimeout(TimerId timerId) override;
114 
115     void SyncStepInnerLocked() override;
116 
117     // Do state machine step with no lock, for inner use
118     void SyncStepInner() override;
119 
120     int StartSyncInner() override;
121 
122     void AbortInner() override;
123 
124     void SetCurStateErrStatus() override;
125 
126     // Used to get instance class' stateSwitchTables
127     const std::vector<StateSwitchTable> &GetStateSwitchTables() const override;
128 
129     // Do some init for run a next sync task
130     int PrepareNextSyncTask() override;
131 
132     // Called by StartSaveDataNotifyTimer, used to send a save data notify packet
133     void SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId) override;
134 
135     int TimeMarkSyncRecv(const Message *inMsg);
136 
137     void DataAckRecvErrCodeHandle(int errCode, bool handleError);
138 
139     void ResponsePullError(int errCode, bool ignoreInnerErr);
140 
141 private:
142     // Used to init sync state machine switchbables
143     static void InitStateSwitchTables();
144 
145     // To generate the statemachine switchtable with the given version
146     static void InitStateSwitchTable(uint32_t version, const std::vector<std::vector<uint8_t>> &switchTable);
147 
148     void InitStateMapping();
149 
150     // Do TimeSync, for first sync
151     Event DoTimeSync() const;
152 
153     // Do AbilitySync, for first sync
154     Event DoAbilitySync() const;
155 
156     // Waiting for pull data revice finish, if coming a pull request, should goto START_PASSIVE_DATA_SYNC state
157     Event DoWaitForDataRecv() const;
158 
159     // Sync task finihsed, should do some data clear and exec next task.
160     Event DoSyncTaskFinished();
161 
162     // Do something when sync timeout.
163     Event DoTimeout();
164 
165     // Do something when sync get some err.
166     Event DoInnerErr();
167 
168     Event DoInitiactiveDataSyncWithSlidingWindow() const;
169 
170     Event DoPassiveDataSyncWithSlidingWindow();
171 
172     Event DoInitiactiveControlSync() const;
173 
174     Event GetEventAfterTimeSync(int mode) const;
175 
176     int HandleControlAckRecv(const Message *inMsg);
177 
178     int GetSyncOperationStatus(int errCode) const;
179 
180     int AbilitySyncRecv(const Message *inMsg);
181 
182     int DataPktRecv(Message *inMsg);
183 
184     void ScheduleMsgAndHandle(Message *inMsg);
185 
186     int ControlPktRecv(Message *inMsg);
187 
188     void NeedAbilitySyncHandle();
189 
190     int HandleDataAckRecv(const Message *inMsg);
191 
192     void HandleDataAckRecvWithSlidingWindow(int errCode, const Message *inMsg, bool ignoreInnerErr);
193 
194     void Clear();
195 
196     bool IsPacketValid(const Message *inMsg) const;
197 
198     void PreStartPullResponse();
199 
200     bool CheckIsStartPullResponse() const;
201 
202     int MessageCallbackPre(const Message *inMsg);
203 
204     void AddPullResponseTarget(const Message *inMsg, WaterMark pullEndWatermark);
205 
206     Event TransformErrCodeToEvent(int errCode) const;
207 
208     bool IsNeedResetWatchdog(const Message *inMsg) const;
209 
210     Event TransforTimeOutErrCodeToEvent() const;
211 
212     bool AbilityMsgSessionIdCheck(const Message *inMsg);
213 
214     SyncType GetSyncType(uint32_t messageId) const;
215 
216     void JumpStatusAfterAbilitySync(int mode);
217 
218     void ControlAckRecvErrCodeHandle(int errCode);
219 
220     int AbilitySyncResponseRecv(const Message *inMsg);
221 
222     int AbilitySyncNotifyRecv(const Message *inMsg);
223 
224     DISABLE_COPY_ASSIGN_MOVE(SingleVerSyncStateMachine);
225 
226     static std::mutex stateSwitchTableLock_;
227     static bool isStateSwitchTableInited_;
228     static std::vector<StateSwitchTable> stateSwitchTables_;
229     SingleVerSyncTaskContext *context_;
230     SyncGenericInterface *syncInterface_;
231     std::shared_ptr<TimeSync> timeSync_;
232     std::unique_ptr<AbilitySync> abilitySync_;
233     std::shared_ptr<SingleVerDataSync> dataSync_;
234     uint64_t currentRemoteVersionId_;
235     std::map<uint8_t, StateMappingHandler> stateMapping_;
236 };
237 } // namespace DistributedDB
238 
239 #endif // SINGLE_VER_SYNC_STATE_MACHINE_H
240