• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SINGLE_VER_DATA_PACKET_NEW_H
17 #define SINGLE_VER_DATA_PACKET_NEW_H
18 
19 #include "icommunicator.h"
20 #include "parcel.h"
21 #include "query_sync_object.h"
22 #include "single_ver_kvdb_sync_interface.h"
23 #include "sync_types.h"
24 #include "version.h"
25 
26 namespace DistributedDB {
27 using SendDataItem = SingleVerKvEntry *;
28 
29 class DataRequestPacket {
30 public:
DataRequestPacket()31     DataRequestPacket() {};
32     virtual ~DataRequestPacket();
33 
34     void SetData(std::vector<SendDataItem> &data);
35 
36     const std::vector<SendDataItem> &GetData() const;
37     const std::vector<uint8_t> &GetCompressedData() const;
38 
39     void SetCompressData(std::vector<uint8_t> &compressData);
40 
41     const std::vector<uint8_t> &GetCompressData() const;
42 
43     void SetEndWaterMark(WaterMark waterMark);
44 
45     WaterMark GetEndWaterMark() const;
46 
47     void SetLocalWaterMark(WaterMark waterMark);
48 
49     WaterMark GetLocalWaterMark() const;
50 
51     void SetPeerWaterMark(WaterMark waterMark);
52 
53     WaterMark GetPeerWaterMark() const;
54 
55     void SetSendCode(int32_t errCode);
56 
57     int32_t GetSendCode() const;
58 
59     void SetMode(int32_t mode);
60 
61     int32_t GetMode() const;
62 
63     void SetSessionId(uint32_t sessionId);
64 
65     uint32_t GetSessionId() const;
66 
67     void SetVersion(uint32_t version);
68 
69     uint32_t GetVersion() const;
70 
71     uint32_t CalculateLen(uint32_t messageId) const;
72 
73     void SetReserved(std::vector<uint64_t> &reserved);
74     void SetReserved(std::vector<uint64_t> &&reserved);
75 
76     std::vector<uint64_t> GetReserved() const;
77 
78     uint64_t GetPacketId() const;
79 
80     void SetFlag(uint32_t flag);
81 
82     uint32_t GetFlag() const;
83 
84     bool IsLastSequence() const;
85 
86     void SetLastSequence();
87 
88     bool IsNeedUpdateWaterMark() const;
89 
90     void SetUpdateWaterMark();
91 
92     void SetBasicInfo(int sendCode, uint32_t version, int32_t mode);
93 
94     void SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark);
95 
96     void SetQuery(const QuerySyncObject &query);
97     QuerySyncObject GetQuery() const;
98 
99     void SetQueryId(const std::string &queryId);
100     std::string GetQueryId() const;
101 
102     void SetDeletedWaterMark(WaterMark watermark);
103     WaterMark GetDeletedWaterMark() const;
104 
105     void SetCompressDataMark();
106     bool IsCompressData() const;
107 
108     void SetCompressAlgo(CompressAlgorithm algo);
109     CompressAlgorithm GetCompressAlgo() const;
110 
111     void SetExtraConditions(const std::map<std::string, std::string> &extraConditions);
112     std::map<std::string, std::string> GetExtraConditions() const;
113     bool IsExtraConditionData() const;
114 
115 protected:
116     std::vector<SendDataItem> data_;
117     WaterMark endWaterMark_ = 0;
118     WaterMark localWaterMark_ = 0;
119     WaterMark peerWaterMark_ = 0;
120     int32_t sendCode_ = 0;
121     int32_t mode_ = SyncModeType::INVALID_MODE;
122     uint32_t sessionId_ = 0;
123     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
124     std::vector<uint64_t> reserved_;
125     uint32_t flag_ = 0; // bit 0 used for isLastSequence
126     // add for query sync mode
127     QuerySyncObject query_;
128     std::string queryId_;
129     WaterMark deletedWatermark_ = 0;
130     std::vector<uint8_t> compressData_; // if compressData size is above 0, means use compressData and ignore data_
131     CompressAlgorithm algo_ = CompressAlgorithm::NONE; // used for param while serialize compress data
132     std::map<std::string, std::string> extraConditions_; // use for checkpermission in annother device
133     static const uint32_t IS_LAST_SEQUENCE = 0x1; // bit 0 used for isLastSequence, 1: is last, 0: not last
134     static const uint32_t IS_UPDATE_WATER = 0x2; // bit 1 used for update watermark, 0: update, 1: not update
135     static const uint32_t IS_COMPRESS_DATA = 0x4; // bit 3 used for compress data, 0: raw data, 1: compress data
136     static const uint32_t IS_CONDITION_DATA = 0x8; // bit 4 used for extra condition data, 0: raw data
137 };
138 
139 class DataAckPacket {
140 public:
DataAckPacket()141     DataAckPacket() {};
~DataAckPacket()142     virtual ~DataAckPacket() {};
143 
144     void SetData(uint64_t data);
145 
146     uint64_t GetData() const;
147 
148     void SetRecvCode(int32_t errorCode);
149 
150     int32_t GetRecvCode() const;
151 
152     void SetVersion(uint32_t version);
153 
154     uint32_t GetVersion() const;
155 
156     void SetReserved(std::vector<uint64_t> &reserved);
157 
158     std::vector<uint64_t> GetReserved() const;
159 
160     uint64_t GetPacketId() const;
161 
162     static bool IsPacketIdValid(uint64_t packetId);
163 
164     uint32_t CalculateLen() const;
165 
166 private:
167     /*
168      * data_ is waterMark when revCode_ == LOCAL_WATER_MARK_NOT_INIT || revCode_ == E_OK;
169      * data_ is timer in milliSeconds when revCode_ == -E_SAVE_DATA_NOTIFY && data_ != 0.
170      */
171     uint64_t data_ = 0;
172     int32_t recvCode_ = 0;
173     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
174     std::vector<uint64_t> reserved_;
175 };
176 
177 class ControlRequestPacket {
178 public:
ControlRequestPacket()179     ControlRequestPacket() {};
~ControlRequestPacket()180     virtual ~ControlRequestPacket() {};
181     void SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag);
182 
183     int32_t GetSendCode() const;
184     uint32_t GetVersion() const;
185     uint32_t GetcontrolCmdType() const;
186     uint32_t GetFlag() const;
187     virtual void SetQuery(const QuerySyncObject &query);
188     virtual uint32_t CalculateLen() const;
189 private:
190     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
191     int32_t sendCode_ = 0;
192     uint32_t controlCmdType_ = 0;
193     uint32_t flag_ = 0;
194 };
195 
196 class SubscribeRequest : public ControlRequestPacket {
197 public:
SubscribeRequest()198     SubscribeRequest() {};
~SubscribeRequest()199     ~SubscribeRequest() override {};
200     QuerySyncObject GetQuery() const;
201     bool IsAutoSubscribe() const;
202     void SetQuery(const QuerySyncObject &query) override;
203     uint32_t CalculateLen() const override;
204     static const uint32_t IS_AUTO_SUBSCRIBE = 0x1;
205 private:
206     QuerySyncObject query_;
207 };
208 
209 class ControlAckPacket {
210 public:
ControlAckPacket()211     ControlAckPacket() {};
~ControlAckPacket()212     virtual ~ControlAckPacket() {};
213     void SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag);
214     int32_t GetRecvCode() const;
215     uint32_t GetVersion() const;
216     uint32_t GetcontrolCmdType() const;
217     uint32_t GetFlag() const;
218     uint32_t CalculateLen() const;
219 
220 private:
221     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
222     int32_t recvCode_ = 0;
223     uint32_t controlCmdType_ = 0;
224     uint32_t flag_ = 0;
225 };
226 }  // namespace DistributedDB
227 
228 #endif // SINGLE_VER_DATA_SYNC_NEW_H