1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_data_packet.h"
17 #include "icommunicator.h"
18 #include "single_ver_kvdb_sync_interface.h"
19 #include "query_sync_object.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "sync_types.h"
22 #include "version.h"
23 #include "parcel.h"
24
25 namespace DistributedDB {
~DataRequestPacket()26 DataRequestPacket::~DataRequestPacket()
27 {
28 for (auto &entry : data_) {
29 delete entry;
30 entry = nullptr;
31 }
32 }
33
SetData(std::vector<SendDataItem> & data)34 void DataRequestPacket::SetData(std::vector<SendDataItem> &data)
35 {
36 data_ = std::move(data);
37 }
38
GetData() const39 const std::vector<SendDataItem> &DataRequestPacket::GetData() const
40 {
41 return data_;
42 }
43
SetCompressData(std::vector<uint8_t> & compressData)44 void DataRequestPacket::SetCompressData(std::vector<uint8_t> &compressData)
45 {
46 compressData_ = std::move(compressData);
47 }
48
GetCompressData() const49 const std::vector<uint8_t> &DataRequestPacket::GetCompressData() const
50 {
51 return compressData_;
52 }
53
SetEndWaterMark(WaterMark waterMark)54 void DataRequestPacket::SetEndWaterMark(WaterMark waterMark)
55 {
56 endWaterMark_ = waterMark;
57 }
58
GetEndWaterMark() const59 WaterMark DataRequestPacket::GetEndWaterMark() const
60 {
61 return endWaterMark_;
62 }
63
SetLocalWaterMark(WaterMark waterMark)64 void DataRequestPacket::SetLocalWaterMark(WaterMark waterMark)
65 {
66 localWaterMark_ = waterMark;
67 }
68
GetLocalWaterMark() const69 WaterMark DataRequestPacket::GetLocalWaterMark() const
70 {
71 return localWaterMark_;
72 }
73
SetPeerWaterMark(WaterMark waterMark)74 void DataRequestPacket::SetPeerWaterMark(WaterMark waterMark)
75 {
76 peerWaterMark_ = waterMark;
77 }
78
GetPeerWaterMark() const79 WaterMark DataRequestPacket::GetPeerWaterMark() const
80 {
81 return peerWaterMark_;
82 }
83
SetSendCode(int32_t errCode)84 void DataRequestPacket::SetSendCode(int32_t errCode)
85 {
86 sendCode_ = errCode;
87 }
88
GetSendCode() const89 int32_t DataRequestPacket::GetSendCode() const
90 {
91 return sendCode_;
92 }
93
SetMode(int32_t mode)94 void DataRequestPacket::SetMode(int32_t mode)
95 {
96 mode_ = mode;
97 }
98
GetMode() const99 int32_t DataRequestPacket::GetMode() const
100 {
101 return mode_;
102 }
103
SetSessionId(uint32_t sessionId)104 void DataRequestPacket::SetSessionId(uint32_t sessionId)
105 {
106 sessionId_ = sessionId;
107 }
108
GetSessionId() const109 uint32_t DataRequestPacket::GetSessionId() const
110 {
111 return sessionId_;
112 }
113
SetVersion(uint32_t version)114 void DataRequestPacket::SetVersion(uint32_t version)
115 {
116 version_ = version;
117 }
118
GetVersion() const119 uint32_t DataRequestPacket::GetVersion() const
120 {
121 return version_;
122 }
123
SetReserved(std::vector<uint64_t> & reserved)124 void DataRequestPacket::SetReserved(std::vector<uint64_t> &reserved)
125 {
126 reserved_ = std::move(reserved);
127 }
128
SetReserved(std::vector<uint64_t> && reserved)129 void DataRequestPacket::SetReserved(std::vector<uint64_t> &&reserved)
130 {
131 reserved_ = reserved;
132 }
133
GetReserved() const134 std::vector<uint64_t> DataRequestPacket::GetReserved() const
135 {
136 return reserved_;
137 }
138
GetPacketId() const139 uint64_t DataRequestPacket::GetPacketId() const
140 {
141 uint64_t packetId = 0;
142 std::vector<uint64_t> DataRequestReserve = GetReserved();
143 if (DataRequestReserve.size() > REQUEST_PACKET_RESERVED_INDEX_PACKETID) {
144 return DataRequestReserve[REQUEST_PACKET_RESERVED_INDEX_PACKETID];
145 } else {
146 return packetId;
147 }
148 }
149
CalculateLen(uint32_t messageId) const150 uint32_t DataRequestPacket::CalculateLen(uint32_t messageId) const
151 {
152 uint64_t totalLen = GenericSingleVerKvEntry::CalculateLens(
153 IsCompressData() ? std::vector<SendDataItem> {} : data_, version_); // for data
154 if (totalLen == 0) {
155 return 0;
156 }
157 totalLen += Parcel::GetUInt64Len(); // endWaterMark
158 totalLen += Parcel::GetUInt64Len(); // localWaterMark
159 totalLen += Parcel::GetUInt64Len(); // peerWaterMark
160 totalLen += Parcel::GetIntLen(); // sendCode
161 totalLen += Parcel::GetIntLen(); // mode
162 totalLen += Parcel::GetUInt32Len(); // sessionId
163 totalLen += Parcel::GetUInt32Len(); // version
164 totalLen += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
165
166 if (version_ > SOFTWARE_VERSION_RELEASE_2_0) {
167 totalLen += Parcel::GetUInt32Len(); // flag bit0 used for isLastSequence
168 }
169 totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
170 if (totalLen > INT32_MAX) {
171 return 0;
172 }
173 if (messageId == QUERY_SYNC_MESSAGE) {
174 // deleted watermark
175 totalLen += Parcel::GetUInt64Len();
176 // query id
177 totalLen += Parcel::GetStringLen(queryId_);
178 // add for queryObject
179 totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
180 }
181 if (IsCompressData()) {
182 totalLen += GenericSingleVerKvEntry::CalculateCompressedLens(compressData_); // add forcompressData_
183 }
184
185 if (version_ > SOFTWARE_VERSION_RELEASE_2_0 && IsExtraConditionData()) {
186 totalLen += Parcel::GetUInt32Len(); // extraCondition size
187 for (const auto &entry : extraConditions_) {
188 totalLen += Parcel::GetStringLen(entry.first);
189 totalLen += Parcel::GetStringLen(entry.second);
190 }
191 totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
192 }
193 if (totalLen > INT32_MAX) {
194 return 0;
195 }
196 return totalLen;
197 }
198
SetFlag(uint32_t flag)199 void DataRequestPacket::SetFlag(uint32_t flag)
200 {
201 flag_ = flag;
202 }
203
GetFlag() const204 uint32_t DataRequestPacket::GetFlag() const
205 {
206 return flag_;
207 }
208
IsLastSequence() const209 bool DataRequestPacket::IsLastSequence() const
210 {
211 return ((flag_ & IS_LAST_SEQUENCE) == IS_LAST_SEQUENCE);
212 }
213
SetLastSequence()214 void DataRequestPacket::SetLastSequence()
215 {
216 flag_ = flag_ | IS_LAST_SEQUENCE;
217 }
218
IsNeedUpdateWaterMark() const219 bool DataRequestPacket::IsNeedUpdateWaterMark() const
220 {
221 return (flag_ & IS_UPDATE_WATER) != IS_UPDATE_WATER;
222 }
223
SetUpdateWaterMark()224 void DataRequestPacket::SetUpdateWaterMark()
225 {
226 flag_ = flag_ | IS_UPDATE_WATER;
227 }
228
SetCompressDataMark()229 void DataRequestPacket::SetCompressDataMark()
230 {
231 flag_ = flag_ | IS_COMPRESS_DATA;
232 }
233
IsCompressData() const234 bool DataRequestPacket::IsCompressData() const
235 {
236 return ((flag_ & IS_COMPRESS_DATA) == IS_COMPRESS_DATA);
237 }
238
SetCompressAlgo(CompressAlgorithm algo)239 void DataRequestPacket::SetCompressAlgo(CompressAlgorithm algo)
240 {
241 algo_ = algo;
242 }
243
GetCompressAlgo() const244 CompressAlgorithm DataRequestPacket::GetCompressAlgo() const
245 {
246 return algo_;
247 }
248
SetBasicInfo(int sendCode,uint32_t version,int32_t mode)249 void DataRequestPacket::SetBasicInfo(int sendCode, uint32_t version, int32_t mode)
250 {
251 SetSendCode(sendCode);
252 SetVersion(version);
253 SetMode(mode);
254 }
255
SetWaterMark(WaterMark localMark,WaterMark peerMark,WaterMark deletedWatermark)256 void DataRequestPacket::SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark)
257 {
258 localWaterMark_ = localMark;
259 peerWaterMark_ = peerMark;
260 deletedWatermark_ = deletedWatermark;
261 }
262
SetQuery(const QuerySyncObject & query)263 void DataRequestPacket::SetQuery(const QuerySyncObject &query)
264 {
265 query_ = query;
266 }
267
GetQuery() const268 QuerySyncObject DataRequestPacket::GetQuery() const
269 {
270 return query_;
271 }
272
SetQueryId(const std::string & queryId)273 void DataRequestPacket::SetQueryId(const std::string &queryId)
274 {
275 queryId_ = queryId;
276 }
277
GetQueryId() const278 std::string DataRequestPacket::GetQueryId() const
279 {
280 return queryId_;
281 }
282
SetDeletedWaterMark(WaterMark watermark)283 void DataRequestPacket::SetDeletedWaterMark(WaterMark watermark)
284 {
285 deletedWatermark_ = watermark;
286 }
287
GetDeletedWaterMark() const288 WaterMark DataRequestPacket::GetDeletedWaterMark() const
289 {
290 return deletedWatermark_;
291 }
292
SetExtraConditions(const std::map<std::string,std::string> & extraConditions)293 void DataRequestPacket::SetExtraConditions(const std::map<std::string, std::string> &extraConditions)
294 {
295 extraConditions_ = extraConditions;
296 flag_ |= IS_CONDITION_DATA;
297 }
298
GetExtraConditions() const299 std::map<std::string, std::string> DataRequestPacket::GetExtraConditions() const
300 {
301 return extraConditions_;
302 }
303
IsExtraConditionData() const304 bool DataRequestPacket::IsExtraConditionData() const
305 {
306 return ((flag_ & IS_CONDITION_DATA) == IS_CONDITION_DATA);
307 }
308
SetData(uint64_t data)309 void DataAckPacket::SetData(uint64_t data)
310 {
311 data_ = data;
312 }
313
GetData() const314 uint64_t DataAckPacket::GetData() const
315 {
316 return data_;
317 }
318
SetRecvCode(int32_t errorCode)319 void DataAckPacket::SetRecvCode(int32_t errorCode)
320 {
321 recvCode_ = errorCode;
322 }
323
GetRecvCode() const324 int32_t DataAckPacket::GetRecvCode() const
325 {
326 return recvCode_;
327 }
328
SetVersion(uint32_t version)329 void DataAckPacket::SetVersion(uint32_t version)
330 {
331 version_ = version;
332 }
333
GetVersion() const334 uint32_t DataAckPacket::GetVersion() const
335 {
336 return version_;
337 }
338
SetReserved(std::vector<uint64_t> & reserved)339 void DataAckPacket::SetReserved(std::vector<uint64_t> &reserved)
340 {
341 reserved_ = std::move(reserved);
342 }
343
GetReserved() const344 std::vector<uint64_t> DataAckPacket::GetReserved() const
345 {
346 return reserved_;
347 }
348
GetPacketId() const349 uint64_t DataAckPacket::GetPacketId() const
350 {
351 uint64_t packetId = 0;
352 std::vector<uint64_t> DataAckReserve = GetReserved();
353 if (DataAckReserve.size() > ACK_PACKET_RESERVED_INDEX_PACKETID) {
354 packetId = DataAckReserve[ACK_PACKET_RESERVED_INDEX_PACKETID];
355 }
356 // while remote db is close and open again, it may not carry packetId
357 // so the second index is deletewatermark if it is the query Sync, should drop the deletewatermark here
358 if (packetId > MAX_PACKETID) {
359 return 0;
360 }
361 return packetId;
362 }
363
IsPacketIdValid(uint64_t packetId)364 bool DataAckPacket::IsPacketIdValid(uint64_t packetId)
365 {
366 return (packetId > 0);
367 }
368
CalculateLen() const369 uint32_t DataAckPacket::CalculateLen() const
370 {
371 uint64_t len = Parcel::GetUInt64Len(); // ackWaterMark
372 len += Parcel::GetIntLen(); // recvCode
373 len += Parcel::GetUInt32Len(); // version
374 len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
375
376 len = Parcel::GetEightByteAlign(len);
377 if (len > INT32_MAX) {
378 return 0;
379 }
380 return len;
381 }
382
SetPacketHead(int sendCode,uint32_t version,int32_t controlCmd,uint32_t flag)383 void ControlRequestPacket::SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag)
384 {
385 sendCode_ = sendCode;
386 version_ = version;
387 controlCmdType_ = static_cast<uint32_t>(controlCmd);
388 flag_ = flag;
389 }
390
GetSendCode() const391 int32_t ControlRequestPacket::GetSendCode() const
392 {
393 return sendCode_;
394 }
395
GetVersion() const396 uint32_t ControlRequestPacket::GetVersion() const
397 {
398 return version_;
399 }
400
GetcontrolCmdType() const401 uint32_t ControlRequestPacket::GetcontrolCmdType() const
402 {
403 return controlCmdType_;
404 }
405
GetFlag() const406 uint32_t ControlRequestPacket::GetFlag() const
407 {
408 return flag_;
409 }
410
SetQuery(const QuerySyncObject & query)411 void ControlRequestPacket::SetQuery(const QuerySyncObject &query)
412 {
413 (void)query;
414 }
415
CalculateLen() const416 uint32_t ControlRequestPacket::CalculateLen() const
417 {
418 uint64_t len = Parcel::GetUInt32Len(); // version_
419 len += Parcel::GetIntLen(); // sendCode_
420 len += Parcel::GetUInt32Len(); // controlCmdType_
421 len += Parcel::GetUInt32Len(); // flag
422
423 len = Parcel::GetEightByteAlign(len);
424 if (len > INT32_MAX) {
425 return 0;
426 }
427 return len;
428 }
429
SetQuery(const QuerySyncObject & query)430 void SubscribeRequest::SetQuery(const QuerySyncObject &query)
431 {
432 query_ = query;
433 }
434
GetQuery() const435 QuerySyncObject SubscribeRequest::GetQuery() const
436 {
437 return query_;
438 }
439
CalculateLen() const440 uint32_t SubscribeRequest::CalculateLen() const
441 {
442 uint64_t totalLen = ControlRequestPacket::CalculateLen();
443 if (totalLen == 0) {
444 LOGE("[SubscribeRequest] cal packet len failed");
445 return 0;
446 }
447 // add for queryObject
448 totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
449 if (totalLen > INT32_MAX) {
450 return 0;
451 }
452 return totalLen;
453 }
454
IsAutoSubscribe() const455 bool SubscribeRequest::IsAutoSubscribe() const
456 {
457 return ((GetFlag() & IS_AUTO_SUBSCRIBE) == IS_AUTO_SUBSCRIBE);
458 }
459
SetPacketHead(int recvCode,uint32_t version,int32_t controlCmd,uint32_t flag)460 void ControlAckPacket::SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag)
461 {
462 recvCode_ = recvCode;
463 version_ = version;
464 controlCmdType_ = static_cast<uint32_t>(controlCmd);
465 flag_ = flag;
466 }
467
GetRecvCode() const468 int32_t ControlAckPacket::GetRecvCode() const
469 {
470 return recvCode_;
471 }
472
GetVersion() const473 uint32_t ControlAckPacket::GetVersion() const
474 {
475 return version_;
476 }
477
GetcontrolCmdType() const478 uint32_t ControlAckPacket::GetcontrolCmdType() const
479 {
480 return controlCmdType_;
481 }
482
GetFlag() const483 uint32_t ControlAckPacket::GetFlag() const
484 {
485 return flag_;
486 }
487
CalculateLen() const488 uint32_t ControlAckPacket::CalculateLen() const
489 {
490 uint64_t len = Parcel::GetUInt32Len(); // version_
491 len += Parcel::GetIntLen(); // recvCode_
492 len += Parcel::GetUInt32Len(); // controlCmdType_
493 len += Parcel::GetUInt32Len(); // flag
494 len = Parcel::GetEightByteAlign(len);
495 if (len > INT32_MAX) {
496 return 0;
497 }
498 return len;
499 }
500 } // namespace DistributedDB
501