• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_serialize_manager.h"
17 
18 #include "db_common.h"
19 #include "generic_single_ver_kv_entry.h"
20 #include "icommunicator.h"
21 #include "log_print.h"
22 #include "message_transform.h"
23 #include "parcel.h"
24 #include "remote_executor_packet.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "sync_types.h"
27 #include "version.h"
28 
29 namespace DistributedDB {
30 std::mutex SingleVerSerializeManager::handlesLock_;
31 std::map<uint32_t, TransformFunc> SingleVerSerializeManager::messageHandles_;
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)32 int SingleVerSerializeManager::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
33 {
34     if ((buffer == nullptr) || length == 0u || !(IsPacketValid(inMsg))) {
35         return -E_MESSAGE_ID_ERROR;
36     }
37     SerializeFunc serializeFunc = nullptr;
38     {
39         std::lock_guard<std::mutex> autoLock(handlesLock_);
40         if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
41             serializeFunc = messageHandles_.at(inMsg->GetMessageId()).serializeFunc;
42         }
43     }
44     if (serializeFunc != nullptr) {
45         return serializeFunc(buffer, length, inMsg);
46     }
47 
48     if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
49         return ControlSerialization(buffer, length, inMsg);
50     }
51     return DataSerialization(buffer, length, inMsg);
52 }
53 
DataSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)54 int SingleVerSerializeManager::DataSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
55 {
56     switch (inMsg->GetMessageType()) {
57         case TYPE_REQUEST:
58             return DataPacketSerialization(buffer, length, inMsg);
59         case TYPE_RESPONSE:
60         case TYPE_NOTIFY:
61             return AckPacketSerialization(buffer, length, inMsg);
62         default:
63             return -E_MESSAGE_TYPE_ERROR;
64     }
65 }
66 
ControlSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)67 int SingleVerSerializeManager::ControlSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
68 {
69     switch (inMsg->GetMessageType()) {
70         case TYPE_REQUEST:
71             return ControlPacketSerialization(buffer, length, inMsg);
72         case TYPE_RESPONSE:
73             return AckControlPacketSerialization(buffer, length, inMsg);
74         default:
75             return -E_MESSAGE_TYPE_ERROR;
76     }
77 }
78 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)79 int SingleVerSerializeManager::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
80 {
81     if ((buffer == nullptr) || length == 0u || !(IsPacketValid(inMsg))) {
82         return -E_MESSAGE_ID_ERROR;
83     }
84     DeserializeFunc deserializeFunc = nullptr;
85     {
86         std::lock_guard<std::mutex> autoLock(handlesLock_);
87         if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
88             deserializeFunc = messageHandles_.at(inMsg->GetMessageId()).deserializeFunc;
89         }
90     }
91     if (deserializeFunc != nullptr) {
92         return deserializeFunc(buffer, length, inMsg);
93     }
94     if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
95         return ControlDeSerialization(buffer, length, inMsg);
96     }
97     return DataDeSerialization(buffer, length, inMsg);
98 }
99 
DataDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)100 int SingleVerSerializeManager::DataDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
101 {
102     switch (inMsg->GetMessageType()) {
103         case TYPE_REQUEST:
104             return DataPacketDeSerialization(buffer, length, inMsg);
105         case TYPE_RESPONSE:
106         case TYPE_NOTIFY:
107             return AckPacketDeSerialization(buffer, length, inMsg);
108         default:
109             return -E_MESSAGE_TYPE_ERROR;
110     }
111 }
112 
ControlDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)113 int SingleVerSerializeManager::ControlDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
114 {
115     switch (inMsg->GetMessageType()) {
116         case TYPE_REQUEST:
117             return ControlPacketDeSerialization(buffer, length, inMsg);
118         case TYPE_RESPONSE:
119             return AckControlPacketDeSerialization(buffer, length, inMsg);
120         default:
121             return -E_MESSAGE_TYPE_ERROR;
122     }
123 }
124 
CalculateLen(const Message * inMsg)125 uint32_t SingleVerSerializeManager::CalculateLen(const Message *inMsg)
126 {
127     if (!(IsPacketValid(inMsg))) {
128         return 0;
129     }
130     ComputeLengthFunc computeFunc = nullptr;
131     {
132         std::lock_guard<std::mutex> autoLock(handlesLock_);
133         if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
134             computeFunc = messageHandles_.at(inMsg->GetMessageId()).computeFunc;
135         }
136     }
137     if (computeFunc != nullptr) {
138         return computeFunc(inMsg);
139     }
140     if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
141         return CalculateControlLen(inMsg);
142     }
143     return CalculateDataLen(inMsg);
144 }
145 
CalculateDataLen(const Message * inMsg)146 uint32_t SingleVerSerializeManager::CalculateDataLen(const Message *inMsg)
147 {
148     uint32_t len = 0;
149     int errCode;
150     switch (inMsg->GetMessageType()) {
151         case TYPE_REQUEST:
152             errCode = DataPacketCalculateLen(inMsg, len);
153             if (errCode != E_OK) {
154                 LOGE("[CalculateDataLen] calculate data request packet len failed, errCode=%d", errCode);
155                 return 0;
156             }
157             return len;
158         case TYPE_RESPONSE:
159         case TYPE_NOTIFY:
160             errCode = AckPacketCalculateLen(inMsg, len);
161             if (errCode != E_OK) {
162                 LOGE("[CalculateDataLen] calculate data notify packet len failed errCode=%d", errCode);
163                 return 0;
164             }
165             return len;
166         default:
167             return 0;
168     }
169 }
170 
CalculateControlLen(const Message * inMsg)171 uint32_t SingleVerSerializeManager::CalculateControlLen(const Message *inMsg)
172 {
173     uint32_t len = 0;
174     int errCode;
175     switch (inMsg->GetMessageType()) {
176         case TYPE_REQUEST:
177             errCode = ControlPacketCalculateLen(inMsg, len);
178             if (errCode != E_OK) {
179                 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
180                 return 0;
181             }
182             return len;
183         case TYPE_RESPONSE:
184         case TYPE_NOTIFY:
185             errCode = AckControlPacketCalculateLen(inMsg, len);
186             if (errCode != E_OK) {
187                 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
188                 return 0;
189             }
190             return len;
191         default:
192             return 0;
193     }
194 }
195 
RegisterTransformFunc()196 int SingleVerSerializeManager::RegisterTransformFunc()
197 {
198     RegisterInnerTransformFunc();
199     return RegisterCommunicatorTransformFunc();
200 }
201 
DataPacketSyncerPartSerialization(Parcel & parcel,const DataRequestPacket * packet)202 int SingleVerSerializeManager::DataPacketSyncerPartSerialization(Parcel &parcel, const DataRequestPacket *packet)
203 {
204     parcel.WriteUInt64(packet->GetEndWaterMark());
205     parcel.WriteUInt64(packet->GetLocalWaterMark());
206     parcel.WriteUInt64(packet->GetPeerWaterMark());
207     parcel.WriteInt(packet->GetSendCode());
208     parcel.WriteInt(packet->GetMode());
209     parcel.WriteUInt32(packet->GetSessionId());
210     parcel.WriteVector<uint64_t>(packet->GetReserved());
211     if (parcel.IsError()) {
212         return -E_PARSE_FAIL;
213     }
214     if (packet->GetVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
215         parcel.WriteUInt32(packet->GetFlag());
216     }
217     parcel.EightByteAlign();
218     if (parcel.IsError()) {
219         return -E_PARSE_FAIL;
220     }
221     return E_OK;
222 }
223 
DataPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)224 int SingleVerSerializeManager::DataPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
225 {
226     auto packet = inMsg->GetObject<DataRequestPacket>();
227     if (packet == nullptr) {
228         return -E_INVALID_ARGS;
229     }
230     Parcel parcel(buffer, length);
231 
232     // version
233     int errCode = parcel.WriteUInt32(packet->GetVersion());
234     if (errCode != E_OK) {
235         LOGE("[DataPacketSerialization] Serialize version failed");
236         return errCode;
237     }
238     // sendDataItems
239     errCode = GenericSingleVerKvEntry::SerializeDatas(
240         (packet->IsCompressData() ? std::vector<SendDataItem> {} : packet->GetData()), parcel, packet->GetVersion());
241     if (errCode != E_OK) {
242         LOGE("[DataPacketSerialization] Serialize Data failed");
243         return errCode;
244     }
245 
246     // data sync
247     errCode = DataPacketSyncerPartSerialization(parcel, packet);
248     if (errCode != E_OK) {
249         LOGE("[DataPacketSerialization] Serialize Data failed");
250         return errCode;
251     }
252     if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
253         errCode = DataPacketQuerySyncSerialization(parcel, packet); // for query sync
254         if (errCode != E_OK) {
255             return errCode;
256         }
257     }
258     if (packet->IsCompressData()) {
259         // serialize compress data
260         errCode = GenericSingleVerKvEntry::SerializeCompressedDatas(packet->GetData(), packet->GetCompressData(),
261             parcel, packet->GetVersion(), packet->GetCompressAlgo());
262         if (errCode != E_OK) {
263             LOGE("[DataPacketSerialization] Serialize compress Data failed");
264             return errCode;
265         }
266     }
267     return DataPacketInnerSerialization(packet, parcel);
268 }
269 
DataPacketQuerySyncSerialization(Parcel & parcel,const DataRequestPacket * packet)270 int SingleVerSerializeManager::DataPacketQuerySyncSerialization(Parcel &parcel, const DataRequestPacket *packet)
271 {
272     // deleted record send watermark
273     int errCode = parcel.WriteUInt64(packet->GetDeletedWaterMark());
274     if (errCode != E_OK) {
275         LOGE("[QuerySerialization] Serialize deleted record send watermark failed!");
276         return errCode;
277     }
278 
279     // query identify
280     QuerySyncObject queryObj = packet->GetQuery();
281     errCode = parcel.WriteString(packet->GetQueryId());
282     if (errCode != E_OK) {
283         LOGE("[QuerySerialization] Serialize query id failed!");
284         return errCode;
285     }
286     if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
287         // need to check.
288         errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
289     }
290     return errCode;
291 }
292 
DataPacketCalculateLen(const Message * inMsg,uint32_t & len)293 int SingleVerSerializeManager::DataPacketCalculateLen(const Message *inMsg, uint32_t &len)
294 {
295     const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
296     if (packet == nullptr) {
297         return -E_INVALID_ARGS;
298     }
299 
300     len = packet->CalculateLen(inMsg->GetMessageId());
301     return E_OK;
302 }
303 
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)304 int SingleVerSerializeManager::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
305 {
306     const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
307     if (packet == nullptr) {
308         return -E_INVALID_ARGS;
309     }
310 
311     len = packet->CalculateLen();
312     return E_OK;
313 }
314 
IsPacketValid(const Message * inMsg)315 bool SingleVerSerializeManager::IsPacketValid(const Message *inMsg)
316 {
317     if (inMsg == nullptr) {
318         return false;
319     }
320 
321     int msgType = inMsg->GetMessageType();
322     if (msgType != TYPE_REQUEST && msgType != TYPE_RESPONSE && msgType != TYPE_NOTIFY) {
323         LOGE("[DataSync][IsPacketValid] Message type ERROR! message type=%d", msgType);
324         return false;
325     }
326     return true;
327 }
328 
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)329 int SingleVerSerializeManager::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
330 {
331     const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
332     if (packet == nullptr) {
333         return -E_INVALID_ARGS;
334     }
335 
336     Parcel parcel(buffer, length);
337     parcel.WriteUInt32(packet->GetVersion());
338     if (parcel.IsError()) {
339         return -E_PARSE_FAIL;
340     }
341     // now V1 compatible for softWareVersion :{101, 102}
342     return AckPacketSyncerPartSerializationV1(parcel, packet);
343 }
344 
AckPacketSyncerPartSerializationV1(Parcel & parcel,const DataAckPacket * packet)345 int SingleVerSerializeManager::AckPacketSyncerPartSerializationV1(Parcel &parcel, const DataAckPacket *packet)
346 {
347     parcel.WriteUInt64(packet->GetData());
348     parcel.WriteInt(packet->GetRecvCode());
349     parcel.WriteVector<uint64_t>(packet->GetReserved());
350     parcel.EightByteAlign();
351     if (parcel.IsError()) {
352         return -E_PARSE_FAIL;
353     }
354     return E_OK;
355 }
356 
DataPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)357 int SingleVerSerializeManager::DataPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
358 {
359     std::vector<SendDataItem> dataItems;
360     uint32_t version;
361     Parcel parcel(const_cast<uint8_t *>(buffer), length);
362     uint32_t packLen = parcel.ReadUInt32(version);
363     if (parcel.IsError()) {
364         return -E_PARSE_FAIL;
365     }
366 
367     if (version > SOFTWARE_VERSION_CURRENT) {
368         return -E_VERSION_NOT_SUPPORT;
369     }
370 
371     packLen += static_cast<uint32_t>(GenericSingleVerKvEntry::DeSerializeDatas(dataItems, parcel));
372     if (parcel.IsError()) {
373         return -E_PARSE_FAIL;
374     }
375 
376     auto packet = new (std::nothrow) DataRequestPacket();
377     if (packet == nullptr) {
378         return -E_OUT_OF_MEMORY;
379     }
380 
381     packet->SetVersion(version);
382     packet->SetData(dataItems);
383     int errCode = DataPacketSyncerPartDeSerialization(parcel, packet, packLen, length, version);
384     if (errCode != E_OK) {
385         goto ERROR;
386     }
387     if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
388         errCode = DataPacketQuerySyncDeSerialization(parcel, packet);
389         if (errCode != E_OK) {
390             goto ERROR;
391         }
392     }
393     errCode = DataPacketInnerDeSerialization(packet, parcel);
394     if (errCode != E_OK) {
395         goto ERROR;
396     }
397     errCode = inMsg->SetExternalObject<>(packet);
398     if (errCode == E_OK) {
399         return errCode;
400     }
401 
402 ERROR:
403     delete packet;
404     packet = nullptr;
405     return errCode;
406 }
407 
DataPacketQuerySyncDeSerialization(Parcel & parcel,DataRequestPacket * packet)408 int SingleVerSerializeManager::DataPacketQuerySyncDeSerialization(Parcel &parcel, DataRequestPacket *packet)
409 {
410     WaterMark deletedWatermark = 0;
411     parcel.ReadUInt64(deletedWatermark);
412     std::string queryId;
413     parcel.ReadString(queryId);
414     if (parcel.IsError()) {
415         return -E_PARSE_FAIL;
416     }
417     // query identify
418     QuerySyncObject querySyncObj;
419     int errCode = E_OK;
420     // for version 105, query is always sent.
421     if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
422         // need to check.
423         errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
424     }
425     if (errCode != E_OK) {
426         LOGI("[SingleVerSerializeManager] DeSerializeData object failed.");
427         return errCode;
428     }
429     packet->SetDeletedWaterMark(deletedWatermark);
430     packet->SetQueryId(queryId);
431     if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
432         packet->SetQuery(querySyncObj);
433     }
434     return E_OK;
435 }
436 
DataPacketCompressDataDeSerialization(Parcel & parcel,DataRequestPacket * packet)437 int SingleVerSerializeManager::DataPacketCompressDataDeSerialization(Parcel &parcel, DataRequestPacket *packet)
438 {
439     std::vector<SendDataItem> originalData;
440     int errCode = GenericSingleVerKvEntry::DeSerializeCompressedDatas(originalData, parcel);
441     if (errCode != E_OK) {
442         LOGE("[SingleVerSerializeManager] DeSerializeComptressData failed, errCode=%d", errCode);
443         return errCode;
444     }
445     packet->SetData(originalData);
446     return E_OK;
447 }
448 
DataPacketSyncerPartDeSerialization(Parcel & parcel,DataRequestPacket * packet,uint32_t packLen,uint32_t length,uint32_t version)449 int SingleVerSerializeManager::DataPacketSyncerPartDeSerialization(Parcel &parcel, DataRequestPacket *packet,
450     uint32_t packLen, uint32_t length, uint32_t version)
451 {
452     WaterMark waterMark;
453     WaterMark localWaterMark;
454     WaterMark peerWaterMark;
455     int32_t sendCode;
456     int32_t mode;
457     uint32_t sessionId;
458     std::vector<uint64_t> reserved;
459 
460     uint64_t totPacketLen = packLen;
461     totPacketLen += parcel.ReadUInt64(waterMark);
462     totPacketLen += parcel.ReadUInt64(localWaterMark);
463     totPacketLen += parcel.ReadUInt64(peerWaterMark);
464     totPacketLen += parcel.ReadInt(sendCode);
465     totPacketLen += parcel.ReadInt(mode);
466     totPacketLen += parcel.ReadUInt32(sessionId);
467     totPacketLen += parcel.ReadVector<uint64_t>(reserved);
468     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
469         uint32_t flag = 0u;
470         totPacketLen += parcel.ReadUInt32(flag);
471         packet->SetFlag(flag);
472     }
473     if (totPacketLen > INT32_MAX) {
474         LOGE("[DataSync][DataPacketDeSerialization] deserialize failed! input totPackLen=%" PRIu64 " is over limit.",
475              totPacketLen);
476         return -E_LENGTH_ERROR;
477     }
478     parcel.EightByteAlign();
479     totPacketLen = Parcel::GetEightByteAlign(totPacketLen);
480     if (parcel.IsError()) {
481         LOGE("[DataSync][DataPacketDeSerialization] deserialize failed! input len=%" PRIu32 ", totPackLen=%" PRIu64,
482             length, totPacketLen);
483         return -E_LENGTH_ERROR;
484     }
485     packet->SetEndWaterMark(waterMark);
486     packet->SetLocalWaterMark(localWaterMark);
487     packet->SetPeerWaterMark(peerWaterMark);
488     packet->SetSendCode(sendCode);
489     packet->SetMode(mode);
490     packet->SetSessionId(sessionId);
491     packet->SetReserved(reserved);
492     return E_OK;
493 }
494 
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)495 int SingleVerSerializeManager::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
496 {
497     DataAckPacket packet;
498     Parcel parcel(const_cast<uint8_t *>(buffer), length);
499     uint32_t version;
500 
501     parcel.ReadUInt32(version);
502     if (parcel.IsError()) {
503         return -E_INVALID_ARGS;
504     }
505     if (version > SOFTWARE_VERSION_CURRENT) {
506         packet.SetVersion(version);
507         packet.SetRecvCode(-E_VERSION_NOT_SUPPORT);
508         return inMsg->SetCopiedObject<>(packet);
509     }
510     packet.SetVersion(version);
511     // now V1 compatible for softWareVersion :{101, 102}
512     int errCode = AckPacketSyncerPartDeSerializationV1(parcel, packet);
513     if (errCode != E_OK) {
514         return errCode;
515     }
516 
517     return inMsg->SetCopiedObject<>(packet);
518 }
519 
AckPacketSyncerPartDeSerializationV1(Parcel & parcel,DataAckPacket & packet)520 int SingleVerSerializeManager::AckPacketSyncerPartDeSerializationV1(Parcel &parcel, DataAckPacket &packet)
521 {
522     WaterMark mark;
523     int32_t errCode;
524     std::vector<uint64_t> reserved;
525 
526     parcel.ReadUInt64(mark);
527     parcel.ReadInt(errCode);
528     parcel.ReadVector<uint64_t>(reserved);
529     if (parcel.IsError()) {
530         LOGE("[AckPacketSyncerPartDeSerializationV1] DeSerialization failed");
531         return -E_INVALID_ARGS;
532     }
533     packet.SetData(mark);
534     packet.SetRecvCode(errCode);
535     packet.SetReserved(reserved);
536     return E_OK;
537 }
538 
ControlPacketCalculateLen(const Message * inMsg,uint32_t & len)539 int SingleVerSerializeManager::ControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
540 {
541     auto packet = inMsg->GetObject<SubscribeRequest>();
542     if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
543         LOGE("[ControlPacketSerialization] invalid control cmd");
544         return -E_INVALID_ARGS;
545     }
546     if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
547         return SingleVerSerializeManager::SubscribeCalculateLen(inMsg, len);
548     }
549     return E_OK;
550 }
551 
ControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)552 int SingleVerSerializeManager::ControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
553 {
554     auto packet = inMsg->GetObject<SubscribeRequest>();
555     if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
556         LOGE("[ControlPacketSerialization] invalid control cmd");
557         return -E_INVALID_ARGS;
558     }
559     if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
560         return SingleVerSerializeManager::SubscribeSerialization(buffer, length, inMsg);
561     }
562     return E_OK;
563 }
564 
ControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)565 int SingleVerSerializeManager::ControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
566 {
567     Parcel parcel(const_cast<uint8_t *>(buffer), length);
568     ControlRequestPacket packet;
569     int errCode = ControlRequestDeSerialization(parcel, packet);
570     if (errCode != E_OK) {
571         return errCode;
572     }
573     if (packet.GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet.GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
574         errCode = SubscribeDeSerialization(parcel, inMsg, packet);
575     }
576     return errCode;
577 }
578 
AckControlPacketCalculateLen(const Message * inMsg,uint32_t & len)579 int SingleVerSerializeManager::AckControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
580 {
581     auto packet = inMsg->GetObject<ControlAckPacket>();
582     if (packet == nullptr) {
583         LOGE("[AckControlPacketCalculateLen] invalid control cmd");
584         return -E_INVALID_ARGS;
585     }
586     len = packet->CalculateLen();
587     return E_OK;
588 }
589 
AckControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)590 int SingleVerSerializeManager::AckControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
591 {
592     auto packet = inMsg->GetObject<ControlAckPacket>();
593     if (packet == nullptr) {
594         return -E_INVALID_ARGS;
595     }
596     Parcel parcel(buffer, length);
597     parcel.WriteUInt32(packet->GetVersion());
598     parcel.WriteInt(packet->GetRecvCode());
599     parcel.WriteUInt32(packet->GetcontrolCmdType());
600     parcel.WriteUInt32(packet->GetFlag());
601     if (parcel.IsError()) {
602         LOGE("[AckControlPacketSerialization] Serialization failed");
603         return -E_INVALID_ARGS;
604     }
605     parcel.EightByteAlign();
606     return E_OK;
607 }
608 
AckControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)609 int SingleVerSerializeManager::AckControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
610 {
611     auto packet = new (std::nothrow) ControlAckPacket();
612     if (packet == nullptr) {
613         return -E_OUT_OF_MEMORY;
614     }
615     Parcel parcel(const_cast<uint8_t *>(buffer), length);
616     int32_t recvCode = 0;
617     uint32_t version = 0;
618     uint32_t controlCmdType = 0;
619     uint32_t flag = 0;
620     parcel.ReadUInt32(version);
621     parcel.ReadInt(recvCode);
622     parcel.ReadUInt32(controlCmdType);
623     parcel.ReadUInt32(flag);
624     int errCode;
625     if (parcel.IsError()) {
626         LOGE("[AckControlPacketDeSerialization] DeSerialization failed");
627         errCode = -E_INVALID_ARGS;
628         goto ERROR;
629     }
630     packet->SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), flag);
631     errCode = inMsg->SetExternalObject<>(packet);
632     if (errCode != E_OK) {
633         goto ERROR;
634     }
635     return errCode;
636 ERROR:
637     delete packet;
638     packet = nullptr;
639     return errCode;
640 }
641 
ControlRequestSerialization(Parcel & parcel,const Message * inMsg)642 int SingleVerSerializeManager::ControlRequestSerialization(Parcel &parcel, const Message *inMsg)
643 {
644     auto packet = inMsg->GetObject<SubscribeRequest>();
645     if (packet == nullptr) {
646         return -E_INVALID_ARGS;
647     }
648     parcel.WriteUInt32(packet->GetVersion());
649     parcel.WriteInt(packet->GetSendCode());
650     parcel.WriteUInt32(packet->GetcontrolCmdType());
651     parcel.WriteUInt32(packet->GetFlag());
652     parcel.EightByteAlign();
653     if (parcel.IsError()) {
654         LOGE("[ControlRequestSerialization] Serialization failed");
655         return -E_INVALID_ARGS;
656     }
657     return E_OK;
658 }
659 
ControlRequestDeSerialization(Parcel & parcel,ControlRequestPacket & packet)660 int SingleVerSerializeManager::ControlRequestDeSerialization(Parcel &parcel, ControlRequestPacket &packet)
661 {
662     uint32_t version = 0;
663     int32_t sendCode = 0;
664     uint32_t controlCmdType = 0;
665     uint32_t flag = 0;
666     parcel.ReadUInt32(version);
667     if (version > SOFTWARE_VERSION_CURRENT) {
668         return -E_VERSION_NOT_SUPPORT;
669     }
670     parcel.ReadInt(sendCode);
671     parcel.ReadUInt32(controlCmdType);
672     parcel.ReadUInt32(flag);
673     if (parcel.IsError()) {
674         LOGE("[ControlRequestDeSerialization] deserialize failed!");
675         return -E_LENGTH_ERROR;
676     }
677     packet.SetPacketHead(sendCode, version, static_cast<int32_t>(controlCmdType), flag);
678     return E_OK;
679 }
680 
SubscribeCalculateLen(const Message * inMsg,uint32_t & len)681 int SingleVerSerializeManager::SubscribeCalculateLen(const Message *inMsg, uint32_t &len)
682 {
683     auto packet = inMsg->GetObject<SubscribeRequest>();
684     if (packet == nullptr) {
685         return -E_INVALID_ARGS;
686     }
687     len = packet->CalculateLen();
688     return E_OK;
689 }
690 
SubscribeSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)691 int SingleVerSerializeManager::SubscribeSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
692 {
693     auto packet = inMsg->GetObject<SubscribeRequest>();
694     if (packet == nullptr) {
695         return -E_INVALID_ARGS;
696     }
697     Parcel parcel(buffer, length);
698     int errCode = ControlRequestSerialization(parcel, inMsg);
699     if (errCode != E_OK) {
700         LOGE("[SubscribeSerialization] ControlRequestPacket Serialization failed, errCode=%d", errCode);
701         return errCode;
702     }
703     QuerySyncObject queryObj = packet->GetQuery();
704     errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
705     if (errCode != E_OK) {
706         LOGE("[SubscribeSerialization] query object Serialization failed, errCode=%d", errCode);
707         return errCode;
708     }
709     return E_OK;
710 }
711 
SubscribeDeSerialization(Parcel & parcel,Message * inMsg,ControlRequestPacket & controlPacket)712 int SingleVerSerializeManager::SubscribeDeSerialization(Parcel &parcel, Message *inMsg,
713     ControlRequestPacket &controlPacket)
714 {
715     auto packet = new (std::nothrow) SubscribeRequest();
716     if (packet == nullptr) {
717         return -E_OUT_OF_MEMORY;
718     }
719     QuerySyncObject querySyncObj;
720     int errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
721     if (errCode != E_OK) {
722         goto ERROR;
723     }
724     packet->SetPacketHead(controlPacket.GetSendCode(), controlPacket.GetVersion(),
725         static_cast<int32_t>(controlPacket.GetcontrolCmdType()), controlPacket.GetFlag());
726     packet->SetQuery(querySyncObj);
727     errCode = inMsg->SetExternalObject<>(packet);
728     if (errCode != E_OK) {
729         goto ERROR;
730     }
731     return errCode;
732 ERROR:
733     delete packet;
734     packet = nullptr;
735     return errCode;
736 }
737 
RegisterCommunicatorTransformFunc()738 int SingleVerSerializeManager::RegisterCommunicatorTransformFunc()
739 {
740     TransformFunc func;
741     func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); };
742     func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
743         return Serialization(buffer, length, inMsg);
744     };
745     func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
746         return DeSerialization(buffer, length, inMsg);
747     };
748 
749     static std::vector<MessageId> messageIds = {
750         QUERY_SYNC_MESSAGE, DATA_SYNC_MESSAGE, CONTROL_SYNC_MESSAGE, REMOTE_EXECUTE_MESSAGE
751     };
752     int errCode = E_OK;
753     for (auto &id : messageIds) {
754         int retCode = MessageTransform::RegTransformFunction(static_cast<uint32_t>(id), func);
755         if (retCode != E_OK) {
756             LOGE("[SingleVerSerializeManager][RegisterTransformFunc] regist messageId %u failed %d",
757                 static_cast<uint32_t>(id), retCode);
758             errCode = retCode;
759         }
760     }
761     return errCode;
762 }
763 
RegisterInnerTransformFunc()764 void SingleVerSerializeManager::RegisterInnerTransformFunc()
765 {
766     TransformFunc func;
767     func.computeFunc = [](const Message *inMsg) { return ISyncPacketCalculateLen(inMsg); };
768     func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
769         return ISyncPacketSerialization(buffer, length, inMsg);
770     };
771     func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
772         return ISyncPacketDeSerialization(buffer, length, inMsg);
773     };
774     std::lock_guard<std::mutex> autoLock(handlesLock_);
775     messageHandles_.emplace(static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE), func);
776 }
777 
ISyncPacketCalculateLen(const Message * inMsg)778 uint32_t SingleVerSerializeManager::ISyncPacketCalculateLen(const Message *inMsg)
779 {
780     if (inMsg == nullptr) {
781         return 0u;
782     }
783     uint32_t len = 0u;
784     const auto packet = inMsg->GetObject<ISyncPacket>();
785     if (packet != nullptr) {
786         len = packet->CalculateLen();
787     }
788     return len;
789 }
790 
ISyncPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)791 int SingleVerSerializeManager::ISyncPacketSerialization(uint8_t *buffer, uint32_t length,
792     const Message *inMsg)
793 {
794     if (inMsg == nullptr) {
795         return -E_INVALID_ARGS;
796     }
797     int errCode = E_OK;
798     Parcel parcel(buffer, length);
799     auto packet = inMsg->GetObject<ISyncPacket>();
800     if (packet != nullptr) {
801         errCode = packet->Serialization(parcel);
802     }
803     return errCode;
804 }
805 
ISyncPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)806 int SingleVerSerializeManager::ISyncPacketDeSerialization(const uint8_t *buffer, uint32_t length,
807     Message *inMsg)
808 {
809     if (inMsg == nullptr) {
810         return -E_INVALID_ARGS;
811     }
812     ISyncPacket *packet = nullptr;
813     int errCode = BuildISyncPacket(inMsg, packet);
814     if (errCode != E_OK) {
815         return errCode;
816     }
817     Parcel parcel(const_cast<uint8_t *>(buffer), length);
818     do {
819         errCode = packet->DeSerialization(parcel);
820         if (errCode != E_OK) {
821             break;
822         }
823         errCode = inMsg->SetExternalObject(packet);
824     } while (false);
825     if (errCode != E_OK) {
826         delete packet;
827         packet = nullptr;
828     }
829     return E_OK;
830 }
831 
BuildISyncPacket(Message * inMsg,ISyncPacket * & packet)832 int SingleVerSerializeManager::BuildISyncPacket(Message *inMsg, ISyncPacket *&packet)
833 {
834     uint32_t messageId = inMsg->GetMessageId();
835     if (messageId != static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE)) {
836         return -E_INVALID_ARGS;
837     }
838     switch (inMsg->GetMessageType()) {
839         case TYPE_REQUEST:
840             packet = new(std::nothrow) RemoteExecutorRequestPacket();
841             break;
842         case TYPE_RESPONSE:
843             packet = new(std::nothrow) RemoteExecutorAckPacket();
844             break;
845         default:
846             packet = nullptr;
847             break;
848     }
849     if (packet == nullptr) {
850         return -E_OUT_OF_MEMORY;
851     }
852     return E_OK;
853 }
854 
DataPacketExtraConditionsSerialization(Parcel & parcel,const DataRequestPacket * packet)855 int SingleVerSerializeManager::DataPacketExtraConditionsSerialization(Parcel &parcel, const DataRequestPacket *packet)
856 {
857     std::map<std::string, std::string> extraConditions = packet->GetExtraConditions();
858     if (extraConditions.size() > DBConstant::MAX_CONDITION_COUNT) {
859         return -E_INVALID_ARGS;
860     }
861     parcel.WriteUInt32(static_cast<uint32_t>(extraConditions.size()));
862     for (const auto &entry : extraConditions) {
863         if (entry.first.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
864             entry.second.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
865             return -E_INVALID_ARGS;
866         }
867         parcel.WriteString(entry.first);
868         parcel.WriteString(entry.second);
869     }
870     parcel.EightByteAlign();
871     if (parcel.IsError()) {
872         return -E_PARSE_FAIL;
873     }
874     return E_OK;
875 }
876 
DataPacketExtraConditionsDeserialization(Parcel & parcel,DataRequestPacket * packet)877 int SingleVerSerializeManager::DataPacketExtraConditionsDeserialization(Parcel &parcel, DataRequestPacket *packet)
878 {
879     if (!packet->IsExtraConditionData()) {
880         return E_OK;
881     }
882     uint32_t conditionSize = 0u;
883     (void) parcel.ReadUInt32(conditionSize);
884     if (conditionSize > DBConstant::MAX_CONDITION_COUNT) {
885         return -E_INVALID_ARGS;
886     }
887     std::map<std::string, std::string> extraConditions;
888     for (uint32_t i = 0; i < conditionSize; i++) {
889         std::string conditionKey;
890         std::string conditionVal;
891         (void) parcel.ReadString(conditionKey);
892         (void) parcel.ReadString(conditionVal);
893         if (conditionKey.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
894             conditionVal.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
895             return -E_INVALID_ARGS;
896         }
897         extraConditions[conditionKey] = conditionVal;
898     }
899     parcel.EightByteAlign();
900     if (parcel.IsError()) {
901         return -E_PARSE_FAIL;
902     }
903     packet->SetExtraConditions(extraConditions);
904     return E_OK;
905 }
906 
DataPacketInnerDeSerialization(DataRequestPacket * packet,Parcel & parcel)907 int SingleVerSerializeManager::DataPacketInnerDeSerialization(DataRequestPacket *packet, Parcel &parcel)
908 {
909     int errCode = E_OK;
910     if (packet->IsCompressData()) {
911         errCode = DataPacketCompressDataDeSerialization(parcel, packet);
912         if (errCode != E_OK) {
913             return errCode;
914         }
915     }
916     errCode = DataPacketExtraConditionsDeserialization(parcel, packet);
917     if (errCode != E_OK) {
918         return errCode;
919     }
920     if (packet->GetVersion() >= SOFTWARE_VERSION_RELEASE_9_0) {
921         uint64_t schemaVersion = 0u;
922         parcel.ReadUInt64(schemaVersion);
923         int64_t systemTimeOffset = 0u;
924         parcel.ReadInt64(systemTimeOffset);
925         int64_t senderTimeOffset = 0u;
926         parcel.ReadInt64(senderTimeOffset);
927         if (parcel.IsError()) {
928             LOGE("[SingleVerSerializeManager] parse schema version or time offset failed");
929             return -E_PARSE_FAIL;
930         }
931         packet->SetSchemaVersion(schemaVersion);
932         packet->SetSystemTimeOffset(systemTimeOffset);
933         packet->SetSenderTimeOffset(senderTimeOffset);
934         if (!parcel.IsContinueRead()) {
935             return errCode;
936         }
937         SecurityOption option;
938         parcel.ReadInt(option.securityLabel);
939         parcel.ReadInt(option.securityFlag);
940         packet->SetSecurityOption(option);
941     }
942     if (SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
943         uint32_t total = 0u;
944         parcel.ReadUInt32(total);
945         parcel.EightByteAlign();
946         packet->SetTotalDataCount(total);
947     }
948     return errCode;
949 }
950 
DataPacketInnerSerialization(const DataRequestPacket * packet,Parcel & parcel)951 int SingleVerSerializeManager::DataPacketInnerSerialization(const DataRequestPacket *packet, Parcel &parcel)
952 {
953     // flag mask add in 103
954     if (packet->GetVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
955         return E_OK;
956     }
957     if (packet->IsExtraConditionData()) {
958         int errCode = DataPacketExtraConditionsSerialization(parcel, packet);
959         if (errCode != E_OK) {
960             LOGE("[SingleVerSerializeManager] Serialize extra condition failed %d", errCode);
961             return errCode;
962         }
963     }
964     if (packet->GetVersion() >= SOFTWARE_VERSION_RELEASE_9_0) {
965         parcel.WriteUInt64(packet->GetSchemaVersion());
966         parcel.WriteInt64(packet->GetSystemTimeOffset());
967         parcel.WriteInt64(packet->GetSenderTimeOffset());
968         if (parcel.IsError()) {
969             LOGE("[SingleVerSerializeManager] Serialize schema version or time offset failed");
970             return -E_PARSE_FAIL;
971         }
972         auto option = packet->GetSecurityOption();
973         parcel.WriteInt(option.securityLabel);
974         parcel.WriteInt(option.securityFlag);
975         if (parcel.IsError()) {
976             LOGE("[SingleVerSerializeManager] Serialize security option failed");
977             return -E_PARSE_FAIL;
978         }
979     }
980     if (SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
981         parcel.WriteUInt32(packet->GetTotalDataCount());
982         parcel.EightByteAlign();
983     }
984     return E_OK;
985 }
986 }  // namespace DistributedDB