• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_data_packet.h"
17 #include "icommunicator.h"
18 #include "single_ver_kvdb_sync_interface.h"
19 #include "query_sync_object.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "sync_types.h"
22 #include "version.h"
23 #include "parcel.h"
24 
25 
26 namespace DistributedDB {
~DataRequestPacket()27 DataRequestPacket::~DataRequestPacket()
28 {
29     for (auto &entry : data_) {
30         delete entry;
31         entry = nullptr;
32     }
33 }
34 
SetData(std::vector<SendDataItem> & data)35 void DataRequestPacket::SetData(std::vector<SendDataItem> &data)
36 {
37     data_ = std::move(data);
38 }
39 
GetData() const40 const std::vector<SendDataItem> &DataRequestPacket::GetData() const
41 {
42     return data_;
43 }
44 
SetCompressData(std::vector<uint8_t> & compressData)45 void DataRequestPacket::SetCompressData(std::vector<uint8_t> &compressData)
46 {
47     compressData_ = std::move(compressData);
48 }
49 
GetCompressData() const50 const std::vector<uint8_t> &DataRequestPacket::GetCompressData() const
51 {
52     return compressData_;
53 }
54 
SetEndWaterMark(WaterMark waterMark)55 void DataRequestPacket::SetEndWaterMark(WaterMark waterMark)
56 {
57     endWaterMark_ = waterMark;
58 }
59 
GetEndWaterMark() const60 WaterMark DataRequestPacket::GetEndWaterMark() const
61 {
62     return endWaterMark_;
63 }
64 
SetLocalWaterMark(WaterMark waterMark)65 void DataRequestPacket::SetLocalWaterMark(WaterMark waterMark)
66 {
67     localWaterMark_ = waterMark;
68 }
69 
GetLocalWaterMark() const70 WaterMark DataRequestPacket::GetLocalWaterMark() const
71 {
72     return localWaterMark_;
73 }
74 
SetPeerWaterMark(WaterMark waterMark)75 void DataRequestPacket::SetPeerWaterMark(WaterMark waterMark)
76 {
77     peerWaterMark_ = waterMark;
78 }
79 
GetPeerWaterMark() const80 WaterMark DataRequestPacket::GetPeerWaterMark() const
81 {
82     return peerWaterMark_;
83 }
84 
SetSendCode(int32_t errCode)85 void DataRequestPacket::SetSendCode(int32_t errCode)
86 {
87     sendCode_ = errCode;
88 }
89 
GetSendCode() const90 int32_t DataRequestPacket::GetSendCode() const
91 {
92     return sendCode_;
93 }
94 
SetMode(int32_t mode)95 void DataRequestPacket::SetMode(int32_t mode)
96 {
97     mode_ = mode;
98 }
99 
GetMode() const100 int32_t DataRequestPacket::GetMode() const
101 {
102     return mode_;
103 }
104 
SetSessionId(uint32_t sessionId)105 void DataRequestPacket::SetSessionId(uint32_t sessionId)
106 {
107     sessionId_ = sessionId;
108 }
109 
GetSessionId() const110 uint32_t DataRequestPacket::GetSessionId() const
111 {
112     return sessionId_;
113 }
114 
SetVersion(uint32_t version)115 void DataRequestPacket::SetVersion(uint32_t version)
116 {
117     version_ = version;
118 }
119 
GetVersion() const120 uint32_t DataRequestPacket::GetVersion() const
121 {
122     return version_;
123 }
124 
SetReserved(std::vector<uint64_t> & reserved)125 void DataRequestPacket::SetReserved(std::vector<uint64_t> &reserved)
126 {
127     reserved_ = std::move(reserved);
128 }
129 
SetReserved(std::vector<uint64_t> && reserved)130 void DataRequestPacket::SetReserved(std::vector<uint64_t> &&reserved)
131 {
132     reserved_ = reserved;
133 }
134 
GetReserved() const135 std::vector<uint64_t> DataRequestPacket::GetReserved() const
136 {
137     return reserved_;
138 }
139 
GetPacketId() const140 uint64_t DataRequestPacket::GetPacketId() const
141 {
142     uint64_t packetId = 0;
143     std::vector<uint64_t> DataRequestReserve = GetReserved();
144     if (DataRequestReserve.size() > REQUEST_PACKET_RESERVED_INDEX_PACKETID) {
145         return DataRequestReserve[REQUEST_PACKET_RESERVED_INDEX_PACKETID];
146     } else {
147         return packetId;
148     }
149 }
150 
CalculateLen(uint32_t messageId) const151 uint32_t DataRequestPacket::CalculateLen(uint32_t messageId) const
152 {
153     uint64_t totalLen = GenericSingleVerKvEntry::CalculateLens(
154         IsCompressData() ? std::vector<SendDataItem> {} : data_, version_); // for data
155     if (totalLen == 0) {
156         return 0;
157     }
158     totalLen += Parcel::GetUInt64Len(); // endWaterMark
159     totalLen += Parcel::GetUInt64Len(); // localWaterMark
160     totalLen += Parcel::GetUInt64Len(); // peerWaterMark
161     totalLen += Parcel::GetIntLen(); // sendCode
162     totalLen += Parcel::GetIntLen(); // mode
163     totalLen += Parcel::GetUInt32Len(); // sessionId
164     totalLen += Parcel::GetUInt32Len(); // version
165     totalLen += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
166 
167     if (version_ > SOFTWARE_VERSION_RELEASE_2_0) {
168         totalLen += Parcel::GetUInt32Len(); // flag bit0 used for isLastSequence
169     }
170     totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
171     if (totalLen > INT32_MAX) {
172         return 0;
173     }
174     if (messageId == QUERY_SYNC_MESSAGE) {
175         // deleted watermark
176         totalLen += Parcel::GetUInt64Len();
177         // query id
178         totalLen += Parcel::GetStringLen(queryId_);
179         // add for queryObject
180         totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
181     }
182     if (IsCompressData()) {
183         totalLen += GenericSingleVerKvEntry::CalculateCompressedLens(compressData_); // add forcompressData_
184     }
185 
186     if (version_ > SOFTWARE_VERSION_RELEASE_2_0 && IsExtraConditionData()) {
187         totalLen += Parcel::GetUInt32Len(); // extraCondition size
188         for (const auto &entry : extraConditions_) {
189             totalLen += Parcel::GetStringLen(entry.first);
190             totalLen += Parcel::GetStringLen(entry.second);
191         }
192         totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
193     }
194     if (totalLen > INT32_MAX) {
195         return 0;
196     }
197     return totalLen;
198 }
199 
SetFlag(uint32_t flag)200 void DataRequestPacket::SetFlag(uint32_t flag)
201 {
202     flag_ = flag;
203 }
204 
GetFlag() const205 uint32_t DataRequestPacket::GetFlag() const
206 {
207     return flag_;
208 }
209 
IsLastSequence() const210 bool DataRequestPacket::IsLastSequence() const
211 {
212     return ((flag_ & IS_LAST_SEQUENCE) == IS_LAST_SEQUENCE);
213 }
214 
SetLastSequence()215 void DataRequestPacket::SetLastSequence()
216 {
217     flag_ = flag_ | IS_LAST_SEQUENCE;
218 }
219 
IsNeedUpdateWaterMark() const220 bool DataRequestPacket::IsNeedUpdateWaterMark() const
221 {
222     return !((flag_ & IS_UPDATE_WATER) == IS_UPDATE_WATER);
223 }
224 
SetUpdateWaterMark()225 void DataRequestPacket::SetUpdateWaterMark()
226 {
227     flag_ = flag_ | IS_UPDATE_WATER;
228 }
229 
SetCompressDataMark()230 void DataRequestPacket::SetCompressDataMark()
231 {
232     flag_ = flag_ | IS_COMPRESS_DATA;
233 }
234 
IsCompressData() const235 bool DataRequestPacket::IsCompressData() const
236 {
237     return ((flag_ & IS_COMPRESS_DATA) == IS_COMPRESS_DATA);
238 }
239 
SetCompressAlgo(CompressAlgorithm algo)240 void DataRequestPacket::SetCompressAlgo(CompressAlgorithm algo)
241 {
242     algo_ = algo;
243 }
244 
GetCompressAlgo() const245 CompressAlgorithm DataRequestPacket::GetCompressAlgo() const
246 {
247     return algo_;
248 }
249 
SetBasicInfo(int sendCode,uint32_t version,int32_t mode)250 void DataRequestPacket::SetBasicInfo(int sendCode, uint32_t version, int32_t mode)
251 {
252     SetSendCode(sendCode);
253     SetVersion(version);
254     SetMode(mode);
255 }
256 
SetWaterMark(WaterMark localMark,WaterMark peerMark,WaterMark deletedWatermark)257 void DataRequestPacket::SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark)
258 {
259     localWaterMark_ = localMark;
260     peerWaterMark_ = peerMark;
261     deletedWatermark_ = deletedWatermark;
262 }
263 
SetQuery(const QuerySyncObject & query)264 void DataRequestPacket::SetQuery(const QuerySyncObject &query)
265 {
266     query_ = query;
267 }
268 
GetQuery() const269 QuerySyncObject DataRequestPacket::GetQuery() const
270 {
271     return query_;
272 }
273 
SetQueryId(const std::string & queryId)274 void DataRequestPacket::SetQueryId(const std::string &queryId)
275 {
276     queryId_ = queryId;
277 }
278 
GetQueryId() const279 std::string DataRequestPacket::GetQueryId() const
280 {
281     return queryId_;
282 }
283 
SetDeletedWaterMark(WaterMark watermark)284 void DataRequestPacket::SetDeletedWaterMark(WaterMark watermark)
285 {
286     deletedWatermark_ = watermark;
287 }
288 
GetDeletedWaterMark() const289 WaterMark DataRequestPacket::GetDeletedWaterMark() const
290 {
291     return deletedWatermark_;
292 }
293 
SetExtraConditions(const std::map<std::string,std::string> & extraConditions)294 void DataRequestPacket::SetExtraConditions(const std::map<std::string, std::string> &extraConditions)
295 {
296     extraConditions_ = extraConditions;
297     flag_ |= IS_CONDITION_DATA;
298 }
299 
GetExtraConditions() const300 std::map<std::string, std::string> DataRequestPacket::GetExtraConditions() const
301 {
302     return extraConditions_;
303 }
304 
IsExtraConditionData() const305 bool DataRequestPacket::IsExtraConditionData() const
306 {
307     return ((flag_ & IS_CONDITION_DATA) == IS_CONDITION_DATA);
308 }
309 
SetData(uint64_t data)310 void DataAckPacket::SetData(uint64_t data)
311 {
312     data_ = data;
313 }
314 
GetData() const315 uint64_t DataAckPacket::GetData() const
316 {
317     return data_;
318 }
319 
SetRecvCode(int32_t errorCode)320 void DataAckPacket::SetRecvCode(int32_t errorCode)
321 {
322     recvCode_ = errorCode;
323 }
324 
GetRecvCode() const325 int32_t DataAckPacket::GetRecvCode() const
326 {
327     return recvCode_;
328 }
329 
SetVersion(uint32_t version)330 void DataAckPacket::SetVersion(uint32_t version)
331 {
332     version_ = version;
333 }
334 
GetVersion() const335 uint32_t DataAckPacket::GetVersion() const
336 {
337     return version_;
338 }
339 
SetReserved(std::vector<uint64_t> & reserved)340 void DataAckPacket::SetReserved(std::vector<uint64_t> &reserved)
341 {
342     reserved_ = std::move(reserved);
343 }
344 
GetReserved() const345 std::vector<uint64_t> DataAckPacket::GetReserved() const
346 {
347     return reserved_;
348 }
349 
GetPacketId() const350 uint64_t DataAckPacket::GetPacketId() const
351 {
352     uint64_t packetId = 0;
353     std::vector<uint64_t> DataAckReserve = GetReserved();
354     if (DataAckReserve.size() > ACK_PACKET_RESERVED_INDEX_PACKETID) {
355         packetId = DataAckReserve[ACK_PACKET_RESERVED_INDEX_PACKETID];
356     }
357     // while remote db is close and open again, it may not carry packetId
358     // so the second index is deletewatermark if it is the query Sync, should drop the deletewatermark here
359     if (packetId > MAX_PACKETID) {
360         return 0;
361     }
362     return packetId;
363 }
364 
IsPacketIdValid(uint64_t packetId)365 bool DataAckPacket::IsPacketIdValid(uint64_t packetId)
366 {
367     return (packetId > 0);
368 }
369 
CalculateLen() const370 uint32_t DataAckPacket::CalculateLen() const
371 {
372     uint64_t len = Parcel::GetUInt64Len(); // ackWaterMark
373     len += Parcel::GetIntLen(); // recvCode
374     len += Parcel::GetUInt32Len(); // version
375     len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
376 
377     len = Parcel::GetEightByteAlign(len);
378     if (len > INT32_MAX) {
379         return 0;
380     }
381     return len;
382 }
383 
SetPacketHead(int sendCode,uint32_t version,int32_t controlCmd,uint32_t flag)384 void ControlRequestPacket::SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag)
385 {
386     sendCode_ = sendCode;
387     version_ = version;
388     controlCmdType_ = static_cast<uint32_t>(controlCmd);
389     flag_ = flag;
390 }
391 
GetSendCode() const392 int32_t ControlRequestPacket::GetSendCode() const
393 {
394     return sendCode_;
395 }
396 
GetVersion() const397 uint32_t ControlRequestPacket::GetVersion() const
398 {
399     return version_;
400 }
401 
GetcontrolCmdType() const402 uint32_t ControlRequestPacket::GetcontrolCmdType() const
403 {
404     return controlCmdType_;
405 }
406 
GetFlag() const407 uint32_t ControlRequestPacket::GetFlag() const
408 {
409     return flag_;
410 }
411 
SetQuery(const QuerySyncObject & query)412 void ControlRequestPacket::SetQuery(const QuerySyncObject &query)
413 {
414     (void)query;
415 }
416 
CalculateLen() const417 uint32_t ControlRequestPacket::CalculateLen() const
418 {
419     uint64_t len = Parcel::GetUInt32Len(); // version_
420     len += Parcel::GetIntLen(); // sendCode_
421     len += Parcel::GetUInt32Len(); // controlCmdType_
422     len += Parcel::GetUInt32Len(); // flag
423 
424     len = Parcel::GetEightByteAlign(len);
425     if (len > INT32_MAX) {
426         return 0;
427     }
428     return len;
429 }
430 
SetQuery(const QuerySyncObject & query)431 void SubscribeRequest::SetQuery(const QuerySyncObject &query)
432 {
433     query_ = query;
434 }
435 
GetQuery() const436 QuerySyncObject SubscribeRequest::GetQuery() const
437 {
438     return query_;
439 }
440 
CalculateLen() const441 uint32_t SubscribeRequest::CalculateLen() const
442 {
443     uint64_t totalLen = ControlRequestPacket::CalculateLen();
444     if (totalLen == 0) {
445         LOGE("[SubscribeRequest] cal packet len failed");
446         return 0;
447     }
448     // add for queryObject
449     totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
450     if (totalLen > INT32_MAX) {
451         return 0;
452     }
453     return totalLen;
454 }
455 
IsAutoSubscribe() const456 bool SubscribeRequest::IsAutoSubscribe() const
457 {
458     return ((GetFlag() & IS_AUTO_SUBSCRIBE) == IS_AUTO_SUBSCRIBE);
459 }
460 
SetPacketHead(int recvCode,uint32_t version,int32_t controlCmd,uint32_t flag)461 void ControlAckPacket::SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag)
462 {
463     recvCode_ = recvCode;
464     version_ = version;
465     controlCmdType_ = static_cast<uint32_t>(controlCmd);
466     flag_ = flag;
467 }
468 
GetRecvCode() const469 int32_t ControlAckPacket::GetRecvCode() const
470 {
471     return recvCode_;
472 }
473 
GetVersion() const474 uint32_t ControlAckPacket::GetVersion() const
475 {
476     return version_;
477 }
478 
GetcontrolCmdType() const479 uint32_t ControlAckPacket::GetcontrolCmdType() const
480 {
481     return controlCmdType_;
482 }
483 
GetFlag() const484 uint32_t ControlAckPacket::GetFlag() const
485 {
486     return flag_;
487 }
488 
CalculateLen() const489 uint32_t ControlAckPacket::CalculateLen() const
490 {
491     uint64_t len = Parcel::GetUInt32Len(); // version_
492     len += Parcel::GetIntLen(); // recvCode_
493     len += Parcel::GetUInt32Len(); // controlCmdType_
494     len += Parcel::GetUInt32Len(); // flag
495     len = Parcel::GetEightByteAlign(len);
496     if (len > INT32_MAX) {
497         return 0;
498     }
499     return len;
500 }
501 } // namespace DistributedDB
502