• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SINGLE_VER_DATA_PACKET_NEW_H
17 #define SINGLE_VER_DATA_PACKET_NEW_H
18 
19 #include <cstdint>
20 #include <string>
21 #include "icommunicator.h"
22 #include "parcel.h"
23 #include "query_sync_object.h"
24 #include "single_ver_kvdb_sync_interface.h"
25 #include "sync_types.h"
26 #include "version.h"
27 
28 namespace DistributedDB {
29 using SendDataItem = SingleVerKvEntry *;
30 
31 class DataRequestPacket {
32 public:
DataRequestPacket()33     DataRequestPacket() {};
34     virtual ~DataRequestPacket();
35 
36     void SetData(std::vector<SendDataItem> &data);
37 
38     const std::vector<SendDataItem> &GetData() const;
39     const std::vector<uint8_t> &GetCompressedData() const;
40 
41     void SetCompressData(std::vector<uint8_t> &compressData);
42 
43     const std::vector<uint8_t> &GetCompressData() const;
44 
45     void SetEndWaterMark(WaterMark waterMark);
46 
47     WaterMark GetEndWaterMark() const;
48 
49     void SetLocalWaterMark(WaterMark waterMark);
50 
51     WaterMark GetLocalWaterMark() const;
52 
53     void SetPeerWaterMark(WaterMark waterMark);
54 
55     WaterMark GetPeerWaterMark() const;
56 
57     void SetSendCode(int32_t errCode);
58 
59     int32_t GetSendCode() const;
60 
61     void SetMode(int32_t mode);
62 
63     int32_t GetMode() const;
64 
65     void SetSessionId(uint32_t sessionId);
66 
67     uint32_t GetSessionId() const;
68 
69     void SetVersion(uint32_t version);
70 
71     uint32_t GetVersion() const;
72 
73     uint32_t CalculateLen(uint32_t messageId) const;
74 
75     void SetReserved(std::vector<uint64_t> &reserved);
76     void SetReserved(std::vector<uint64_t> &&reserved);
77 
78     std::vector<uint64_t> GetReserved() const;
79 
80     uint64_t GetPacketId() const;
81 
82     void SetFlag(uint32_t flag);
83 
84     uint32_t GetFlag() const;
85 
86     bool IsLastSequence() const;
87 
88     void SetLastSequence();
89 
90     bool IsNeedUpdateWaterMark() const;
91 
92     void SetUpdateWaterMark();
93 
94     void SetBasicInfo(int sendCode, uint32_t version, int32_t mode);
95 
96     void SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark);
97 
98     void SetQuery(const QuerySyncObject &query);
99     QuerySyncObject GetQuery() const;
100 
101     void SetQueryId(const std::string &queryId);
102     std::string GetQueryId() const;
103 
104     void SetDeletedWaterMark(WaterMark watermark);
105     WaterMark GetDeletedWaterMark() const;
106 
107     void SetCompressDataMark();
108     bool IsCompressData() const;
109 
110     void SetCompressAlgo(CompressAlgorithm algo);
111     CompressAlgorithm GetCompressAlgo() const;
112 
113     void SetExtraConditions(const std::map<std::string, std::string> &extraConditions);
114     std::map<std::string, std::string> GetExtraConditions() const;
115     bool IsExtraConditionData() const;
116 
117     void SetSchemaVersion(uint64_t schemaVersion);
118     uint64_t GetSchemaVersion() const;
119 
120     void SetSystemTimeOffset(int64_t systemTimeOffset);
121     int64_t GetSystemTimeOffset() const;
122 
123     void SetSenderTimeOffset(int64_t senderTimeOffset);
124     int64_t GetSenderTimeOffset() const;
125 
126     void SetSecurityOption(const SecurityOption &option);
127     SecurityOption GetSecurityOption() const;
128 
129     void SetTotalDataCount(uint32_t total);
130     uint32_t GetTotalDataCount() const;
131 protected:
132     std::vector<SendDataItem> data_;
133     WaterMark endWaterMark_ = 0;
134     WaterMark localWaterMark_ = 0;
135     WaterMark peerWaterMark_ = 0;
136     int32_t sendCode_ = 0;
137     int32_t mode_ = SyncModeType::INVALID_MODE;
138     uint32_t sessionId_ = 0;
139     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
140     std::vector<uint64_t> reserved_;
141     uint32_t flag_ = 0; // bit 0 used for isLastSequence
142     // add for query sync mode
143     QuerySyncObject query_;
144     std::string queryId_;
145     WaterMark deletedWatermark_ = 0;
146     std::vector<uint8_t> compressData_; // if compressData size is above 0, means use compressData and ignore data_
147     CompressAlgorithm algo_ = CompressAlgorithm::NONE; // used for param while serialize compress data
148     std::map<std::string, std::string> extraConditions_; // use for checkpermission in annother device
149     uint64_t schemaVersion_ = 0; // sender schema version, add in 109
150     int64_t systemTimeOffset_ = 0; // sender device time offset with receiver, add in 109
151     int64_t senderTimeOffset_ = 0; // sender local time offset, add in 109
152     SecurityOption securityOption_;
153     uint32_t totalDataCount_ = 0;
154     static const uint32_t IS_LAST_SEQUENCE = 0x1; // bit 0 used for isLastSequence, 1: is last, 0: not last
155     static const uint32_t IS_UPDATE_WATER = 0x2; // bit 1 used for update watermark, 0: update, 1: not update
156     static const uint32_t IS_COMPRESS_DATA = 0x4; // bit 3 used for compress data, 0: raw data, 1: compress data
157     static const uint32_t IS_CONDITION_DATA = 0x8; // bit 4 used for extra condition data, 0: raw data
158 };
159 
160 class DataAckPacket {
161 public:
DataAckPacket()162     DataAckPacket() {};
~DataAckPacket()163     virtual ~DataAckPacket() {};
164 
165     void SetData(const uint64_t data);
166 
167     uint64_t GetData() const;
168 
169     void SetRecvCode(int32_t errorCode);
170 
171     int32_t GetRecvCode() const;
172 
173     void SetVersion(uint32_t version);
174 
175     uint32_t GetVersion() const;
176 
177     void SetReserved(std::vector<uint64_t> &reserved);
178 
179     std::vector<uint64_t> GetReserved() const;
180 
181     uint64_t GetPacketId() const;
182 
183     static bool IsPacketIdValid(uint64_t packetId);
184 
185     uint32_t CalculateLen() const;
186 
187 private:
188     /*
189      * data_ is waterMark when revCode_ == LOCAL_WATER_MARK_NOT_INIT || revCode_ == E_OK;
190      * data_ is timer in milliSeconds when revCode_ == -E_SAVE_DATA_NOTIFY && data_ != 0.
191      */
192     uint64_t data_ = 0;
193     int32_t recvCode_ = 0;
194     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
195     std::vector<uint64_t> reserved_;
196 };
197 
198 class ControlRequestPacket {
199 public:
ControlRequestPacket()200     ControlRequestPacket() {};
~ControlRequestPacket()201     virtual ~ControlRequestPacket() {};
202     void SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag);
203 
204     int32_t GetSendCode() const;
205     uint32_t GetVersion() const;
206     uint32_t GetcontrolCmdType() const;
207     uint32_t GetFlag() const;
208     virtual void SetQuery(const QuerySyncObject &query);
209     virtual uint32_t CalculateLen() const;
210 private:
211     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
212     int32_t sendCode_ = 0;
213     uint32_t controlCmdType_ = 0;
214     uint32_t flag_ = 0;
215 };
216 
217 class SubscribeRequest : public ControlRequestPacket {
218 public:
SubscribeRequest()219     SubscribeRequest() {};
~SubscribeRequest()220     ~SubscribeRequest() override {};
221     QuerySyncObject GetQuery() const;
222     bool IsAutoSubscribe() const;
223     void SetQuery(const QuerySyncObject &query) override;
224     uint32_t CalculateLen() const override;
225     static const uint32_t IS_AUTO_SUBSCRIBE = 0x1;
226 private:
227     QuerySyncObject query_;
228 };
229 
230 class ControlAckPacket {
231 public:
ControlAckPacket()232     ControlAckPacket() {};
~ControlAckPacket()233     virtual ~ControlAckPacket() {};
234     void SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag);
235     int32_t GetRecvCode() const;
236     uint32_t GetVersion() const;
237     uint32_t GetcontrolCmdType() const;
238     uint32_t GetFlag() const;
239     uint32_t CalculateLen() const;
240 
241 private:
242     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
243     int32_t recvCode_ = 0;
244     uint32_t controlCmdType_ = 0;
245     uint32_t flag_ = 0;
246 };
247 }  // namespace DistributedDB
248 
249 #endif // SINGLE_VER_DATA_SYNC_NEW_H