1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SINGLE_VER_DATA_PACKET_NEW_H 17 #define SINGLE_VER_DATA_PACKET_NEW_H 18 19 #include <cstdint> 20 #include <string> 21 #include "icommunicator.h" 22 #include "parcel.h" 23 #include "query_sync_object.h" 24 #include "single_ver_kvdb_sync_interface.h" 25 #include "sync_types.h" 26 #include "version.h" 27 28 namespace DistributedDB { 29 using SendDataItem = SingleVerKvEntry *; 30 31 class DataRequestPacket { 32 public: DataRequestPacket()33 DataRequestPacket() {}; 34 virtual ~DataRequestPacket(); 35 36 void SetData(std::vector<SendDataItem> &data); 37 38 const std::vector<SendDataItem> &GetData() const; 39 const std::vector<uint8_t> &GetCompressedData() const; 40 41 void SetCompressData(std::vector<uint8_t> &compressData); 42 43 const std::vector<uint8_t> &GetCompressData() const; 44 45 void SetEndWaterMark(WaterMark waterMark); 46 47 WaterMark GetEndWaterMark() const; 48 49 void SetLocalWaterMark(WaterMark waterMark); 50 51 WaterMark GetLocalWaterMark() const; 52 53 void SetPeerWaterMark(WaterMark waterMark); 54 55 WaterMark GetPeerWaterMark() const; 56 57 void SetSendCode(int32_t errCode); 58 59 int32_t GetSendCode() const; 60 61 void SetMode(int32_t mode); 62 63 int32_t GetMode() const; 64 65 void SetSessionId(uint32_t sessionId); 66 67 uint32_t GetSessionId() const; 68 69 void SetVersion(uint32_t version); 70 71 uint32_t GetVersion() const; 72 73 uint32_t CalculateLen(uint32_t messageId) const; 74 75 void SetReserved(std::vector<uint64_t> &reserved); 76 void SetReserved(std::vector<uint64_t> &&reserved); 77 78 std::vector<uint64_t> GetReserved() const; 79 80 uint64_t GetPacketId() const; 81 82 void SetFlag(uint32_t flag); 83 84 uint32_t GetFlag() const; 85 86 bool IsLastSequence() const; 87 88 void SetLastSequence(); 89 90 bool IsNeedUpdateWaterMark() const; 91 92 void SetUpdateWaterMark(); 93 94 void SetBasicInfo(int sendCode, uint32_t version, int32_t mode); 95 96 void SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark); 97 98 void SetQuery(const QuerySyncObject &query); 99 QuerySyncObject GetQuery() const; 100 101 void SetQueryId(const std::string &queryId); 102 std::string GetQueryId() const; 103 104 void SetDeletedWaterMark(WaterMark watermark); 105 WaterMark GetDeletedWaterMark() const; 106 107 void SetCompressDataMark(); 108 bool IsCompressData() const; 109 110 void SetCompressAlgo(CompressAlgorithm algo); 111 CompressAlgorithm GetCompressAlgo() const; 112 113 void SetExtraConditions(const std::map<std::string, std::string> &extraConditions); 114 std::map<std::string, std::string> GetExtraConditions() const; 115 bool IsExtraConditionData() const; 116 117 void SetSchemaVersion(uint64_t schemaVersion); 118 uint64_t GetSchemaVersion() const; 119 120 void SetSystemTimeOffset(int64_t systemTimeOffset); 121 int64_t GetSystemTimeOffset() const; 122 123 void SetSenderTimeOffset(int64_t senderTimeOffset); 124 int64_t GetSenderTimeOffset() const; 125 126 void SetSecurityOption(const SecurityOption &option); 127 SecurityOption GetSecurityOption() const; 128 129 void SetTotalDataCount(uint32_t total); 130 uint32_t GetTotalDataCount() const; 131 protected: 132 std::vector<SendDataItem> data_; 133 WaterMark endWaterMark_ = 0; 134 WaterMark localWaterMark_ = 0; 135 WaterMark peerWaterMark_ = 0; 136 int32_t sendCode_ = 0; 137 int32_t mode_ = SyncModeType::INVALID_MODE; 138 uint32_t sessionId_ = 0; 139 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 140 std::vector<uint64_t> reserved_; 141 uint32_t flag_ = 0; // bit 0 used for isLastSequence 142 // add for query sync mode 143 QuerySyncObject query_; 144 std::string queryId_; 145 WaterMark deletedWatermark_ = 0; 146 std::vector<uint8_t> compressData_; // if compressData size is above 0, means use compressData and ignore data_ 147 CompressAlgorithm algo_ = CompressAlgorithm::NONE; // used for param while serialize compress data 148 std::map<std::string, std::string> extraConditions_; // use for checkpermission in annother device 149 uint64_t schemaVersion_ = 0; // sender schema version, add in 109 150 int64_t systemTimeOffset_ = 0; // sender device time offset with receiver, add in 109 151 int64_t senderTimeOffset_ = 0; // sender local time offset, add in 109 152 SecurityOption securityOption_; 153 uint32_t totalDataCount_ = 0; 154 static const uint32_t IS_LAST_SEQUENCE = 0x1; // bit 0 used for isLastSequence, 1: is last, 0: not last 155 static const uint32_t IS_UPDATE_WATER = 0x2; // bit 1 used for update watermark, 0: update, 1: not update 156 static const uint32_t IS_COMPRESS_DATA = 0x4; // bit 3 used for compress data, 0: raw data, 1: compress data 157 static const uint32_t IS_CONDITION_DATA = 0x8; // bit 4 used for extra condition data, 0: raw data 158 }; 159 160 class DataAckPacket { 161 public: DataAckPacket()162 DataAckPacket() {}; ~DataAckPacket()163 virtual ~DataAckPacket() {}; 164 165 void SetData(const uint64_t data); 166 167 uint64_t GetData() const; 168 169 void SetRecvCode(int32_t errorCode); 170 171 int32_t GetRecvCode() const; 172 173 void SetVersion(uint32_t version); 174 175 uint32_t GetVersion() const; 176 177 void SetReserved(std::vector<uint64_t> &reserved); 178 179 std::vector<uint64_t> GetReserved() const; 180 181 uint64_t GetPacketId() const; 182 183 static bool IsPacketIdValid(uint64_t packetId); 184 185 uint32_t CalculateLen() const; 186 187 private: 188 /* 189 * data_ is waterMark when revCode_ == LOCAL_WATER_MARK_NOT_INIT || revCode_ == E_OK; 190 * data_ is timer in milliSeconds when revCode_ == -E_SAVE_DATA_NOTIFY && data_ != 0. 191 */ 192 uint64_t data_ = 0; 193 int32_t recvCode_ = 0; 194 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 195 std::vector<uint64_t> reserved_; 196 }; 197 198 class ControlRequestPacket { 199 public: ControlRequestPacket()200 ControlRequestPacket() {}; ~ControlRequestPacket()201 virtual ~ControlRequestPacket() {}; 202 void SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag); 203 204 int32_t GetSendCode() const; 205 uint32_t GetVersion() const; 206 uint32_t GetcontrolCmdType() const; 207 uint32_t GetFlag() const; 208 virtual void SetQuery(const QuerySyncObject &query); 209 virtual uint32_t CalculateLen() const; 210 private: 211 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 212 int32_t sendCode_ = 0; 213 uint32_t controlCmdType_ = 0; 214 uint32_t flag_ = 0; 215 }; 216 217 class SubscribeRequest : public ControlRequestPacket { 218 public: SubscribeRequest()219 SubscribeRequest() {}; ~SubscribeRequest()220 ~SubscribeRequest() override {}; 221 QuerySyncObject GetQuery() const; 222 bool IsAutoSubscribe() const; 223 void SetQuery(const QuerySyncObject &query) override; 224 uint32_t CalculateLen() const override; 225 static const uint32_t IS_AUTO_SUBSCRIBE = 0x1; 226 private: 227 QuerySyncObject query_; 228 }; 229 230 class ControlAckPacket { 231 public: ControlAckPacket()232 ControlAckPacket() {}; ~ControlAckPacket()233 virtual ~ControlAckPacket() {}; 234 void SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag); 235 int32_t GetRecvCode() const; 236 uint32_t GetVersion() const; 237 uint32_t GetcontrolCmdType() const; 238 uint32_t GetFlag() const; 239 uint32_t CalculateLen() const; 240 241 private: 242 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 243 int32_t recvCode_ = 0; 244 uint32_t controlCmdType_ = 0; 245 uint32_t flag_ = 0; 246 }; 247 } // namespace DistributedDB 248 249 #endif // SINGLE_VER_DATA_SYNC_NEW_H