1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SINGLE_VER_DATA_PACKET_NEW_H 17 #define SINGLE_VER_DATA_PACKET_NEW_H 18 19 #include <cstdint> 20 #include <string> 21 #include "icommunicator.h" 22 #include "parcel.h" 23 #include "query_sync_object.h" 24 #include "single_ver_kvdb_sync_interface.h" 25 #include "sync_types.h" 26 #include "version.h" 27 28 namespace DistributedDB { 29 using SendDataItem = SingleVerKvEntry *; 30 31 class DataRequestPacket { 32 public: DataRequestPacket()33 DataRequestPacket() {}; 34 ~DataRequestPacket(); 35 36 void SetData(std::vector<SendDataItem> &data); 37 38 const std::vector<SendDataItem> &GetData() const; 39 40 void SetCompressData(std::vector<uint8_t> &compressData); 41 42 const std::vector<uint8_t> &GetCompressData() const; 43 44 void SetEndWaterMark(WaterMark waterMark); 45 46 WaterMark GetEndWaterMark() const; 47 48 void SetLocalWaterMark(WaterMark waterMark); 49 50 WaterMark GetLocalWaterMark() const; 51 52 void SetPeerWaterMark(WaterMark waterMark); 53 54 WaterMark GetPeerWaterMark() const; 55 56 void SetSendCode(int32_t errCode); 57 58 int32_t GetSendCode() const; 59 60 void SetMode(int32_t mode); 61 62 int32_t GetMode() const; 63 64 void SetSessionId(uint32_t sessionId); 65 66 uint32_t GetSessionId() const; 67 68 void SetVersion(uint32_t version); 69 70 uint32_t GetVersion() const; 71 72 uint32_t CalculateLen(uint32_t messageId) const; 73 74 void SetReserved(std::vector<uint64_t> &reserved); 75 void SetReserved(std::vector<uint64_t> &&reserved); 76 77 std::vector<uint64_t> GetReserved() const; 78 79 uint64_t GetPacketId() const; 80 81 void SetFlag(uint32_t flag); 82 83 uint32_t GetFlag() const; 84 85 bool IsLastSequence() const; 86 87 void SetLastSequence(); 88 89 bool IsNeedUpdateWaterMark() const; 90 91 void SetUpdateWaterMark(); 92 93 void SetBasicInfo(int sendCode, uint32_t version, int32_t mode); 94 95 void SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark); 96 97 void SetQuery(const QuerySyncObject &query); 98 QuerySyncObject GetQuery() const; 99 100 void SetQueryId(const std::string &queryId); 101 std::string GetQueryId() const; 102 103 void SetDeletedWaterMark(WaterMark watermark); 104 WaterMark GetDeletedWaterMark() const; 105 106 void SetCompressDataMark(); 107 bool IsCompressData() const; 108 109 void SetCompressAlgo(CompressAlgorithm algo); 110 CompressAlgorithm GetCompressAlgo() const; 111 112 void SetExtraConditions(const std::map<std::string, std::string> &extraConditions); 113 std::map<std::string, std::string> GetExtraConditions() const; 114 bool IsExtraConditionData() const; 115 116 void SetSchemaVersion(uint64_t schemaVersion); 117 uint64_t GetSchemaVersion() const; 118 119 void SetSystemTimeOffset(int64_t systemTimeOffset); 120 int64_t GetSystemTimeOffset() const; 121 122 void SetSenderTimeOffset(int64_t senderTimeOffset); 123 int64_t GetSenderTimeOffset() const; 124 125 void SetSecurityOption(const SecurityOption &option); 126 SecurityOption GetSecurityOption() const; 127 128 void SetTotalDataCount(uint32_t total); 129 uint32_t GetTotalDataCount() const; 130 protected: 131 std::vector<SendDataItem> data_; 132 WaterMark endWaterMark_ = 0; 133 WaterMark localWaterMark_ = 0; 134 WaterMark peerWaterMark_ = 0; 135 int32_t sendCode_ = 0; 136 int32_t mode_ = SyncModeType::INVALID_MODE; 137 uint32_t sessionId_ = 0; 138 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 139 std::vector<uint64_t> reserved_; 140 uint32_t flag_ = 0; // bit 0 used for isLastSequence 141 // add for query sync mode 142 QuerySyncObject query_; 143 std::string queryId_; 144 WaterMark deletedWatermark_ = 0; 145 std::vector<uint8_t> compressData_; // if compressData size is above 0, means use compressData and ignore data_ 146 CompressAlgorithm algo_ = CompressAlgorithm::NONE; // used for param while serialize compress data 147 std::map<std::string, std::string> extraConditions_; // use for checkpermission in annother device 148 uint64_t schemaVersion_ = 0; // sender schema version, add in 109 149 int64_t systemTimeOffset_ = 0; // sender device time offset with receiver, add in 109 150 int64_t senderTimeOffset_ = 0; // sender local time offset, add in 109 151 SecurityOption securityOption_; 152 uint32_t totalDataCount_ = 0; 153 static const uint32_t IS_LAST_SEQUENCE = 0x1; // bit 0 used for isLastSequence, 1: is last, 0: not last 154 static const uint32_t IS_UPDATE_WATER = 0x2; // bit 1 used for update watermark, 0: update, 1: not update 155 static const uint32_t IS_COMPRESS_DATA = 0x4; // bit 3 used for compress data, 0: raw data, 1: compress data 156 static const uint32_t IS_CONDITION_DATA = 0x8; // bit 4 used for extra condition data, 0: raw data 157 }; 158 159 class DataAckPacket { 160 public: DataAckPacket()161 DataAckPacket() {}; ~DataAckPacket()162 ~DataAckPacket() {}; 163 164 void SetData(const uint64_t data); 165 166 uint64_t GetData() const; 167 168 void SetRecvCode(int32_t errorCode); 169 170 int32_t GetRecvCode() const; 171 172 void SetVersion(uint32_t version); 173 174 uint32_t GetVersion() const; 175 176 void SetReserved(std::vector<uint64_t> &reserved); 177 178 std::vector<uint64_t> GetReserved() const; 179 180 uint64_t GetPacketId() const; 181 182 static bool IsPacketIdValid(uint64_t packetId); 183 184 uint32_t CalculateLen() const; 185 186 private: 187 /* 188 * data_ is waterMark when revCode_ == LOCAL_WATER_MARK_NOT_INIT || revCode_ == E_OK; 189 * data_ is timer in milliSeconds when revCode_ == -E_SAVE_DATA_NOTIFY && data_ != 0. 190 */ 191 uint64_t data_ = 0; 192 int32_t recvCode_ = 0; 193 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 194 std::vector<uint64_t> reserved_; 195 }; 196 197 class ControlRequestPacket { 198 public: ControlRequestPacket()199 ControlRequestPacket() {}; ~ControlRequestPacket()200 virtual ~ControlRequestPacket() {}; 201 void SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag); 202 203 int32_t GetSendCode() const; 204 uint32_t GetVersion() const; 205 uint32_t GetcontrolCmdType() const; 206 uint32_t GetFlag() const; 207 virtual void SetQuery(const QuerySyncObject &query); 208 virtual uint32_t CalculateLen() const; 209 private: 210 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 211 int32_t sendCode_ = 0; 212 uint32_t controlCmdType_ = 0; 213 uint32_t flag_ = 0; 214 }; 215 216 class SubscribeRequest : public ControlRequestPacket { 217 public: SubscribeRequest()218 SubscribeRequest() {}; ~SubscribeRequest()219 ~SubscribeRequest() override {}; 220 QuerySyncObject GetQuery() const; 221 bool IsAutoSubscribe() const; 222 void SetQuery(const QuerySyncObject &query) override; 223 uint32_t CalculateLen() const override; 224 static const uint32_t IS_AUTO_SUBSCRIBE = 0x1; 225 private: 226 QuerySyncObject query_; 227 }; 228 229 class ControlAckPacket { 230 public: ControlAckPacket()231 ControlAckPacket() {}; ~ControlAckPacket()232 ~ControlAckPacket() {}; 233 void SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag); 234 int32_t GetRecvCode() const; 235 uint32_t GetVersion() const; 236 uint32_t GetcontrolCmdType() const; 237 uint32_t GetFlag() const; 238 uint32_t CalculateLen() const; 239 240 private: 241 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 242 int32_t recvCode_ = 0; 243 uint32_t controlCmdType_ = 0; 244 uint32_t flag_ = 0; 245 }; 246 } // namespace DistributedDB 247 248 #endif // SINGLE_VER_DATA_SYNC_NEW_H