• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_data_packet.h"
17 #include "icommunicator.h"
18 #include "single_ver_kvdb_sync_interface.h"
19 #include "query_sync_object.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "sync_types.h"
22 #include "version.h"
23 #include "parcel.h"
24 
25 namespace DistributedDB {
~DataRequestPacket()26 DataRequestPacket::~DataRequestPacket()
27 {
28     for (auto &entry : data_) {
29         delete entry;
30         entry = nullptr;
31     }
32 }
33 
SetData(std::vector<SendDataItem> & data)34 void DataRequestPacket::SetData(std::vector<SendDataItem> &data)
35 {
36     data_ = std::move(data);
37 }
38 
GetData() const39 const std::vector<SendDataItem> &DataRequestPacket::GetData() const
40 {
41     return data_;
42 }
43 
SetCompressData(std::vector<uint8_t> & compressData)44 void DataRequestPacket::SetCompressData(std::vector<uint8_t> &compressData)
45 {
46     compressData_ = std::move(compressData);
47 }
48 
GetCompressData() const49 const std::vector<uint8_t> &DataRequestPacket::GetCompressData() const
50 {
51     return compressData_;
52 }
53 
SetEndWaterMark(WaterMark waterMark)54 void DataRequestPacket::SetEndWaterMark(WaterMark waterMark)
55 {
56     endWaterMark_ = waterMark;
57 }
58 
GetEndWaterMark() const59 WaterMark DataRequestPacket::GetEndWaterMark() const
60 {
61     return endWaterMark_;
62 }
63 
SetLocalWaterMark(WaterMark waterMark)64 void DataRequestPacket::SetLocalWaterMark(WaterMark waterMark)
65 {
66     localWaterMark_ = waterMark;
67 }
68 
GetLocalWaterMark() const69 WaterMark DataRequestPacket::GetLocalWaterMark() const
70 {
71     return localWaterMark_;
72 }
73 
SetPeerWaterMark(WaterMark waterMark)74 void DataRequestPacket::SetPeerWaterMark(WaterMark waterMark)
75 {
76     peerWaterMark_ = waterMark;
77 }
78 
GetPeerWaterMark() const79 WaterMark DataRequestPacket::GetPeerWaterMark() const
80 {
81     return peerWaterMark_;
82 }
83 
SetSendCode(int32_t errCode)84 void DataRequestPacket::SetSendCode(int32_t errCode)
85 {
86     sendCode_ = errCode;
87 }
88 
GetSendCode() const89 int32_t DataRequestPacket::GetSendCode() const
90 {
91     return sendCode_;
92 }
93 
SetMode(int32_t mode)94 void DataRequestPacket::SetMode(int32_t mode)
95 {
96     mode_ = mode;
97 }
98 
GetMode() const99 int32_t DataRequestPacket::GetMode() const
100 {
101     return mode_;
102 }
103 
SetSessionId(uint32_t sessionId)104 void DataRequestPacket::SetSessionId(uint32_t sessionId)
105 {
106     sessionId_ = sessionId;
107 }
108 
GetSessionId() const109 uint32_t DataRequestPacket::GetSessionId() const
110 {
111     return sessionId_;
112 }
113 
SetVersion(uint32_t version)114 void DataRequestPacket::SetVersion(uint32_t version)
115 {
116     version_ = version;
117 }
118 
GetVersion() const119 uint32_t DataRequestPacket::GetVersion() const
120 {
121     return version_;
122 }
123 
SetReserved(std::vector<uint64_t> & reserved)124 void DataRequestPacket::SetReserved(std::vector<uint64_t> &reserved)
125 {
126     reserved_ = std::move(reserved);
127 }
128 
SetReserved(std::vector<uint64_t> && reserved)129 void DataRequestPacket::SetReserved(std::vector<uint64_t> &&reserved)
130 {
131     reserved_ = reserved;
132 }
133 
GetReserved() const134 std::vector<uint64_t> DataRequestPacket::GetReserved() const
135 {
136     return reserved_;
137 }
138 
GetPacketId() const139 uint64_t DataRequestPacket::GetPacketId() const
140 {
141     uint64_t packetId = 0;
142     std::vector<uint64_t> DataRequestReserve = GetReserved();
143     if (DataRequestReserve.size() > REQUEST_PACKET_RESERVED_INDEX_PACKETID) {
144         return DataRequestReserve[REQUEST_PACKET_RESERVED_INDEX_PACKETID];
145     } else {
146         return packetId;
147     }
148 }
149 
CalculateLen(uint32_t messageId) const150 uint32_t DataRequestPacket::CalculateLen(uint32_t messageId) const
151 {
152     uint64_t totalLen = GenericSingleVerKvEntry::CalculateLens(
153         IsCompressData() ? std::vector<SendDataItem> {} : data_, version_); // for data
154     if (totalLen == 0) {
155         return 0;
156     }
157     totalLen += Parcel::GetUInt64Len(); // endWaterMark
158     totalLen += Parcel::GetUInt64Len(); // localWaterMark
159     totalLen += Parcel::GetUInt64Len(); // peerWaterMark
160     totalLen += Parcel::GetIntLen(); // sendCode
161     totalLen += Parcel::GetIntLen(); // mode
162     totalLen += Parcel::GetUInt32Len(); // sessionId
163     totalLen += Parcel::GetUInt32Len(); // version
164     totalLen += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
165 
166     if (version_ > SOFTWARE_VERSION_RELEASE_2_0) {
167         totalLen += Parcel::GetUInt32Len(); // flag bit0 used for isLastSequence
168     }
169     totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
170     if (totalLen > INT32_MAX) {
171         return 0;
172     }
173     if (messageId == QUERY_SYNC_MESSAGE) {
174         // deleted watermark
175         totalLen += Parcel::GetUInt64Len();
176         // query id
177         totalLen += Parcel::GetStringLen(queryId_);
178         // add for queryObject
179         totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
180     }
181     if (IsCompressData()) {
182         totalLen += GenericSingleVerKvEntry::CalculateCompressedLens(compressData_); // add forcompressData_
183     }
184 
185     if (version_ > SOFTWARE_VERSION_RELEASE_2_0 && IsExtraConditionData()) {
186         totalLen += Parcel::GetUInt32Len(); // extraCondition size
187         for (const auto &entry : extraConditions_) {
188             totalLen += Parcel::GetStringLen(entry.first);
189             totalLen += Parcel::GetStringLen(entry.second);
190         }
191         totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
192     }
193     if (totalLen > INT32_MAX) {
194         return 0;
195     }
196     return totalLen;
197 }
198 
SetFlag(uint32_t flag)199 void DataRequestPacket::SetFlag(uint32_t flag)
200 {
201     flag_ = flag;
202 }
203 
GetFlag() const204 uint32_t DataRequestPacket::GetFlag() const
205 {
206     return flag_;
207 }
208 
IsLastSequence() const209 bool DataRequestPacket::IsLastSequence() const
210 {
211     return ((flag_ & IS_LAST_SEQUENCE) == IS_LAST_SEQUENCE);
212 }
213 
SetLastSequence()214 void DataRequestPacket::SetLastSequence()
215 {
216     flag_ = flag_ | IS_LAST_SEQUENCE;
217 }
218 
IsNeedUpdateWaterMark() const219 bool DataRequestPacket::IsNeedUpdateWaterMark() const
220 {
221     return (flag_ & IS_UPDATE_WATER) != IS_UPDATE_WATER;
222 }
223 
SetUpdateWaterMark()224 void DataRequestPacket::SetUpdateWaterMark()
225 {
226     flag_ = flag_ | IS_UPDATE_WATER;
227 }
228 
SetCompressDataMark()229 void DataRequestPacket::SetCompressDataMark()
230 {
231     flag_ = flag_ | IS_COMPRESS_DATA;
232 }
233 
IsCompressData() const234 bool DataRequestPacket::IsCompressData() const
235 {
236     return ((flag_ & IS_COMPRESS_DATA) == IS_COMPRESS_DATA);
237 }
238 
SetCompressAlgo(CompressAlgorithm algo)239 void DataRequestPacket::SetCompressAlgo(CompressAlgorithm algo)
240 {
241     algo_ = algo;
242 }
243 
GetCompressAlgo() const244 CompressAlgorithm DataRequestPacket::GetCompressAlgo() const
245 {
246     return algo_;
247 }
248 
SetBasicInfo(int sendCode,uint32_t version,int32_t mode)249 void DataRequestPacket::SetBasicInfo(int sendCode, uint32_t version, int32_t mode)
250 {
251     SetSendCode(sendCode);
252     SetVersion(version);
253     SetMode(mode);
254 }
255 
SetWaterMark(WaterMark localMark,WaterMark peerMark,WaterMark deletedWatermark)256 void DataRequestPacket::SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark)
257 {
258     localWaterMark_ = localMark;
259     peerWaterMark_ = peerMark;
260     deletedWatermark_ = deletedWatermark;
261 }
262 
SetQuery(const QuerySyncObject & query)263 void DataRequestPacket::SetQuery(const QuerySyncObject &query)
264 {
265     query_ = query;
266 }
267 
GetQuery() const268 QuerySyncObject DataRequestPacket::GetQuery() const
269 {
270     return query_;
271 }
272 
SetQueryId(const std::string & queryId)273 void DataRequestPacket::SetQueryId(const std::string &queryId)
274 {
275     queryId_ = queryId;
276 }
277 
GetQueryId() const278 std::string DataRequestPacket::GetQueryId() const
279 {
280     return queryId_;
281 }
282 
SetDeletedWaterMark(WaterMark watermark)283 void DataRequestPacket::SetDeletedWaterMark(WaterMark watermark)
284 {
285     deletedWatermark_ = watermark;
286 }
287 
GetDeletedWaterMark() const288 WaterMark DataRequestPacket::GetDeletedWaterMark() const
289 {
290     return deletedWatermark_;
291 }
292 
SetExtraConditions(const std::map<std::string,std::string> & extraConditions)293 void DataRequestPacket::SetExtraConditions(const std::map<std::string, std::string> &extraConditions)
294 {
295     extraConditions_ = extraConditions;
296     flag_ |= IS_CONDITION_DATA;
297 }
298 
GetExtraConditions() const299 std::map<std::string, std::string> DataRequestPacket::GetExtraConditions() const
300 {
301     return extraConditions_;
302 }
303 
IsExtraConditionData() const304 bool DataRequestPacket::IsExtraConditionData() const
305 {
306     return ((flag_ & IS_CONDITION_DATA) == IS_CONDITION_DATA);
307 }
308 
SetData(uint64_t data)309 void DataAckPacket::SetData(uint64_t data)
310 {
311     data_ = data;
312 }
313 
GetData() const314 uint64_t DataAckPacket::GetData() const
315 {
316     return data_;
317 }
318 
SetRecvCode(int32_t errorCode)319 void DataAckPacket::SetRecvCode(int32_t errorCode)
320 {
321     recvCode_ = errorCode;
322 }
323 
GetRecvCode() const324 int32_t DataAckPacket::GetRecvCode() const
325 {
326     return recvCode_;
327 }
328 
SetVersion(uint32_t version)329 void DataAckPacket::SetVersion(uint32_t version)
330 {
331     version_ = version;
332 }
333 
GetVersion() const334 uint32_t DataAckPacket::GetVersion() const
335 {
336     return version_;
337 }
338 
SetReserved(std::vector<uint64_t> & reserved)339 void DataAckPacket::SetReserved(std::vector<uint64_t> &reserved)
340 {
341     reserved_ = std::move(reserved);
342 }
343 
GetReserved() const344 std::vector<uint64_t> DataAckPacket::GetReserved() const
345 {
346     return reserved_;
347 }
348 
GetPacketId() const349 uint64_t DataAckPacket::GetPacketId() const
350 {
351     uint64_t packetId = 0;
352     std::vector<uint64_t> DataAckReserve = GetReserved();
353     if (DataAckReserve.size() > ACK_PACKET_RESERVED_INDEX_PACKETID) {
354         packetId = DataAckReserve[ACK_PACKET_RESERVED_INDEX_PACKETID];
355     }
356     // while remote db is close and open again, it may not carry packetId
357     // so the second index is deletewatermark if it is the query Sync, should drop the deletewatermark here
358     if (packetId > MAX_PACKETID) {
359         return 0;
360     }
361     return packetId;
362 }
363 
IsPacketIdValid(uint64_t packetId)364 bool DataAckPacket::IsPacketIdValid(uint64_t packetId)
365 {
366     return (packetId > 0);
367 }
368 
CalculateLen() const369 uint32_t DataAckPacket::CalculateLen() const
370 {
371     uint64_t len = Parcel::GetUInt64Len(); // ackWaterMark
372     len += Parcel::GetIntLen(); // recvCode
373     len += Parcel::GetUInt32Len(); // version
374     len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
375 
376     len = Parcel::GetEightByteAlign(len);
377     if (len > INT32_MAX) {
378         return 0;
379     }
380     return len;
381 }
382 
SetPacketHead(int sendCode,uint32_t version,int32_t controlCmd,uint32_t flag)383 void ControlRequestPacket::SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag)
384 {
385     sendCode_ = sendCode;
386     version_ = version;
387     controlCmdType_ = static_cast<uint32_t>(controlCmd);
388     flag_ = flag;
389 }
390 
GetSendCode() const391 int32_t ControlRequestPacket::GetSendCode() const
392 {
393     return sendCode_;
394 }
395 
GetVersion() const396 uint32_t ControlRequestPacket::GetVersion() const
397 {
398     return version_;
399 }
400 
GetcontrolCmdType() const401 uint32_t ControlRequestPacket::GetcontrolCmdType() const
402 {
403     return controlCmdType_;
404 }
405 
GetFlag() const406 uint32_t ControlRequestPacket::GetFlag() const
407 {
408     return flag_;
409 }
410 
SetQuery(const QuerySyncObject & query)411 void ControlRequestPacket::SetQuery(const QuerySyncObject &query)
412 {
413     (void)query;
414 }
415 
CalculateLen() const416 uint32_t ControlRequestPacket::CalculateLen() const
417 {
418     uint64_t len = Parcel::GetUInt32Len(); // version_
419     len += Parcel::GetIntLen(); // sendCode_
420     len += Parcel::GetUInt32Len(); // controlCmdType_
421     len += Parcel::GetUInt32Len(); // flag
422 
423     len = Parcel::GetEightByteAlign(len);
424     if (len > INT32_MAX) {
425         return 0;
426     }
427     return len;
428 }
429 
SetQuery(const QuerySyncObject & query)430 void SubscribeRequest::SetQuery(const QuerySyncObject &query)
431 {
432     query_ = query;
433 }
434 
GetQuery() const435 QuerySyncObject SubscribeRequest::GetQuery() const
436 {
437     return query_;
438 }
439 
CalculateLen() const440 uint32_t SubscribeRequest::CalculateLen() const
441 {
442     uint64_t totalLen = ControlRequestPacket::CalculateLen();
443     if (totalLen == 0) {
444         LOGE("[SubscribeRequest] cal packet len failed");
445         return 0;
446     }
447     // add for queryObject
448     totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
449     if (totalLen > INT32_MAX) {
450         return 0;
451     }
452     return totalLen;
453 }
454 
IsAutoSubscribe() const455 bool SubscribeRequest::IsAutoSubscribe() const
456 {
457     return ((GetFlag() & IS_AUTO_SUBSCRIBE) == IS_AUTO_SUBSCRIBE);
458 }
459 
SetPacketHead(int recvCode,uint32_t version,int32_t controlCmd,uint32_t flag)460 void ControlAckPacket::SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag)
461 {
462     recvCode_ = recvCode;
463     version_ = version;
464     controlCmdType_ = static_cast<uint32_t>(controlCmd);
465     flag_ = flag;
466 }
467 
GetRecvCode() const468 int32_t ControlAckPacket::GetRecvCode() const
469 {
470     return recvCode_;
471 }
472 
GetVersion() const473 uint32_t ControlAckPacket::GetVersion() const
474 {
475     return version_;
476 }
477 
GetcontrolCmdType() const478 uint32_t ControlAckPacket::GetcontrolCmdType() const
479 {
480     return controlCmdType_;
481 }
482 
GetFlag() const483 uint32_t ControlAckPacket::GetFlag() const
484 {
485     return flag_;
486 }
487 
CalculateLen() const488 uint32_t ControlAckPacket::CalculateLen() const
489 {
490     uint64_t len = Parcel::GetUInt32Len(); // version_
491     len += Parcel::GetIntLen(); // recvCode_
492     len += Parcel::GetUInt32Len(); // controlCmdType_
493     len += Parcel::GetUInt32Len(); // flag
494     len = Parcel::GetEightByteAlign(len);
495     if (len > INT32_MAX) {
496         return 0;
497     }
498     return len;
499 }
500 } // namespace DistributedDB
501