1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SINGLE_VER_DATA_SYNC_NEW_H 17 #define SINGLE_VER_DATA_SYNC_NEW_H 18 19 #include "icommunicator.h" 20 #include "isync_interface.h" 21 #include "meta_data.h" 22 #include "parcel.h" 23 #include "single_ver_data_message_schedule.h" 24 #include "single_ver_data_packet.h" 25 #include "single_ver_kvdb_sync_interface.h" 26 #include "single_ver_sync_task_context.h" 27 #include "sync_generic_interface.h" 28 #include "sync_types.h" 29 #include "version.h" 30 31 namespace DistributedDB { 32 using SendDataItem = SingleVerKvEntry *; 33 struct ReSendInfo { 34 Timestamp start = 0; 35 Timestamp end = 0; 36 Timestamp deleteBeginTime = 0; 37 Timestamp deleteEndTime = 0; 38 // packetId is used for matched ackpacket packetId which saved in ackPacket.reserve 39 // if equaled, means need to handle the ack, or drop. it is always increased 40 uint64_t packetId = 0; 41 }; 42 43 struct DataSyncReSendInfo { 44 uint32_t sessionId = 0; 45 uint32_t sequenceId = 0; 46 Timestamp start = 0; // means normal or sync data localwatermark 47 Timestamp end = 0; 48 Timestamp deleteDataStart = 0; // means delete data localwatermark 49 Timestamp deleteDataEnd = 0; 50 uint64_t packetId = 0; 51 }; 52 53 struct SyncEntry { 54 std::vector<SendDataItem> entries; 55 std::vector<uint8_t> compressedEntries; 56 }; 57 58 class SingleVerDataSync { 59 public: 60 SingleVerDataSync(); 61 virtual ~SingleVerDataSync(); 62 63 DISABLE_COPY_ASSIGN_MOVE(SingleVerDataSync); 64 65 int Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle, 66 const std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId); 67 68 int SyncStart(int mode, SingleVerSyncTaskContext *context); 69 70 int TryContinueSync(SingleVerSyncTaskContext *context, const Message *message); 71 72 void ClearSyncStatus(); 73 74 int PushStart(SingleVerSyncTaskContext *context); 75 76 int PushPullStart(SingleVerSyncTaskContext *context); 77 78 int PullRequestStart(SingleVerSyncTaskContext *context); 79 80 int PullResponseStart(SingleVerSyncTaskContext *context); 81 82 int DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message, WaterMark &pullEndWatermark); 83 84 bool AckPacketIdCheck(const Message *message); 85 86 int AckRecv(SingleVerSyncTaskContext *context, const Message *message); 87 88 void SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion, uint32_t sessionId, 89 uint32_t sequenceId, uint32_t inMsgId); 90 91 virtual int SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode, 92 WaterMark maxSendDataTime); 93 94 int CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context); 95 96 std::string GetLabel() const; 97 98 std::string GetDeviceId() const; 99 100 bool WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message); 101 102 int ControlCmdStart(SingleVerSyncTaskContext *context); 103 104 int ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message); 105 106 int ControlCmdAckRecv(SingleVerSyncTaskContext *context, const Message *message); 107 108 void PutDataMsg(Message *message); 109 110 Message *MoveNextDataMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle, bool &isNeedContinue); 111 112 bool IsNeedReloadQueue(); 113 114 void SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message); 115 116 void ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *message); 117 118 void ClearDataMsg(); 119 120 void GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify, const SingleVerSyncTaskContext *context, 121 WaterMark &waterMark) const; 122 123 void GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context, WaterMark &waterMark) const; 124 125 protected: 126 static const int SEND_FINISHED = 0xff; 127 static const int LOCAL_WATER_MARK_NOT_INIT = 0xaa; 128 static const int PEER_WATER_MARK_NOT_INIT = 0x55; 129 static const int WATER_MARK_INVALID = 0xbb; 130 static const int MTU_SIZE = 28311552; // 27MB 131 132 void ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context); 133 134 int InnerSyncStart(SingleVerSyncTaskContext *context); 135 136 void InnerClearSyncStatus(); 137 138 int ReSendData(SingleVerSyncTaskContext *context); 139 140 int32_t ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo); 141 142 void SetSessionEndTimestamp(Timestamp end); 143 144 Timestamp GetSessionEndTimestamp() const; 145 146 void FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context, 147 SyncEntry &syncData, int sendCode, int mode); 148 149 int RequestStart(SingleVerSyncTaskContext *context, int mode); 150 151 SyncTimeRange GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context, 152 const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate); 153 154 int GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData, 155 size_t packetSize); 156 157 int GetData(SingleVerSyncTaskContext *context, size_t packetSize, std::vector<SendDataItem> &outData); 158 159 int GetMatchData(SingleVerSyncTaskContext *context, SyncEntry &syncOutData); 160 161 int Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler, 162 uint32_t packetLen); 163 164 int GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData, size_t packetSize); 165 int GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData, 166 DataSizeSpecInfo syncDataSizeInfo, SyncTimeRange &waterMarkInfo); 167 168 int GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData, size_t packetSize); 169 170 int SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData, SyncType curType, 171 const QuerySyncObject &query); 172 173 int SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context, 174 SyncTimeRange dataTimeRange, bool isCheckBeforUpdate = false) const; 175 176 void GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify, const DeviceID &deviceId, 177 const DeviceID &userId, WaterMark &waterMark) const; 178 179 void GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, const DeviceID &userId, WaterMark &waterMark); 180 181 int RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message, WaterMark maxSendDataTime); 182 183 int DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark, 184 const std::vector<uint64_t> &reserved); 185 186 int SendDataPacket(SyncType syncType, DataRequestPacket *packet, SingleVerSyncTaskContext *context); 187 188 void UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId, const SyncTimeRange &dataTime, 189 const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark); 190 191 void UpdatePeerWaterMark(SyncType syncType, const std::string &queryId, const SingleVerSyncTaskContext *context, 192 WaterMark peerWatermark, WaterMark peerDeletedWatermark); 193 194 std::string GetLocalDeviceName(); 195 196 int DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg = false); 197 198 int DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message); 199 200 void GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet, 201 WaterMark &pullEndWatermark) const; 202 203 int DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark, 204 const std::vector<uint64_t> &reserved); 205 206 int RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message, 207 const DataRequestPacket *packet); 208 209 void SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen); 210 211 int SendReSendPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context, 212 uint32_t sessionId, uint32_t sequenceId); 213 214 int SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData, SingleVerSyncTaskContext *context); 215 216 int CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message); 217 218 void RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId); 219 220 void SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context, const DataRequestPacket *packet, 221 int32_t recvCode, WaterMark maxSendDataTime); 222 223 int GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context, 224 DataSyncReSendInfo reSendInfo); 225 226 virtual int RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context); 227 228 virtual void UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context); 229 230 void FillRequestReSendPacket(SingleVerSyncTaskContext *context, DataRequestPacket *packet, 231 DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode); 232 233 void FillRequestReSendPacketV2(const SingleVerSyncTaskContext *context, DataRequestPacket *packet); 234 235 void UpdateMtuSize(); 236 237 DataSizeSpecInfo GetDataSizeSpecInfo(size_t packetSize); 238 239 int InterceptData(SyncEntry &syncEntry); 240 241 int ControlCmdStartCheck(SingleVerSyncTaskContext *context); 242 243 int SendControlPacket(SubscribeRequest *packet, SingleVerSyncTaskContext *context); 244 245 int ControlCmdRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message); 246 int SubscribeRequestRecvPre(SingleVerSyncTaskContext *context, const SubscribeRequest *packet, 247 const Message *message); 248 int SubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message); 249 int UnsubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message); 250 int SendControlAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode, 251 uint32_t controlCmdType, const CommErrHandler &handler = nullptr); 252 static int QuerySyncCheck(SingleVerSyncTaskContext *context); 253 254 void RemoveSubscribeIfNeed(const std::string &queryId, const std::shared_ptr<SubscribeManager> &subscribeManager); 255 256 int DataRequestRecvInner(SingleVerSyncTaskContext *context, const Message *message, WaterMark &pullEndWatermark); 257 258 void UpdatePeerWaterMarkInner(const DataRequestPacket &packet, const SyncTimeRange &dataTime, 259 const UpdateWaterMark &isUpdateWaterMark, const SingleVerSyncTaskContext *context); 260 261 uint32_t mtuSize_; 262 SyncGenericInterface* storage_; 263 ICommunicator* communicateHandle_; 264 std::shared_ptr<Metadata> metadata_; 265 std::string label_; 266 std::string deviceId_; 267 268 SingleVerDataMessageSchedule msgSchedule_; 269 270 static const int HIGH_VERSION_WINDOW_SIZE = 3; 271 static const int LOW_VERSION_WINDOW_SIZE = 1; 272 // below param is about sliding sync info, is different from every sync task 273 std::mutex lock_; 274 int mode_ = 0; // sync mode, may diff from context mode if trigger pull_response while push finish 275 uint32_t sessionId_ = 0; 276 // sequenceId as key 277 std::map<uint32_t, ReSendInfo> reSendMap_; 278 // remaining sending window 279 int32_t windowSize_ = 0; 280 // max sequenceId has been sent 281 uint32_t maxSequenceIdHasSent_ = 0; 282 bool isAllDataHasSent_ = false; 283 // in a sync session, the last data timestamp 284 Timestamp sessionEndTimestamp_ = 0; 285 286 std::mutex removeDeviceDataLock_; 287 std::mutex unsubscribeLock_; 288 }; 289 } // namespace DistributedDB 290 291 #endif // SINGLE_VER_DATA_SYNC_NEW_H 292