1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SINGLE_VER_DATA_SYNC_NEW_H 17 #define SINGLE_VER_DATA_SYNC_NEW_H 18 19 #include "icommunicator.h" 20 #include "isync_interface.h" 21 #include "meta_data.h" 22 #include "parcel.h" 23 #include "single_ver_data_message_schedule.h" 24 #include "single_ver_data_packet.h" 25 #include "single_ver_kvdb_sync_interface.h" 26 #include "single_ver_sync_task_context.h" 27 #include "sync_generic_interface.h" 28 #include "sync_types.h" 29 #include "version.h" 30 31 namespace DistributedDB { 32 using SendDataItem = SingleVerKvEntry *; 33 struct ReSendInfo { 34 Timestamp start = 0; 35 Timestamp end = 0; 36 Timestamp deleteBeginTime = 0; 37 Timestamp deleteEndTime = 0; 38 // packetId is used for matched ackpacket packetId which saved in ackPacket.reserve 39 // if equaled, means need to handle the ack, or drop. it is always increased 40 uint64_t packetId = 0; 41 }; 42 43 struct DataSyncReSendInfo { 44 uint32_t sessionId = 0; 45 uint32_t sequenceId = 0; 46 Timestamp start = 0; // means normal or sync data localwatermark 47 Timestamp end = 0; 48 Timestamp deleteDataStart = 0; // means delete data localwatermark 49 Timestamp deleteDataEnd = 0; 50 uint64_t packetId = 0; 51 }; 52 53 struct SyncEntry { 54 std::vector<SendDataItem> entries; 55 std::vector<uint8_t> compressedEntries; 56 }; 57 58 class SingleVerDataSync { 59 public: 60 SingleVerDataSync(); 61 virtual ~SingleVerDataSync(); 62 63 DISABLE_COPY_ASSIGN_MOVE(SingleVerDataSync); 64 65 int Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle, 66 std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId); 67 68 int SyncStart(int mode, SingleVerSyncTaskContext *context); 69 70 int TryContinueSync(SingleVerSyncTaskContext *context, const Message *message); 71 72 void ClearSyncStatus(); 73 74 int PushStart(SingleVerSyncTaskContext *context); 75 76 int PushPullStart(SingleVerSyncTaskContext *context); 77 78 int PullRequestStart(SingleVerSyncTaskContext *context); 79 80 int PullResponseStart(SingleVerSyncTaskContext *context); 81 82 int DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message, WaterMark &pullEndWatermark); 83 84 bool AckPacketIdCheck(const Message *message); 85 86 int AckRecv(SingleVerSyncTaskContext *context, const Message *message); 87 88 void SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion, uint32_t sessionId, 89 uint32_t sequenceId, uint32_t inMsgId); 90 91 virtual int SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode, 92 WaterMark maxSendDataTime); 93 94 int CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context); 95 96 std::string GetLabel() const; 97 98 std::string GetDeviceId() const; 99 100 bool WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message); 101 102 int ControlCmdStart(SingleVerSyncTaskContext *context); 103 104 int ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message); 105 106 int ControlCmdAckRecv(SingleVerSyncTaskContext *context, const Message *message); 107 108 void PutDataMsg(Message *message); 109 110 Message *MoveNextDataMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle, bool &isNeedContinue); 111 112 bool IsNeedReloadQueue(); 113 114 void SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message); 115 116 void ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *message); 117 118 void ClearDataMsg(); 119 120 protected: 121 static const int SEND_FINISHED = 0xff; 122 static const int LOCAL_WATER_MARK_NOT_INIT = 0xaa; 123 static const int PEER_WATER_MARK_NOT_INIT = 0x55; 124 static const int WATER_MARK_INVALID = 0xbb; 125 static const int MTU_SIZE = 28311552; // 27MB 126 127 void ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context); 128 129 int InnerSyncStart(SingleVerSyncTaskContext *context); 130 131 void InnerClearSyncStatus(); 132 133 int ReSendData(SingleVerSyncTaskContext *context); 134 135 int32_t ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo); 136 137 void SetSessionEndTimestamp(Timestamp end); 138 139 Timestamp GetSessionEndTimestamp() const; 140 141 void FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context, 142 SyncEntry &syncData, int sendCode, int mode); 143 144 int RequestStart(SingleVerSyncTaskContext *context, int mode); 145 146 SyncTimeRange GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context, 147 const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate); 148 149 int GetData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData, size_t packetSize); 150 151 int GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context, SyncEntry &syncOutData); 152 153 int Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler, 154 uint32_t packetLen); 155 156 int GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData, size_t packetSize); 157 158 int GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData, size_t packetSize); 159 160 int SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData, SyncType curType, 161 const QuerySyncObject &query); 162 163 int SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context, 164 SyncTimeRange dataTimeRange, bool isCheckBeforUpdate = false) const; 165 166 void GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify, const SingleVerSyncTaskContext *context, 167 WaterMark &watermark) const; 168 169 void GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify, const DeviceID &deviceId, 170 WaterMark &watermark) const; 171 172 void GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark); 173 174 void GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context, WaterMark &waterMark) const; 175 176 int RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message, WaterMark maxSendDataTime); 177 178 int DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark, 179 const std::vector<uint64_t> &reserved); 180 181 int SendDataPacket(SyncType syncType, const DataRequestPacket *packet, SingleVerSyncTaskContext *context); 182 183 void UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId, const SyncTimeRange &dataTime, 184 const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark); 185 186 void UpdatePeerWaterMark(SyncType syncType, const std::string &queryId, const SingleVerSyncTaskContext *context, 187 WaterMark peerWatermark, WaterMark peerDeletedWatermark); 188 189 std::string GetLocalDeviceName(); 190 191 int DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg = false); 192 193 int DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message); 194 195 void GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet, 196 WaterMark &pullEndWatermark) const; 197 198 int DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark, 199 const std::vector<uint64_t> &reserved); 200 201 int RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message, 202 const DataRequestPacket *packet); 203 204 void SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen); 205 206 int SendReSendPacket(const DataRequestPacket *packet, SingleVerSyncTaskContext *context, 207 uint32_t sessionId, uint32_t sequenceId); 208 209 int SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData, SingleVerSyncTaskContext *context); 210 211 int CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message); 212 213 void RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId); 214 215 void SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context, const DataRequestPacket *packet, 216 int32_t recvCode, WaterMark maxSendDataTime); 217 218 int GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context, 219 DataSyncReSendInfo reSendInfo); 220 221 virtual int RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context); 222 223 virtual void UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context); 224 225 void FillRequestReSendPacket(const SingleVerSyncTaskContext *context, DataRequestPacket *packet, 226 DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode); 227 228 void UpdateMtuSize(); 229 230 DataSizeSpecInfo GetDataSizeSpecInfo(size_t packetSize); 231 232 int InterceptData(SyncEntry &syncEntry); 233 234 int ControlCmdStartCheck(SingleVerSyncTaskContext *context); 235 236 int SendControlPacket(const ControlRequestPacket *packet, SingleVerSyncTaskContext *context); 237 238 int ControlCmdRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message); 239 int SubscribeRequestRecvPre(SingleVerSyncTaskContext *context, const SubscribeRequest *packet, 240 const Message *message); 241 int SubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message); 242 int UnsubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message); 243 int SendControlAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode, 244 uint32_t controlCmdType, const CommErrHandler &handler = nullptr); 245 246 void RemoveSubscribeIfNeed(const std::string &queryId, const std::shared_ptr<SubscribeManager> &subscribeManager); 247 248 uint32_t mtuSize_; 249 SyncGenericInterface* storage_; 250 ICommunicator* communicateHandle_; 251 std::shared_ptr<Metadata> metadata_; 252 std::string label_; 253 std::string deviceId_; 254 255 SingleVerDataMessageSchedule msgSchedule_; 256 257 static const int HIGH_VERSION_WINDOW_SIZE = 3; 258 static const int LOW_VERSION_WINDOW_SIZE = 1; 259 // below param is about sliding sync info, is different from every sync task 260 std::mutex lock_; 261 int mode_ = 0; // sync mode, may diff from context mode if trigger pull_response while push finish 262 uint32_t sessionId_ = 0; 263 // sequenceId as key 264 std::map<uint32_t, ReSendInfo> reSendMap_; 265 // remaining sending window 266 int32_t windowSize_ = 0; 267 // max sequenceId has been sent 268 uint32_t maxSequenceIdHasSent_ = 0; 269 bool isAllDataHasSent_ = false; 270 // in a sync session, the last data timestamp 271 Timestamp sessionEndTimestamp_ = 0; 272 273 std::mutex removeDeviceDataLock_; 274 std::mutex unsubscribeLock_; 275 }; 276 } // namespace DistributedDB 277 278 #endif // SINGLE_VER_DATA_SYNC_NEW_H 279