• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SINGLE_VER_DATA_PACKET_NEW_H
17 #define SINGLE_VER_DATA_PACKET_NEW_H
18 
19 #include <cstdint>
20 #include <string>
21 #include "icommunicator.h"
22 #include "parcel.h"
23 #include "query_sync_object.h"
24 #include "single_ver_kvdb_sync_interface.h"
25 #include "sync_types.h"
26 #include "version.h"
27 
28 namespace DistributedDB {
29 using SendDataItem = SingleVerKvEntry *;
30 
31 class DataRequestPacket {
32 public:
DataRequestPacket()33     DataRequestPacket() {};
34     ~DataRequestPacket();
35 
36     void SetData(std::vector<SendDataItem> &data);
37 
38     const std::vector<SendDataItem> &GetData() const;
39 
40     void SetCompressData(std::vector<uint8_t> &compressData);
41 
42     const std::vector<uint8_t> &GetCompressData() const;
43 
44     void SetEndWaterMark(WaterMark waterMark);
45 
46     WaterMark GetEndWaterMark() const;
47 
48     void SetLocalWaterMark(WaterMark waterMark);
49 
50     WaterMark GetLocalWaterMark() const;
51 
52     void SetPeerWaterMark(WaterMark waterMark);
53 
54     WaterMark GetPeerWaterMark() const;
55 
56     void SetSendCode(int32_t errCode);
57 
58     int32_t GetSendCode() const;
59 
60     void SetMode(int32_t mode);
61 
62     int32_t GetMode() const;
63 
64     void SetSessionId(uint32_t sessionId);
65 
66     uint32_t GetSessionId() const;
67 
68     void SetVersion(uint32_t version);
69 
70     uint32_t GetVersion() const;
71 
72     uint32_t CalculateLen(uint32_t messageId) const;
73 
74     void SetReserved(std::vector<uint64_t> &reserved);
75     void SetReserved(std::vector<uint64_t> &&reserved);
76 
77     std::vector<uint64_t> GetReserved() const;
78 
79     uint64_t GetPacketId() const;
80 
81     void SetFlag(uint32_t flag);
82 
83     uint32_t GetFlag() const;
84 
85     bool IsLastSequence() const;
86 
87     void SetLastSequence();
88 
89     bool IsNeedUpdateWaterMark() const;
90 
91     void SetUpdateWaterMark();
92 
93     void SetBasicInfo(int sendCode, uint32_t version, int32_t mode);
94 
95     void SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark);
96 
97     void SetQuery(const QuerySyncObject &query);
98     QuerySyncObject GetQuery() const;
99 
100     void SetQueryId(const std::string &queryId);
101     std::string GetQueryId() const;
102 
103     void SetDeletedWaterMark(WaterMark watermark);
104     WaterMark GetDeletedWaterMark() const;
105 
106     void SetCompressDataMark();
107     bool IsCompressData() const;
108 
109     void SetCompressAlgo(CompressAlgorithm algo);
110     CompressAlgorithm GetCompressAlgo() const;
111 
112     void SetExtraConditions(const std::map<std::string, std::string> &extraConditions);
113     std::map<std::string, std::string> GetExtraConditions() const;
114     bool IsExtraConditionData() const;
115 
116     void SetSchemaVersion(uint64_t schemaVersion);
117     uint64_t GetSchemaVersion() const;
118 
119     void SetSystemTimeOffset(int64_t systemTimeOffset);
120     int64_t GetSystemTimeOffset() const;
121 
122     void SetSenderTimeOffset(int64_t senderTimeOffset);
123     int64_t GetSenderTimeOffset() const;
124 
125     void SetSecurityOption(const SecurityOption &option);
126     SecurityOption GetSecurityOption() const;
127 
128     void SetTotalDataCount(uint32_t total);
129     uint32_t GetTotalDataCount() const;
130 protected:
131     std::vector<SendDataItem> data_;
132     WaterMark endWaterMark_ = 0;
133     WaterMark localWaterMark_ = 0;
134     WaterMark peerWaterMark_ = 0;
135     int32_t sendCode_ = 0;
136     int32_t mode_ = SyncModeType::INVALID_MODE;
137     uint32_t sessionId_ = 0;
138     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
139     std::vector<uint64_t> reserved_;
140     uint32_t flag_ = 0; // bit 0 used for isLastSequence
141     // add for query sync mode
142     QuerySyncObject query_;
143     std::string queryId_;
144     WaterMark deletedWatermark_ = 0;
145     std::vector<uint8_t> compressData_; // if compressData size is above 0, means use compressData and ignore data_
146     CompressAlgorithm algo_ = CompressAlgorithm::NONE; // used for param while serialize compress data
147     std::map<std::string, std::string> extraConditions_; // use for checkpermission in annother device
148     uint64_t schemaVersion_ = 0; // sender schema version, add in 109
149     int64_t systemTimeOffset_ = 0; // sender device time offset with receiver, add in 109
150     int64_t senderTimeOffset_ = 0; // sender local time offset, add in 109
151     SecurityOption securityOption_;
152     uint32_t totalDataCount_ = 0;
153     static const uint32_t IS_LAST_SEQUENCE = 0x1; // bit 0 used for isLastSequence, 1: is last, 0: not last
154     static const uint32_t IS_UPDATE_WATER = 0x2; // bit 1 used for update watermark, 0: update, 1: not update
155     static const uint32_t IS_COMPRESS_DATA = 0x4; // bit 3 used for compress data, 0: raw data, 1: compress data
156     static const uint32_t IS_CONDITION_DATA = 0x8; // bit 4 used for extra condition data, 0: raw data
157 };
158 
159 class DataAckPacket {
160 public:
DataAckPacket()161     DataAckPacket() {};
~DataAckPacket()162     ~DataAckPacket() {};
163 
164     void SetData(const uint64_t data);
165 
166     uint64_t GetData() const;
167 
168     void SetRecvCode(int32_t errorCode);
169 
170     int32_t GetRecvCode() const;
171 
172     void SetVersion(uint32_t version);
173 
174     uint32_t GetVersion() const;
175 
176     void SetReserved(std::vector<uint64_t> &reserved);
177 
178     std::vector<uint64_t> GetReserved() const;
179 
180     uint64_t GetPacketId() const;
181 
182     static bool IsPacketIdValid(uint64_t packetId);
183 
184     uint32_t CalculateLen() const;
185 
186 private:
187     /*
188      * data_ is waterMark when revCode_ == LOCAL_WATER_MARK_NOT_INIT || revCode_ == E_OK;
189      * data_ is timer in milliSeconds when revCode_ == -E_SAVE_DATA_NOTIFY && data_ != 0.
190      */
191     uint64_t data_ = 0;
192     int32_t recvCode_ = 0;
193     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
194     std::vector<uint64_t> reserved_;
195 };
196 
197 class ControlRequestPacket {
198 public:
ControlRequestPacket()199     ControlRequestPacket() {};
~ControlRequestPacket()200     virtual ~ControlRequestPacket() {};
201     void SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag);
202 
203     int32_t GetSendCode() const;
204     uint32_t GetVersion() const;
205     uint32_t GetcontrolCmdType() const;
206     uint32_t GetFlag() const;
207     virtual void SetQuery(const QuerySyncObject &query);
208     virtual uint32_t CalculateLen() const;
209 private:
210     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
211     int32_t sendCode_ = 0;
212     uint32_t controlCmdType_ = 0;
213     uint32_t flag_ = 0;
214 };
215 
216 class SubscribeRequest : public ControlRequestPacket {
217 public:
SubscribeRequest()218     SubscribeRequest() {};
~SubscribeRequest()219     ~SubscribeRequest() override {};
220     QuerySyncObject GetQuery() const;
221     bool IsAutoSubscribe() const;
222     void SetQuery(const QuerySyncObject &query) override;
223     uint32_t CalculateLen() const override;
224     static const uint32_t IS_AUTO_SUBSCRIBE = 0x1;
225 private:
226     QuerySyncObject query_;
227 };
228 
229 class ControlAckPacket {
230 public:
ControlAckPacket()231     ControlAckPacket() {};
~ControlAckPacket()232     ~ControlAckPacket() {};
233     void SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag);
234     int32_t GetRecvCode() const;
235     uint32_t GetVersion() const;
236     uint32_t GetcontrolCmdType() const;
237     uint32_t GetFlag() const;
238     uint32_t CalculateLen() const;
239 
240 private:
241     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
242     int32_t recvCode_ = 0;
243     uint32_t controlCmdType_ = 0;
244     uint32_t flag_ = 0;
245 };
246 }  // namespace DistributedDB
247 
248 #endif // SINGLE_VER_DATA_SYNC_NEW_H