1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_serialize_manager.h"
17
18 #include "db_common.h"
19 #include "generic_single_ver_kv_entry.h"
20 #include "icommunicator.h"
21 #include "log_print.h"
22 #include "message_transform.h"
23 #include "parcel.h"
24 #include "remote_executor_packet.h"
25 #include "sync_types.h"
26 #include "version.h"
27
28 namespace DistributedDB {
29 std::mutex SingleVerSerializeManager::handlesLock_;
30 std::map<uint32_t, TransformFunc> SingleVerSerializeManager::messageHandles_;
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)31 int SingleVerSerializeManager::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
32 {
33 if ((buffer == nullptr) || !(IsPacketValid(inMsg))) {
34 return -E_MESSAGE_ID_ERROR;
35 }
36 SerializeFunc serializeFunc = nullptr;
37 {
38 std::lock_guard<std::mutex> autoLock(handlesLock_);
39 if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
40 serializeFunc = messageHandles_.at(inMsg->GetMessageId()).serializeFunc;
41 }
42 }
43 if (serializeFunc != nullptr) {
44 return serializeFunc(buffer, length, inMsg);
45 }
46
47 if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
48 return ControlSerialization(buffer, length, inMsg);
49 }
50 return DataSerialization(buffer, length, inMsg);
51 }
52
DataSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)53 int SingleVerSerializeManager::DataSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
54 {
55 switch (inMsg->GetMessageType()) {
56 case TYPE_REQUEST:
57 return DataPacketSerialization(buffer, length, inMsg);
58 case TYPE_RESPONSE:
59 case TYPE_NOTIFY:
60 return AckPacketSerialization(buffer, length, inMsg);
61 default:
62 return -E_MESSAGE_TYPE_ERROR;
63 }
64 }
65
ControlSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)66 int SingleVerSerializeManager::ControlSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
67 {
68 switch (inMsg->GetMessageType()) {
69 case TYPE_REQUEST:
70 return ControlPacketSerialization(buffer, length, inMsg);
71 case TYPE_RESPONSE:
72 return AckControlPacketSerialization(buffer, length, inMsg);
73 default:
74 return -E_MESSAGE_TYPE_ERROR;
75 }
76 }
77
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)78 int SingleVerSerializeManager::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
79 {
80 if ((buffer == nullptr) || !(IsPacketValid(inMsg))) {
81 return -E_MESSAGE_ID_ERROR;
82 }
83 DeserializeFunc deserializeFunc = nullptr;
84 {
85 std::lock_guard<std::mutex> autoLock(handlesLock_);
86 if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
87 deserializeFunc = messageHandles_.at(inMsg->GetMessageId()).deserializeFunc;
88 }
89 }
90 if (deserializeFunc != nullptr) {
91 return deserializeFunc(buffer, length, inMsg);
92 }
93 if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
94 return ControlDeSerialization(buffer, length, inMsg);
95 }
96 return DataDeSerialization(buffer, length, inMsg);
97 }
98
DataDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)99 int SingleVerSerializeManager::DataDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
100 {
101 switch (inMsg->GetMessageType()) {
102 case TYPE_REQUEST:
103 return DataPacketDeSerialization(buffer, length, inMsg);
104 case TYPE_RESPONSE:
105 case TYPE_NOTIFY:
106 return AckPacketDeSerialization(buffer, length, inMsg);
107 default:
108 return -E_MESSAGE_TYPE_ERROR;
109 }
110 }
111
ControlDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)112 int SingleVerSerializeManager::ControlDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
113 {
114 switch (inMsg->GetMessageType()) {
115 case TYPE_REQUEST:
116 return ControlPacketDeSerialization(buffer, length, inMsg);
117 case TYPE_RESPONSE:
118 return AckControlPacketDeSerialization(buffer, length, inMsg);
119 default:
120 return -E_MESSAGE_TYPE_ERROR;
121 }
122 }
123
CalculateLen(const Message * inMsg)124 uint32_t SingleVerSerializeManager::CalculateLen(const Message *inMsg)
125 {
126 if (!(IsPacketValid(inMsg))) {
127 return 0;
128 }
129 ComputeLengthFunc computeFunc = nullptr;
130 {
131 std::lock_guard<std::mutex> autoLock(handlesLock_);
132 if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
133 computeFunc = messageHandles_.at(inMsg->GetMessageId()).computeFunc;
134 }
135 }
136 if (computeFunc != nullptr) {
137 return computeFunc(inMsg);
138 }
139 if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
140 return CalculateControlLen(inMsg);
141 }
142 return CalculateDataLen(inMsg);
143 }
144
CalculateDataLen(const Message * inMsg)145 uint32_t SingleVerSerializeManager::CalculateDataLen(const Message *inMsg)
146 {
147 uint32_t len = 0;
148 int errCode;
149 switch (inMsg->GetMessageType()) {
150 case TYPE_REQUEST:
151 errCode = DataPacketCalculateLen(inMsg, len);
152 if (errCode != E_OK) {
153 LOGE("[CalculateDataLen] calculate data request packet len failed, errCode=%d", errCode);
154 return 0;
155 }
156 return len;
157 case TYPE_RESPONSE:
158 case TYPE_NOTIFY:
159 errCode = AckPacketCalculateLen(inMsg, len);
160 if (errCode != E_OK) {
161 LOGE("[CalculateDataLen] calculate data notify packet len failed errCode=%d", errCode);
162 return 0;
163 }
164 return len;
165 default:
166 return 0;
167 }
168 }
169
CalculateControlLen(const Message * inMsg)170 uint32_t SingleVerSerializeManager::CalculateControlLen(const Message *inMsg)
171 {
172 uint32_t len = 0;
173 int errCode;
174 switch (inMsg->GetMessageType()) {
175 case TYPE_REQUEST:
176 errCode = ControlPacketCalculateLen(inMsg, len);
177 if (errCode != E_OK) {
178 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
179 return 0;
180 }
181 return len;
182 case TYPE_RESPONSE:
183 case TYPE_NOTIFY:
184 errCode = AckControlPacketCalculateLen(inMsg, len);
185 if (errCode != E_OK) {
186 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
187 return 0;
188 }
189 return len;
190 default:
191 return 0;
192 }
193 }
194
RegisterTransformFunc()195 int SingleVerSerializeManager::RegisterTransformFunc()
196 {
197 int errCode = RegisterCommunicatorTransformFunc();
198 RegisterInnerTransformFunc();
199 return errCode;
200 }
201
DataPacketSyncerPartSerialization(Parcel & parcel,const DataRequestPacket * packet)202 int SingleVerSerializeManager::DataPacketSyncerPartSerialization(Parcel &parcel, const DataRequestPacket *packet)
203 {
204 parcel.WriteUInt64(packet->GetEndWaterMark());
205 parcel.WriteUInt64(packet->GetLocalWaterMark());
206 parcel.WriteUInt64(packet->GetPeerWaterMark());
207 parcel.WriteInt(packet->GetSendCode());
208 parcel.WriteInt(packet->GetMode());
209 parcel.WriteUInt32(packet->GetSessionId());
210 parcel.WriteVector<uint64_t>(packet->GetReserved());
211 if (parcel.IsError()) {
212 return -E_PARSE_FAIL;
213 }
214 if (packet->GetVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
215 parcel.WriteUInt32(packet->GetFlag());
216 }
217 parcel.EightByteAlign();
218 if (parcel.IsError()) {
219 return -E_PARSE_FAIL;
220 }
221 return E_OK;
222 }
223
DataPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)224 int SingleVerSerializeManager::DataPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
225 {
226 auto packet = inMsg->GetObject<DataRequestPacket>();
227 if (packet == nullptr) {
228 return -E_INVALID_ARGS;
229 }
230 Parcel parcel(buffer, length);
231
232 // version
233 int errCode = parcel.WriteUInt32(packet->GetVersion());
234 if (errCode != E_OK) {
235 LOGE("[DataPacketSerialization] Serialize version failed");
236 return errCode;
237 }
238 // sendDataItems
239 errCode = GenericSingleVerKvEntry::SerializeDatas(
240 (packet->IsCompressData() ? std::vector<SendDataItem> {} : packet->GetData()), parcel, packet->GetVersion());
241 if (errCode != E_OK) {
242 LOGE("[DataPacketSerialization] Serialize Data failed");
243 return errCode;
244 }
245
246 // data sync
247 errCode = DataPacketSyncerPartSerialization(parcel, packet);
248 if (errCode != E_OK) {
249 LOGE("[DataPacketSerialization] Serialize Data failed");
250 return errCode;
251 }
252 if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
253 errCode = DataPacketQuerySyncSerialization(parcel, packet); // for query sync
254 if (errCode != E_OK) {
255 return errCode;
256 }
257 }
258 if (packet->IsCompressData()) {
259 // serialize compress data
260 errCode = GenericSingleVerKvEntry::SerializeCompressedDatas(packet->GetData(), packet->GetCompressData(),
261 parcel, packet->GetVersion(), packet->GetCompressAlgo());
262 if (errCode != E_OK) {
263 LOGE("[DataPacketSerialization] Serialize compress Data failed");
264 return errCode;
265 }
266 }
267 // flag mask add in 103
268 if (packet->GetVersion() < SOFTWARE_VERSION_RELEASE_3_0 || !packet->IsExtraConditionData()) {
269 return E_OK;
270 }
271 return DataPacketExtraConditionsSerialization(parcel, packet);
272 }
273
DataPacketQuerySyncSerialization(Parcel & parcel,const DataRequestPacket * packet)274 int SingleVerSerializeManager::DataPacketQuerySyncSerialization(Parcel &parcel, const DataRequestPacket *packet)
275 {
276 // deleted record send watermark
277 int errCode = parcel.WriteUInt64(packet->GetDeletedWaterMark());
278 if (errCode != E_OK) {
279 LOGE("[QuerySerialization] Serialize deleted record send watermark failed!");
280 return errCode;
281 }
282
283 // query identify
284 QuerySyncObject queryObj = packet->GetQuery();
285 errCode = parcel.WriteString(packet->GetQueryId());
286 if (errCode != E_OK) {
287 LOGE("[QuerySerialization] Serialize query id failed!");
288 return errCode;
289 }
290 if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
291 // need to check.
292 errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
293 }
294 return errCode;
295 }
296
DataPacketCalculateLen(const Message * inMsg,uint32_t & len)297 int SingleVerSerializeManager::DataPacketCalculateLen(const Message *inMsg, uint32_t &len)
298 {
299 const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
300 if (packet == nullptr) {
301 return -E_INVALID_ARGS;
302 }
303
304 len = packet->CalculateLen(inMsg->GetMessageId());
305 return E_OK;
306 }
307
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)308 int SingleVerSerializeManager::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
309 {
310 const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
311 if (packet == nullptr) {
312 return -E_INVALID_ARGS;
313 }
314
315 len = packet->CalculateLen();
316 return E_OK;
317 }
318
IsPacketValid(const Message * inMsg)319 bool SingleVerSerializeManager::IsPacketValid(const Message *inMsg)
320 {
321 if (inMsg == nullptr) {
322 return false;
323 }
324
325 int msgType = inMsg->GetMessageType();
326 if (msgType != TYPE_REQUEST && msgType != TYPE_RESPONSE && msgType != TYPE_NOTIFY) {
327 LOGE("[DataSync][IsPacketValid] Message type ERROR! message type=%d", msgType);
328 return false;
329 }
330 return true;
331 }
332
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)333 int SingleVerSerializeManager::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
334 {
335 const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
336 if (packet == nullptr) {
337 return -E_INVALID_ARGS;
338 }
339
340 Parcel parcel(buffer, length);
341 parcel.WriteUInt32(packet->GetVersion());
342 if (parcel.IsError()) {
343 return -E_PARSE_FAIL;
344 }
345 // now V1 compatible for softWareVersion :{101, 102}
346 return AckPacketSyncerPartSerializationV1(parcel, packet);
347 }
348
AckPacketSyncerPartSerializationV1(Parcel & parcel,const DataAckPacket * packet)349 int SingleVerSerializeManager::AckPacketSyncerPartSerializationV1(Parcel &parcel, const DataAckPacket *packet)
350 {
351 parcel.WriteUInt64(packet->GetData());
352 parcel.WriteInt(packet->GetRecvCode());
353 parcel.WriteVector<uint64_t>(packet->GetReserved());
354 parcel.EightByteAlign();
355 if (parcel.IsError()) {
356 return -E_PARSE_FAIL;
357 }
358 return E_OK;
359 }
360
DataPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)361 int SingleVerSerializeManager::DataPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
362 {
363 std::vector<SendDataItem> dataItems;
364 uint32_t version;
365 Parcel parcel(const_cast<uint8_t *>(buffer), length);
366 uint32_t packLen = parcel.ReadUInt32(version);
367 if (parcel.IsError()) {
368 return -E_PARSE_FAIL;
369 }
370
371 if (version > SOFTWARE_VERSION_CURRENT) {
372 return -E_VERSION_NOT_SUPPORT;
373 }
374
375 packLen += static_cast<uint32_t>(GenericSingleVerKvEntry::DeSerializeDatas(dataItems, parcel));
376 if (parcel.IsError()) {
377 return -E_PARSE_FAIL;
378 }
379
380 auto packet = new (std::nothrow) DataRequestPacket();
381 if (packet == nullptr) {
382 return -E_OUT_OF_MEMORY;
383 }
384
385 packet->SetVersion(version);
386 packet->SetData(dataItems);
387 int errCode = DataPacketSyncerPartDeSerialization(parcel, packet, packLen, length, version);
388 if (errCode != E_OK) {
389 goto ERROR;
390 }
391 if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
392 errCode = DataPacketQuerySyncDeSerialization(parcel, packet);
393 if (errCode != E_OK) {
394 goto ERROR;
395 }
396 }
397 if (packet->IsCompressData()) {
398 errCode = DataPacketCompressDataDeSerialization(parcel, packet);
399 if (errCode != E_OK) {
400 goto ERROR;
401 }
402 }
403 errCode = DataPacketExtraConditionsDeserialization(parcel, packet);
404 if (errCode != E_OK) {
405 goto ERROR;
406 }
407 errCode = inMsg->SetExternalObject<>(packet);
408 if (errCode == E_OK) {
409 return errCode;
410 }
411
412 ERROR:
413 delete packet;
414 packet = nullptr;
415 return errCode;
416 }
417
DataPacketQuerySyncDeSerialization(Parcel & parcel,DataRequestPacket * packet)418 int SingleVerSerializeManager::DataPacketQuerySyncDeSerialization(Parcel &parcel, DataRequestPacket *packet)
419 {
420 WaterMark deletedWatermark = 0;
421 parcel.ReadUInt64(deletedWatermark);
422 std::string queryId;
423 parcel.ReadString(queryId);
424 if (parcel.IsError()) {
425 return -E_PARSE_FAIL;
426 }
427 // query identify
428 QuerySyncObject querySyncObj;
429 int errCode = E_OK;
430 // for version 105, query is always sent.
431 if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
432 // need to check.
433 errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
434 }
435 if (errCode != E_OK) {
436 LOGI("[SingleVerSerializeManager] DeSerializeData object failed.");
437 return errCode;
438 }
439 packet->SetDeletedWaterMark(deletedWatermark);
440 packet->SetQueryId(queryId);
441 if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
442 packet->SetQuery(querySyncObj);
443 }
444 return E_OK;
445 }
446
DataPacketCompressDataDeSerialization(Parcel & parcel,DataRequestPacket * packet)447 int SingleVerSerializeManager::DataPacketCompressDataDeSerialization(Parcel &parcel, DataRequestPacket *packet)
448 {
449 std::vector<SendDataItem> originalData;
450 int errCode = GenericSingleVerKvEntry::DeSerializeCompressedDatas(originalData, parcel);
451 if (errCode != E_OK) {
452 LOGE("[SingleVerSerializeManager] DeSerializeComptressData failed, errCode=%d", errCode);
453 return errCode;
454 }
455 packet->SetData(originalData);
456 return E_OK;
457 }
458
DataPacketSyncerPartDeSerialization(Parcel & parcel,DataRequestPacket * packet,uint32_t packLen,uint32_t length,uint32_t version)459 int SingleVerSerializeManager::DataPacketSyncerPartDeSerialization(Parcel &parcel, DataRequestPacket *packet,
460 uint32_t packLen, uint32_t length, uint32_t version)
461 {
462 WaterMark waterMark;
463 WaterMark localWaterMark;
464 WaterMark peerWaterMark;
465 int32_t sendCode;
466 int32_t mode;
467 uint32_t sessionId;
468 uint32_t flag = 0;
469 std::vector<uint64_t> reserved;
470
471 uint64_t totPacketLen = packLen;
472 totPacketLen += parcel.ReadUInt64(waterMark);
473 totPacketLen += parcel.ReadUInt64(localWaterMark);
474 totPacketLen += parcel.ReadUInt64(peerWaterMark);
475 totPacketLen += parcel.ReadInt(sendCode);
476 totPacketLen += parcel.ReadInt(mode);
477 totPacketLen += parcel.ReadUInt32(sessionId);
478 totPacketLen += parcel.ReadVector<uint64_t>(reserved);
479 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
480 totPacketLen += parcel.ReadUInt32(flag);
481 packet->SetFlag(flag);
482 }
483 if (totPacketLen > INT32_MAX) {
484 LOGE("[DataSync][DataPacketDeSerialization] deserialize failed! input totPackLen=%" PRIu64 " is over limit.",
485 totPacketLen);
486 return -E_LENGTH_ERROR;
487 }
488 parcel.EightByteAlign();
489 totPacketLen = Parcel::GetEightByteAlign(totPacketLen);
490 if (parcel.IsError()) {
491 LOGE("[DataSync][DataPacketDeSerialization] deserialize failed! input len=%" PRIu32 ", totPackLen=%" PRIu64,
492 length, totPacketLen);
493 return -E_LENGTH_ERROR;
494 }
495 packet->SetEndWaterMark(waterMark);
496 packet->SetLocalWaterMark(localWaterMark);
497 packet->SetPeerWaterMark(peerWaterMark);
498 packet->SetSendCode(sendCode);
499 packet->SetMode(mode);
500 packet->SetSessionId(sessionId);
501 packet->SetReserved(reserved);
502 return E_OK;
503 }
504
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)505 int SingleVerSerializeManager::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
506 {
507 DataAckPacket packet;
508 Parcel parcel(const_cast<uint8_t *>(buffer), length);
509 uint32_t version;
510
511 parcel.ReadUInt32(version);
512 if (parcel.IsError()) {
513 return -E_INVALID_ARGS;
514 }
515 if (version > SOFTWARE_VERSION_CURRENT) {
516 packet.SetVersion(version);
517 packet.SetRecvCode(-E_VERSION_NOT_SUPPORT);
518 return inMsg->SetCopiedObject<>(packet);
519 }
520 packet.SetVersion(version);
521 // now V1 compatible for softWareVersion :{101, 102}
522 int errCode = AckPacketSyncerPartDeSerializationV1(parcel, packet);
523 if (errCode != E_OK) {
524 return errCode;
525 }
526
527 return inMsg->SetCopiedObject<>(packet);
528 }
529
AckPacketSyncerPartDeSerializationV1(Parcel & parcel,DataAckPacket & packet)530 int SingleVerSerializeManager::AckPacketSyncerPartDeSerializationV1(Parcel &parcel, DataAckPacket &packet)
531 {
532 WaterMark mark;
533 int32_t errCode;
534 std::vector<uint64_t> reserved;
535
536 parcel.ReadUInt64(mark);
537 parcel.ReadInt(errCode);
538 parcel.ReadVector<uint64_t>(reserved);
539 if (parcel.IsError()) {
540 LOGE("[AckPacketSyncerPartDeSerializationV1] DeSerialization failed");
541 return -E_INVALID_ARGS;
542 }
543 packet.SetData(mark);
544 packet.SetRecvCode(errCode);
545 packet.SetReserved(reserved);
546 return E_OK;
547 }
548
ControlPacketCalculateLen(const Message * inMsg,uint32_t & len)549 int SingleVerSerializeManager::ControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
550 {
551 auto packet = inMsg->GetObject<ControlRequestPacket>();
552 if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
553 LOGE("[ControlPacketSerialization] invalid control cmd");
554 return -E_INVALID_ARGS;
555 }
556 if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
557 return SingleVerSerializeManager::SubscribeCalculateLen(inMsg, len);
558 }
559 return E_OK;
560 }
561
ControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)562 int SingleVerSerializeManager::ControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
563 {
564 auto packet = inMsg->GetObject<ControlRequestPacket>();
565 if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
566 LOGE("[ControlPacketSerialization] invalid control cmd");
567 return -E_INVALID_ARGS;
568 }
569 if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
570 return SingleVerSerializeManager::SubscribeSerialization(buffer, length, inMsg);
571 }
572 return E_OK;
573 }
574
ControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)575 int SingleVerSerializeManager::ControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
576 {
577 Parcel parcel(const_cast<uint8_t *>(buffer), length);
578 ControlRequestPacket packet;
579 int errCode = ControlRequestDeSerialization(parcel, packet);
580 if (errCode != E_OK) {
581 return errCode;
582 }
583 if (packet.GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet.GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
584 errCode = SubscribeDeSerialization(parcel, inMsg, packet);
585 }
586 return errCode;
587 }
588
AckControlPacketCalculateLen(const Message * inMsg,uint32_t & len)589 int SingleVerSerializeManager::AckControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
590 {
591 auto packet = inMsg->GetObject<ControlAckPacket>();
592 if (packet == nullptr) {
593 LOGE("[AckControlPacketCalculateLen] invalid control cmd");
594 return -E_INVALID_ARGS;
595 }
596 len = packet->CalculateLen();
597 return E_OK;
598 }
599
AckControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)600 int SingleVerSerializeManager::AckControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
601 {
602 auto packet = inMsg->GetObject<ControlAckPacket>();
603 if (packet == nullptr) {
604 return -E_INVALID_ARGS;
605 }
606 Parcel parcel(buffer, length);
607 parcel.WriteUInt32(packet->GetVersion());
608 parcel.WriteInt(packet->GetRecvCode());
609 parcel.WriteUInt32(packet->GetcontrolCmdType());
610 parcel.WriteUInt32(packet->GetFlag());
611 if (parcel.IsError()) {
612 LOGE("[AckControlPacketSerialization] Serialization failed");
613 return -E_INVALID_ARGS;
614 }
615 parcel.EightByteAlign();
616 return E_OK;
617 }
618
AckControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)619 int SingleVerSerializeManager::AckControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
620 {
621 auto packet = new (std::nothrow) ControlAckPacket();
622 if (packet == nullptr) {
623 return -E_OUT_OF_MEMORY;
624 }
625 Parcel parcel(const_cast<uint8_t *>(buffer), length);
626 int32_t recvCode = 0;
627 uint32_t version = 0;
628 uint32_t controlCmdType = 0;
629 uint32_t flag = 0;
630 parcel.ReadUInt32(version);
631 parcel.ReadInt(recvCode);
632 parcel.ReadUInt32(controlCmdType);
633 parcel.ReadUInt32(flag);
634 int errCode;
635 if (parcel.IsError()) {
636 LOGE("[AckControlPacketDeSerialization] DeSerialization failed");
637 errCode = -E_INVALID_ARGS;
638 goto ERROR;
639 }
640 packet->SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), flag);
641 errCode = inMsg->SetExternalObject<>(packet);
642 if (errCode != E_OK) {
643 goto ERROR;
644 }
645 return errCode;
646 ERROR:
647 delete packet;
648 packet = nullptr;
649 return errCode;
650 }
651
ControlRequestSerialization(Parcel & parcel,const Message * inMsg)652 int SingleVerSerializeManager::ControlRequestSerialization(Parcel &parcel, const Message *inMsg)
653 {
654 auto packet = inMsg->GetObject<ControlRequestPacket>();
655 if (packet == nullptr) {
656 return -E_INVALID_ARGS;
657 }
658 parcel.WriteUInt32(packet->GetVersion());
659 parcel.WriteInt(packet->GetSendCode());
660 parcel.WriteUInt32(packet->GetcontrolCmdType());
661 parcel.WriteUInt32(packet->GetFlag());
662 parcel.EightByteAlign();
663 if (parcel.IsError()) {
664 LOGE("[ControlRequestSerialization] Serialization failed");
665 return -E_INVALID_ARGS;
666 }
667 return E_OK;
668 }
669
ControlRequestDeSerialization(Parcel & parcel,ControlRequestPacket & packet)670 int SingleVerSerializeManager::ControlRequestDeSerialization(Parcel &parcel, ControlRequestPacket &packet)
671 {
672 uint32_t version = 0;
673 int32_t sendCode = 0;
674 uint32_t controlCmdType = 0;
675 uint32_t flag = 0;
676 parcel.ReadUInt32(version);
677 if (version > SOFTWARE_VERSION_CURRENT) {
678 return -E_VERSION_NOT_SUPPORT;
679 }
680 parcel.ReadInt(sendCode);
681 parcel.ReadUInt32(controlCmdType);
682 parcel.ReadUInt32(flag);
683 if (parcel.IsError()) {
684 LOGE("[ControlRequestDeSerialization] deserialize failed!");
685 return -E_LENGTH_ERROR;
686 }
687 packet.SetPacketHead(sendCode, version, static_cast<int32_t>(controlCmdType), flag);
688 return E_OK;
689 }
690
SubscribeCalculateLen(const Message * inMsg,uint32_t & len)691 int SingleVerSerializeManager::SubscribeCalculateLen(const Message *inMsg, uint32_t &len)
692 {
693 auto packet = inMsg->GetObject<SubscribeRequest>();
694 if (packet == nullptr) {
695 return -E_INVALID_ARGS;
696 }
697 len = packet->CalculateLen();
698 return E_OK;
699 }
700
SubscribeSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)701 int SingleVerSerializeManager::SubscribeSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
702 {
703 auto packet = inMsg->GetObject<SubscribeRequest>();
704 if (packet == nullptr) {
705 return -E_INVALID_ARGS;
706 }
707 Parcel parcel(buffer, length);
708 int errCode = ControlRequestSerialization(parcel, inMsg);
709 if (errCode != E_OK) {
710 LOGE("[SubscribeSerialization] ControlRequestPacket Serialization failed, errCode=%d", errCode);
711 return errCode;
712 }
713 QuerySyncObject queryObj = packet->GetQuery();
714 errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
715 if (errCode != E_OK) {
716 LOGE("[SubscribeSerialization] query object Serialization failed, errCode=%d", errCode);
717 return errCode;
718 }
719 return E_OK;
720 }
721
SubscribeDeSerialization(Parcel & parcel,Message * inMsg,ControlRequestPacket & controlPacket)722 int SingleVerSerializeManager::SubscribeDeSerialization(Parcel &parcel, Message *inMsg,
723 ControlRequestPacket &controlPacket)
724 {
725 auto packet = new (std::nothrow) SubscribeRequest();
726 if (packet == nullptr) {
727 return -E_OUT_OF_MEMORY;
728 }
729 QuerySyncObject querySyncObj;
730 int errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
731 if (errCode != E_OK) {
732 goto ERROR;
733 }
734 packet->SetPacketHead(controlPacket.GetSendCode(), controlPacket.GetVersion(),
735 static_cast<int32_t>(controlPacket.GetcontrolCmdType()), controlPacket.GetFlag());
736 packet->SetQuery(querySyncObj);
737 errCode = inMsg->SetExternalObject<>(packet);
738 if (errCode != E_OK) {
739 goto ERROR;
740 }
741 return errCode;
742 ERROR:
743 delete packet;
744 packet = nullptr;
745 return errCode;
746 }
747
RegisterCommunicatorTransformFunc()748 int SingleVerSerializeManager::RegisterCommunicatorTransformFunc()
749 {
750 TransformFunc func;
751 func.computeFunc = std::bind(&SingleVerSerializeManager::CalculateLen, std::placeholders::_1);
752 func.serializeFunc = std::bind(&SingleVerSerializeManager::Serialization, std::placeholders::_1,
753 std::placeholders::_2, std::placeholders::_3);
754 func.deserializeFunc = std::bind(&SingleVerSerializeManager::DeSerialization, std::placeholders::_1,
755 std::placeholders::_2, std::placeholders::_3);
756
757 static std::vector<MessageId> messageIds = {
758 QUERY_SYNC_MESSAGE, DATA_SYNC_MESSAGE, CONTROL_SYNC_MESSAGE, REMOTE_EXECUTE_MESSAGE
759 };
760 int errCode = E_OK;
761 for (auto &id : messageIds) {
762 int retCode = MessageTransform::RegTransformFunction(static_cast<uint32_t>(id), func);
763 if (retCode != E_OK) {
764 LOGE("[SingleVerSerializeManager][RegisterTransformFunc] regist messageId %u failed %d",
765 static_cast<uint32_t>(id), retCode);
766 errCode = retCode;
767 }
768 }
769 return errCode;
770 }
771
RegisterInnerTransformFunc()772 void SingleVerSerializeManager::RegisterInnerTransformFunc()
773 {
774 TransformFunc func;
775 func.computeFunc = std::bind(&SingleVerSerializeManager::ISyncPacketCalculateLen, std::placeholders::_1);
776 func.serializeFunc = std::bind(&SingleVerSerializeManager::ISyncPacketSerialization,
777 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
778 func.deserializeFunc = std::bind(&SingleVerSerializeManager::ISyncPacketDeSerialization,
779 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
780 std::lock_guard<std::mutex> autoLock(handlesLock_);
781 messageHandles_.emplace(static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE), func);
782 }
783
ISyncPacketCalculateLen(const Message * inMsg)784 uint32_t SingleVerSerializeManager::ISyncPacketCalculateLen(const Message *inMsg)
785 {
786 if (inMsg == nullptr) {
787 return 0u;
788 }
789 uint32_t len = 0u;
790 const auto packet = inMsg->GetObject<ISyncPacket>();
791 if (packet != nullptr) {
792 len = packet->CalculateLen();
793 }
794 return len;
795 }
796
ISyncPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)797 int SingleVerSerializeManager::ISyncPacketSerialization(uint8_t *buffer, uint32_t length,
798 const Message *inMsg)
799 {
800 if (inMsg == nullptr) {
801 return -E_INVALID_ARGS;
802 }
803 int errCode = E_OK;
804 Parcel parcel(buffer, length);
805 auto packet = inMsg->GetObject<ISyncPacket>();
806 if (packet != nullptr) {
807 errCode = packet->Serialization(parcel);
808 }
809 return errCode;
810 }
811
ISyncPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)812 int SingleVerSerializeManager::ISyncPacketDeSerialization(const uint8_t *buffer, uint32_t length,
813 Message *inMsg)
814 {
815 if (inMsg == nullptr) {
816 return -E_INVALID_ARGS;
817 }
818 ISyncPacket *packet = nullptr;
819 int errCode = BuildISyncPacket(inMsg, packet);
820 if (errCode != E_OK) {
821 return errCode;
822 }
823 Parcel parcel(const_cast<uint8_t *>(buffer), length);
824 do {
825 errCode = packet->DeSerialization(parcel);
826 if (errCode != E_OK) {
827 break;
828 }
829 errCode = inMsg->SetExternalObject(packet);
830 } while (false);
831 if (errCode != E_OK) {
832 delete packet;
833 packet = nullptr;
834 }
835 return E_OK;
836 }
837
BuildISyncPacket(Message * inMsg,ISyncPacket * & packet)838 int SingleVerSerializeManager::BuildISyncPacket(Message *inMsg, ISyncPacket *&packet)
839 {
840 uint32_t messageId = inMsg->GetMessageId();
841 if (messageId != static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE)) {
842 return -E_INVALID_ARGS;
843 }
844 switch (inMsg->GetMessageType()) {
845 case TYPE_REQUEST:
846 packet = new(std::nothrow) RemoteExecutorRequestPacket();
847 break;
848 case TYPE_RESPONSE:
849 packet = new(std::nothrow) RemoteExecutorAckPacket();
850 break;
851 default:
852 packet = nullptr;
853 break;
854 }
855 if (packet == nullptr) {
856 return -E_OUT_OF_MEMORY;
857 }
858 return E_OK;
859 }
860
DataPacketExtraConditionsSerialization(Parcel & parcel,const DataRequestPacket * packet)861 int SingleVerSerializeManager::DataPacketExtraConditionsSerialization(Parcel &parcel, const DataRequestPacket *packet)
862 {
863 std::map<std::string, std::string> extraConditions = packet->GetExtraConditions();
864 if (extraConditions.size() > DBConstant::MAX_CONDITION_COUNT) {
865 return -E_INVALID_ARGS;
866 }
867 parcel.WriteUInt32(static_cast<uint32_t>(extraConditions.size()));
868 for (const auto &entry : extraConditions) {
869 if (entry.first.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
870 entry.second.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
871 return -E_INVALID_ARGS;
872 }
873 parcel.WriteString(entry.first);
874 parcel.WriteString(entry.second);
875 }
876 parcel.EightByteAlign();
877 if (parcel.IsError()) {
878 return -E_PARSE_FAIL;
879 }
880 return E_OK;
881 }
882
DataPacketExtraConditionsDeserialization(Parcel & parcel,DataRequestPacket * packet)883 int SingleVerSerializeManager::DataPacketExtraConditionsDeserialization(Parcel &parcel, DataRequestPacket *packet)
884 {
885 if (!packet->IsExtraConditionData()) {
886 return E_OK;
887 }
888 uint32_t conditionSize = 0u;
889 (void) parcel.ReadUInt32(conditionSize);
890 if (conditionSize > DBConstant::MAX_CONDITION_COUNT) {
891 return -E_INVALID_ARGS;
892 }
893 std::map<std::string, std::string> extraConditions;
894 for (uint32_t i = 0; i < conditionSize; i++) {
895 std::string conditionKey;
896 std::string conditionVal;
897 (void) parcel.ReadString(conditionKey);
898 (void) parcel.ReadString(conditionVal);
899 if (conditionKey.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
900 conditionVal.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
901 return -E_INVALID_ARGS;
902 }
903 extraConditions[conditionKey] = conditionVal;
904 }
905 parcel.EightByteAlign();
906 if (parcel.IsError()) {
907 return -E_PARSE_FAIL;
908 }
909 packet->SetExtraConditions(extraConditions);
910 return E_OK;
911 }
912 } // namespace DistributedDB