1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SINGLE_VER_DATA_PACKET_NEW_H 17 #define SINGLE_VER_DATA_PACKET_NEW_H 18 19 #include "icommunicator.h" 20 #include "parcel.h" 21 #include "query_sync_object.h" 22 #include "single_ver_kvdb_sync_interface.h" 23 #include "sync_types.h" 24 #include "version.h" 25 26 namespace DistributedDB { 27 using SendDataItem = SingleVerKvEntry *; 28 29 class DataRequestPacket { 30 public: DataRequestPacket()31 DataRequestPacket() {}; 32 virtual ~DataRequestPacket(); 33 34 void SetData(std::vector<SendDataItem> &data); 35 36 const std::vector<SendDataItem> &GetData() const; 37 const std::vector<uint8_t> &GetCompressedData() const; 38 39 void SetCompressData(std::vector<uint8_t> &compressData); 40 41 const std::vector<uint8_t> &GetCompressData() const; 42 43 void SetEndWaterMark(WaterMark waterMark); 44 45 WaterMark GetEndWaterMark() const; 46 47 void SetLocalWaterMark(WaterMark waterMark); 48 49 WaterMark GetLocalWaterMark() const; 50 51 void SetPeerWaterMark(WaterMark waterMark); 52 53 WaterMark GetPeerWaterMark() const; 54 55 void SetSendCode(int32_t errCode); 56 57 int32_t GetSendCode() const; 58 59 void SetMode(int32_t mode); 60 61 int32_t GetMode() const; 62 63 void SetSessionId(uint32_t sessionId); 64 65 uint32_t GetSessionId() const; 66 67 void SetVersion(uint32_t version); 68 69 uint32_t GetVersion() const; 70 71 uint32_t CalculateLen(uint32_t messageId) const; 72 73 void SetReserved(std::vector<uint64_t> &reserved); 74 void SetReserved(std::vector<uint64_t> &&reserved); 75 76 std::vector<uint64_t> GetReserved() const; 77 78 uint64_t GetPacketId() const; 79 80 void SetFlag(uint32_t flag); 81 82 uint32_t GetFlag() const; 83 84 bool IsLastSequence() const; 85 86 void SetLastSequence(); 87 88 bool IsNeedUpdateWaterMark() const; 89 90 void SetUpdateWaterMark(); 91 92 void SetBasicInfo(int sendCode, uint32_t version, int32_t mode); 93 94 void SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark); 95 96 void SetQuery(const QuerySyncObject &query); 97 QuerySyncObject GetQuery() const; 98 99 void SetQueryId(const std::string &queryId); 100 std::string GetQueryId() const; 101 102 void SetDeletedWaterMark(WaterMark watermark); 103 WaterMark GetDeletedWaterMark() const; 104 105 void SetCompressDataMark(); 106 bool IsCompressData() const; 107 108 void SetCompressAlgo(CompressAlgorithm algo); 109 CompressAlgorithm GetCompressAlgo() const; 110 111 void SetExtraConditions(const std::map<std::string, std::string> &extraConditions); 112 std::map<std::string, std::string> GetExtraConditions() const; 113 bool IsExtraConditionData() const; 114 115 protected: 116 std::vector<SendDataItem> data_; 117 WaterMark endWaterMark_ = 0; 118 WaterMark localWaterMark_ = 0; 119 WaterMark peerWaterMark_ = 0; 120 int32_t sendCode_ = 0; 121 int32_t mode_ = SyncModeType::INVALID_MODE; 122 uint32_t sessionId_ = 0; 123 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 124 std::vector<uint64_t> reserved_; 125 uint32_t flag_ = 0; // bit 0 used for isLastSequence 126 // add for query sync mode 127 QuerySyncObject query_; 128 std::string queryId_; 129 WaterMark deletedWatermark_ = 0; 130 std::vector<uint8_t> compressData_; // if compressData size is above 0, means use compressData and ignore data_ 131 CompressAlgorithm algo_ = CompressAlgorithm::NONE; // used for param while serialize compress data 132 std::map<std::string, std::string> extraConditions_; // use for checkpermission in annother device 133 static const uint32_t IS_LAST_SEQUENCE = 0x1; // bit 0 used for isLastSequence, 1: is last, 0: not last 134 static const uint32_t IS_UPDATE_WATER = 0x2; // bit 1 used for update watermark, 0: update, 1: not update 135 static const uint32_t IS_COMPRESS_DATA = 0x4; // bit 3 used for compress data, 0: raw data, 1: compress data 136 static const uint32_t IS_CONDITION_DATA = 0x8; // bit 4 used for extra condition data, 0: raw data 137 }; 138 139 class DataAckPacket { 140 public: DataAckPacket()141 DataAckPacket() {}; ~DataAckPacket()142 virtual ~DataAckPacket() {}; 143 144 void SetData(uint64_t data); 145 146 uint64_t GetData() const; 147 148 void SetRecvCode(int32_t errorCode); 149 150 int32_t GetRecvCode() const; 151 152 void SetVersion(uint32_t version); 153 154 uint32_t GetVersion() const; 155 156 void SetReserved(std::vector<uint64_t> &reserved); 157 158 std::vector<uint64_t> GetReserved() const; 159 160 uint64_t GetPacketId() const; 161 162 static bool IsPacketIdValid(uint64_t packetId); 163 164 uint32_t CalculateLen() const; 165 166 private: 167 /* 168 * data_ is waterMark when revCode_ == LOCAL_WATER_MARK_NOT_INIT || revCode_ == E_OK; 169 * data_ is timer in milliSeconds when revCode_ == -E_SAVE_DATA_NOTIFY && data_ != 0. 170 */ 171 uint64_t data_ = 0; 172 int32_t recvCode_ = 0; 173 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 174 std::vector<uint64_t> reserved_; 175 }; 176 177 class ControlRequestPacket { 178 public: ControlRequestPacket()179 ControlRequestPacket() {}; ~ControlRequestPacket()180 virtual ~ControlRequestPacket() {}; 181 void SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag); 182 183 int32_t GetSendCode() const; 184 uint32_t GetVersion() const; 185 uint32_t GetcontrolCmdType() const; 186 uint32_t GetFlag() const; 187 virtual void SetQuery(const QuerySyncObject &query); 188 virtual uint32_t CalculateLen() const; 189 private: 190 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 191 int32_t sendCode_ = 0; 192 uint32_t controlCmdType_ = 0; 193 uint32_t flag_ = 0; 194 }; 195 196 class SubscribeRequest : public ControlRequestPacket { 197 public: SubscribeRequest()198 SubscribeRequest() {}; ~SubscribeRequest()199 ~SubscribeRequest() override {}; 200 QuerySyncObject GetQuery() const; 201 bool IsAutoSubscribe() const; 202 void SetQuery(const QuerySyncObject &query) override; 203 uint32_t CalculateLen() const override; 204 static const uint32_t IS_AUTO_SUBSCRIBE = 0x1; 205 private: 206 QuerySyncObject query_; 207 }; 208 209 class ControlAckPacket { 210 public: ControlAckPacket()211 ControlAckPacket() {}; ~ControlAckPacket()212 virtual ~ControlAckPacket() {}; 213 void SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag); 214 int32_t GetRecvCode() const; 215 uint32_t GetVersion() const; 216 uint32_t GetcontrolCmdType() const; 217 uint32_t GetFlag() const; 218 uint32_t CalculateLen() const; 219 220 private: 221 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 222 int32_t recvCode_ = 0; 223 uint32_t controlCmdType_ = 0; 224 uint32_t flag_ = 0; 225 }; 226 } // namespace DistributedDB 227 228 #endif // SINGLE_VER_DATA_SYNC_NEW_H