1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_data_packet.h"
17
18 #include "generic_single_ver_kv_entry.h"
19 #include "icommunicator.h"
20 #include "parcel.h"
21 #include "query_sync_object.h"
22 #include "single_ver_data_sync_utils.h"
23 #include "single_ver_kvdb_sync_interface.h"
24 #include "sync_types.h"
25 #include "version.h"
26
27 namespace DistributedDB {
~DataRequestPacket()28 DataRequestPacket::~DataRequestPacket()
29 {
30 for (auto &entry : data_) {
31 delete entry;
32 entry = nullptr;
33 }
34 }
35
SetData(std::vector<SendDataItem> & data)36 void DataRequestPacket::SetData(std::vector<SendDataItem> &data)
37 {
38 data_ = std::move(data);
39 }
40
GetData() const41 const std::vector<SendDataItem> &DataRequestPacket::GetData() const
42 {
43 return data_;
44 }
45
SetCompressData(std::vector<uint8_t> & compressData)46 void DataRequestPacket::SetCompressData(std::vector<uint8_t> &compressData)
47 {
48 compressData_ = std::move(compressData);
49 }
50
GetCompressData() const51 const std::vector<uint8_t> &DataRequestPacket::GetCompressData() const
52 {
53 return compressData_;
54 }
55
SetEndWaterMark(WaterMark waterMark)56 void DataRequestPacket::SetEndWaterMark(WaterMark waterMark)
57 {
58 endWaterMark_ = waterMark;
59 }
60
GetEndWaterMark() const61 WaterMark DataRequestPacket::GetEndWaterMark() const
62 {
63 return endWaterMark_;
64 }
65
SetLocalWaterMark(WaterMark waterMark)66 void DataRequestPacket::SetLocalWaterMark(WaterMark waterMark)
67 {
68 localWaterMark_ = waterMark;
69 }
70
GetLocalWaterMark() const71 WaterMark DataRequestPacket::GetLocalWaterMark() const
72 {
73 return localWaterMark_;
74 }
75
SetPeerWaterMark(WaterMark waterMark)76 void DataRequestPacket::SetPeerWaterMark(WaterMark waterMark)
77 {
78 peerWaterMark_ = waterMark;
79 }
80
GetPeerWaterMark() const81 WaterMark DataRequestPacket::GetPeerWaterMark() const
82 {
83 return peerWaterMark_;
84 }
85
SetSendCode(int32_t errCode)86 void DataRequestPacket::SetSendCode(int32_t errCode)
87 {
88 sendCode_ = errCode;
89 }
90
GetSendCode() const91 int32_t DataRequestPacket::GetSendCode() const
92 {
93 return sendCode_;
94 }
95
SetMode(int32_t mode)96 void DataRequestPacket::SetMode(int32_t mode)
97 {
98 mode_ = mode;
99 }
100
GetMode() const101 int32_t DataRequestPacket::GetMode() const
102 {
103 return mode_;
104 }
105
SetSessionId(uint32_t sessionId)106 void DataRequestPacket::SetSessionId(uint32_t sessionId)
107 {
108 sessionId_ = sessionId;
109 }
110
GetSessionId() const111 uint32_t DataRequestPacket::GetSessionId() const
112 {
113 return sessionId_;
114 }
115
SetVersion(uint32_t version)116 void DataRequestPacket::SetVersion(uint32_t version)
117 {
118 version_ = version;
119 }
120
GetVersion() const121 uint32_t DataRequestPacket::GetVersion() const
122 {
123 return version_;
124 }
125
SetReserved(std::vector<uint64_t> & reserved)126 void DataRequestPacket::SetReserved(std::vector<uint64_t> &reserved)
127 {
128 reserved_ = std::move(reserved);
129 }
130
SetReserved(std::vector<uint64_t> && reserved)131 void DataRequestPacket::SetReserved(std::vector<uint64_t> &&reserved)
132 {
133 reserved_ = reserved;
134 }
135
GetReserved() const136 std::vector<uint64_t> DataRequestPacket::GetReserved() const
137 {
138 return reserved_;
139 }
140
GetPacketId() const141 uint64_t DataRequestPacket::GetPacketId() const
142 {
143 uint64_t packetId = 0;
144 std::vector<uint64_t> DataRequestReserve = GetReserved();
145 if (DataRequestReserve.size() > REQUEST_PACKET_RESERVED_INDEX_PACKETID) {
146 return DataRequestReserve[REQUEST_PACKET_RESERVED_INDEX_PACKETID];
147 } else {
148 return packetId;
149 }
150 }
151
CalculateLen(uint32_t messageId) const152 uint32_t DataRequestPacket::CalculateLen(uint32_t messageId) const
153 {
154 uint64_t totalLen = GenericSingleVerKvEntry::CalculateLens(
155 IsCompressData() ? std::vector<SendDataItem> {} : data_, version_); // for data
156 if (totalLen == 0) {
157 return 0;
158 }
159 totalLen += Parcel::GetUInt64Len(); // endWaterMark
160 totalLen += Parcel::GetUInt64Len(); // localWaterMark
161 totalLen += Parcel::GetUInt64Len(); // peerWaterMark
162 totalLen += Parcel::GetIntLen(); // sendCode
163 totalLen += Parcel::GetIntLen(); // mode
164 totalLen += Parcel::GetUInt32Len(); // sessionId
165 totalLen += Parcel::GetUInt32Len(); // version
166 totalLen += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
167
168 if (version_ > SOFTWARE_VERSION_RELEASE_2_0) {
169 totalLen += Parcel::GetUInt32Len(); // flag bit0 used for isLastSequence
170 }
171 totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
172 if (totalLen > INT32_MAX) {
173 return 0;
174 }
175 if (messageId == QUERY_SYNC_MESSAGE) {
176 // deleted watermark
177 totalLen += Parcel::GetUInt64Len();
178 // query id
179 totalLen += Parcel::GetStringLen(queryId_);
180 // add for queryObject
181 totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
182 }
183 if (IsCompressData()) {
184 totalLen += GenericSingleVerKvEntry::CalculateCompressedLens(compressData_); // add forcompressData_
185 }
186
187 if (version_ > SOFTWARE_VERSION_RELEASE_2_0 && IsExtraConditionData()) {
188 totalLen += Parcel::GetUInt32Len(); // extraCondition size
189 for (const auto &entry : extraConditions_) {
190 totalLen += Parcel::GetStringLen(entry.first);
191 totalLen += Parcel::GetStringLen(entry.second);
192 }
193 totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
194 }
195 if (version_ >= SOFTWARE_VERSION_RELEASE_9_0) {
196 totalLen += Parcel::GetUInt64Len(); // schemaVersion
197 totalLen += Parcel::GetInt64Len(); // systemTimeOffset
198 totalLen += Parcel::GetInt64Len(); // senderTimeOffset
199 totalLen += Parcel::GetIntLen(); // security label
200 totalLen += Parcel::GetIntLen(); // security flag
201 }
202 if (SingleVerDataSyncUtils::IsSupportRequestTotal(version_)) {
203 totalLen += Parcel::GetUInt32Len(); // totalDataCount
204 totalLen = Parcel::GetEightByteAlign(totalLen);
205 }
206
207 return totalLen > INT32_MAX ? 0 : totalLen;
208 }
209
SetFlag(uint32_t flag)210 void DataRequestPacket::SetFlag(uint32_t flag)
211 {
212 flag_ = flag;
213 }
214
GetFlag() const215 uint32_t DataRequestPacket::GetFlag() const
216 {
217 return flag_;
218 }
219
IsLastSequence() const220 bool DataRequestPacket::IsLastSequence() const
221 {
222 return ((flag_ & IS_LAST_SEQUENCE) == IS_LAST_SEQUENCE);
223 }
224
SetLastSequence()225 void DataRequestPacket::SetLastSequence()
226 {
227 flag_ = flag_ | IS_LAST_SEQUENCE;
228 }
229
IsNeedUpdateWaterMark() const230 bool DataRequestPacket::IsNeedUpdateWaterMark() const
231 {
232 return (flag_ & IS_UPDATE_WATER) != IS_UPDATE_WATER;
233 }
234
SetUpdateWaterMark()235 void DataRequestPacket::SetUpdateWaterMark()
236 {
237 flag_ = flag_ | IS_UPDATE_WATER;
238 }
239
SetCompressDataMark()240 void DataRequestPacket::SetCompressDataMark()
241 {
242 flag_ = flag_ | IS_COMPRESS_DATA;
243 }
244
IsCompressData() const245 bool DataRequestPacket::IsCompressData() const
246 {
247 return ((flag_ & IS_COMPRESS_DATA) == IS_COMPRESS_DATA);
248 }
249
SetCompressAlgo(CompressAlgorithm algo)250 void DataRequestPacket::SetCompressAlgo(CompressAlgorithm algo)
251 {
252 algo_ = algo;
253 }
254
GetCompressAlgo() const255 CompressAlgorithm DataRequestPacket::GetCompressAlgo() const
256 {
257 return algo_;
258 }
259
SetBasicInfo(int sendCode,uint32_t version,int32_t mode)260 void DataRequestPacket::SetBasicInfo(int sendCode, uint32_t version, int32_t mode)
261 {
262 SetSendCode(sendCode);
263 SetVersion(version);
264 SetMode(mode);
265 }
266
SetWaterMark(WaterMark localMark,WaterMark peerMark,WaterMark deletedWatermark)267 void DataRequestPacket::SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark)
268 {
269 localWaterMark_ = localMark;
270 peerWaterMark_ = peerMark;
271 deletedWatermark_ = deletedWatermark;
272 }
273
SetQuery(const QuerySyncObject & query)274 void DataRequestPacket::SetQuery(const QuerySyncObject &query)
275 {
276 query_ = query;
277 }
278
GetQuery() const279 QuerySyncObject DataRequestPacket::GetQuery() const
280 {
281 return query_;
282 }
283
SetQueryId(const std::string & queryId)284 void DataRequestPacket::SetQueryId(const std::string &queryId)
285 {
286 queryId_ = queryId;
287 }
288
GetQueryId() const289 std::string DataRequestPacket::GetQueryId() const
290 {
291 return queryId_;
292 }
293
SetDeletedWaterMark(WaterMark watermark)294 void DataRequestPacket::SetDeletedWaterMark(WaterMark watermark)
295 {
296 deletedWatermark_ = watermark;
297 }
298
GetDeletedWaterMark() const299 WaterMark DataRequestPacket::GetDeletedWaterMark() const
300 {
301 return deletedWatermark_;
302 }
303
SetExtraConditions(const std::map<std::string,std::string> & extraConditions)304 void DataRequestPacket::SetExtraConditions(const std::map<std::string, std::string> &extraConditions)
305 {
306 extraConditions_ = extraConditions;
307 flag_ |= IS_CONDITION_DATA;
308 }
309
GetExtraConditions() const310 std::map<std::string, std::string> DataRequestPacket::GetExtraConditions() const
311 {
312 return extraConditions_;
313 }
314
IsExtraConditionData() const315 bool DataRequestPacket::IsExtraConditionData() const
316 {
317 return ((flag_ & IS_CONDITION_DATA) == IS_CONDITION_DATA);
318 }
319
SetSchemaVersion(uint64_t schemaVersion)320 void DataRequestPacket::SetSchemaVersion(uint64_t schemaVersion)
321 {
322 schemaVersion_ = schemaVersion;
323 }
324
GetSchemaVersion() const325 uint64_t DataRequestPacket::GetSchemaVersion() const
326 {
327 return schemaVersion_;
328 }
329
SetSystemTimeOffset(int64_t systemTimeOffset)330 void DataRequestPacket::SetSystemTimeOffset(int64_t systemTimeOffset)
331 {
332 systemTimeOffset_ = systemTimeOffset;
333 }
334
GetSystemTimeOffset() const335 int64_t DataRequestPacket::GetSystemTimeOffset() const
336 {
337 return systemTimeOffset_;
338 }
339
SetSenderTimeOffset(int64_t senderTimeOffset)340 void DataRequestPacket::SetSenderTimeOffset(int64_t senderTimeOffset)
341 {
342 senderTimeOffset_ = senderTimeOffset;
343 }
344
GetSenderTimeOffset() const345 int64_t DataRequestPacket::GetSenderTimeOffset() const
346 {
347 return senderTimeOffset_;
348 }
349
SetSecurityOption(const SecurityOption & option)350 void DataRequestPacket::SetSecurityOption(const SecurityOption &option)
351 {
352 securityOption_ = option;
353 }
354
GetSecurityOption() const355 SecurityOption DataRequestPacket::GetSecurityOption() const
356 {
357 return securityOption_;
358 }
359
SetTotalDataCount(uint32_t total)360 void DataRequestPacket::SetTotalDataCount(uint32_t total)
361 {
362 totalDataCount_ = total;
363 }
364
GetTotalDataCount() const365 uint32_t DataRequestPacket::GetTotalDataCount() const
366 {
367 return totalDataCount_;
368 }
369
SetData(uint64_t data)370 void DataAckPacket::SetData(uint64_t data)
371 {
372 data_ = data;
373 }
374
GetData() const375 uint64_t DataAckPacket::GetData() const
376 {
377 return data_;
378 }
379
SetRecvCode(int32_t errorCode)380 void DataAckPacket::SetRecvCode(int32_t errorCode)
381 {
382 recvCode_ = errorCode;
383 }
384
GetRecvCode() const385 int32_t DataAckPacket::GetRecvCode() const
386 {
387 return recvCode_;
388 }
389
SetVersion(uint32_t version)390 void DataAckPacket::SetVersion(uint32_t version)
391 {
392 version_ = version;
393 }
394
GetVersion() const395 uint32_t DataAckPacket::GetVersion() const
396 {
397 return version_;
398 }
399
SetReserved(std::vector<uint64_t> & reserved)400 void DataAckPacket::SetReserved(std::vector<uint64_t> &reserved)
401 {
402 reserved_ = std::move(reserved);
403 }
404
GetReserved() const405 std::vector<uint64_t> DataAckPacket::GetReserved() const
406 {
407 return reserved_;
408 }
409
GetPacketId() const410 uint64_t DataAckPacket::GetPacketId() const
411 {
412 uint64_t packetId = 0;
413 std::vector<uint64_t> DataAckReserve = GetReserved();
414 if (DataAckReserve.size() > ACK_PACKET_RESERVED_INDEX_PACKETID) {
415 packetId = DataAckReserve[ACK_PACKET_RESERVED_INDEX_PACKETID];
416 }
417 // while remote db is close and open again, it may not carry packetId
418 // so the second index is deletewatermark if it is the query Sync, should drop the deletewatermark here
419 if (packetId > MAX_PACKETID) {
420 return 0;
421 }
422 return packetId;
423 }
424
IsPacketIdValid(uint64_t packetId)425 bool DataAckPacket::IsPacketIdValid(uint64_t packetId)
426 {
427 return (packetId > 0);
428 }
429
CalculateLen() const430 uint32_t DataAckPacket::CalculateLen() const
431 {
432 uint64_t len = Parcel::GetUInt64Len(); // ackWaterMark
433 len += Parcel::GetIntLen(); // recvCode
434 len += Parcel::GetUInt32Len(); // version
435 len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
436
437 len = Parcel::GetEightByteAlign(len);
438 if (len > INT32_MAX) {
439 return 0;
440 }
441 return len;
442 }
443
SetPacketHead(int sendCode,uint32_t version,int32_t controlCmd,uint32_t flag)444 void ControlRequestPacket::SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag)
445 {
446 sendCode_ = sendCode;
447 version_ = version;
448 controlCmdType_ = static_cast<uint32_t>(controlCmd);
449 flag_ = flag;
450 }
451
GetSendCode() const452 int32_t ControlRequestPacket::GetSendCode() const
453 {
454 return sendCode_;
455 }
456
GetVersion() const457 uint32_t ControlRequestPacket::GetVersion() const
458 {
459 return version_;
460 }
461
GetcontrolCmdType() const462 uint32_t ControlRequestPacket::GetcontrolCmdType() const
463 {
464 return controlCmdType_;
465 }
466
GetFlag() const467 uint32_t ControlRequestPacket::GetFlag() const
468 {
469 return flag_;
470 }
471
SetQuery(const QuerySyncObject & query)472 void ControlRequestPacket::SetQuery(const QuerySyncObject &query)
473 {
474 (void)query;
475 }
476
CalculateLen() const477 uint32_t ControlRequestPacket::CalculateLen() const
478 {
479 uint64_t len = Parcel::GetUInt32Len(); // version_
480 len += Parcel::GetIntLen(); // sendCode_
481 len += Parcel::GetUInt32Len(); // controlCmdType_
482 len += Parcel::GetUInt32Len(); // flag
483
484 len = Parcel::GetEightByteAlign(len);
485 if (len > INT32_MAX) {
486 return 0;
487 }
488 return len;
489 }
490
SetQuery(const QuerySyncObject & query)491 void SubscribeRequest::SetQuery(const QuerySyncObject &query)
492 {
493 query_ = query;
494 }
495
GetQuery() const496 QuerySyncObject SubscribeRequest::GetQuery() const
497 {
498 return query_;
499 }
500
CalculateLen() const501 uint32_t SubscribeRequest::CalculateLen() const
502 {
503 uint64_t totalLen = ControlRequestPacket::CalculateLen();
504 if (totalLen == 0) {
505 LOGE("[SubscribeRequest] cal packet len failed");
506 return 0;
507 }
508 // add for queryObject
509 totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
510 if (totalLen > INT32_MAX) {
511 return 0;
512 }
513 return totalLen;
514 }
515
IsAutoSubscribe() const516 bool SubscribeRequest::IsAutoSubscribe() const
517 {
518 return ((GetFlag() & IS_AUTO_SUBSCRIBE) == IS_AUTO_SUBSCRIBE);
519 }
520
SetPacketHead(int recvCode,uint32_t version,int32_t controlCmd,uint32_t flag)521 void ControlAckPacket::SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag)
522 {
523 recvCode_ = recvCode;
524 version_ = version;
525 controlCmdType_ = static_cast<uint32_t>(controlCmd);
526 flag_ = flag;
527 }
528
GetRecvCode() const529 int32_t ControlAckPacket::GetRecvCode() const
530 {
531 return recvCode_;
532 }
533
GetVersion() const534 uint32_t ControlAckPacket::GetVersion() const
535 {
536 return version_;
537 }
538
GetcontrolCmdType() const539 uint32_t ControlAckPacket::GetcontrolCmdType() const
540 {
541 return controlCmdType_;
542 }
543
GetFlag() const544 uint32_t ControlAckPacket::GetFlag() const
545 {
546 return flag_;
547 }
548
CalculateLen() const549 uint32_t ControlAckPacket::CalculateLen() const
550 {
551 uint64_t len = Parcel::GetUInt32Len(); // version_
552 len += Parcel::GetIntLen(); // recvCode_
553 len += Parcel::GetUInt32Len(); // controlCmdType_
554 len += Parcel::GetUInt32Len(); // flag
555 len = Parcel::GetEightByteAlign(len);
556 if (len > INT32_MAX) {
557 return 0;
558 }
559 return len;
560 }
561 } // namespace DistributedDB
562