• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_data_packet.h"
17 
18 #include "generic_single_ver_kv_entry.h"
19 #include "icommunicator.h"
20 #include "parcel.h"
21 #include "query_sync_object.h"
22 #include "single_ver_data_sync_utils.h"
23 #include "single_ver_kvdb_sync_interface.h"
24 #include "sync_types.h"
25 #include "version.h"
26 
27 namespace DistributedDB {
~DataRequestPacket()28 DataRequestPacket::~DataRequestPacket()
29 {
30     for (auto &entry : data_) {
31         delete entry;
32         entry = nullptr;
33     }
34 }
35 
SetData(std::vector<SendDataItem> & data)36 void DataRequestPacket::SetData(std::vector<SendDataItem> &data)
37 {
38     data_ = std::move(data);
39 }
40 
GetData() const41 const std::vector<SendDataItem> &DataRequestPacket::GetData() const
42 {
43     return data_;
44 }
45 
SetCompressData(std::vector<uint8_t> & compressData)46 void DataRequestPacket::SetCompressData(std::vector<uint8_t> &compressData)
47 {
48     compressData_ = std::move(compressData);
49 }
50 
GetCompressData() const51 const std::vector<uint8_t> &DataRequestPacket::GetCompressData() const
52 {
53     return compressData_;
54 }
55 
SetEndWaterMark(WaterMark waterMark)56 void DataRequestPacket::SetEndWaterMark(WaterMark waterMark)
57 {
58     endWaterMark_ = waterMark;
59 }
60 
GetEndWaterMark() const61 WaterMark DataRequestPacket::GetEndWaterMark() const
62 {
63     return endWaterMark_;
64 }
65 
SetLocalWaterMark(WaterMark waterMark)66 void DataRequestPacket::SetLocalWaterMark(WaterMark waterMark)
67 {
68     localWaterMark_ = waterMark;
69 }
70 
GetLocalWaterMark() const71 WaterMark DataRequestPacket::GetLocalWaterMark() const
72 {
73     return localWaterMark_;
74 }
75 
SetPeerWaterMark(WaterMark waterMark)76 void DataRequestPacket::SetPeerWaterMark(WaterMark waterMark)
77 {
78     peerWaterMark_ = waterMark;
79 }
80 
GetPeerWaterMark() const81 WaterMark DataRequestPacket::GetPeerWaterMark() const
82 {
83     return peerWaterMark_;
84 }
85 
SetSendCode(int32_t errCode)86 void DataRequestPacket::SetSendCode(int32_t errCode)
87 {
88     sendCode_ = errCode;
89 }
90 
GetSendCode() const91 int32_t DataRequestPacket::GetSendCode() const
92 {
93     return sendCode_;
94 }
95 
SetMode(int32_t mode)96 void DataRequestPacket::SetMode(int32_t mode)
97 {
98     mode_ = mode;
99 }
100 
GetMode() const101 int32_t DataRequestPacket::GetMode() const
102 {
103     return mode_;
104 }
105 
SetSessionId(uint32_t sessionId)106 void DataRequestPacket::SetSessionId(uint32_t sessionId)
107 {
108     sessionId_ = sessionId;
109 }
110 
GetSessionId() const111 uint32_t DataRequestPacket::GetSessionId() const
112 {
113     return sessionId_;
114 }
115 
SetVersion(uint32_t version)116 void DataRequestPacket::SetVersion(uint32_t version)
117 {
118     version_ = version;
119 }
120 
GetVersion() const121 uint32_t DataRequestPacket::GetVersion() const
122 {
123     return version_;
124 }
125 
SetReserved(std::vector<uint64_t> & reserved)126 void DataRequestPacket::SetReserved(std::vector<uint64_t> &reserved)
127 {
128     reserved_ = std::move(reserved);
129 }
130 
SetReserved(std::vector<uint64_t> && reserved)131 void DataRequestPacket::SetReserved(std::vector<uint64_t> &&reserved)
132 {
133     reserved_ = reserved;
134 }
135 
GetReserved() const136 std::vector<uint64_t> DataRequestPacket::GetReserved() const
137 {
138     return reserved_;
139 }
140 
GetPacketId() const141 uint64_t DataRequestPacket::GetPacketId() const
142 {
143     uint64_t packetId = 0;
144     std::vector<uint64_t> DataRequestReserve = GetReserved();
145     if (DataRequestReserve.size() > REQUEST_PACKET_RESERVED_INDEX_PACKETID) {
146         return DataRequestReserve[REQUEST_PACKET_RESERVED_INDEX_PACKETID];
147     } else {
148         return packetId;
149     }
150 }
151 
CalculateLen(uint32_t messageId) const152 uint32_t DataRequestPacket::CalculateLen(uint32_t messageId) const
153 {
154     uint64_t totalLen = GenericSingleVerKvEntry::CalculateLens(
155         IsCompressData() ? std::vector<SendDataItem> {} : data_, version_); // for data
156     if (totalLen == 0) {
157         return 0;
158     }
159     totalLen += Parcel::GetUInt64Len(); // endWaterMark
160     totalLen += Parcel::GetUInt64Len(); // localWaterMark
161     totalLen += Parcel::GetUInt64Len(); // peerWaterMark
162     totalLen += Parcel::GetIntLen(); // sendCode
163     totalLen += Parcel::GetIntLen(); // mode
164     totalLen += Parcel::GetUInt32Len(); // sessionId
165     totalLen += Parcel::GetUInt32Len(); // version
166     totalLen += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
167 
168     if (version_ > SOFTWARE_VERSION_RELEASE_2_0) {
169         totalLen += Parcel::GetUInt32Len(); // flag bit0 used for isLastSequence
170     }
171     totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
172     if (totalLen > INT32_MAX) {
173         return 0;
174     }
175     if (messageId == QUERY_SYNC_MESSAGE) {
176         // deleted watermark
177         totalLen += Parcel::GetUInt64Len();
178         // query id
179         totalLen += Parcel::GetStringLen(queryId_);
180         // add for queryObject
181         totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
182     }
183     if (IsCompressData()) {
184         totalLen += GenericSingleVerKvEntry::CalculateCompressedLens(compressData_); // add forcompressData_
185     }
186 
187     if (version_ > SOFTWARE_VERSION_RELEASE_2_0 && IsExtraConditionData()) {
188         totalLen += Parcel::GetUInt32Len(); // extraCondition size
189         for (const auto &entry : extraConditions_) {
190             totalLen += Parcel::GetStringLen(entry.first);
191             totalLen += Parcel::GetStringLen(entry.second);
192         }
193         totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
194     }
195     if (version_ >= SOFTWARE_VERSION_RELEASE_9_0) {
196         totalLen += Parcel::GetUInt64Len(); // schemaVersion
197         totalLen += Parcel::GetInt64Len(); // systemTimeOffset
198         totalLen += Parcel::GetInt64Len(); // senderTimeOffset
199         totalLen += Parcel::GetIntLen();   // security label
200         totalLen += Parcel::GetIntLen();   // security flag
201     }
202     if (SingleVerDataSyncUtils::IsSupportRequestTotal(version_)) {
203         totalLen += Parcel::GetUInt32Len(); // totalDataCount
204         totalLen = Parcel::GetEightByteAlign(totalLen);
205     }
206 
207     return totalLen > INT32_MAX ? 0 : totalLen;
208 }
209 
SetFlag(uint32_t flag)210 void DataRequestPacket::SetFlag(uint32_t flag)
211 {
212     flag_ = flag;
213 }
214 
GetFlag() const215 uint32_t DataRequestPacket::GetFlag() const
216 {
217     return flag_;
218 }
219 
IsLastSequence() const220 bool DataRequestPacket::IsLastSequence() const
221 {
222     return ((flag_ & IS_LAST_SEQUENCE) == IS_LAST_SEQUENCE);
223 }
224 
SetLastSequence()225 void DataRequestPacket::SetLastSequence()
226 {
227     flag_ = flag_ | IS_LAST_SEQUENCE;
228 }
229 
IsNeedUpdateWaterMark() const230 bool DataRequestPacket::IsNeedUpdateWaterMark() const
231 {
232     return (flag_ & IS_UPDATE_WATER) != IS_UPDATE_WATER;
233 }
234 
SetUpdateWaterMark()235 void DataRequestPacket::SetUpdateWaterMark()
236 {
237     flag_ = flag_ | IS_UPDATE_WATER;
238 }
239 
SetCompressDataMark()240 void DataRequestPacket::SetCompressDataMark()
241 {
242     flag_ = flag_ | IS_COMPRESS_DATA;
243 }
244 
IsCompressData() const245 bool DataRequestPacket::IsCompressData() const
246 {
247     return ((flag_ & IS_COMPRESS_DATA) == IS_COMPRESS_DATA);
248 }
249 
SetCompressAlgo(CompressAlgorithm algo)250 void DataRequestPacket::SetCompressAlgo(CompressAlgorithm algo)
251 {
252     algo_ = algo;
253 }
254 
GetCompressAlgo() const255 CompressAlgorithm DataRequestPacket::GetCompressAlgo() const
256 {
257     return algo_;
258 }
259 
SetBasicInfo(int sendCode,uint32_t version,int32_t mode)260 void DataRequestPacket::SetBasicInfo(int sendCode, uint32_t version, int32_t mode)
261 {
262     SetSendCode(sendCode);
263     SetVersion(version);
264     SetMode(mode);
265 }
266 
SetWaterMark(WaterMark localMark,WaterMark peerMark,WaterMark deletedWatermark)267 void DataRequestPacket::SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark)
268 {
269     localWaterMark_ = localMark;
270     peerWaterMark_ = peerMark;
271     deletedWatermark_ = deletedWatermark;
272 }
273 
SetQuery(const QuerySyncObject & query)274 void DataRequestPacket::SetQuery(const QuerySyncObject &query)
275 {
276     query_ = query;
277 }
278 
GetQuery() const279 QuerySyncObject DataRequestPacket::GetQuery() const
280 {
281     return query_;
282 }
283 
SetQueryId(const std::string & queryId)284 void DataRequestPacket::SetQueryId(const std::string &queryId)
285 {
286     queryId_ = queryId;
287 }
288 
GetQueryId() const289 std::string DataRequestPacket::GetQueryId() const
290 {
291     return queryId_;
292 }
293 
SetDeletedWaterMark(WaterMark watermark)294 void DataRequestPacket::SetDeletedWaterMark(WaterMark watermark)
295 {
296     deletedWatermark_ = watermark;
297 }
298 
GetDeletedWaterMark() const299 WaterMark DataRequestPacket::GetDeletedWaterMark() const
300 {
301     return deletedWatermark_;
302 }
303 
SetExtraConditions(const std::map<std::string,std::string> & extraConditions)304 void DataRequestPacket::SetExtraConditions(const std::map<std::string, std::string> &extraConditions)
305 {
306     extraConditions_ = extraConditions;
307     flag_ |= IS_CONDITION_DATA;
308 }
309 
GetExtraConditions() const310 std::map<std::string, std::string> DataRequestPacket::GetExtraConditions() const
311 {
312     return extraConditions_;
313 }
314 
IsExtraConditionData() const315 bool DataRequestPacket::IsExtraConditionData() const
316 {
317     return ((flag_ & IS_CONDITION_DATA) == IS_CONDITION_DATA);
318 }
319 
SetSchemaVersion(uint64_t schemaVersion)320 void DataRequestPacket::SetSchemaVersion(uint64_t schemaVersion)
321 {
322     schemaVersion_ = schemaVersion;
323 }
324 
GetSchemaVersion() const325 uint64_t DataRequestPacket::GetSchemaVersion() const
326 {
327     return schemaVersion_;
328 }
329 
SetSystemTimeOffset(int64_t systemTimeOffset)330 void DataRequestPacket::SetSystemTimeOffset(int64_t systemTimeOffset)
331 {
332     systemTimeOffset_ = systemTimeOffset;
333 }
334 
GetSystemTimeOffset() const335 int64_t DataRequestPacket::GetSystemTimeOffset() const
336 {
337     return systemTimeOffset_;
338 }
339 
SetSenderTimeOffset(int64_t senderTimeOffset)340 void DataRequestPacket::SetSenderTimeOffset(int64_t senderTimeOffset)
341 {
342     senderTimeOffset_ = senderTimeOffset;
343 }
344 
GetSenderTimeOffset() const345 int64_t DataRequestPacket::GetSenderTimeOffset() const
346 {
347     return senderTimeOffset_;
348 }
349 
SetSecurityOption(const SecurityOption & option)350 void DataRequestPacket::SetSecurityOption(const SecurityOption &option)
351 {
352     securityOption_ = option;
353 }
354 
GetSecurityOption() const355 SecurityOption DataRequestPacket::GetSecurityOption() const
356 {
357     return securityOption_;
358 }
359 
SetTotalDataCount(uint32_t total)360 void DataRequestPacket::SetTotalDataCount(uint32_t total)
361 {
362     totalDataCount_ = total;
363 }
364 
GetTotalDataCount() const365 uint32_t DataRequestPacket::GetTotalDataCount() const
366 {
367     return totalDataCount_;
368 }
369 
SetData(uint64_t data)370 void DataAckPacket::SetData(uint64_t data)
371 {
372     data_ = data;
373 }
374 
GetData() const375 uint64_t DataAckPacket::GetData() const
376 {
377     return data_;
378 }
379 
SetRecvCode(int32_t errorCode)380 void DataAckPacket::SetRecvCode(int32_t errorCode)
381 {
382     recvCode_ = errorCode;
383 }
384 
GetRecvCode() const385 int32_t DataAckPacket::GetRecvCode() const
386 {
387     return recvCode_;
388 }
389 
SetVersion(uint32_t version)390 void DataAckPacket::SetVersion(uint32_t version)
391 {
392     version_ = version;
393 }
394 
GetVersion() const395 uint32_t DataAckPacket::GetVersion() const
396 {
397     return version_;
398 }
399 
SetReserved(std::vector<uint64_t> & reserved)400 void DataAckPacket::SetReserved(std::vector<uint64_t> &reserved)
401 {
402     reserved_ = std::move(reserved);
403 }
404 
GetReserved() const405 std::vector<uint64_t> DataAckPacket::GetReserved() const
406 {
407     return reserved_;
408 }
409 
GetPacketId() const410 uint64_t DataAckPacket::GetPacketId() const
411 {
412     uint64_t packetId = 0;
413     std::vector<uint64_t> DataAckReserve = GetReserved();
414     if (DataAckReserve.size() > ACK_PACKET_RESERVED_INDEX_PACKETID) {
415         packetId = DataAckReserve[ACK_PACKET_RESERVED_INDEX_PACKETID];
416     }
417     // while remote db is close and open again, it may not carry packetId
418     // so the second index is deletewatermark if it is the query Sync, should drop the deletewatermark here
419     if (packetId > MAX_PACKETID) {
420         return 0;
421     }
422     return packetId;
423 }
424 
IsPacketIdValid(uint64_t packetId)425 bool DataAckPacket::IsPacketIdValid(uint64_t packetId)
426 {
427     return (packetId > 0);
428 }
429 
CalculateLen() const430 uint32_t DataAckPacket::CalculateLen() const
431 {
432     uint64_t len = Parcel::GetUInt64Len(); // ackWaterMark
433     len += Parcel::GetIntLen(); // recvCode
434     len += Parcel::GetUInt32Len(); // version
435     len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
436 
437     len = Parcel::GetEightByteAlign(len);
438     if (len > INT32_MAX) {
439         return 0;
440     }
441     return len;
442 }
443 
SetPacketHead(int sendCode,uint32_t version,int32_t controlCmd,uint32_t flag)444 void ControlRequestPacket::SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag)
445 {
446     sendCode_ = sendCode;
447     version_ = version;
448     controlCmdType_ = static_cast<uint32_t>(controlCmd);
449     flag_ = flag;
450 }
451 
GetSendCode() const452 int32_t ControlRequestPacket::GetSendCode() const
453 {
454     return sendCode_;
455 }
456 
GetVersion() const457 uint32_t ControlRequestPacket::GetVersion() const
458 {
459     return version_;
460 }
461 
GetcontrolCmdType() const462 uint32_t ControlRequestPacket::GetcontrolCmdType() const
463 {
464     return controlCmdType_;
465 }
466 
GetFlag() const467 uint32_t ControlRequestPacket::GetFlag() const
468 {
469     return flag_;
470 }
471 
SetQuery(const QuerySyncObject & query)472 void ControlRequestPacket::SetQuery(const QuerySyncObject &query)
473 {
474     (void)query;
475 }
476 
CalculateLen() const477 uint32_t ControlRequestPacket::CalculateLen() const
478 {
479     uint64_t len = Parcel::GetUInt32Len(); // version_
480     len += Parcel::GetIntLen(); // sendCode_
481     len += Parcel::GetUInt32Len(); // controlCmdType_
482     len += Parcel::GetUInt32Len(); // flag
483 
484     len = Parcel::GetEightByteAlign(len);
485     if (len > INT32_MAX) {
486         return 0;
487     }
488     return len;
489 }
490 
SetQuery(const QuerySyncObject & query)491 void SubscribeRequest::SetQuery(const QuerySyncObject &query)
492 {
493     query_ = query;
494 }
495 
GetQuery() const496 QuerySyncObject SubscribeRequest::GetQuery() const
497 {
498     return query_;
499 }
500 
CalculateLen() const501 uint32_t SubscribeRequest::CalculateLen() const
502 {
503     uint64_t totalLen = ControlRequestPacket::CalculateLen();
504     if (totalLen == 0) {
505         LOGE("[SubscribeRequest] cal packet len failed");
506         return 0;
507     }
508     // add for queryObject
509     totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
510     if (totalLen > INT32_MAX) {
511         return 0;
512     }
513     return totalLen;
514 }
515 
IsAutoSubscribe() const516 bool SubscribeRequest::IsAutoSubscribe() const
517 {
518     return ((GetFlag() & IS_AUTO_SUBSCRIBE) == IS_AUTO_SUBSCRIBE);
519 }
520 
SetPacketHead(int recvCode,uint32_t version,int32_t controlCmd,uint32_t flag)521 void ControlAckPacket::SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag)
522 {
523     recvCode_ = recvCode;
524     version_ = version;
525     controlCmdType_ = static_cast<uint32_t>(controlCmd);
526     flag_ = flag;
527 }
528 
GetRecvCode() const529 int32_t ControlAckPacket::GetRecvCode() const
530 {
531     return recvCode_;
532 }
533 
GetVersion() const534 uint32_t ControlAckPacket::GetVersion() const
535 {
536     return version_;
537 }
538 
GetcontrolCmdType() const539 uint32_t ControlAckPacket::GetcontrolCmdType() const
540 {
541     return controlCmdType_;
542 }
543 
GetFlag() const544 uint32_t ControlAckPacket::GetFlag() const
545 {
546     return flag_;
547 }
548 
CalculateLen() const549 uint32_t ControlAckPacket::CalculateLen() const
550 {
551     uint64_t len = Parcel::GetUInt32Len(); // version_
552     len += Parcel::GetIntLen(); // recvCode_
553     len += Parcel::GetUInt32Len(); // controlCmdType_
554     len += Parcel::GetUInt32Len(); // flag
555     len = Parcel::GetEightByteAlign(len);
556     if (len > INT32_MAX) {
557         return 0;
558     }
559     return len;
560 }
561 } // namespace DistributedDB
562