1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_data_packet.h"
17 #include "icommunicator.h"
18 #include "single_ver_kvdb_sync_interface.h"
19 #include "query_sync_object.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "sync_types.h"
22 #include "version.h"
23 #include "parcel.h"
24
25
26 namespace DistributedDB {
~DataRequestPacket()27 DataRequestPacket::~DataRequestPacket()
28 {
29 for (auto &entry : data_) {
30 delete entry;
31 entry = nullptr;
32 }
33 }
34
SetData(std::vector<SendDataItem> & data)35 void DataRequestPacket::SetData(std::vector<SendDataItem> &data)
36 {
37 data_ = std::move(data);
38 }
39
GetData() const40 const std::vector<SendDataItem> &DataRequestPacket::GetData() const
41 {
42 return data_;
43 }
44
SetCompressData(std::vector<uint8_t> & compressData)45 void DataRequestPacket::SetCompressData(std::vector<uint8_t> &compressData)
46 {
47 compressData_ = std::move(compressData);
48 }
49
GetCompressData() const50 const std::vector<uint8_t> &DataRequestPacket::GetCompressData() const
51 {
52 return compressData_;
53 }
54
SetEndWaterMark(WaterMark waterMark)55 void DataRequestPacket::SetEndWaterMark(WaterMark waterMark)
56 {
57 endWaterMark_ = waterMark;
58 }
59
GetEndWaterMark() const60 WaterMark DataRequestPacket::GetEndWaterMark() const
61 {
62 return endWaterMark_;
63 }
64
SetLocalWaterMark(WaterMark waterMark)65 void DataRequestPacket::SetLocalWaterMark(WaterMark waterMark)
66 {
67 localWaterMark_ = waterMark;
68 }
69
GetLocalWaterMark() const70 WaterMark DataRequestPacket::GetLocalWaterMark() const
71 {
72 return localWaterMark_;
73 }
74
SetPeerWaterMark(WaterMark waterMark)75 void DataRequestPacket::SetPeerWaterMark(WaterMark waterMark)
76 {
77 peerWaterMark_ = waterMark;
78 }
79
GetPeerWaterMark() const80 WaterMark DataRequestPacket::GetPeerWaterMark() const
81 {
82 return peerWaterMark_;
83 }
84
SetSendCode(int32_t errCode)85 void DataRequestPacket::SetSendCode(int32_t errCode)
86 {
87 sendCode_ = errCode;
88 }
89
GetSendCode() const90 int32_t DataRequestPacket::GetSendCode() const
91 {
92 return sendCode_;
93 }
94
SetMode(int32_t mode)95 void DataRequestPacket::SetMode(int32_t mode)
96 {
97 mode_ = mode;
98 }
99
GetMode() const100 int32_t DataRequestPacket::GetMode() const
101 {
102 return mode_;
103 }
104
SetSessionId(uint32_t sessionId)105 void DataRequestPacket::SetSessionId(uint32_t sessionId)
106 {
107 sessionId_ = sessionId;
108 }
109
GetSessionId() const110 uint32_t DataRequestPacket::GetSessionId() const
111 {
112 return sessionId_;
113 }
114
SetVersion(uint32_t version)115 void DataRequestPacket::SetVersion(uint32_t version)
116 {
117 version_ = version;
118 }
119
GetVersion() const120 uint32_t DataRequestPacket::GetVersion() const
121 {
122 return version_;
123 }
124
SetReserved(std::vector<uint64_t> & reserved)125 void DataRequestPacket::SetReserved(std::vector<uint64_t> &reserved)
126 {
127 reserved_ = std::move(reserved);
128 }
129
SetReserved(std::vector<uint64_t> && reserved)130 void DataRequestPacket::SetReserved(std::vector<uint64_t> &&reserved)
131 {
132 reserved_ = reserved;
133 }
134
GetReserved() const135 std::vector<uint64_t> DataRequestPacket::GetReserved() const
136 {
137 return reserved_;
138 }
139
GetPacketId() const140 uint64_t DataRequestPacket::GetPacketId() const
141 {
142 uint64_t packetId = 0;
143 std::vector<uint64_t> DataRequestReserve = GetReserved();
144 if (DataRequestReserve.size() > REQUEST_PACKET_RESERVED_INDEX_PACKETID) {
145 return DataRequestReserve[REQUEST_PACKET_RESERVED_INDEX_PACKETID];
146 } else {
147 return packetId;
148 }
149 }
150
CalculateLen(uint32_t messageId) const151 uint32_t DataRequestPacket::CalculateLen(uint32_t messageId) const
152 {
153 uint64_t totalLen = GenericSingleVerKvEntry::CalculateLens(
154 IsCompressData() ? std::vector<SendDataItem> {} : data_, version_); // for data
155 if (totalLen == 0) {
156 return 0;
157 }
158 totalLen += Parcel::GetUInt64Len(); // endWaterMark
159 totalLen += Parcel::GetUInt64Len(); // localWaterMark
160 totalLen += Parcel::GetUInt64Len(); // peerWaterMark
161 totalLen += Parcel::GetIntLen(); // sendCode
162 totalLen += Parcel::GetIntLen(); // mode
163 totalLen += Parcel::GetUInt32Len(); // sessionId
164 totalLen += Parcel::GetUInt32Len(); // version
165 totalLen += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
166
167 if (version_ > SOFTWARE_VERSION_RELEASE_2_0) {
168 totalLen += Parcel::GetUInt32Len(); // flag bit0 used for isLastSequence
169 }
170 totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
171 if (totalLen > INT32_MAX) {
172 return 0;
173 }
174 if (messageId == QUERY_SYNC_MESSAGE) {
175 // deleted watermark
176 totalLen += Parcel::GetUInt64Len();
177 // query id
178 totalLen += Parcel::GetStringLen(queryId_);
179 // add for queryObject
180 totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
181 }
182 if (IsCompressData()) {
183 totalLen += GenericSingleVerKvEntry::CalculateCompressedLens(compressData_); // add forcompressData_
184 }
185
186 if (version_ > SOFTWARE_VERSION_RELEASE_2_0 && IsExtraConditionData()) {
187 totalLen += Parcel::GetUInt32Len(); // extraCondition size
188 for (const auto &entry : extraConditions_) {
189 totalLen += Parcel::GetStringLen(entry.first);
190 totalLen += Parcel::GetStringLen(entry.second);
191 }
192 totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
193 }
194 if (totalLen > INT32_MAX) {
195 return 0;
196 }
197 return totalLen;
198 }
199
SetFlag(uint32_t flag)200 void DataRequestPacket::SetFlag(uint32_t flag)
201 {
202 flag_ = flag;
203 }
204
GetFlag() const205 uint32_t DataRequestPacket::GetFlag() const
206 {
207 return flag_;
208 }
209
IsLastSequence() const210 bool DataRequestPacket::IsLastSequence() const
211 {
212 return ((flag_ & IS_LAST_SEQUENCE) == IS_LAST_SEQUENCE);
213 }
214
SetLastSequence()215 void DataRequestPacket::SetLastSequence()
216 {
217 flag_ = flag_ | IS_LAST_SEQUENCE;
218 }
219
IsNeedUpdateWaterMark() const220 bool DataRequestPacket::IsNeedUpdateWaterMark() const
221 {
222 return !((flag_ & IS_UPDATE_WATER) == IS_UPDATE_WATER);
223 }
224
SetUpdateWaterMark()225 void DataRequestPacket::SetUpdateWaterMark()
226 {
227 flag_ = flag_ | IS_UPDATE_WATER;
228 }
229
SetCompressDataMark()230 void DataRequestPacket::SetCompressDataMark()
231 {
232 flag_ = flag_ | IS_COMPRESS_DATA;
233 }
234
IsCompressData() const235 bool DataRequestPacket::IsCompressData() const
236 {
237 return ((flag_ & IS_COMPRESS_DATA) == IS_COMPRESS_DATA);
238 }
239
SetCompressAlgo(CompressAlgorithm algo)240 void DataRequestPacket::SetCompressAlgo(CompressAlgorithm algo)
241 {
242 algo_ = algo;
243 }
244
GetCompressAlgo() const245 CompressAlgorithm DataRequestPacket::GetCompressAlgo() const
246 {
247 return algo_;
248 }
249
SetBasicInfo(int sendCode,uint32_t version,int32_t mode)250 void DataRequestPacket::SetBasicInfo(int sendCode, uint32_t version, int32_t mode)
251 {
252 SetSendCode(sendCode);
253 SetVersion(version);
254 SetMode(mode);
255 }
256
SetWaterMark(WaterMark localMark,WaterMark peerMark,WaterMark deletedWatermark)257 void DataRequestPacket::SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark)
258 {
259 localWaterMark_ = localMark;
260 peerWaterMark_ = peerMark;
261 deletedWatermark_ = deletedWatermark;
262 }
263
SetQuery(const QuerySyncObject & query)264 void DataRequestPacket::SetQuery(const QuerySyncObject &query)
265 {
266 query_ = query;
267 }
268
GetQuery() const269 QuerySyncObject DataRequestPacket::GetQuery() const
270 {
271 return query_;
272 }
273
SetQueryId(const std::string & queryId)274 void DataRequestPacket::SetQueryId(const std::string &queryId)
275 {
276 queryId_ = queryId;
277 }
278
GetQueryId() const279 std::string DataRequestPacket::GetQueryId() const
280 {
281 return queryId_;
282 }
283
SetDeletedWaterMark(WaterMark watermark)284 void DataRequestPacket::SetDeletedWaterMark(WaterMark watermark)
285 {
286 deletedWatermark_ = watermark;
287 }
288
GetDeletedWaterMark() const289 WaterMark DataRequestPacket::GetDeletedWaterMark() const
290 {
291 return deletedWatermark_;
292 }
293
SetExtraConditions(const std::map<std::string,std::string> & extraConditions)294 void DataRequestPacket::SetExtraConditions(const std::map<std::string, std::string> &extraConditions)
295 {
296 extraConditions_ = extraConditions;
297 flag_ |= IS_CONDITION_DATA;
298 }
299
GetExtraConditions() const300 std::map<std::string, std::string> DataRequestPacket::GetExtraConditions() const
301 {
302 return extraConditions_;
303 }
304
IsExtraConditionData() const305 bool DataRequestPacket::IsExtraConditionData() const
306 {
307 return ((flag_ & IS_CONDITION_DATA) == IS_CONDITION_DATA);
308 }
309
SetData(uint64_t data)310 void DataAckPacket::SetData(uint64_t data)
311 {
312 data_ = data;
313 }
314
GetData() const315 uint64_t DataAckPacket::GetData() const
316 {
317 return data_;
318 }
319
SetRecvCode(int32_t errorCode)320 void DataAckPacket::SetRecvCode(int32_t errorCode)
321 {
322 recvCode_ = errorCode;
323 }
324
GetRecvCode() const325 int32_t DataAckPacket::GetRecvCode() const
326 {
327 return recvCode_;
328 }
329
SetVersion(uint32_t version)330 void DataAckPacket::SetVersion(uint32_t version)
331 {
332 version_ = version;
333 }
334
GetVersion() const335 uint32_t DataAckPacket::GetVersion() const
336 {
337 return version_;
338 }
339
SetReserved(std::vector<uint64_t> & reserved)340 void DataAckPacket::SetReserved(std::vector<uint64_t> &reserved)
341 {
342 reserved_ = std::move(reserved);
343 }
344
GetReserved() const345 std::vector<uint64_t> DataAckPacket::GetReserved() const
346 {
347 return reserved_;
348 }
349
GetPacketId() const350 uint64_t DataAckPacket::GetPacketId() const
351 {
352 uint64_t packetId = 0;
353 std::vector<uint64_t> DataAckReserve = GetReserved();
354 if (DataAckReserve.size() > ACK_PACKET_RESERVED_INDEX_PACKETID) {
355 packetId = DataAckReserve[ACK_PACKET_RESERVED_INDEX_PACKETID];
356 }
357 // while remote db is close and open again, it may not carry packetId
358 // so the second index is deletewatermark if it is the query Sync, should drop the deletewatermark here
359 if (packetId > MAX_PACKETID) {
360 return 0;
361 }
362 return packetId;
363 }
364
IsPacketIdValid(uint64_t packetId)365 bool DataAckPacket::IsPacketIdValid(uint64_t packetId)
366 {
367 return (packetId > 0);
368 }
369
CalculateLen() const370 uint32_t DataAckPacket::CalculateLen() const
371 {
372 uint64_t len = Parcel::GetUInt64Len(); // ackWaterMark
373 len += Parcel::GetIntLen(); // recvCode
374 len += Parcel::GetUInt32Len(); // version
375 len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
376
377 len = Parcel::GetEightByteAlign(len);
378 if (len > INT32_MAX) {
379 return 0;
380 }
381 return len;
382 }
383
SetPacketHead(int sendCode,uint32_t version,int32_t controlCmd,uint32_t flag)384 void ControlRequestPacket::SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag)
385 {
386 sendCode_ = sendCode;
387 version_ = version;
388 controlCmdType_ = static_cast<uint32_t>(controlCmd);
389 flag_ = flag;
390 }
391
GetSendCode() const392 int32_t ControlRequestPacket::GetSendCode() const
393 {
394 return sendCode_;
395 }
396
GetVersion() const397 uint32_t ControlRequestPacket::GetVersion() const
398 {
399 return version_;
400 }
401
GetcontrolCmdType() const402 uint32_t ControlRequestPacket::GetcontrolCmdType() const
403 {
404 return controlCmdType_;
405 }
406
GetFlag() const407 uint32_t ControlRequestPacket::GetFlag() const
408 {
409 return flag_;
410 }
411
SetQuery(const QuerySyncObject & query)412 void ControlRequestPacket::SetQuery(const QuerySyncObject &query)
413 {
414 (void)query;
415 }
416
CalculateLen() const417 uint32_t ControlRequestPacket::CalculateLen() const
418 {
419 uint64_t len = Parcel::GetUInt32Len(); // version_
420 len += Parcel::GetIntLen(); // sendCode_
421 len += Parcel::GetUInt32Len(); // controlCmdType_
422 len += Parcel::GetUInt32Len(); // flag
423
424 len = Parcel::GetEightByteAlign(len);
425 if (len > INT32_MAX) {
426 return 0;
427 }
428 return len;
429 }
430
SetQuery(const QuerySyncObject & query)431 void SubscribeRequest::SetQuery(const QuerySyncObject &query)
432 {
433 query_ = query;
434 }
435
GetQuery() const436 QuerySyncObject SubscribeRequest::GetQuery() const
437 {
438 return query_;
439 }
440
CalculateLen() const441 uint32_t SubscribeRequest::CalculateLen() const
442 {
443 uint64_t totalLen = ControlRequestPacket::CalculateLen();
444 if (totalLen == 0) {
445 LOGE("[SubscribeRequest] cal packet len failed");
446 return 0;
447 }
448 // add for queryObject
449 totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
450 if (totalLen > INT32_MAX) {
451 return 0;
452 }
453 return totalLen;
454 }
455
IsAutoSubscribe() const456 bool SubscribeRequest::IsAutoSubscribe() const
457 {
458 return ((GetFlag() & IS_AUTO_SUBSCRIBE) == IS_AUTO_SUBSCRIBE);
459 }
460
SetPacketHead(int recvCode,uint32_t version,int32_t controlCmd,uint32_t flag)461 void ControlAckPacket::SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag)
462 {
463 recvCode_ = recvCode;
464 version_ = version;
465 controlCmdType_ = static_cast<uint32_t>(controlCmd);
466 flag_ = flag;
467 }
468
GetRecvCode() const469 int32_t ControlAckPacket::GetRecvCode() const
470 {
471 return recvCode_;
472 }
473
GetVersion() const474 uint32_t ControlAckPacket::GetVersion() const
475 {
476 return version_;
477 }
478
GetcontrolCmdType() const479 uint32_t ControlAckPacket::GetcontrolCmdType() const
480 {
481 return controlCmdType_;
482 }
483
GetFlag() const484 uint32_t ControlAckPacket::GetFlag() const
485 {
486 return flag_;
487 }
488
CalculateLen() const489 uint32_t ControlAckPacket::CalculateLen() const
490 {
491 uint64_t len = Parcel::GetUInt32Len(); // version_
492 len += Parcel::GetIntLen(); // recvCode_
493 len += Parcel::GetUInt32Len(); // controlCmdType_
494 len += Parcel::GetUInt32Len(); // flag
495 len = Parcel::GetEightByteAlign(len);
496 if (len > INT32_MAX) {
497 return 0;
498 }
499 return len;
500 }
501 } // namespace DistributedDB
502