1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SINGLE_VER_DATA_PACKET_NEW_H 17 #define SINGLE_VER_DATA_PACKET_NEW_H 18 19 #include "icommunicator.h" 20 #include "parcel.h" 21 #include "query_sync_object.h" 22 #include "single_ver_kvdb_sync_interface.h" 23 #include "sync_types.h" 24 #include "version.h" 25 26 namespace DistributedDB { 27 using SendDataItem = SingleVerKvEntry *; 28 29 class DataRequestPacket { 30 public: DataRequestPacket()31 DataRequestPacket() {}; 32 virtual ~DataRequestPacket(); 33 34 void SetData(std::vector<SendDataItem> &data); 35 36 const std::vector<SendDataItem> &GetData() const; 37 const std::vector<uint8_t> &GetCompressedData() const; 38 39 void SetCompressData(std::vector<uint8_t> &compressData); 40 41 const std::vector<uint8_t> &GetCompressData() const; 42 43 void SetEndWaterMark(WaterMark waterMark); 44 45 WaterMark GetEndWaterMark() const; 46 47 void SetLocalWaterMark(WaterMark waterMark); 48 49 WaterMark GetLocalWaterMark() const; 50 51 void SetPeerWaterMark(WaterMark waterMark); 52 53 WaterMark GetPeerWaterMark() const; 54 55 void SetSendCode(int32_t errCode); 56 57 int32_t GetSendCode() const; 58 59 void SetMode(int32_t mode); 60 61 int32_t GetMode() const; 62 63 void SetSessionId(uint32_t sessionId); 64 65 uint32_t GetSessionId() const; 66 67 void SetVersion(uint32_t version); 68 69 uint32_t GetVersion() const; 70 71 uint32_t CalculateLen(uint32_t messageId) const; 72 73 void SetReserved(std::vector<uint64_t> &reserved); 74 void SetReserved(std::vector<uint64_t> &&reserved); 75 76 std::vector<uint64_t> GetReserved() const; 77 78 uint64_t GetPacketId() const; 79 80 void SetFlag(uint32_t flag); 81 82 uint32_t GetFlag() const; 83 84 bool IsLastSequence() const; 85 86 void SetLastSequence(); 87 88 bool IsNeedUpdateWaterMark() const; 89 90 void SetUpdateWaterMark(); 91 92 void SetBasicInfo(int sendCode, uint32_t version, int32_t mode); 93 94 void SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark); 95 96 void SetQuery(const QuerySyncObject &query); 97 QuerySyncObject GetQuery() const; 98 99 void SetQueryId(const std::string &queryId); 100 std::string GetQueryId() const; 101 102 void SetDeletedWaterMark(WaterMark watermark); 103 WaterMark GetDeletedWaterMark() const; 104 105 void SetCompressDataMark(); 106 bool IsCompressData() const; 107 108 void SetCompressAlgo(CompressAlgorithm algo); 109 CompressAlgorithm GetCompressAlgo() const; 110 111 void SetExtraConditions(const std::map<std::string, std::string> &extraConditions); 112 std::map<std::string, std::string> GetExtraConditions() const; 113 bool IsExtraConditionData() const; 114 115 void SetSchemaVersion(uint64_t schemaVersion); 116 uint64_t GetSchemaVersion() const; 117 118 void SetSystemTimeOffset(int64_t systemTimeOffset); 119 int64_t GetSystemTimeOffset() const; 120 121 void SetSenderTimeOffset(int64_t senderTimeOffset); 122 int64_t GetSenderTimeOffset() const; 123 124 void SetSecurityOption(const SecurityOption &option); 125 SecurityOption GetSecurityOption() const; 126 127 void SetTotalDataCount(uint32_t total); 128 uint32_t GetTotalDataCount() const; 129 protected: 130 std::vector<SendDataItem> data_; 131 WaterMark endWaterMark_ = 0; 132 WaterMark localWaterMark_ = 0; 133 WaterMark peerWaterMark_ = 0; 134 int32_t sendCode_ = 0; 135 int32_t mode_ = SyncModeType::INVALID_MODE; 136 uint32_t sessionId_ = 0; 137 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 138 std::vector<uint64_t> reserved_; 139 uint32_t flag_ = 0; // bit 0 used for isLastSequence 140 // add for query sync mode 141 QuerySyncObject query_; 142 std::string queryId_; 143 WaterMark deletedWatermark_ = 0; 144 std::vector<uint8_t> compressData_; // if compressData size is above 0, means use compressData and ignore data_ 145 CompressAlgorithm algo_ = CompressAlgorithm::NONE; // used for param while serialize compress data 146 std::map<std::string, std::string> extraConditions_; // use for checkpermission in annother device 147 uint64_t schemaVersion_ = 0; // sender schema version, add in 109 148 int64_t systemTimeOffset_ = 0; // sender device time offset with receiver, add in 109 149 int64_t senderTimeOffset_ = 0; // sender local time offset, add in 109 150 SecurityOption securityOption_; 151 uint32_t totalDataCount_ = 0; 152 static const uint32_t IS_LAST_SEQUENCE = 0x1; // bit 0 used for isLastSequence, 1: is last, 0: not last 153 static const uint32_t IS_UPDATE_WATER = 0x2; // bit 1 used for update watermark, 0: update, 1: not update 154 static const uint32_t IS_COMPRESS_DATA = 0x4; // bit 3 used for compress data, 0: raw data, 1: compress data 155 static const uint32_t IS_CONDITION_DATA = 0x8; // bit 4 used for extra condition data, 0: raw data 156 }; 157 158 class DataAckPacket { 159 public: DataAckPacket()160 DataAckPacket() {}; ~DataAckPacket()161 virtual ~DataAckPacket() {}; 162 163 void SetData(uint64_t data); 164 165 uint64_t GetData() const; 166 167 void SetRecvCode(int32_t errorCode); 168 169 int32_t GetRecvCode() const; 170 171 void SetVersion(uint32_t version); 172 173 uint32_t GetVersion() const; 174 175 void SetReserved(std::vector<uint64_t> &reserved); 176 177 std::vector<uint64_t> GetReserved() const; 178 179 uint64_t GetPacketId() const; 180 181 static bool IsPacketIdValid(uint64_t packetId); 182 183 uint32_t CalculateLen() const; 184 185 private: 186 /* 187 * data_ is waterMark when revCode_ == LOCAL_WATER_MARK_NOT_INIT || revCode_ == E_OK; 188 * data_ is timer in milliSeconds when revCode_ == -E_SAVE_DATA_NOTIFY && data_ != 0. 189 */ 190 uint64_t data_ = 0; 191 int32_t recvCode_ = 0; 192 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 193 std::vector<uint64_t> reserved_; 194 }; 195 196 class ControlRequestPacket { 197 public: ControlRequestPacket()198 ControlRequestPacket() {}; ~ControlRequestPacket()199 virtual ~ControlRequestPacket() {}; 200 void SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag); 201 202 int32_t GetSendCode() const; 203 uint32_t GetVersion() const; 204 uint32_t GetcontrolCmdType() const; 205 uint32_t GetFlag() const; 206 virtual void SetQuery(const QuerySyncObject &query); 207 virtual uint32_t CalculateLen() const; 208 private: 209 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 210 int32_t sendCode_ = 0; 211 uint32_t controlCmdType_ = 0; 212 uint32_t flag_ = 0; 213 }; 214 215 class SubscribeRequest : public ControlRequestPacket { 216 public: SubscribeRequest()217 SubscribeRequest() {}; ~SubscribeRequest()218 ~SubscribeRequest() override {}; 219 QuerySyncObject GetQuery() const; 220 bool IsAutoSubscribe() const; 221 void SetQuery(const QuerySyncObject &query) override; 222 uint32_t CalculateLen() const override; 223 static const uint32_t IS_AUTO_SUBSCRIBE = 0x1; 224 private: 225 QuerySyncObject query_; 226 }; 227 228 class ControlAckPacket { 229 public: ControlAckPacket()230 ControlAckPacket() {}; ~ControlAckPacket()231 virtual ~ControlAckPacket() {}; 232 void SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag); 233 int32_t GetRecvCode() const; 234 uint32_t GetVersion() const; 235 uint32_t GetcontrolCmdType() const; 236 uint32_t GetFlag() const; 237 uint32_t CalculateLen() const; 238 239 private: 240 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 241 int32_t recvCode_ = 0; 242 uint32_t controlCmdType_ = 0; 243 uint32_t flag_ = 0; 244 }; 245 } // namespace DistributedDB 246 247 #endif // SINGLE_VER_DATA_SYNC_NEW_H