• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SINGLE_VER_DATA_PACKET_NEW_H
17 #define SINGLE_VER_DATA_PACKET_NEW_H
18 
19 #include "icommunicator.h"
20 #include "parcel.h"
21 #include "query_sync_object.h"
22 #include "single_ver_kvdb_sync_interface.h"
23 #include "sync_types.h"
24 #include "version.h"
25 
26 namespace DistributedDB {
27 using SendDataItem = SingleVerKvEntry *;
28 
29 class DataRequestPacket {
30 public:
DataRequestPacket()31     DataRequestPacket() {};
32     virtual ~DataRequestPacket();
33 
34     void SetData(std::vector<SendDataItem> &data);
35 
36     const std::vector<SendDataItem> &GetData() const;
37     const std::vector<uint8_t> &GetCompressedData() const;
38 
39     void SetCompressData(std::vector<uint8_t> &compressData);
40 
41     const std::vector<uint8_t> &GetCompressData() const;
42 
43     void SetEndWaterMark(WaterMark waterMark);
44 
45     WaterMark GetEndWaterMark() const;
46 
47     void SetLocalWaterMark(WaterMark waterMark);
48 
49     WaterMark GetLocalWaterMark() const;
50 
51     void SetPeerWaterMark(WaterMark waterMark);
52 
53     WaterMark GetPeerWaterMark() const;
54 
55     void SetSendCode(int32_t errCode);
56 
57     int32_t GetSendCode() const;
58 
59     void SetMode(int32_t mode);
60 
61     int32_t GetMode() const;
62 
63     void SetSessionId(uint32_t sessionId);
64 
65     uint32_t GetSessionId() const;
66 
67     void SetVersion(uint32_t version);
68 
69     uint32_t GetVersion() const;
70 
71     uint32_t CalculateLen(uint32_t messageId) const;
72 
73     void SetReserved(std::vector<uint64_t> &reserved);
74     void SetReserved(std::vector<uint64_t> &&reserved);
75 
76     std::vector<uint64_t> GetReserved() const;
77 
78     uint64_t GetPacketId() const;
79 
80     void SetFlag(uint32_t flag);
81 
82     uint32_t GetFlag() const;
83 
84     bool IsLastSequence() const;
85 
86     void SetLastSequence();
87 
88     bool IsNeedUpdateWaterMark() const;
89 
90     void SetUpdateWaterMark();
91 
92     void SetBasicInfo(int sendCode, uint32_t version, int32_t mode);
93 
94     void SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark);
95 
96     void SetQuery(const QuerySyncObject &query);
97     QuerySyncObject GetQuery() const;
98 
99     void SetQueryId(const std::string &queryId);
100     std::string GetQueryId() const;
101 
102     void SetDeletedWaterMark(WaterMark watermark);
103     WaterMark GetDeletedWaterMark() const;
104 
105     void SetCompressDataMark();
106     bool IsCompressData() const;
107 
108     void SetCompressAlgo(CompressAlgorithm algo);
109     CompressAlgorithm GetCompressAlgo() const;
110 
111     void SetExtraConditions(const std::map<std::string, std::string> &extraConditions);
112     std::map<std::string, std::string> GetExtraConditions() const;
113     bool IsExtraConditionData() const;
114 
115     void SetSchemaVersion(uint64_t schemaVersion);
116     uint64_t GetSchemaVersion() const;
117 
118     void SetSystemTimeOffset(int64_t systemTimeOffset);
119     int64_t GetSystemTimeOffset() const;
120 
121     void SetSenderTimeOffset(int64_t senderTimeOffset);
122     int64_t GetSenderTimeOffset() const;
123 
124     void SetSecurityOption(const SecurityOption &option);
125     SecurityOption GetSecurityOption() const;
126 
127     void SetTotalDataCount(uint32_t total);
128     uint32_t GetTotalDataCount() const;
129 protected:
130     std::vector<SendDataItem> data_;
131     WaterMark endWaterMark_ = 0;
132     WaterMark localWaterMark_ = 0;
133     WaterMark peerWaterMark_ = 0;
134     int32_t sendCode_ = 0;
135     int32_t mode_ = SyncModeType::INVALID_MODE;
136     uint32_t sessionId_ = 0;
137     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
138     std::vector<uint64_t> reserved_;
139     uint32_t flag_ = 0; // bit 0 used for isLastSequence
140     // add for query sync mode
141     QuerySyncObject query_;
142     std::string queryId_;
143     WaterMark deletedWatermark_ = 0;
144     std::vector<uint8_t> compressData_; // if compressData size is above 0, means use compressData and ignore data_
145     CompressAlgorithm algo_ = CompressAlgorithm::NONE; // used for param while serialize compress data
146     std::map<std::string, std::string> extraConditions_; // use for checkpermission in annother device
147     uint64_t schemaVersion_ = 0; // sender schema version, add in 109
148     int64_t systemTimeOffset_ = 0; // sender device time offset with receiver, add in 109
149     int64_t senderTimeOffset_ = 0; // sender local time offset, add in 109
150     SecurityOption securityOption_;
151     uint32_t totalDataCount_ = 0;
152     static const uint32_t IS_LAST_SEQUENCE = 0x1; // bit 0 used for isLastSequence, 1: is last, 0: not last
153     static const uint32_t IS_UPDATE_WATER = 0x2; // bit 1 used for update watermark, 0: update, 1: not update
154     static const uint32_t IS_COMPRESS_DATA = 0x4; // bit 3 used for compress data, 0: raw data, 1: compress data
155     static const uint32_t IS_CONDITION_DATA = 0x8; // bit 4 used for extra condition data, 0: raw data
156 };
157 
158 class DataAckPacket {
159 public:
DataAckPacket()160     DataAckPacket() {};
~DataAckPacket()161     virtual ~DataAckPacket() {};
162 
163     void SetData(uint64_t data);
164 
165     uint64_t GetData() const;
166 
167     void SetRecvCode(int32_t errorCode);
168 
169     int32_t GetRecvCode() const;
170 
171     void SetVersion(uint32_t version);
172 
173     uint32_t GetVersion() const;
174 
175     void SetReserved(std::vector<uint64_t> &reserved);
176 
177     std::vector<uint64_t> GetReserved() const;
178 
179     uint64_t GetPacketId() const;
180 
181     static bool IsPacketIdValid(uint64_t packetId);
182 
183     uint32_t CalculateLen() const;
184 
185 private:
186     /*
187      * data_ is waterMark when revCode_ == LOCAL_WATER_MARK_NOT_INIT || revCode_ == E_OK;
188      * data_ is timer in milliSeconds when revCode_ == -E_SAVE_DATA_NOTIFY && data_ != 0.
189      */
190     uint64_t data_ = 0;
191     int32_t recvCode_ = 0;
192     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
193     std::vector<uint64_t> reserved_;
194 };
195 
196 class ControlRequestPacket {
197 public:
ControlRequestPacket()198     ControlRequestPacket() {};
~ControlRequestPacket()199     virtual ~ControlRequestPacket() {};
200     void SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag);
201 
202     int32_t GetSendCode() const;
203     uint32_t GetVersion() const;
204     uint32_t GetcontrolCmdType() const;
205     uint32_t GetFlag() const;
206     virtual void SetQuery(const QuerySyncObject &query);
207     virtual uint32_t CalculateLen() const;
208 private:
209     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
210     int32_t sendCode_ = 0;
211     uint32_t controlCmdType_ = 0;
212     uint32_t flag_ = 0;
213 };
214 
215 class SubscribeRequest : public ControlRequestPacket {
216 public:
SubscribeRequest()217     SubscribeRequest() {};
~SubscribeRequest()218     ~SubscribeRequest() override {};
219     QuerySyncObject GetQuery() const;
220     bool IsAutoSubscribe() const;
221     void SetQuery(const QuerySyncObject &query) override;
222     uint32_t CalculateLen() const override;
223     static const uint32_t IS_AUTO_SUBSCRIBE = 0x1;
224 private:
225     QuerySyncObject query_;
226 };
227 
228 class ControlAckPacket {
229 public:
ControlAckPacket()230     ControlAckPacket() {};
~ControlAckPacket()231     virtual ~ControlAckPacket() {};
232     void SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag);
233     int32_t GetRecvCode() const;
234     uint32_t GetVersion() const;
235     uint32_t GetcontrolCmdType() const;
236     uint32_t GetFlag() const;
237     uint32_t CalculateLen() const;
238 
239 private:
240     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
241     int32_t recvCode_ = 0;
242     uint32_t controlCmdType_ = 0;
243     uint32_t flag_ = 0;
244 };
245 }  // namespace DistributedDB
246 
247 #endif // SINGLE_VER_DATA_SYNC_NEW_H