• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_data_packet.h"
17 
18 #include "generic_single_ver_kv_entry.h"
19 #include "icommunicator.h"
20 #include "parcel.h"
21 #include "query_sync_object.h"
22 #include "single_ver_data_sync_utils.h"
23 #include "single_ver_kvdb_sync_interface.h"
24 #include "sync_types.h"
25 #include "version.h"
26 
27 namespace DistributedDB {
~DataRequestPacket()28 DataRequestPacket::~DataRequestPacket()
29 {
30     for (auto &entry : data_) {
31         delete entry;
32     }
33 }
34 
SetData(std::vector<SendDataItem> & data)35 void DataRequestPacket::SetData(std::vector<SendDataItem> &data)
36 {
37     data_ = std::move(data);
38 }
39 
GetData() const40 const std::vector<SendDataItem> &DataRequestPacket::GetData() const
41 {
42     return data_;
43 }
44 
SetCompressData(std::vector<uint8_t> & compressData)45 void DataRequestPacket::SetCompressData(std::vector<uint8_t> &compressData)
46 {
47     compressData_ = std::move(compressData);
48 }
49 
GetCompressData() const50 const std::vector<uint8_t> &DataRequestPacket::GetCompressData() const
51 {
52     return compressData_;
53 }
54 
SetEndWaterMark(WaterMark waterMark)55 void DataRequestPacket::SetEndWaterMark(WaterMark waterMark)
56 {
57     endWaterMark_ = waterMark;
58 }
59 
GetEndWaterMark() const60 WaterMark DataRequestPacket::GetEndWaterMark() const
61 {
62     return endWaterMark_;
63 }
64 
SetLocalWaterMark(WaterMark waterMark)65 void DataRequestPacket::SetLocalWaterMark(WaterMark waterMark)
66 {
67     localWaterMark_ = waterMark;
68 }
69 
GetLocalWaterMark() const70 WaterMark DataRequestPacket::GetLocalWaterMark() const
71 {
72     return localWaterMark_;
73 }
74 
SetPeerWaterMark(WaterMark waterMark)75 void DataRequestPacket::SetPeerWaterMark(WaterMark waterMark)
76 {
77     peerWaterMark_ = waterMark;
78 }
79 
GetPeerWaterMark() const80 WaterMark DataRequestPacket::GetPeerWaterMark() const
81 {
82     return peerWaterMark_;
83 }
84 
SetSendCode(int32_t errCode)85 void DataRequestPacket::SetSendCode(int32_t errCode)
86 {
87     sendCode_ = errCode;
88 }
89 
GetSendCode() const90 int32_t DataRequestPacket::GetSendCode() const
91 {
92     return sendCode_;
93 }
94 
SetMode(int32_t mode)95 void DataRequestPacket::SetMode(int32_t mode)
96 {
97     mode_ = mode;
98 }
99 
GetMode() const100 int32_t DataRequestPacket::GetMode() const
101 {
102     return mode_;
103 }
104 
SetSessionId(uint32_t sessionId)105 void DataRequestPacket::SetSessionId(uint32_t sessionId)
106 {
107     sessionId_ = sessionId;
108 }
109 
GetSessionId() const110 uint32_t DataRequestPacket::GetSessionId() const
111 {
112     return sessionId_;
113 }
114 
SetVersion(uint32_t version)115 void DataRequestPacket::SetVersion(uint32_t version)
116 {
117     version_ = version;
118 }
119 
GetVersion() const120 uint32_t DataRequestPacket::GetVersion() const
121 {
122     return version_;
123 }
124 
SetReserved(std::vector<uint64_t> & reserved)125 void DataRequestPacket::SetReserved(std::vector<uint64_t> &reserved)
126 {
127     reserved_ = std::move(reserved);
128 }
129 
SetReserved(std::vector<uint64_t> && reserved)130 void DataRequestPacket::SetReserved(std::vector<uint64_t> &&reserved)
131 {
132     reserved_ = reserved;
133 }
134 
GetReserved() const135 std::vector<uint64_t> DataRequestPacket::GetReserved() const
136 {
137     return reserved_;
138 }
139 
GetPacketId() const140 uint64_t DataRequestPacket::GetPacketId() const
141 {
142     uint64_t packetId = 0;
143     std::vector<uint64_t> DataRequestReserve = GetReserved();
144     if (DataRequestReserve.size() > REQUEST_PACKET_RESERVED_INDEX_PACKETID) {
145         return DataRequestReserve[REQUEST_PACKET_RESERVED_INDEX_PACKETID];
146     } else {
147         return packetId;
148     }
149 }
150 
CalculateLen(uint32_t messageId) const151 uint32_t DataRequestPacket::CalculateLen(uint32_t messageId) const
152 {
153     uint64_t totalLen = GenericSingleVerKvEntry::CalculateLens(
154         IsCompressData() ? std::vector<SendDataItem> {} : data_, version_); // for data
155     if (totalLen == 0) {
156         return 0;
157     }
158     totalLen += Parcel::GetUInt64Len(); // endWaterMark
159     totalLen += Parcel::GetUInt64Len(); // localWaterMark
160     totalLen += Parcel::GetUInt64Len(); // peerWaterMark
161     totalLen += Parcel::GetIntLen(); // sendCode
162     totalLen += Parcel::GetIntLen(); // mode
163     totalLen += Parcel::GetUInt32Len(); // sessionId
164     totalLen += Parcel::GetUInt32Len(); // version
165     totalLen += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
166 
167     if (version_ > SOFTWARE_VERSION_RELEASE_2_0) {
168         totalLen += Parcel::GetUInt32Len(); // flag bit0 used for isLastSequence
169     }
170     totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
171     if (totalLen > INT32_MAX) {
172         return 0;
173     }
174     if (messageId == QUERY_SYNC_MESSAGE) {
175         // deleted watermark
176         totalLen += Parcel::GetUInt64Len();
177         // query id
178         totalLen += Parcel::GetStringLen(queryId_);
179         // add for queryObject
180         totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
181     }
182     if (IsCompressData()) {
183         totalLen += GenericSingleVerKvEntry::CalculateCompressedLens(compressData_); // add forcompressData_
184     }
185 
186     if (version_ > SOFTWARE_VERSION_RELEASE_2_0 && IsExtraConditionData()) {
187         totalLen += Parcel::GetUInt32Len(); // extraCondition size
188         for (const auto &entry : extraConditions_) {
189             totalLen += Parcel::GetStringLen(entry.first);
190             totalLen += Parcel::GetStringLen(entry.second);
191         }
192         totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
193     }
194     if (version_ >= SOFTWARE_VERSION_RELEASE_9_0) {
195         totalLen += Parcel::GetUInt64Len(); // schemaVersion
196         totalLen += Parcel::GetInt64Len(); // systemTimeOffset
197         totalLen += Parcel::GetInt64Len(); // senderTimeOffset
198         totalLen += Parcel::GetIntLen();   // security label
199         totalLen += Parcel::GetIntLen();   // security flag
200     }
201     if (SingleVerDataSyncUtils::IsSupportRequestTotal(version_)) {
202         totalLen += Parcel::GetUInt32Len(); // totalDataCount
203         totalLen = Parcel::GetEightByteAlign(totalLen);
204     }
205 
206     return totalLen > INT32_MAX ? 0 : totalLen;
207 }
208 
SetFlag(uint32_t flag)209 void DataRequestPacket::SetFlag(uint32_t flag)
210 {
211     flag_ = flag;
212 }
213 
GetFlag() const214 uint32_t DataRequestPacket::GetFlag() const
215 {
216     return flag_;
217 }
218 
IsLastSequence() const219 bool DataRequestPacket::IsLastSequence() const
220 {
221     return ((flag_ & IS_LAST_SEQUENCE) == IS_LAST_SEQUENCE);
222 }
223 
SetLastSequence()224 void DataRequestPacket::SetLastSequence()
225 {
226     flag_ = flag_ | IS_LAST_SEQUENCE;
227 }
228 
IsNeedUpdateWaterMark() const229 bool DataRequestPacket::IsNeedUpdateWaterMark() const
230 {
231     return (flag_ & IS_UPDATE_WATER) != IS_UPDATE_WATER;
232 }
233 
SetUpdateWaterMark()234 void DataRequestPacket::SetUpdateWaterMark()
235 {
236     flag_ = flag_ | IS_UPDATE_WATER;
237 }
238 
SetCompressDataMark()239 void DataRequestPacket::SetCompressDataMark()
240 {
241     flag_ = flag_ | IS_COMPRESS_DATA;
242 }
243 
IsCompressData() const244 bool DataRequestPacket::IsCompressData() const
245 {
246     return ((flag_ & IS_COMPRESS_DATA) == IS_COMPRESS_DATA);
247 }
248 
SetCompressAlgo(CompressAlgorithm algo)249 void DataRequestPacket::SetCompressAlgo(CompressAlgorithm algo)
250 {
251     algo_ = algo;
252 }
253 
GetCompressAlgo() const254 CompressAlgorithm DataRequestPacket::GetCompressAlgo() const
255 {
256     return algo_;
257 }
258 
SetBasicInfo(int sendCode,uint32_t version,int32_t mode)259 void DataRequestPacket::SetBasicInfo(int sendCode, uint32_t version, int32_t mode)
260 {
261     SetSendCode(sendCode);
262     SetVersion(version);
263     SetMode(mode);
264 }
265 
SetWaterMark(WaterMark localMark,WaterMark peerMark,WaterMark deletedWatermark)266 void DataRequestPacket::SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark)
267 {
268     localWaterMark_ = localMark;
269     peerWaterMark_ = peerMark;
270     deletedWatermark_ = deletedWatermark;
271 }
272 
SetQuery(const QuerySyncObject & query)273 void DataRequestPacket::SetQuery(const QuerySyncObject &query)
274 {
275     query_ = query;
276 }
277 
GetQuery() const278 QuerySyncObject DataRequestPacket::GetQuery() const
279 {
280     return query_;
281 }
282 
SetQueryId(const std::string & queryId)283 void DataRequestPacket::SetQueryId(const std::string &queryId)
284 {
285     queryId_ = queryId;
286 }
287 
GetQueryId() const288 std::string DataRequestPacket::GetQueryId() const
289 {
290     return queryId_;
291 }
292 
SetDeletedWaterMark(WaterMark watermark)293 void DataRequestPacket::SetDeletedWaterMark(WaterMark watermark)
294 {
295     deletedWatermark_ = watermark;
296 }
297 
GetDeletedWaterMark() const298 WaterMark DataRequestPacket::GetDeletedWaterMark() const
299 {
300     return deletedWatermark_;
301 }
302 
SetExtraConditions(const std::map<std::string,std::string> & extraConditions)303 void DataRequestPacket::SetExtraConditions(const std::map<std::string, std::string> &extraConditions)
304 {
305     extraConditions_ = extraConditions;
306     flag_ |= IS_CONDITION_DATA;
307 }
308 
GetExtraConditions() const309 std::map<std::string, std::string> DataRequestPacket::GetExtraConditions() const
310 {
311     return extraConditions_;
312 }
313 
IsExtraConditionData() const314 bool DataRequestPacket::IsExtraConditionData() const
315 {
316     return ((flag_ & IS_CONDITION_DATA) == IS_CONDITION_DATA);
317 }
318 
SetSchemaVersion(uint64_t schemaVersion)319 void DataRequestPacket::SetSchemaVersion(uint64_t schemaVersion)
320 {
321     schemaVersion_ = schemaVersion;
322 }
323 
GetSchemaVersion() const324 uint64_t DataRequestPacket::GetSchemaVersion() const
325 {
326     return schemaVersion_;
327 }
328 
SetSystemTimeOffset(int64_t systemTimeOffset)329 void DataRequestPacket::SetSystemTimeOffset(int64_t systemTimeOffset)
330 {
331     systemTimeOffset_ = systemTimeOffset;
332 }
333 
GetSystemTimeOffset() const334 int64_t DataRequestPacket::GetSystemTimeOffset() const
335 {
336     return systemTimeOffset_;
337 }
338 
SetSenderTimeOffset(int64_t senderTimeOffset)339 void DataRequestPacket::SetSenderTimeOffset(int64_t senderTimeOffset)
340 {
341     senderTimeOffset_ = senderTimeOffset;
342 }
343 
GetSenderTimeOffset() const344 int64_t DataRequestPacket::GetSenderTimeOffset() const
345 {
346     return senderTimeOffset_;
347 }
348 
SetSecurityOption(const SecurityOption & option)349 void DataRequestPacket::SetSecurityOption(const SecurityOption &option)
350 {
351     securityOption_ = option;
352 }
353 
GetSecurityOption() const354 SecurityOption DataRequestPacket::GetSecurityOption() const
355 {
356     return securityOption_;
357 }
358 
SetTotalDataCount(uint32_t total)359 void DataRequestPacket::SetTotalDataCount(uint32_t total)
360 {
361     totalDataCount_ = total;
362 }
363 
GetTotalDataCount() const364 uint32_t DataRequestPacket::GetTotalDataCount() const
365 {
366     return totalDataCount_;
367 }
368 
SetData(const uint64_t data)369 void DataAckPacket::SetData(const uint64_t data)
370 {
371     data_ = data;
372 }
373 
GetData() const374 uint64_t DataAckPacket::GetData() const
375 {
376     return data_;
377 }
378 
SetRecvCode(int32_t errorCode)379 void DataAckPacket::SetRecvCode(int32_t errorCode)
380 {
381     recvCode_ = errorCode;
382 }
383 
GetRecvCode() const384 int32_t DataAckPacket::GetRecvCode() const
385 {
386     return recvCode_;
387 }
388 
SetVersion(uint32_t version)389 void DataAckPacket::SetVersion(uint32_t version)
390 {
391     version_ = version;
392 }
393 
GetVersion() const394 uint32_t DataAckPacket::GetVersion() const
395 {
396     return version_;
397 }
398 
SetReserved(std::vector<uint64_t> & reserved)399 void DataAckPacket::SetReserved(std::vector<uint64_t> &reserved)
400 {
401     reserved_ = std::move(reserved);
402 }
403 
GetReserved() const404 std::vector<uint64_t> DataAckPacket::GetReserved() const
405 {
406     return reserved_;
407 }
408 
GetPacketId() const409 uint64_t DataAckPacket::GetPacketId() const
410 {
411     uint64_t packetId = 0;
412     std::vector<uint64_t> DataAckReserve = GetReserved();
413     if (DataAckReserve.size() > ACK_PACKET_RESERVED_INDEX_PACKETID) {
414         packetId = DataAckReserve[ACK_PACKET_RESERVED_INDEX_PACKETID];
415     }
416     // while remote db is close and open again, it may not carry packetId
417     // so the second index is deletewatermark if it is the query Sync, should drop the deletewatermark here
418     if (packetId > MAX_PACKETID) {
419         return 0;
420     }
421     return packetId;
422 }
423 
IsPacketIdValid(uint64_t packetId)424 bool DataAckPacket::IsPacketIdValid(uint64_t packetId)
425 {
426     return (packetId > 0);
427 }
428 
CalculateLen() const429 uint32_t DataAckPacket::CalculateLen() const
430 {
431     uint64_t len = Parcel::GetUInt64Len(); // ackWaterMark
432     len += Parcel::GetIntLen(); // recvCode
433     len += Parcel::GetUInt32Len(); // version
434     len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
435 
436     len = Parcel::GetEightByteAlign(len);
437     if (len > INT32_MAX) {
438         return 0;
439     }
440     return len;
441 }
442 
SetPacketHead(int sendCode,uint32_t version,int32_t controlCmd,uint32_t flag)443 void ControlRequestPacket::SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag)
444 {
445     sendCode_ = sendCode;
446     version_ = version;
447     controlCmdType_ = static_cast<uint32_t>(controlCmd);
448     flag_ = flag;
449 }
450 
GetSendCode() const451 int32_t ControlRequestPacket::GetSendCode() const
452 {
453     return sendCode_;
454 }
455 
GetVersion() const456 uint32_t ControlRequestPacket::GetVersion() const
457 {
458     return version_;
459 }
460 
GetcontrolCmdType() const461 uint32_t ControlRequestPacket::GetcontrolCmdType() const
462 {
463     return controlCmdType_;
464 }
465 
GetFlag() const466 uint32_t ControlRequestPacket::GetFlag() const
467 {
468     return flag_;
469 }
470 
SetQuery(const QuerySyncObject & query)471 void ControlRequestPacket::SetQuery(const QuerySyncObject &query)
472 {
473     (void)query;
474 }
475 
CalculateLen() const476 uint32_t ControlRequestPacket::CalculateLen() const
477 {
478     uint64_t len = Parcel::GetUInt32Len(); // version_
479     len += Parcel::GetIntLen(); // sendCode_
480     len += Parcel::GetUInt32Len(); // controlCmdType_
481     len += Parcel::GetUInt32Len(); // flag
482 
483     len = Parcel::GetEightByteAlign(len);
484     if (len > INT32_MAX) {
485         return 0;
486     }
487     return len;
488 }
489 
SetQuery(const QuerySyncObject & query)490 void SubscribeRequest::SetQuery(const QuerySyncObject &query)
491 {
492     query_ = query;
493 }
494 
GetQuery() const495 QuerySyncObject SubscribeRequest::GetQuery() const
496 {
497     return query_;
498 }
499 
CalculateLen() const500 uint32_t SubscribeRequest::CalculateLen() const
501 {
502     uint64_t totalLen = ControlRequestPacket::CalculateLen();
503     if (totalLen == 0) {
504         LOGE("[SubscribeRequest] cal packet len failed");
505         return 0;
506     }
507     // add for queryObject
508     totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
509     if (totalLen > INT32_MAX) {
510         return 0;
511     }
512     return totalLen;
513 }
514 
IsAutoSubscribe() const515 bool SubscribeRequest::IsAutoSubscribe() const
516 {
517     return ((GetFlag() & IS_AUTO_SUBSCRIBE) == IS_AUTO_SUBSCRIBE);
518 }
519 
SetPacketHead(int recvCode,uint32_t version,int32_t controlCmd,uint32_t flag)520 void ControlAckPacket::SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag)
521 {
522     recvCode_ = recvCode;
523     version_ = version;
524     controlCmdType_ = static_cast<uint32_t>(controlCmd);
525     flag_ = flag;
526 }
527 
GetRecvCode() const528 int32_t ControlAckPacket::GetRecvCode() const
529 {
530     return recvCode_;
531 }
532 
GetVersion() const533 uint32_t ControlAckPacket::GetVersion() const
534 {
535     return version_;
536 }
537 
GetcontrolCmdType() const538 uint32_t ControlAckPacket::GetcontrolCmdType() const
539 {
540     return controlCmdType_;
541 }
542 
GetFlag() const543 uint32_t ControlAckPacket::GetFlag() const
544 {
545     return flag_;
546 }
547 
CalculateLen() const548 uint32_t ControlAckPacket::CalculateLen() const
549 {
550     uint64_t len = Parcel::GetUInt32Len(); // version_
551     len += Parcel::GetIntLen(); // recvCode_
552     len += Parcel::GetUInt32Len(); // controlCmdType_
553     len += Parcel::GetUInt32Len(); // flag
554     len = Parcel::GetEightByteAlign(len);
555     if (len > INT32_MAX) {
556         return 0;
557     }
558     return len;
559 }
560 } // namespace DistributedDB
561