• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_serialize_manager.h"
17 
18 #include "db_common.h"
19 #include "generic_single_ver_kv_entry.h"
20 #include "icommunicator.h"
21 #include "log_print.h"
22 #include "message_transform.h"
23 #include "parcel.h"
24 #include "remote_executor_packet.h"
25 #include "sync_types.h"
26 #include "version.h"
27 
28 namespace DistributedDB {
29 std::mutex SingleVerSerializeManager::handlesLock_;
30 std::map<uint32_t, TransformFunc> SingleVerSerializeManager::messageHandles_;
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)31 int SingleVerSerializeManager::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
32 {
33     if ((buffer == nullptr) || !(IsPacketValid(inMsg))) {
34         return -E_MESSAGE_ID_ERROR;
35     }
36     SerializeFunc serializeFunc = nullptr;
37     {
38         std::lock_guard<std::mutex> autoLock(handlesLock_);
39         if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
40             serializeFunc = messageHandles_.at(inMsg->GetMessageId()).serializeFunc;
41         }
42     }
43     if (serializeFunc != nullptr) {
44         return serializeFunc(buffer, length, inMsg);
45     }
46 
47     if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
48         return ControlSerialization(buffer, length, inMsg);
49     }
50     return DataSerialization(buffer, length, inMsg);
51 }
52 
DataSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)53 int SingleVerSerializeManager::DataSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
54 {
55     switch (inMsg->GetMessageType()) {
56         case TYPE_REQUEST:
57             return DataPacketSerialization(buffer, length, inMsg);
58         case TYPE_RESPONSE:
59         case TYPE_NOTIFY:
60             return AckPacketSerialization(buffer, length, inMsg);
61         default:
62             return -E_MESSAGE_TYPE_ERROR;
63     }
64 }
65 
ControlSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)66 int SingleVerSerializeManager::ControlSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
67 {
68     switch (inMsg->GetMessageType()) {
69         case TYPE_REQUEST:
70             return ControlPacketSerialization(buffer, length, inMsg);
71         case TYPE_RESPONSE:
72             return AckControlPacketSerialization(buffer, length, inMsg);
73         default:
74             return -E_MESSAGE_TYPE_ERROR;
75     }
76 }
77 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)78 int SingleVerSerializeManager::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
79 {
80     if ((buffer == nullptr) || !(IsPacketValid(inMsg))) {
81         return -E_MESSAGE_ID_ERROR;
82     }
83     DeserializeFunc deserializeFunc = nullptr;
84     {
85         std::lock_guard<std::mutex> autoLock(handlesLock_);
86         if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
87             deserializeFunc = messageHandles_.at(inMsg->GetMessageId()).deserializeFunc;
88         }
89     }
90     if (deserializeFunc != nullptr) {
91         return deserializeFunc(buffer, length, inMsg);
92     }
93     if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
94         return ControlDeSerialization(buffer, length, inMsg);
95     }
96     return DataDeSerialization(buffer, length, inMsg);
97 }
98 
DataDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)99 int SingleVerSerializeManager::DataDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
100 {
101     switch (inMsg->GetMessageType()) {
102         case TYPE_REQUEST:
103             return DataPacketDeSerialization(buffer, length, inMsg);
104         case TYPE_RESPONSE:
105         case TYPE_NOTIFY:
106             return AckPacketDeSerialization(buffer, length, inMsg);
107         default:
108             return -E_MESSAGE_TYPE_ERROR;
109     }
110 }
111 
ControlDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)112 int SingleVerSerializeManager::ControlDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
113 {
114     switch (inMsg->GetMessageType()) {
115         case TYPE_REQUEST:
116             return ControlPacketDeSerialization(buffer, length, inMsg);
117         case TYPE_RESPONSE:
118             return AckControlPacketDeSerialization(buffer, length, inMsg);
119         default:
120             return -E_MESSAGE_TYPE_ERROR;
121     }
122 }
123 
CalculateLen(const Message * inMsg)124 uint32_t SingleVerSerializeManager::CalculateLen(const Message *inMsg)
125 {
126     if (!(IsPacketValid(inMsg))) {
127         return 0;
128     }
129     ComputeLengthFunc computeFunc = nullptr;
130     {
131         std::lock_guard<std::mutex> autoLock(handlesLock_);
132         if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
133             computeFunc = messageHandles_.at(inMsg->GetMessageId()).computeFunc;
134         }
135     }
136     if (computeFunc != nullptr) {
137         return computeFunc(inMsg);
138     }
139     if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
140         return CalculateControlLen(inMsg);
141     }
142     return CalculateDataLen(inMsg);
143 }
144 
CalculateDataLen(const Message * inMsg)145 uint32_t SingleVerSerializeManager::CalculateDataLen(const Message *inMsg)
146 {
147     uint32_t len = 0;
148     int errCode;
149     switch (inMsg->GetMessageType()) {
150         case TYPE_REQUEST:
151             errCode = DataPacketCalculateLen(inMsg, len);
152             if (errCode != E_OK) {
153                 LOGE("[CalculateDataLen] calculate data request packet len failed, errCode=%d", errCode);
154                 return 0;
155             }
156             return len;
157         case TYPE_RESPONSE:
158         case TYPE_NOTIFY:
159             errCode = AckPacketCalculateLen(inMsg, len);
160             if (errCode != E_OK) {
161                 LOGE("[CalculateDataLen] calculate data notify packet len failed errCode=%d", errCode);
162                 return 0;
163             }
164             return len;
165         default:
166             return 0;
167     }
168 }
169 
CalculateControlLen(const Message * inMsg)170 uint32_t SingleVerSerializeManager::CalculateControlLen(const Message *inMsg)
171 {
172     uint32_t len = 0;
173     int errCode;
174     switch (inMsg->GetMessageType()) {
175         case TYPE_REQUEST:
176             errCode = ControlPacketCalculateLen(inMsg, len);
177             if (errCode != E_OK) {
178                 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
179                 return 0;
180             }
181             return len;
182         case TYPE_RESPONSE:
183         case TYPE_NOTIFY:
184             errCode = AckControlPacketCalculateLen(inMsg, len);
185             if (errCode != E_OK) {
186                 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
187                 return 0;
188             }
189             return len;
190         default:
191             return 0;
192     }
193 }
194 
RegisterTransformFunc()195 int SingleVerSerializeManager::RegisterTransformFunc()
196 {
197     int errCode = RegisterCommunicatorTransformFunc();
198     RegisterInnerTransformFunc();
199     return errCode;
200 }
201 
DataPacketSyncerPartSerialization(Parcel & parcel,const DataRequestPacket * packet)202 int SingleVerSerializeManager::DataPacketSyncerPartSerialization(Parcel &parcel, const DataRequestPacket *packet)
203 {
204     parcel.WriteUInt64(packet->GetEndWaterMark());
205     parcel.WriteUInt64(packet->GetLocalWaterMark());
206     parcel.WriteUInt64(packet->GetPeerWaterMark());
207     parcel.WriteInt(packet->GetSendCode());
208     parcel.WriteInt(packet->GetMode());
209     parcel.WriteUInt32(packet->GetSessionId());
210     parcel.WriteVector<uint64_t>(packet->GetReserved());
211     if (parcel.IsError()) {
212         return -E_PARSE_FAIL;
213     }
214     if (packet->GetVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
215         parcel.WriteUInt32(packet->GetFlag());
216     }
217     parcel.EightByteAlign();
218     if (parcel.IsError()) {
219         return -E_PARSE_FAIL;
220     }
221     return E_OK;
222 }
223 
DataPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)224 int SingleVerSerializeManager::DataPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
225 {
226     auto packet = inMsg->GetObject<DataRequestPacket>();
227     if (packet == nullptr) {
228         return -E_INVALID_ARGS;
229     }
230     Parcel parcel(buffer, length);
231 
232     // version
233     int errCode = parcel.WriteUInt32(packet->GetVersion());
234     if (errCode != E_OK) {
235         LOGE("[DataPacketSerialization] Serialize version failed");
236         return errCode;
237     }
238     // sendDataItems
239     errCode = GenericSingleVerKvEntry::SerializeDatas(
240         (packet->IsCompressData() ? std::vector<SendDataItem> {} : packet->GetData()), parcel, packet->GetVersion());
241     if (errCode != E_OK) {
242         LOGE("[DataPacketSerialization] Serialize Data failed");
243         return errCode;
244     }
245 
246     // data sync
247     errCode = DataPacketSyncerPartSerialization(parcel, packet);
248     if (errCode != E_OK) {
249         LOGE("[DataPacketSerialization] Serialize Data failed");
250         return errCode;
251     }
252     if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
253         errCode = DataPacketQuerySyncSerialization(parcel, packet); // for query sync
254         if (errCode != E_OK) {
255             return errCode;
256         }
257     }
258     if (packet->IsCompressData()) {
259         // serialize compress data
260         errCode = GenericSingleVerKvEntry::SerializeCompressedDatas(packet->GetData(), packet->GetCompressData(),
261             parcel, packet->GetVersion(), packet->GetCompressAlgo());
262         if (errCode != E_OK) {
263             LOGE("[DataPacketSerialization] Serialize compress Data failed");
264             return errCode;
265         }
266     }
267     // flag mask add in 103
268     if (packet->GetVersion() < SOFTWARE_VERSION_RELEASE_3_0 || !packet->IsExtraConditionData()) {
269         return E_OK;
270     }
271     return DataPacketExtraConditionsSerialization(parcel, packet);
272 }
273 
DataPacketQuerySyncSerialization(Parcel & parcel,const DataRequestPacket * packet)274 int SingleVerSerializeManager::DataPacketQuerySyncSerialization(Parcel &parcel, const DataRequestPacket *packet)
275 {
276     // deleted record send watermark
277     int errCode = parcel.WriteUInt64(packet->GetDeletedWaterMark());
278     if (errCode != E_OK) {
279         LOGE("[QuerySerialization] Serialize deleted record send watermark failed!");
280         return errCode;
281     }
282 
283     // query identify
284     QuerySyncObject queryObj = packet->GetQuery();
285     errCode = parcel.WriteString(packet->GetQueryId());
286     if (errCode != E_OK) {
287         LOGE("[QuerySerialization] Serialize query id failed!");
288         return errCode;
289     }
290     if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
291         // need to check.
292         errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
293     }
294     return errCode;
295 }
296 
DataPacketCalculateLen(const Message * inMsg,uint32_t & len)297 int SingleVerSerializeManager::DataPacketCalculateLen(const Message *inMsg, uint32_t &len)
298 {
299     const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
300     if (packet == nullptr) {
301         return -E_INVALID_ARGS;
302     }
303 
304     len = packet->CalculateLen(inMsg->GetMessageId());
305     return E_OK;
306 }
307 
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)308 int SingleVerSerializeManager::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
309 {
310     const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
311     if (packet == nullptr) {
312         return -E_INVALID_ARGS;
313     }
314 
315     len = packet->CalculateLen();
316     return E_OK;
317 }
318 
IsPacketValid(const Message * inMsg)319 bool SingleVerSerializeManager::IsPacketValid(const Message *inMsg)
320 {
321     if (inMsg == nullptr) {
322         return false;
323     }
324 
325     int msgType = inMsg->GetMessageType();
326     if (msgType != TYPE_REQUEST && msgType != TYPE_RESPONSE && msgType != TYPE_NOTIFY) {
327         LOGE("[DataSync][IsPacketValid] Message type ERROR! message type=%d", msgType);
328         return false;
329     }
330     return true;
331 }
332 
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)333 int SingleVerSerializeManager::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
334 {
335     const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
336     if (packet == nullptr) {
337         return -E_INVALID_ARGS;
338     }
339 
340     Parcel parcel(buffer, length);
341     parcel.WriteUInt32(packet->GetVersion());
342     if (parcel.IsError()) {
343         return -E_PARSE_FAIL;
344     }
345     // now V1 compatible for softWareVersion :{101, 102}
346     return AckPacketSyncerPartSerializationV1(parcel, packet);
347 }
348 
AckPacketSyncerPartSerializationV1(Parcel & parcel,const DataAckPacket * packet)349 int SingleVerSerializeManager::AckPacketSyncerPartSerializationV1(Parcel &parcel, const DataAckPacket *packet)
350 {
351     parcel.WriteUInt64(packet->GetData());
352     parcel.WriteInt(packet->GetRecvCode());
353     parcel.WriteVector<uint64_t>(packet->GetReserved());
354     parcel.EightByteAlign();
355     if (parcel.IsError()) {
356         return -E_PARSE_FAIL;
357     }
358     return E_OK;
359 }
360 
DataPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)361 int SingleVerSerializeManager::DataPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
362 {
363     std::vector<SendDataItem> dataItems;
364     uint32_t version;
365     Parcel parcel(const_cast<uint8_t *>(buffer), length);
366     uint32_t packLen = parcel.ReadUInt32(version);
367     if (parcel.IsError()) {
368         return -E_PARSE_FAIL;
369     }
370 
371     if (version > SOFTWARE_VERSION_CURRENT) {
372         return -E_VERSION_NOT_SUPPORT;
373     }
374 
375     packLen += static_cast<uint32_t>(GenericSingleVerKvEntry::DeSerializeDatas(dataItems, parcel));
376     if (parcel.IsError()) {
377         return -E_PARSE_FAIL;
378     }
379 
380     auto packet = new (std::nothrow) DataRequestPacket();
381     if (packet == nullptr) {
382         return -E_OUT_OF_MEMORY;
383     }
384 
385     packet->SetVersion(version);
386     packet->SetData(dataItems);
387     int errCode = DataPacketSyncerPartDeSerialization(parcel, packet, packLen, length, version);
388     if (errCode != E_OK) {
389         goto ERROR;
390     }
391     if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
392         errCode = DataPacketQuerySyncDeSerialization(parcel, packet);
393         if (errCode != E_OK) {
394             goto ERROR;
395         }
396     }
397     if (packet->IsCompressData()) {
398         errCode = DataPacketCompressDataDeSerialization(parcel, packet);
399         if (errCode != E_OK) {
400             goto ERROR;
401         }
402     }
403     errCode = DataPacketExtraConditionsDeserialization(parcel, packet);
404     if (errCode != E_OK) {
405         goto ERROR;
406     }
407     errCode = inMsg->SetExternalObject<>(packet);
408     if (errCode == E_OK) {
409         return errCode;
410     }
411 
412 ERROR:
413     delete packet;
414     packet = nullptr;
415     return errCode;
416 }
417 
DataPacketQuerySyncDeSerialization(Parcel & parcel,DataRequestPacket * packet)418 int SingleVerSerializeManager::DataPacketQuerySyncDeSerialization(Parcel &parcel, DataRequestPacket *packet)
419 {
420     WaterMark deletedWatermark = 0;
421     parcel.ReadUInt64(deletedWatermark);
422     std::string queryId;
423     parcel.ReadString(queryId);
424     if (parcel.IsError()) {
425         return -E_PARSE_FAIL;
426     }
427     // query identify
428     QuerySyncObject querySyncObj;
429     int errCode = E_OK;
430     // for version 105, query is always sent.
431     if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
432         // need to check.
433         errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
434     }
435     if (errCode != E_OK) {
436         LOGI("[SingleVerSerializeManager] DeSerializeData object failed.");
437         return errCode;
438     }
439     packet->SetDeletedWaterMark(deletedWatermark);
440     packet->SetQueryId(queryId);
441     if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
442         packet->SetQuery(querySyncObj);
443     }
444     return E_OK;
445 }
446 
DataPacketCompressDataDeSerialization(Parcel & parcel,DataRequestPacket * packet)447 int SingleVerSerializeManager::DataPacketCompressDataDeSerialization(Parcel &parcel, DataRequestPacket *packet)
448 {
449     std::vector<SendDataItem> originalData;
450     int errCode = GenericSingleVerKvEntry::DeSerializeCompressedDatas(originalData, parcel);
451     if (errCode != E_OK) {
452         LOGE("[SingleVerSerializeManager] DeSerializeComptressData failed, errCode=%d", errCode);
453         return errCode;
454     }
455     packet->SetData(originalData);
456     return E_OK;
457 }
458 
DataPacketSyncerPartDeSerialization(Parcel & parcel,DataRequestPacket * packet,uint32_t packLen,uint32_t length,uint32_t version)459 int SingleVerSerializeManager::DataPacketSyncerPartDeSerialization(Parcel &parcel, DataRequestPacket *packet,
460     uint32_t packLen, uint32_t length, uint32_t version)
461 {
462     WaterMark waterMark;
463     WaterMark localWaterMark;
464     WaterMark peerWaterMark;
465     int32_t sendCode;
466     int32_t mode;
467     uint32_t sessionId;
468     uint32_t flag = 0;
469     std::vector<uint64_t> reserved;
470 
471     uint64_t totPacketLen = packLen;
472     totPacketLen += parcel.ReadUInt64(waterMark);
473     totPacketLen += parcel.ReadUInt64(localWaterMark);
474     totPacketLen += parcel.ReadUInt64(peerWaterMark);
475     totPacketLen += parcel.ReadInt(sendCode);
476     totPacketLen += parcel.ReadInt(mode);
477     totPacketLen += parcel.ReadUInt32(sessionId);
478     totPacketLen += parcel.ReadVector<uint64_t>(reserved);
479     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
480         totPacketLen += parcel.ReadUInt32(flag);
481         packet->SetFlag(flag);
482     }
483     if (totPacketLen > INT32_MAX) {
484         LOGE("[DataSync][DataPacketDeSerialization] deserialize failed! input totPackLen=%" PRIu64 " is over limit.",
485              totPacketLen);
486         return -E_LENGTH_ERROR;
487     }
488     parcel.EightByteAlign();
489     totPacketLen = Parcel::GetEightByteAlign(totPacketLen);
490     if (parcel.IsError()) {
491         LOGE("[DataSync][DataPacketDeSerialization] deserialize failed! input len=%" PRIu32 ", totPackLen=%" PRIu64,
492             length, totPacketLen);
493         return -E_LENGTH_ERROR;
494     }
495     packet->SetEndWaterMark(waterMark);
496     packet->SetLocalWaterMark(localWaterMark);
497     packet->SetPeerWaterMark(peerWaterMark);
498     packet->SetSendCode(sendCode);
499     packet->SetMode(mode);
500     packet->SetSessionId(sessionId);
501     packet->SetReserved(reserved);
502     return E_OK;
503 }
504 
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)505 int SingleVerSerializeManager::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
506 {
507     DataAckPacket packet;
508     Parcel parcel(const_cast<uint8_t *>(buffer), length);
509     uint32_t version;
510 
511     parcel.ReadUInt32(version);
512     if (parcel.IsError()) {
513         return -E_INVALID_ARGS;
514     }
515     if (version > SOFTWARE_VERSION_CURRENT) {
516         packet.SetVersion(version);
517         packet.SetRecvCode(-E_VERSION_NOT_SUPPORT);
518         return inMsg->SetCopiedObject<>(packet);
519     }
520     packet.SetVersion(version);
521     // now V1 compatible for softWareVersion :{101, 102}
522     int errCode = AckPacketSyncerPartDeSerializationV1(parcel, packet);
523     if (errCode != E_OK) {
524         return errCode;
525     }
526 
527     return inMsg->SetCopiedObject<>(packet);
528 }
529 
AckPacketSyncerPartDeSerializationV1(Parcel & parcel,DataAckPacket & packet)530 int SingleVerSerializeManager::AckPacketSyncerPartDeSerializationV1(Parcel &parcel, DataAckPacket &packet)
531 {
532     WaterMark mark;
533     int32_t errCode;
534     std::vector<uint64_t> reserved;
535 
536     parcel.ReadUInt64(mark);
537     parcel.ReadInt(errCode);
538     parcel.ReadVector<uint64_t>(reserved);
539     if (parcel.IsError()) {
540         LOGE("[AckPacketSyncerPartDeSerializationV1] DeSerialization failed");
541         return -E_INVALID_ARGS;
542     }
543     packet.SetData(mark);
544     packet.SetRecvCode(errCode);
545     packet.SetReserved(reserved);
546     return E_OK;
547 }
548 
ControlPacketCalculateLen(const Message * inMsg,uint32_t & len)549 int SingleVerSerializeManager::ControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
550 {
551     auto packet = inMsg->GetObject<ControlRequestPacket>();
552     if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
553         LOGE("[ControlPacketSerialization] invalid control cmd");
554         return -E_INVALID_ARGS;
555     }
556     if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
557         return SingleVerSerializeManager::SubscribeCalculateLen(inMsg, len);
558     }
559     return E_OK;
560 }
561 
ControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)562 int SingleVerSerializeManager::ControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
563 {
564     auto packet = inMsg->GetObject<ControlRequestPacket>();
565     if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
566         LOGE("[ControlPacketSerialization] invalid control cmd");
567         return -E_INVALID_ARGS;
568     }
569     if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
570         return SingleVerSerializeManager::SubscribeSerialization(buffer, length, inMsg);
571     }
572     return E_OK;
573 }
574 
ControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)575 int SingleVerSerializeManager::ControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
576 {
577     Parcel parcel(const_cast<uint8_t *>(buffer), length);
578     ControlRequestPacket packet;
579     int errCode = ControlRequestDeSerialization(parcel, packet);
580     if (errCode != E_OK) {
581         return errCode;
582     }
583     if (packet.GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet.GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
584         errCode = SubscribeDeSerialization(parcel, inMsg, packet);
585     }
586     return errCode;
587 }
588 
AckControlPacketCalculateLen(const Message * inMsg,uint32_t & len)589 int SingleVerSerializeManager::AckControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
590 {
591     auto packet = inMsg->GetObject<ControlAckPacket>();
592     if (packet == nullptr) {
593         LOGE("[AckControlPacketCalculateLen] invalid control cmd");
594         return -E_INVALID_ARGS;
595     }
596     len = packet->CalculateLen();
597     return E_OK;
598 }
599 
AckControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)600 int SingleVerSerializeManager::AckControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
601 {
602     auto packet = inMsg->GetObject<ControlAckPacket>();
603     if (packet == nullptr) {
604         return -E_INVALID_ARGS;
605     }
606     Parcel parcel(buffer, length);
607     parcel.WriteUInt32(packet->GetVersion());
608     parcel.WriteInt(packet->GetRecvCode());
609     parcel.WriteUInt32(packet->GetcontrolCmdType());
610     parcel.WriteUInt32(packet->GetFlag());
611     if (parcel.IsError()) {
612         LOGE("[AckControlPacketSerialization] Serialization failed");
613         return -E_INVALID_ARGS;
614     }
615     parcel.EightByteAlign();
616     return E_OK;
617 }
618 
AckControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)619 int SingleVerSerializeManager::AckControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
620 {
621     auto packet = new (std::nothrow) ControlAckPacket();
622     if (packet == nullptr) {
623         return -E_OUT_OF_MEMORY;
624     }
625     Parcel parcel(const_cast<uint8_t *>(buffer), length);
626     int32_t recvCode = 0;
627     uint32_t version = 0;
628     uint32_t controlCmdType = 0;
629     uint32_t flag = 0;
630     parcel.ReadUInt32(version);
631     parcel.ReadInt(recvCode);
632     parcel.ReadUInt32(controlCmdType);
633     parcel.ReadUInt32(flag);
634     int errCode;
635     if (parcel.IsError()) {
636         LOGE("[AckControlPacketDeSerialization] DeSerialization failed");
637         errCode = -E_INVALID_ARGS;
638         goto ERROR;
639     }
640     packet->SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), flag);
641     errCode = inMsg->SetExternalObject<>(packet);
642     if (errCode != E_OK) {
643         goto ERROR;
644     }
645     return errCode;
646 ERROR:
647     delete packet;
648     packet = nullptr;
649     return errCode;
650 }
651 
ControlRequestSerialization(Parcel & parcel,const Message * inMsg)652 int SingleVerSerializeManager::ControlRequestSerialization(Parcel &parcel, const Message *inMsg)
653 {
654     auto packet = inMsg->GetObject<ControlRequestPacket>();
655     if (packet == nullptr) {
656         return -E_INVALID_ARGS;
657     }
658     parcel.WriteUInt32(packet->GetVersion());
659     parcel.WriteInt(packet->GetSendCode());
660     parcel.WriteUInt32(packet->GetcontrolCmdType());
661     parcel.WriteUInt32(packet->GetFlag());
662     parcel.EightByteAlign();
663     if (parcel.IsError()) {
664         LOGE("[ControlRequestSerialization] Serialization failed");
665         return -E_INVALID_ARGS;
666     }
667     return E_OK;
668 }
669 
ControlRequestDeSerialization(Parcel & parcel,ControlRequestPacket & packet)670 int SingleVerSerializeManager::ControlRequestDeSerialization(Parcel &parcel, ControlRequestPacket &packet)
671 {
672     uint32_t version = 0;
673     int32_t sendCode = 0;
674     uint32_t controlCmdType = 0;
675     uint32_t flag = 0;
676     parcel.ReadUInt32(version);
677     if (version > SOFTWARE_VERSION_CURRENT) {
678         return -E_VERSION_NOT_SUPPORT;
679     }
680     parcel.ReadInt(sendCode);
681     parcel.ReadUInt32(controlCmdType);
682     parcel.ReadUInt32(flag);
683     if (parcel.IsError()) {
684         LOGE("[ControlRequestDeSerialization] deserialize failed!");
685         return -E_LENGTH_ERROR;
686     }
687     packet.SetPacketHead(sendCode, version, static_cast<int32_t>(controlCmdType), flag);
688     return E_OK;
689 }
690 
SubscribeCalculateLen(const Message * inMsg,uint32_t & len)691 int SingleVerSerializeManager::SubscribeCalculateLen(const Message *inMsg, uint32_t &len)
692 {
693     auto packet = inMsg->GetObject<SubscribeRequest>();
694     if (packet == nullptr) {
695         return -E_INVALID_ARGS;
696     }
697     len = packet->CalculateLen();
698     return E_OK;
699 }
700 
SubscribeSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)701 int SingleVerSerializeManager::SubscribeSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
702 {
703     auto packet = inMsg->GetObject<SubscribeRequest>();
704     if (packet == nullptr) {
705         return -E_INVALID_ARGS;
706     }
707     Parcel parcel(buffer, length);
708     int errCode = ControlRequestSerialization(parcel, inMsg);
709     if (errCode != E_OK) {
710         LOGE("[SubscribeSerialization] ControlRequestPacket Serialization failed, errCode=%d", errCode);
711         return errCode;
712     }
713     QuerySyncObject queryObj = packet->GetQuery();
714     errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
715     if (errCode != E_OK) {
716         LOGE("[SubscribeSerialization] query object Serialization failed, errCode=%d", errCode);
717         return errCode;
718     }
719     return E_OK;
720 }
721 
SubscribeDeSerialization(Parcel & parcel,Message * inMsg,ControlRequestPacket & controlPacket)722 int SingleVerSerializeManager::SubscribeDeSerialization(Parcel &parcel, Message *inMsg,
723     ControlRequestPacket &controlPacket)
724 {
725     auto packet = new (std::nothrow) SubscribeRequest();
726     if (packet == nullptr) {
727         return -E_OUT_OF_MEMORY;
728     }
729     QuerySyncObject querySyncObj;
730     int errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
731     if (errCode != E_OK) {
732         goto ERROR;
733     }
734     packet->SetPacketHead(controlPacket.GetSendCode(), controlPacket.GetVersion(),
735         static_cast<int32_t>(controlPacket.GetcontrolCmdType()), controlPacket.GetFlag());
736     packet->SetQuery(querySyncObj);
737     errCode = inMsg->SetExternalObject<>(packet);
738     if (errCode != E_OK) {
739         goto ERROR;
740     }
741     return errCode;
742 ERROR:
743     delete packet;
744     packet = nullptr;
745     return errCode;
746 }
747 
RegisterCommunicatorTransformFunc()748 int SingleVerSerializeManager::RegisterCommunicatorTransformFunc()
749 {
750     TransformFunc func;
751     func.computeFunc = std::bind(&SingleVerSerializeManager::CalculateLen, std::placeholders::_1);
752     func.serializeFunc = std::bind(&SingleVerSerializeManager::Serialization, std::placeholders::_1,
753         std::placeholders::_2, std::placeholders::_3);
754     func.deserializeFunc = std::bind(&SingleVerSerializeManager::DeSerialization, std::placeholders::_1,
755         std::placeholders::_2, std::placeholders::_3);
756 
757     static std::vector<MessageId> messageIds = {
758         QUERY_SYNC_MESSAGE, DATA_SYNC_MESSAGE, CONTROL_SYNC_MESSAGE, REMOTE_EXECUTE_MESSAGE
759     };
760     int errCode = E_OK;
761     for (auto &id : messageIds) {
762         int retCode = MessageTransform::RegTransformFunction(static_cast<uint32_t>(id), func);
763         if (retCode != E_OK) {
764             LOGE("[SingleVerSerializeManager][RegisterTransformFunc] regist messageId %u failed %d",
765                 static_cast<uint32_t>(id), retCode);
766             errCode = retCode;
767         }
768     }
769     return errCode;
770 }
771 
RegisterInnerTransformFunc()772 void SingleVerSerializeManager::RegisterInnerTransformFunc()
773 {
774     TransformFunc func;
775     func.computeFunc = std::bind(&SingleVerSerializeManager::ISyncPacketCalculateLen, std::placeholders::_1);
776     func.serializeFunc = std::bind(&SingleVerSerializeManager::ISyncPacketSerialization,
777         std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
778     func.deserializeFunc = std::bind(&SingleVerSerializeManager::ISyncPacketDeSerialization,
779         std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
780     std::lock_guard<std::mutex> autoLock(handlesLock_);
781     messageHandles_.emplace(static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE), func);
782 }
783 
ISyncPacketCalculateLen(const Message * inMsg)784 uint32_t SingleVerSerializeManager::ISyncPacketCalculateLen(const Message *inMsg)
785 {
786     if (inMsg == nullptr) {
787         return 0u;
788     }
789     uint32_t len = 0u;
790     const auto packet = inMsg->GetObject<ISyncPacket>();
791     if (packet != nullptr) {
792         len = packet->CalculateLen();
793     }
794     return len;
795 }
796 
ISyncPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)797 int SingleVerSerializeManager::ISyncPacketSerialization(uint8_t *buffer, uint32_t length,
798     const Message *inMsg)
799 {
800     if (inMsg == nullptr) {
801         return -E_INVALID_ARGS;
802     }
803     int errCode = E_OK;
804     Parcel parcel(buffer, length);
805     auto packet = inMsg->GetObject<ISyncPacket>();
806     if (packet != nullptr) {
807         errCode = packet->Serialization(parcel);
808     }
809     return errCode;
810 }
811 
ISyncPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)812 int SingleVerSerializeManager::ISyncPacketDeSerialization(const uint8_t *buffer, uint32_t length,
813     Message *inMsg)
814 {
815     if (inMsg == nullptr) {
816         return -E_INVALID_ARGS;
817     }
818     ISyncPacket *packet = nullptr;
819     int errCode = BuildISyncPacket(inMsg, packet);
820     if (errCode != E_OK) {
821         return errCode;
822     }
823     Parcel parcel(const_cast<uint8_t *>(buffer), length);
824     do {
825         errCode = packet->DeSerialization(parcel);
826         if (errCode != E_OK) {
827             break;
828         }
829         errCode = inMsg->SetExternalObject(packet);
830     } while (false);
831     if (errCode != E_OK) {
832         delete packet;
833         packet = nullptr;
834     }
835     return E_OK;
836 }
837 
BuildISyncPacket(Message * inMsg,ISyncPacket * & packet)838 int SingleVerSerializeManager::BuildISyncPacket(Message *inMsg, ISyncPacket *&packet)
839 {
840     uint32_t messageId = inMsg->GetMessageId();
841     if (messageId != static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE)) {
842         return -E_INVALID_ARGS;
843     }
844     switch (inMsg->GetMessageType()) {
845         case TYPE_REQUEST:
846             packet = new(std::nothrow) RemoteExecutorRequestPacket();
847             break;
848         case TYPE_RESPONSE:
849             packet = new(std::nothrow) RemoteExecutorAckPacket();
850             break;
851         default:
852             packet = nullptr;
853             break;
854     }
855     if (packet == nullptr) {
856         return -E_OUT_OF_MEMORY;
857     }
858     return E_OK;
859 }
860 
DataPacketExtraConditionsSerialization(Parcel & parcel,const DataRequestPacket * packet)861 int SingleVerSerializeManager::DataPacketExtraConditionsSerialization(Parcel &parcel, const DataRequestPacket *packet)
862 {
863     std::map<std::string, std::string> extraConditions = packet->GetExtraConditions();
864     if (extraConditions.size() > DBConstant::MAX_CONDITION_COUNT) {
865         return -E_INVALID_ARGS;
866     }
867     parcel.WriteUInt32(static_cast<uint32_t>(extraConditions.size()));
868     for (const auto &entry : extraConditions) {
869         if (entry.first.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
870             entry.second.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
871             return -E_INVALID_ARGS;
872         }
873         parcel.WriteString(entry.first);
874         parcel.WriteString(entry.second);
875     }
876     parcel.EightByteAlign();
877     if (parcel.IsError()) {
878         return -E_PARSE_FAIL;
879     }
880     return E_OK;
881 }
882 
DataPacketExtraConditionsDeserialization(Parcel & parcel,DataRequestPacket * packet)883 int SingleVerSerializeManager::DataPacketExtraConditionsDeserialization(Parcel &parcel, DataRequestPacket *packet)
884 {
885     if (!packet->IsExtraConditionData()) {
886         return E_OK;
887     }
888     uint32_t conditionSize = 0u;
889     (void) parcel.ReadUInt32(conditionSize);
890     if (conditionSize > DBConstant::MAX_CONDITION_COUNT) {
891         return -E_INVALID_ARGS;
892     }
893     std::map<std::string, std::string> extraConditions;
894     for (uint32_t i = 0; i < conditionSize; i++) {
895         std::string conditionKey;
896         std::string conditionVal;
897         (void) parcel.ReadString(conditionKey);
898         (void) parcel.ReadString(conditionVal);
899         if (conditionKey.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
900             conditionVal.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
901             return -E_INVALID_ARGS;
902         }
903         extraConditions[conditionKey] = conditionVal;
904     }
905     parcel.EightByteAlign();
906     if (parcel.IsError()) {
907         return -E_PARSE_FAIL;
908     }
909     packet->SetExtraConditions(extraConditions);
910     return E_OK;
911 }
912 }  // namespace DistributedDB