• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_serialize_manager.h"
17 
18 #include "db_common.h"
19 #include "generic_single_ver_kv_entry.h"
20 #include "icommunicator.h"
21 #include "log_print.h"
22 #include "message_transform.h"
23 #include "parcel.h"
24 #include "remote_executor_packet.h"
25 #include "sync_types.h"
26 #include "version.h"
27 
28 namespace DistributedDB {
29 std::mutex SingleVerSerializeManager::handlesLock_;
30 std::map<uint32_t, TransformFunc> SingleVerSerializeManager::messageHandles_;
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)31 int SingleVerSerializeManager::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
32 {
33     if ((buffer == nullptr) || !(IsPacketValid(inMsg))) {
34         return -E_MESSAGE_ID_ERROR;
35     }
36     SerializeFunc serializeFunc = nullptr;
37     {
38         std::lock_guard<std::mutex> autoLock(handlesLock_);
39         if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
40             serializeFunc = messageHandles_.at(inMsg->GetMessageId()).serializeFunc;
41         }
42     }
43     if (serializeFunc != nullptr) {
44         return serializeFunc(buffer, length, inMsg);
45     }
46 
47     if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
48         return ControlSerialization(buffer, length, inMsg);
49     }
50     return DataSerialization(buffer, length, inMsg);
51 }
52 
DataSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)53 int SingleVerSerializeManager::DataSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
54 {
55     switch (inMsg->GetMessageType()) {
56         case TYPE_REQUEST:
57             return DataPacketSerialization(buffer, length, inMsg);
58         case TYPE_RESPONSE:
59         case TYPE_NOTIFY:
60             return AckPacketSerialization(buffer, length, inMsg);
61         default:
62             return -E_MESSAGE_TYPE_ERROR;
63     }
64 }
65 
ControlSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)66 int SingleVerSerializeManager::ControlSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
67 {
68     switch (inMsg->GetMessageType()) {
69         case TYPE_REQUEST:
70             return ControlPacketSerialization(buffer, length, inMsg);
71         case TYPE_RESPONSE:
72             return AckControlPacketSerialization(buffer, length, inMsg);
73         default:
74             return -E_MESSAGE_TYPE_ERROR;
75     }
76 }
77 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)78 int SingleVerSerializeManager::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
79 {
80     if ((buffer == nullptr) || !(IsPacketValid(inMsg))) {
81         return -E_MESSAGE_ID_ERROR;
82     }
83     DeserializeFunc deserializeFunc = nullptr;
84     {
85         std::lock_guard<std::mutex> autoLock(handlesLock_);
86         if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
87             deserializeFunc = messageHandles_.at(inMsg->GetMessageId()).deserializeFunc;
88         }
89     }
90     if (deserializeFunc != nullptr) {
91         return deserializeFunc(buffer, length, inMsg);
92     }
93     if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
94         return ControlDeSerialization(buffer, length, inMsg);
95     }
96     return DataDeSerialization(buffer, length, inMsg);
97 }
98 
DataDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)99 int SingleVerSerializeManager::DataDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
100 {
101     switch (inMsg->GetMessageType()) {
102         case TYPE_REQUEST:
103             return DataPacketDeSerialization(buffer, length, inMsg);
104         case TYPE_RESPONSE:
105         case TYPE_NOTIFY:
106             return AckPacketDeSerialization(buffer, length, inMsg);
107         default:
108             return -E_MESSAGE_TYPE_ERROR;
109     }
110 }
111 
ControlDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)112 int SingleVerSerializeManager::ControlDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
113 {
114     switch (inMsg->GetMessageType()) {
115         case TYPE_REQUEST:
116             return ControlPacketDeSerialization(buffer, length, inMsg);
117         case TYPE_RESPONSE:
118             return AckControlPacketDeSerialization(buffer, length, inMsg);
119         default:
120             return -E_MESSAGE_TYPE_ERROR;
121     }
122 }
123 
CalculateLen(const Message * inMsg)124 uint32_t SingleVerSerializeManager::CalculateLen(const Message *inMsg)
125 {
126     if (!(IsPacketValid(inMsg))) {
127         return 0;
128     }
129     ComputeLengthFunc computeFunc = nullptr;
130     {
131         std::lock_guard<std::mutex> autoLock(handlesLock_);
132         if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
133             computeFunc = messageHandles_.at(inMsg->GetMessageId()).computeFunc;
134         }
135     }
136     if (computeFunc != nullptr) {
137         return computeFunc(inMsg);
138     }
139     if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
140         return CalculateControlLen(inMsg);
141     }
142     return CalculateDataLen(inMsg);
143 }
144 
CalculateDataLen(const Message * inMsg)145 uint32_t SingleVerSerializeManager::CalculateDataLen(const Message *inMsg)
146 {
147     uint32_t len = 0;
148     int errCode;
149     switch (inMsg->GetMessageType()) {
150         case TYPE_REQUEST:
151             errCode = DataPacketCalculateLen(inMsg, len);
152             if (errCode != E_OK) {
153                 LOGE("[CalculateDataLen] calculate data request packet len failed, errCode=%d", errCode);
154                 return 0;
155             }
156             return len;
157         case TYPE_RESPONSE:
158         case TYPE_NOTIFY:
159             errCode = AckPacketCalculateLen(inMsg, len);
160             if (errCode != E_OK) {
161                 LOGE("[CalculateDataLen] calculate data notify packet len failed errCode=%d", errCode);
162                 return 0;
163             }
164             return len;
165         default:
166             return 0;
167     }
168 }
169 
CalculateControlLen(const Message * inMsg)170 uint32_t SingleVerSerializeManager::CalculateControlLen(const Message *inMsg)
171 {
172     uint32_t len = 0;
173     int errCode;
174     switch (inMsg->GetMessageType()) {
175         case TYPE_REQUEST:
176             errCode = ControlPacketCalculateLen(inMsg, len);
177             if (errCode != E_OK) {
178                 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
179                 return 0;
180             }
181             return len;
182         case TYPE_RESPONSE:
183         case TYPE_NOTIFY:
184             errCode = AckControlPacketCalculateLen(inMsg, len);
185             if (errCode != E_OK) {
186                 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
187                 return 0;
188             }
189             return len;
190         default:
191             return 0;
192     }
193 }
194 
RegisterTransformFunc()195 int SingleVerSerializeManager::RegisterTransformFunc()
196 {
197     int errCode = RegisterCommunicatorTransformFunc();
198     RegisterInnerTransformFunc();
199     return errCode;
200 }
201 
DataPacketSyncerPartSerialization(Parcel & parcel,const DataRequestPacket * packet)202 int SingleVerSerializeManager::DataPacketSyncerPartSerialization(Parcel &parcel, const DataRequestPacket *packet)
203 {
204     parcel.WriteUInt64(packet->GetEndWaterMark());
205     parcel.WriteUInt64(packet->GetLocalWaterMark());
206     parcel.WriteUInt64(packet->GetPeerWaterMark());
207     parcel.WriteInt(packet->GetSendCode());
208     parcel.WriteInt(packet->GetMode());
209     parcel.WriteUInt32(packet->GetSessionId());
210     parcel.WriteVector<uint64_t>(packet->GetReserved());
211     if (parcel.IsError()) {
212         return -E_PARSE_FAIL;
213     }
214     if (packet->GetVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
215         parcel.WriteUInt32(packet->GetFlag());
216         if (parcel.IsError()) {
217             return -E_PARSE_FAIL;
218         }
219     }
220     parcel.EightByteAlign();
221     return E_OK;
222 }
223 
DataPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)224 int SingleVerSerializeManager::DataPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
225 {
226     auto packet = inMsg->GetObject<DataRequestPacket>();
227     if (packet == nullptr) {
228         return -E_INVALID_ARGS;
229     }
230     Parcel parcel(buffer, length);
231 
232     // version
233     int errCode = parcel.WriteUInt32(packet->GetVersion());
234     if (errCode != E_OK) {
235         LOGE("[DataPacketSerialization] Serialize version failed");
236         return errCode;
237     }
238     // sendDataItems
239     errCode = GenericSingleVerKvEntry::SerializeDatas(
240         (packet->IsCompressData() ? std::vector<SendDataItem> {} : packet->GetData()), parcel, packet->GetVersion());
241     if (errCode != E_OK) {
242         LOGE("[DataPacketSerialization] Serialize Data failed");
243         return errCode;
244     }
245 
246     // data sync
247     errCode = DataPacketSyncerPartSerialization(parcel, packet);
248     if (errCode != E_OK) {
249         LOGE("[DataPacketSerialization] Serialize Data failed");
250         return errCode;
251     }
252     if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
253         errCode = DataPacketQuerySyncSerialization(parcel, packet); // for query sync
254         if (errCode != E_OK) {
255             return errCode;
256         }
257     }
258     if (packet->IsCompressData()) {
259         // serialize compress data
260         errCode = GenericSingleVerKvEntry::SerializeCompressedDatas(packet->GetData(), packet->GetCompressData(),
261             parcel, packet->GetVersion(), packet->GetCompressAlgo());
262         if (errCode != E_OK) {
263             LOGE("[DataPacketSerialization] Serialize compress Data failed");
264             return errCode;
265         }
266     }
267     // flag mask add in 103
268     if (packet->GetVersion() < SOFTWARE_VERSION_RELEASE_3_0 || !packet->IsExtraConditionData()) {
269         return E_OK;
270     }
271     return DataPacketExtraConditionsSerialization(parcel, packet);
272 }
273 
DataPacketQuerySyncSerialization(Parcel & parcel,const DataRequestPacket * packet)274 int SingleVerSerializeManager::DataPacketQuerySyncSerialization(Parcel &parcel, const DataRequestPacket *packet)
275 {
276     // deleted record send watermark
277     int errCode = parcel.WriteUInt64(packet->GetDeletedWaterMark());
278     if (errCode != E_OK) {
279         LOGE("[QuerySerialization] Serialize deleted record send watermark failed!");
280         return errCode;
281     }
282 
283     // query identify
284     QuerySyncObject queryObj = packet->GetQuery();
285     errCode = parcel.WriteString(packet->GetQueryId());
286     if (errCode != E_OK) {
287         LOGE("[QuerySerialization] Serialize query id failed!");
288         return errCode;
289     }
290     if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
291         // need to check.
292         errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
293     }
294     return errCode;
295 }
296 
DataPacketCalculateLen(const Message * inMsg,uint32_t & len)297 int SingleVerSerializeManager::DataPacketCalculateLen(const Message *inMsg, uint32_t &len)
298 {
299     const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
300     if (packet == nullptr) {
301         return -E_INVALID_ARGS;
302     }
303 
304     len = packet->CalculateLen(inMsg->GetMessageId());
305     return E_OK;
306 }
307 
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)308 int SingleVerSerializeManager::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
309 {
310     const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
311     if (packet == nullptr) {
312         return -E_INVALID_ARGS;
313     }
314 
315     len = packet->CalculateLen();
316     return E_OK;
317 }
318 
IsPacketValid(const Message * inMsg)319 bool SingleVerSerializeManager::IsPacketValid(const Message *inMsg)
320 {
321     if (inMsg == nullptr) {
322         return false;
323     }
324 
325     int msgType = inMsg->GetMessageType();
326     if (msgType != TYPE_REQUEST && msgType != TYPE_RESPONSE && msgType != TYPE_NOTIFY) {
327         LOGE("[DataSync][IsPacketValid] Message type ERROR! message type=%d", msgType);
328         return false;
329     }
330     return true;
331 }
332 
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)333 int SingleVerSerializeManager::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
334 {
335     const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
336     if (packet == nullptr) {
337         return -E_INVALID_ARGS;
338     }
339 
340     Parcel parcel(buffer, length);
341     parcel.WriteUInt32(packet->GetVersion());
342     if (parcel.IsError()) {
343         return -E_PARSE_FAIL;
344     }
345     // now V1 compatible for softWareVersion :{101, 102}
346     return AckPacketSyncerPartSerializationV1(parcel, packet);
347 }
348 
AckPacketSyncerPartSerializationV1(Parcel & parcel,const DataAckPacket * packet)349 int SingleVerSerializeManager::AckPacketSyncerPartSerializationV1(Parcel &parcel, const DataAckPacket *packet)
350 {
351     parcel.WriteUInt64(packet->GetData());
352     parcel.WriteInt(packet->GetRecvCode());
353     parcel.WriteVector<uint64_t>(packet->GetReserved());
354     if (parcel.IsError()) {
355         return -E_PARSE_FAIL;
356     }
357     parcel.EightByteAlign();
358     return E_OK;
359 }
360 
DataPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)361 int SingleVerSerializeManager::DataPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
362 {
363     std::vector<SendDataItem> dataItems;
364     uint32_t version;
365     Parcel parcel(const_cast<uint8_t *>(buffer), length);
366     uint32_t packLen = parcel.ReadUInt32(version);
367     if (parcel.IsError()) {
368         return -E_PARSE_FAIL;
369     }
370 
371     if (version > SOFTWARE_VERSION_CURRENT) {
372         return -E_VERSION_NOT_SUPPORT;
373     }
374 
375     packLen += static_cast<uint32_t>(GenericSingleVerKvEntry::DeSerializeDatas(dataItems, parcel));
376     if (parcel.IsError()) {
377         return -E_PARSE_FAIL;
378     }
379 
380     auto packet = new (std::nothrow) DataRequestPacket();
381     if (packet == nullptr) {
382         return -E_OUT_OF_MEMORY;
383     }
384 
385     packet->SetVersion(version);
386     packet->SetData(dataItems);
387     int errCode = DataPacketSyncerPartDeSerialization(parcel, packet, packLen, length, version);
388     if (errCode != E_OK) {
389         goto ERROR;
390     }
391     if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
392         errCode = DataPacketQuerySyncDeSerialization(parcel, packet);
393         if (errCode != E_OK) {
394             goto ERROR;
395         }
396     }
397     if (packet->IsCompressData()) {
398         errCode = DataPacketCompressDataDeSerialization(parcel, packet);
399         if (errCode != E_OK) {
400             goto ERROR;
401         }
402     }
403     errCode = DataPacketExtraConditionsDeserialization(parcel, packet);
404     if (errCode != E_OK) {
405         goto ERROR;
406     }
407     errCode = inMsg->SetExternalObject<>(packet);
408     if (errCode != E_OK) {
409         goto ERROR;
410     }
411     return errCode;
412 
413 ERROR:
414     delete packet;
415     packet = nullptr;
416     return errCode;
417 }
418 
DataPacketQuerySyncDeSerialization(Parcel & parcel,DataRequestPacket * packet)419 int SingleVerSerializeManager::DataPacketQuerySyncDeSerialization(Parcel &parcel, DataRequestPacket *packet)
420 {
421     WaterMark deletedWatermark = 0;
422     parcel.ReadUInt64(deletedWatermark);
423     std::string queryId;
424     parcel.ReadString(queryId);
425     if (parcel.IsError()) {
426         return -E_PARSE_FAIL;
427     }
428     // query identify
429     QuerySyncObject querySyncObj;
430     int errCode = E_OK;
431     // for version 105, query is always sent.
432     if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
433         // need to check.
434         errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
435     }
436     if (errCode != E_OK) {
437         LOGI("[SingleVerSerializeManager] DeSerializeData object failed.");
438         return errCode;
439     }
440     packet->SetDeletedWaterMark(deletedWatermark);
441     packet->SetQueryId(queryId);
442     if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
443         packet->SetQuery(querySyncObj);
444     }
445     return E_OK;
446 }
447 
DataPacketCompressDataDeSerialization(Parcel & parcel,DataRequestPacket * packet)448 int SingleVerSerializeManager::DataPacketCompressDataDeSerialization(Parcel &parcel, DataRequestPacket *packet)
449 {
450     std::vector<SendDataItem> originalData;
451     int errCode = GenericSingleVerKvEntry::DeSerializeCompressedDatas(originalData, parcel);
452     if (errCode != E_OK) {
453         LOGE("[SingleVerSerializeManager] DeSerializeComptressData failed, errCode=%d", errCode);
454         return errCode;
455     }
456     packet->SetData(originalData);
457     return E_OK;
458 }
459 
DataPacketSyncerPartDeSerialization(Parcel & parcel,DataRequestPacket * packet,uint32_t packLen,uint32_t length,uint32_t version)460 int SingleVerSerializeManager::DataPacketSyncerPartDeSerialization(Parcel &parcel, DataRequestPacket *packet,
461     uint32_t packLen, uint32_t length, uint32_t version)
462 {
463     WaterMark waterMark;
464     WaterMark localWaterMark;
465     WaterMark peerWaterMark;
466     int32_t sendCode;
467     int32_t mode;
468     uint32_t sessionId;
469     uint32_t flag = 0;
470     std::vector<uint64_t> reserved;
471 
472     packLen += parcel.ReadUInt64(waterMark);
473     packLen += parcel.ReadUInt64(localWaterMark);
474     packLen += parcel.ReadUInt64(peerWaterMark);
475     packLen += parcel.ReadInt(sendCode);
476     packLen += parcel.ReadInt(mode);
477     packLen += parcel.ReadUInt32(sessionId);
478     packLen += parcel.ReadVector<uint64_t>(reserved);
479     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
480         packLen += parcel.ReadUInt32(flag);
481         packet->SetFlag(flag);
482     }
483     packLen = Parcel::GetEightByteAlign(packLen);
484     if (parcel.IsError()) {
485         LOGE("[DataSync][DataPacketDeSerialization] deserialize failed! input len=%" PRIu32 ",packLen=%" PRIu32,
486             length, packLen);
487         return -E_LENGTH_ERROR;
488     }
489     parcel.EightByteAlign();
490     packet->SetEndWaterMark(waterMark);
491     packet->SetLocalWaterMark(localWaterMark);
492     packet->SetPeerWaterMark(peerWaterMark);
493     packet->SetSendCode(sendCode);
494     packet->SetMode(mode);
495     packet->SetSessionId(sessionId);
496     packet->SetReserved(reserved);
497     return E_OK;
498 }
499 
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)500 int SingleVerSerializeManager::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
501 {
502     DataAckPacket packet;
503     Parcel parcel(const_cast<uint8_t *>(buffer), length);
504     uint32_t version;
505 
506     parcel.ReadUInt32(version);
507     if (parcel.IsError()) {
508         return -E_INVALID_ARGS;
509     }
510     if (version > SOFTWARE_VERSION_CURRENT) {
511         packet.SetVersion(version);
512         packet.SetRecvCode(-E_VERSION_NOT_SUPPORT);
513         return inMsg->SetCopiedObject<>(packet);
514     }
515     packet.SetVersion(version);
516     // now V1 compatible for softWareVersion :{101, 102}
517     int errCode = AckPacketSyncerPartDeSerializationV1(parcel, packet);
518     if (errCode != E_OK) {
519         return errCode;
520     }
521 
522     return inMsg->SetCopiedObject<>(packet);
523 }
524 
AckPacketSyncerPartDeSerializationV1(Parcel & parcel,DataAckPacket & packet)525 int SingleVerSerializeManager::AckPacketSyncerPartDeSerializationV1(Parcel &parcel, DataAckPacket &packet)
526 {
527     WaterMark mark;
528     int32_t errCode;
529     std::vector<uint64_t> reserved;
530 
531     parcel.ReadUInt64(mark);
532     parcel.ReadInt(errCode);
533     parcel.ReadVector<uint64_t>(reserved);
534     if (parcel.IsError()) {
535         LOGE("[AckPacketSyncerPartDeSerializationV1] DeSerialization failed");
536         return -E_INVALID_ARGS;
537     }
538     packet.SetData(mark);
539     packet.SetRecvCode(errCode);
540     packet.SetReserved(reserved);
541     return E_OK;
542 }
543 
ControlPacketCalculateLen(const Message * inMsg,uint32_t & len)544 int SingleVerSerializeManager::ControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
545 {
546     auto packet = inMsg->GetObject<ControlRequestPacket>();
547     if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
548         LOGE("[ControlPacketSerialization] invalid control cmd");
549         return -E_INVALID_ARGS;
550     }
551     if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
552         return SingleVerSerializeManager::SubscribeCalculateLen(inMsg, len);
553     }
554     return E_OK;
555 }
556 
ControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)557 int SingleVerSerializeManager::ControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
558 {
559     auto packet = inMsg->GetObject<ControlRequestPacket>();
560     if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
561         LOGE("[ControlPacketSerialization] invalid control cmd");
562         return -E_INVALID_ARGS;
563     }
564     if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
565         return SingleVerSerializeManager::SubscribeSerialization(buffer, length, inMsg);
566     }
567     return E_OK;
568 }
569 
ControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)570 int SingleVerSerializeManager::ControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
571 {
572     Parcel parcel(const_cast<uint8_t *>(buffer), length);
573     ControlRequestPacket packet;
574     int errCode = ControlRequestDeSerialization(parcel, packet);
575     if (errCode != E_OK) {
576         return errCode;
577     }
578     if (packet.GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet.GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
579         errCode = SubscribeDeSerialization(parcel, inMsg, packet);
580     }
581     return errCode;
582 }
583 
AckControlPacketCalculateLen(const Message * inMsg,uint32_t & len)584 int SingleVerSerializeManager::AckControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
585 {
586     auto packet = inMsg->GetObject<ControlAckPacket>();
587     if (packet == nullptr) {
588         LOGE("[AckControlPacketCalculateLen] invalid control cmd");
589         return -E_INVALID_ARGS;
590     }
591     len = packet->CalculateLen();
592     return E_OK;
593 }
594 
AckControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)595 int SingleVerSerializeManager::AckControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
596 {
597     auto packet = inMsg->GetObject<ControlAckPacket>();
598     if (packet == nullptr) {
599         return -E_INVALID_ARGS;
600     }
601     Parcel parcel(buffer, length);
602     parcel.WriteUInt32(packet->GetVersion());
603     parcel.WriteInt(packet->GetRecvCode());
604     parcel.WriteUInt32(packet->GetcontrolCmdType());
605     parcel.WriteUInt32(packet->GetFlag());
606     if (parcel.IsError()) {
607         LOGE("[AckControlPacketSerialization] Serialization failed");
608         return -E_INVALID_ARGS;
609     }
610     parcel.EightByteAlign();
611     return E_OK;
612 }
613 
AckControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)614 int SingleVerSerializeManager::AckControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
615 {
616     auto packet = new (std::nothrow) ControlAckPacket();
617     if (packet == nullptr) {
618         return -E_OUT_OF_MEMORY;
619     }
620     Parcel parcel(const_cast<uint8_t *>(buffer), length);
621     int32_t recvCode = 0;
622     uint32_t version = 0;
623     uint32_t controlCmdType = 0;
624     uint32_t flag = 0;
625     parcel.ReadUInt32(version);
626     parcel.ReadInt(recvCode);
627     parcel.ReadUInt32(controlCmdType);
628     parcel.ReadUInt32(flag);
629     int errCode;
630     if (parcel.IsError()) {
631         LOGE("[AckControlPacketDeSerialization] DeSerialization failed");
632         errCode = -E_INVALID_ARGS;
633         goto ERROR;
634     }
635     packet->SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), flag);
636     errCode = inMsg->SetExternalObject<>(packet);
637     if (errCode != E_OK) {
638         goto ERROR;
639     }
640     return errCode;
641 ERROR:
642     delete packet;
643     packet = nullptr;
644     return errCode;
645 }
646 
ControlRequestSerialization(Parcel & parcel,const Message * inMsg)647 int SingleVerSerializeManager::ControlRequestSerialization(Parcel &parcel, const Message *inMsg)
648 {
649     auto packet = inMsg->GetObject<ControlRequestPacket>();
650     if (packet == nullptr) {
651         return -E_INVALID_ARGS;
652     }
653     parcel.WriteUInt32(packet->GetVersion());
654     parcel.WriteInt(packet->GetSendCode());
655     parcel.WriteUInt32(packet->GetcontrolCmdType());
656     parcel.WriteUInt32(packet->GetFlag());
657     if (parcel.IsError()) {
658         LOGE("[ControlRequestSerialization] Serialization failed");
659         return -E_INVALID_ARGS;
660     }
661     parcel.EightByteAlign();
662     return E_OK;
663 }
664 
ControlRequestDeSerialization(Parcel & parcel,ControlRequestPacket & packet)665 int SingleVerSerializeManager::ControlRequestDeSerialization(Parcel &parcel, ControlRequestPacket &packet)
666 {
667     uint32_t version = 0;
668     int32_t sendCode = 0;
669     uint32_t controlCmdType = 0;
670     uint32_t flag = 0;
671     parcel.ReadUInt32(version);
672     if (version > SOFTWARE_VERSION_CURRENT) {
673         return -E_VERSION_NOT_SUPPORT;
674     }
675     parcel.ReadInt(sendCode);
676     parcel.ReadUInt32(controlCmdType);
677     parcel.ReadUInt32(flag);
678     if (parcel.IsError()) {
679         LOGE("[ControlRequestDeSerialization] deserialize failed!");
680         return -E_LENGTH_ERROR;
681     }
682     packet.SetPacketHead(sendCode, version, static_cast<int32_t>(controlCmdType), flag);
683     return E_OK;
684 }
685 
SubscribeCalculateLen(const Message * inMsg,uint32_t & len)686 int SingleVerSerializeManager::SubscribeCalculateLen(const Message *inMsg, uint32_t &len)
687 {
688     auto packet = inMsg->GetObject<SubscribeRequest>();
689     if (packet == nullptr) {
690         return -E_INVALID_ARGS;
691     }
692     len = packet->CalculateLen();
693     return E_OK;
694 }
695 
SubscribeSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)696 int SingleVerSerializeManager::SubscribeSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
697 {
698     auto packet = inMsg->GetObject<SubscribeRequest>();
699     if (packet == nullptr) {
700         return -E_INVALID_ARGS;
701     }
702     Parcel parcel(buffer, length);
703     int errCode = ControlRequestSerialization(parcel, inMsg);
704     if (errCode != E_OK) {
705         LOGE("[SubscribeSerialization] ControlRequestPacket Serialization failed, errCode=%d", errCode);
706         return errCode;
707     }
708     QuerySyncObject queryObj = packet->GetQuery();
709     errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
710     if (errCode != E_OK) {
711         LOGE("[SubscribeSerialization] query object Serialization failed, errCode=%d", errCode);
712         return errCode;
713     }
714     return E_OK;
715 }
716 
SubscribeDeSerialization(Parcel & parcel,Message * inMsg,ControlRequestPacket & controlPacket)717 int SingleVerSerializeManager::SubscribeDeSerialization(Parcel &parcel, Message *inMsg,
718     ControlRequestPacket &controlPacket)
719 {
720     auto packet = new (std::nothrow) SubscribeRequest();
721     if (packet == nullptr) {
722         return -E_OUT_OF_MEMORY;
723     }
724     QuerySyncObject querySyncObj;
725     int errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
726     if (errCode != E_OK) {
727         goto ERROR;
728     }
729     packet->SetPacketHead(controlPacket.GetSendCode(), controlPacket.GetVersion(),
730         static_cast<int32_t>(controlPacket.GetcontrolCmdType()), controlPacket.GetFlag());
731     packet->SetQuery(querySyncObj);
732     errCode = inMsg->SetExternalObject<>(packet);
733     if (errCode != E_OK) {
734         goto ERROR;
735     }
736     return errCode;
737 ERROR:
738     delete packet;
739     packet = nullptr;
740     return errCode;
741 }
742 
RegisterCommunicatorTransformFunc()743 int SingleVerSerializeManager::RegisterCommunicatorTransformFunc()
744 {
745     TransformFunc func;
746     func.computeFunc = std::bind(&SingleVerSerializeManager::CalculateLen, std::placeholders::_1);
747     func.serializeFunc = std::bind(&SingleVerSerializeManager::Serialization, std::placeholders::_1,
748         std::placeholders::_2, std::placeholders::_3);
749     func.deserializeFunc = std::bind(&SingleVerSerializeManager::DeSerialization, std::placeholders::_1,
750         std::placeholders::_2, std::placeholders::_3);
751 
752     static std::vector<MessageId> messageIds = {
753         QUERY_SYNC_MESSAGE, DATA_SYNC_MESSAGE, CONTROL_SYNC_MESSAGE, REMOTE_EXECUTE_MESSAGE
754     };
755     int errCode = E_OK;
756     for (auto &id : messageIds) {
757         int retCode = MessageTransform::RegTransformFunction(static_cast<uint32_t>(id), func);
758         if (retCode != E_OK) {
759             LOGE("[SingleVerSerializeManager][RegisterTransformFunc] regist messageId %u failed %d",
760                 static_cast<uint32_t>(id), retCode);
761             errCode = retCode;
762         }
763     }
764     return errCode;
765 }
766 
RegisterInnerTransformFunc()767 void SingleVerSerializeManager::RegisterInnerTransformFunc()
768 {
769     TransformFunc func;
770     func.computeFunc = std::bind(&SingleVerSerializeManager::ISyncPacketCalculateLen, std::placeholders::_1);
771     func.serializeFunc = std::bind(&SingleVerSerializeManager::ISyncPacketSerialization,
772         std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
773     func.deserializeFunc = std::bind(&SingleVerSerializeManager::ISyncPacketDeSerialization,
774         std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
775     std::lock_guard<std::mutex> autoLock(handlesLock_);
776     messageHandles_.emplace(static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE), func);
777 }
778 
ISyncPacketCalculateLen(const Message * inMsg)779 uint32_t SingleVerSerializeManager::ISyncPacketCalculateLen(const Message *inMsg)
780 {
781     if (inMsg == nullptr) {
782         return 0u;
783     }
784     uint32_t len = 0u;
785     const auto packet = inMsg->GetObject<ISyncPacket>();
786     if (packet != nullptr) {
787         len = packet->CalculateLen();
788     }
789     return len;
790 }
791 
ISyncPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)792 int SingleVerSerializeManager::ISyncPacketSerialization(uint8_t *buffer, uint32_t length,
793     const Message *inMsg)
794 {
795     if (inMsg == nullptr) {
796         return -E_INVALID_ARGS;
797     }
798     int errCode = E_OK;
799     Parcel parcel(buffer, length);
800     auto packet = inMsg->GetObject<ISyncPacket>();
801     if (packet != nullptr) {
802         errCode = packet->Serialization(parcel);
803     }
804     return errCode;
805 }
806 
ISyncPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)807 int SingleVerSerializeManager::ISyncPacketDeSerialization(const uint8_t *buffer, uint32_t length,
808     Message *inMsg)
809 {
810     if (inMsg == nullptr) {
811         return -E_INVALID_ARGS;
812     }
813     ISyncPacket *packet = nullptr;
814     int errCode = BuildISyncPacket(inMsg, packet);
815     if (errCode != E_OK) {
816         return errCode;
817     }
818     Parcel parcel(const_cast<uint8_t *>(buffer), length);
819     do {
820         errCode = packet->DeSerialization(parcel);
821         if (errCode != E_OK) {
822             break;
823         }
824         errCode = inMsg->SetExternalObject(packet);
825     } while (false);
826     if (errCode != E_OK) {
827         delete packet;
828         packet = nullptr;
829     }
830     return E_OK;
831 }
832 
BuildISyncPacket(Message * inMsg,ISyncPacket * & packet)833 int SingleVerSerializeManager::BuildISyncPacket(Message *inMsg, ISyncPacket *&packet)
834 {
835     uint32_t messageId = inMsg->GetMessageId();
836     if (messageId != static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE)) {
837         return -E_INVALID_ARGS;
838     }
839     switch (inMsg->GetMessageType()) {
840         case TYPE_REQUEST:
841             packet = new(std::nothrow) RemoteExecutorRequestPacket();
842             break;
843         case TYPE_RESPONSE:
844             packet = new(std::nothrow) RemoteExecutorAckPacket();
845             break;
846         default:
847             packet = nullptr;
848             break;
849     }
850     if (packet == nullptr) {
851         return -E_OUT_OF_MEMORY;
852     }
853     return E_OK;
854 }
855 
DataPacketExtraConditionsSerialization(Parcel & parcel,const DataRequestPacket * packet)856 int SingleVerSerializeManager::DataPacketExtraConditionsSerialization(Parcel &parcel, const DataRequestPacket *packet)
857 {
858     std::map<std::string, std::string> extraConditions = packet->GetExtraConditions();
859     if (extraConditions.size() > DBConstant::MAX_CONDITION_COUNT) {
860         return -E_INVALID_ARGS;
861     }
862     parcel.WriteUInt32(static_cast<uint32_t>(extraConditions.size()));
863     for (const auto &entry : extraConditions) {
864         if (entry.first.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
865             entry.second.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
866             return -E_INVALID_ARGS;
867         }
868         parcel.WriteString(entry.first);
869         parcel.WriteString(entry.second);
870     }
871     parcel.EightByteAlign();
872     if (parcel.IsError()) {
873         return -E_PARSE_FAIL;
874     }
875     return E_OK;
876 }
877 
DataPacketExtraConditionsDeserialization(Parcel & parcel,DataRequestPacket * packet)878 int SingleVerSerializeManager::DataPacketExtraConditionsDeserialization(Parcel &parcel, DataRequestPacket *packet)
879 {
880     if (!packet->IsExtraConditionData()) {
881         return E_OK;
882     }
883     uint32_t conditionSize = 0u;
884     (void) parcel.ReadUInt32(conditionSize);
885     if (conditionSize > DBConstant::MAX_CONDITION_COUNT) {
886         return -E_INVALID_ARGS;
887     }
888     std::map<std::string, std::string> extraConditions;
889     for (uint32_t i = 0; i < conditionSize; i++) {
890         std::string conditionKey;
891         std::string conditionVal;
892         (void) parcel.ReadString(conditionKey);
893         (void) parcel.ReadString(conditionVal);
894         if (conditionKey.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
895             conditionVal.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
896             return -E_INVALID_ARGS;
897         }
898         extraConditions[conditionKey] = conditionVal;
899     }
900     parcel.EightByteAlign();
901     if (parcel.IsError()) {
902         return -E_PARSE_FAIL;
903     }
904     packet->SetExtraConditions(extraConditions);
905     return E_OK;
906 }
907 }  // namespace DistributedDB