1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_serialize_manager.h"
17
18 #include "db_common.h"
19 #include "generic_single_ver_kv_entry.h"
20 #include "icommunicator.h"
21 #include "log_print.h"
22 #include "message_transform.h"
23 #include "parcel.h"
24 #include "remote_executor_packet.h"
25 #include "sync_types.h"
26 #include "version.h"
27
28 namespace DistributedDB {
29 std::mutex SingleVerSerializeManager::handlesLock_;
30 std::map<uint32_t, TransformFunc> SingleVerSerializeManager::messageHandles_;
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)31 int SingleVerSerializeManager::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
32 {
33 if ((buffer == nullptr) || !(IsPacketValid(inMsg))) {
34 return -E_MESSAGE_ID_ERROR;
35 }
36 SerializeFunc serializeFunc = nullptr;
37 {
38 std::lock_guard<std::mutex> autoLock(handlesLock_);
39 if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
40 serializeFunc = messageHandles_.at(inMsg->GetMessageId()).serializeFunc;
41 }
42 }
43 if (serializeFunc != nullptr) {
44 return serializeFunc(buffer, length, inMsg);
45 }
46
47 if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
48 return ControlSerialization(buffer, length, inMsg);
49 }
50 return DataSerialization(buffer, length, inMsg);
51 }
52
DataSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)53 int SingleVerSerializeManager::DataSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
54 {
55 switch (inMsg->GetMessageType()) {
56 case TYPE_REQUEST:
57 return DataPacketSerialization(buffer, length, inMsg);
58 case TYPE_RESPONSE:
59 case TYPE_NOTIFY:
60 return AckPacketSerialization(buffer, length, inMsg);
61 default:
62 return -E_MESSAGE_TYPE_ERROR;
63 }
64 }
65
ControlSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)66 int SingleVerSerializeManager::ControlSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
67 {
68 switch (inMsg->GetMessageType()) {
69 case TYPE_REQUEST:
70 return ControlPacketSerialization(buffer, length, inMsg);
71 case TYPE_RESPONSE:
72 return AckControlPacketSerialization(buffer, length, inMsg);
73 default:
74 return -E_MESSAGE_TYPE_ERROR;
75 }
76 }
77
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)78 int SingleVerSerializeManager::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
79 {
80 if ((buffer == nullptr) || !(IsPacketValid(inMsg))) {
81 return -E_MESSAGE_ID_ERROR;
82 }
83 DeserializeFunc deserializeFunc = nullptr;
84 {
85 std::lock_guard<std::mutex> autoLock(handlesLock_);
86 if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
87 deserializeFunc = messageHandles_.at(inMsg->GetMessageId()).deserializeFunc;
88 }
89 }
90 if (deserializeFunc != nullptr) {
91 return deserializeFunc(buffer, length, inMsg);
92 }
93 if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
94 return ControlDeSerialization(buffer, length, inMsg);
95 }
96 return DataDeSerialization(buffer, length, inMsg);
97 }
98
DataDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)99 int SingleVerSerializeManager::DataDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
100 {
101 switch (inMsg->GetMessageType()) {
102 case TYPE_REQUEST:
103 return DataPacketDeSerialization(buffer, length, inMsg);
104 case TYPE_RESPONSE:
105 case TYPE_NOTIFY:
106 return AckPacketDeSerialization(buffer, length, inMsg);
107 default:
108 return -E_MESSAGE_TYPE_ERROR;
109 }
110 }
111
ControlDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)112 int SingleVerSerializeManager::ControlDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
113 {
114 switch (inMsg->GetMessageType()) {
115 case TYPE_REQUEST:
116 return ControlPacketDeSerialization(buffer, length, inMsg);
117 case TYPE_RESPONSE:
118 return AckControlPacketDeSerialization(buffer, length, inMsg);
119 default:
120 return -E_MESSAGE_TYPE_ERROR;
121 }
122 }
123
CalculateLen(const Message * inMsg)124 uint32_t SingleVerSerializeManager::CalculateLen(const Message *inMsg)
125 {
126 if (!(IsPacketValid(inMsg))) {
127 return 0;
128 }
129 ComputeLengthFunc computeFunc = nullptr;
130 {
131 std::lock_guard<std::mutex> autoLock(handlesLock_);
132 if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
133 computeFunc = messageHandles_.at(inMsg->GetMessageId()).computeFunc;
134 }
135 }
136 if (computeFunc != nullptr) {
137 return computeFunc(inMsg);
138 }
139 if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
140 return CalculateControlLen(inMsg);
141 }
142 return CalculateDataLen(inMsg);
143 }
144
CalculateDataLen(const Message * inMsg)145 uint32_t SingleVerSerializeManager::CalculateDataLen(const Message *inMsg)
146 {
147 uint32_t len = 0;
148 int errCode;
149 switch (inMsg->GetMessageType()) {
150 case TYPE_REQUEST:
151 errCode = DataPacketCalculateLen(inMsg, len);
152 if (errCode != E_OK) {
153 LOGE("[CalculateDataLen] calculate data request packet len failed, errCode=%d", errCode);
154 return 0;
155 }
156 return len;
157 case TYPE_RESPONSE:
158 case TYPE_NOTIFY:
159 errCode = AckPacketCalculateLen(inMsg, len);
160 if (errCode != E_OK) {
161 LOGE("[CalculateDataLen] calculate data notify packet len failed errCode=%d", errCode);
162 return 0;
163 }
164 return len;
165 default:
166 return 0;
167 }
168 }
169
CalculateControlLen(const Message * inMsg)170 uint32_t SingleVerSerializeManager::CalculateControlLen(const Message *inMsg)
171 {
172 uint32_t len = 0;
173 int errCode;
174 switch (inMsg->GetMessageType()) {
175 case TYPE_REQUEST:
176 errCode = ControlPacketCalculateLen(inMsg, len);
177 if (errCode != E_OK) {
178 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
179 return 0;
180 }
181 return len;
182 case TYPE_RESPONSE:
183 case TYPE_NOTIFY:
184 errCode = AckControlPacketCalculateLen(inMsg, len);
185 if (errCode != E_OK) {
186 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
187 return 0;
188 }
189 return len;
190 default:
191 return 0;
192 }
193 }
194
RegisterTransformFunc()195 int SingleVerSerializeManager::RegisterTransformFunc()
196 {
197 int errCode = RegisterCommunicatorTransformFunc();
198 RegisterInnerTransformFunc();
199 return errCode;
200 }
201
DataPacketSyncerPartSerialization(Parcel & parcel,const DataRequestPacket * packet)202 int SingleVerSerializeManager::DataPacketSyncerPartSerialization(Parcel &parcel, const DataRequestPacket *packet)
203 {
204 parcel.WriteUInt64(packet->GetEndWaterMark());
205 parcel.WriteUInt64(packet->GetLocalWaterMark());
206 parcel.WriteUInt64(packet->GetPeerWaterMark());
207 parcel.WriteInt(packet->GetSendCode());
208 parcel.WriteInt(packet->GetMode());
209 parcel.WriteUInt32(packet->GetSessionId());
210 parcel.WriteVector<uint64_t>(packet->GetReserved());
211 if (parcel.IsError()) {
212 return -E_PARSE_FAIL;
213 }
214 if (packet->GetVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
215 parcel.WriteUInt32(packet->GetFlag());
216 if (parcel.IsError()) {
217 return -E_PARSE_FAIL;
218 }
219 }
220 parcel.EightByteAlign();
221 return E_OK;
222 }
223
DataPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)224 int SingleVerSerializeManager::DataPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
225 {
226 auto packet = inMsg->GetObject<DataRequestPacket>();
227 if (packet == nullptr) {
228 return -E_INVALID_ARGS;
229 }
230 Parcel parcel(buffer, length);
231
232 // version
233 int errCode = parcel.WriteUInt32(packet->GetVersion());
234 if (errCode != E_OK) {
235 LOGE("[DataPacketSerialization] Serialize version failed");
236 return errCode;
237 }
238 // sendDataItems
239 errCode = GenericSingleVerKvEntry::SerializeDatas(
240 (packet->IsCompressData() ? std::vector<SendDataItem> {} : packet->GetData()), parcel, packet->GetVersion());
241 if (errCode != E_OK) {
242 LOGE("[DataPacketSerialization] Serialize Data failed");
243 return errCode;
244 }
245
246 // data sync
247 errCode = DataPacketSyncerPartSerialization(parcel, packet);
248 if (errCode != E_OK) {
249 LOGE("[DataPacketSerialization] Serialize Data failed");
250 return errCode;
251 }
252 if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
253 errCode = DataPacketQuerySyncSerialization(parcel, packet); // for query sync
254 if (errCode != E_OK) {
255 return errCode;
256 }
257 }
258 if (packet->IsCompressData()) {
259 // serialize compress data
260 errCode = GenericSingleVerKvEntry::SerializeCompressedDatas(packet->GetData(), packet->GetCompressData(),
261 parcel, packet->GetVersion(), packet->GetCompressAlgo());
262 if (errCode != E_OK) {
263 LOGE("[DataPacketSerialization] Serialize compress Data failed");
264 return errCode;
265 }
266 }
267 // flag mask add in 103
268 if (packet->GetVersion() < SOFTWARE_VERSION_RELEASE_3_0 || !packet->IsExtraConditionData()) {
269 return E_OK;
270 }
271 return DataPacketExtraConditionsSerialization(parcel, packet);
272 }
273
DataPacketQuerySyncSerialization(Parcel & parcel,const DataRequestPacket * packet)274 int SingleVerSerializeManager::DataPacketQuerySyncSerialization(Parcel &parcel, const DataRequestPacket *packet)
275 {
276 // deleted record send watermark
277 int errCode = parcel.WriteUInt64(packet->GetDeletedWaterMark());
278 if (errCode != E_OK) {
279 LOGE("[QuerySerialization] Serialize deleted record send watermark failed!");
280 return errCode;
281 }
282
283 // query identify
284 QuerySyncObject queryObj = packet->GetQuery();
285 errCode = parcel.WriteString(packet->GetQueryId());
286 if (errCode != E_OK) {
287 LOGE("[QuerySerialization] Serialize query id failed!");
288 return errCode;
289 }
290 if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
291 // need to check.
292 errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
293 }
294 return errCode;
295 }
296
DataPacketCalculateLen(const Message * inMsg,uint32_t & len)297 int SingleVerSerializeManager::DataPacketCalculateLen(const Message *inMsg, uint32_t &len)
298 {
299 const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
300 if (packet == nullptr) {
301 return -E_INVALID_ARGS;
302 }
303
304 len = packet->CalculateLen(inMsg->GetMessageId());
305 return E_OK;
306 }
307
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)308 int SingleVerSerializeManager::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
309 {
310 const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
311 if (packet == nullptr) {
312 return -E_INVALID_ARGS;
313 }
314
315 len = packet->CalculateLen();
316 return E_OK;
317 }
318
IsPacketValid(const Message * inMsg)319 bool SingleVerSerializeManager::IsPacketValid(const Message *inMsg)
320 {
321 if (inMsg == nullptr) {
322 return false;
323 }
324
325 int msgType = inMsg->GetMessageType();
326 if (msgType != TYPE_REQUEST && msgType != TYPE_RESPONSE && msgType != TYPE_NOTIFY) {
327 LOGE("[DataSync][IsPacketValid] Message type ERROR! message type=%d", msgType);
328 return false;
329 }
330 return true;
331 }
332
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)333 int SingleVerSerializeManager::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
334 {
335 const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
336 if (packet == nullptr) {
337 return -E_INVALID_ARGS;
338 }
339
340 Parcel parcel(buffer, length);
341 parcel.WriteUInt32(packet->GetVersion());
342 if (parcel.IsError()) {
343 return -E_PARSE_FAIL;
344 }
345 // now V1 compatible for softWareVersion :{101, 102}
346 return AckPacketSyncerPartSerializationV1(parcel, packet);
347 }
348
AckPacketSyncerPartSerializationV1(Parcel & parcel,const DataAckPacket * packet)349 int SingleVerSerializeManager::AckPacketSyncerPartSerializationV1(Parcel &parcel, const DataAckPacket *packet)
350 {
351 parcel.WriteUInt64(packet->GetData());
352 parcel.WriteInt(packet->GetRecvCode());
353 parcel.WriteVector<uint64_t>(packet->GetReserved());
354 if (parcel.IsError()) {
355 return -E_PARSE_FAIL;
356 }
357 parcel.EightByteAlign();
358 return E_OK;
359 }
360
DataPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)361 int SingleVerSerializeManager::DataPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
362 {
363 std::vector<SendDataItem> dataItems;
364 uint32_t version;
365 Parcel parcel(const_cast<uint8_t *>(buffer), length);
366 uint32_t packLen = parcel.ReadUInt32(version);
367 if (parcel.IsError()) {
368 return -E_PARSE_FAIL;
369 }
370
371 if (version > SOFTWARE_VERSION_CURRENT) {
372 return -E_VERSION_NOT_SUPPORT;
373 }
374
375 packLen += static_cast<uint32_t>(GenericSingleVerKvEntry::DeSerializeDatas(dataItems, parcel));
376 if (parcel.IsError()) {
377 return -E_PARSE_FAIL;
378 }
379
380 auto packet = new (std::nothrow) DataRequestPacket();
381 if (packet == nullptr) {
382 return -E_OUT_OF_MEMORY;
383 }
384
385 packet->SetVersion(version);
386 packet->SetData(dataItems);
387 int errCode = DataPacketSyncerPartDeSerialization(parcel, packet, packLen, length, version);
388 if (errCode != E_OK) {
389 goto ERROR;
390 }
391 if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
392 errCode = DataPacketQuerySyncDeSerialization(parcel, packet);
393 if (errCode != E_OK) {
394 goto ERROR;
395 }
396 }
397 if (packet->IsCompressData()) {
398 errCode = DataPacketCompressDataDeSerialization(parcel, packet);
399 if (errCode != E_OK) {
400 goto ERROR;
401 }
402 }
403 errCode = DataPacketExtraConditionsDeserialization(parcel, packet);
404 if (errCode != E_OK) {
405 goto ERROR;
406 }
407 errCode = inMsg->SetExternalObject<>(packet);
408 if (errCode != E_OK) {
409 goto ERROR;
410 }
411 return errCode;
412
413 ERROR:
414 delete packet;
415 packet = nullptr;
416 return errCode;
417 }
418
DataPacketQuerySyncDeSerialization(Parcel & parcel,DataRequestPacket * packet)419 int SingleVerSerializeManager::DataPacketQuerySyncDeSerialization(Parcel &parcel, DataRequestPacket *packet)
420 {
421 WaterMark deletedWatermark = 0;
422 parcel.ReadUInt64(deletedWatermark);
423 std::string queryId;
424 parcel.ReadString(queryId);
425 if (parcel.IsError()) {
426 return -E_PARSE_FAIL;
427 }
428 // query identify
429 QuerySyncObject querySyncObj;
430 int errCode = E_OK;
431 // for version 105, query is always sent.
432 if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
433 // need to check.
434 errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
435 }
436 if (errCode != E_OK) {
437 LOGI("[SingleVerSerializeManager] DeSerializeData object failed.");
438 return errCode;
439 }
440 packet->SetDeletedWaterMark(deletedWatermark);
441 packet->SetQueryId(queryId);
442 if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
443 packet->SetQuery(querySyncObj);
444 }
445 return E_OK;
446 }
447
DataPacketCompressDataDeSerialization(Parcel & parcel,DataRequestPacket * packet)448 int SingleVerSerializeManager::DataPacketCompressDataDeSerialization(Parcel &parcel, DataRequestPacket *packet)
449 {
450 std::vector<SendDataItem> originalData;
451 int errCode = GenericSingleVerKvEntry::DeSerializeCompressedDatas(originalData, parcel);
452 if (errCode != E_OK) {
453 LOGE("[SingleVerSerializeManager] DeSerializeComptressData failed, errCode=%d", errCode);
454 return errCode;
455 }
456 packet->SetData(originalData);
457 return E_OK;
458 }
459
DataPacketSyncerPartDeSerialization(Parcel & parcel,DataRequestPacket * packet,uint32_t packLen,uint32_t length,uint32_t version)460 int SingleVerSerializeManager::DataPacketSyncerPartDeSerialization(Parcel &parcel, DataRequestPacket *packet,
461 uint32_t packLen, uint32_t length, uint32_t version)
462 {
463 WaterMark waterMark;
464 WaterMark localWaterMark;
465 WaterMark peerWaterMark;
466 int32_t sendCode;
467 int32_t mode;
468 uint32_t sessionId;
469 uint32_t flag = 0;
470 std::vector<uint64_t> reserved;
471
472 packLen += parcel.ReadUInt64(waterMark);
473 packLen += parcel.ReadUInt64(localWaterMark);
474 packLen += parcel.ReadUInt64(peerWaterMark);
475 packLen += parcel.ReadInt(sendCode);
476 packLen += parcel.ReadInt(mode);
477 packLen += parcel.ReadUInt32(sessionId);
478 packLen += parcel.ReadVector<uint64_t>(reserved);
479 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
480 packLen += parcel.ReadUInt32(flag);
481 packet->SetFlag(flag);
482 }
483 packLen = Parcel::GetEightByteAlign(packLen);
484 if (parcel.IsError()) {
485 LOGE("[DataSync][DataPacketDeSerialization] deserialize failed! input len=%" PRIu32 ",packLen=%" PRIu32,
486 length, packLen);
487 return -E_LENGTH_ERROR;
488 }
489 parcel.EightByteAlign();
490 packet->SetEndWaterMark(waterMark);
491 packet->SetLocalWaterMark(localWaterMark);
492 packet->SetPeerWaterMark(peerWaterMark);
493 packet->SetSendCode(sendCode);
494 packet->SetMode(mode);
495 packet->SetSessionId(sessionId);
496 packet->SetReserved(reserved);
497 return E_OK;
498 }
499
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)500 int SingleVerSerializeManager::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
501 {
502 DataAckPacket packet;
503 Parcel parcel(const_cast<uint8_t *>(buffer), length);
504 uint32_t version;
505
506 parcel.ReadUInt32(version);
507 if (parcel.IsError()) {
508 return -E_INVALID_ARGS;
509 }
510 if (version > SOFTWARE_VERSION_CURRENT) {
511 packet.SetVersion(version);
512 packet.SetRecvCode(-E_VERSION_NOT_SUPPORT);
513 return inMsg->SetCopiedObject<>(packet);
514 }
515 packet.SetVersion(version);
516 // now V1 compatible for softWareVersion :{101, 102}
517 int errCode = AckPacketSyncerPartDeSerializationV1(parcel, packet);
518 if (errCode != E_OK) {
519 return errCode;
520 }
521
522 return inMsg->SetCopiedObject<>(packet);
523 }
524
AckPacketSyncerPartDeSerializationV1(Parcel & parcel,DataAckPacket & packet)525 int SingleVerSerializeManager::AckPacketSyncerPartDeSerializationV1(Parcel &parcel, DataAckPacket &packet)
526 {
527 WaterMark mark;
528 int32_t errCode;
529 std::vector<uint64_t> reserved;
530
531 parcel.ReadUInt64(mark);
532 parcel.ReadInt(errCode);
533 parcel.ReadVector<uint64_t>(reserved);
534 if (parcel.IsError()) {
535 LOGE("[AckPacketSyncerPartDeSerializationV1] DeSerialization failed");
536 return -E_INVALID_ARGS;
537 }
538 packet.SetData(mark);
539 packet.SetRecvCode(errCode);
540 packet.SetReserved(reserved);
541 return E_OK;
542 }
543
ControlPacketCalculateLen(const Message * inMsg,uint32_t & len)544 int SingleVerSerializeManager::ControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
545 {
546 auto packet = inMsg->GetObject<ControlRequestPacket>();
547 if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
548 LOGE("[ControlPacketSerialization] invalid control cmd");
549 return -E_INVALID_ARGS;
550 }
551 if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
552 return SingleVerSerializeManager::SubscribeCalculateLen(inMsg, len);
553 }
554 return E_OK;
555 }
556
ControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)557 int SingleVerSerializeManager::ControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
558 {
559 auto packet = inMsg->GetObject<ControlRequestPacket>();
560 if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
561 LOGE("[ControlPacketSerialization] invalid control cmd");
562 return -E_INVALID_ARGS;
563 }
564 if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
565 return SingleVerSerializeManager::SubscribeSerialization(buffer, length, inMsg);
566 }
567 return E_OK;
568 }
569
ControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)570 int SingleVerSerializeManager::ControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
571 {
572 Parcel parcel(const_cast<uint8_t *>(buffer), length);
573 ControlRequestPacket packet;
574 int errCode = ControlRequestDeSerialization(parcel, packet);
575 if (errCode != E_OK) {
576 return errCode;
577 }
578 if (packet.GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet.GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
579 errCode = SubscribeDeSerialization(parcel, inMsg, packet);
580 }
581 return errCode;
582 }
583
AckControlPacketCalculateLen(const Message * inMsg,uint32_t & len)584 int SingleVerSerializeManager::AckControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
585 {
586 auto packet = inMsg->GetObject<ControlAckPacket>();
587 if (packet == nullptr) {
588 LOGE("[AckControlPacketCalculateLen] invalid control cmd");
589 return -E_INVALID_ARGS;
590 }
591 len = packet->CalculateLen();
592 return E_OK;
593 }
594
AckControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)595 int SingleVerSerializeManager::AckControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
596 {
597 auto packet = inMsg->GetObject<ControlAckPacket>();
598 if (packet == nullptr) {
599 return -E_INVALID_ARGS;
600 }
601 Parcel parcel(buffer, length);
602 parcel.WriteUInt32(packet->GetVersion());
603 parcel.WriteInt(packet->GetRecvCode());
604 parcel.WriteUInt32(packet->GetcontrolCmdType());
605 parcel.WriteUInt32(packet->GetFlag());
606 if (parcel.IsError()) {
607 LOGE("[AckControlPacketSerialization] Serialization failed");
608 return -E_INVALID_ARGS;
609 }
610 parcel.EightByteAlign();
611 return E_OK;
612 }
613
AckControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)614 int SingleVerSerializeManager::AckControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
615 {
616 auto packet = new (std::nothrow) ControlAckPacket();
617 if (packet == nullptr) {
618 return -E_OUT_OF_MEMORY;
619 }
620 Parcel parcel(const_cast<uint8_t *>(buffer), length);
621 int32_t recvCode = 0;
622 uint32_t version = 0;
623 uint32_t controlCmdType = 0;
624 uint32_t flag = 0;
625 parcel.ReadUInt32(version);
626 parcel.ReadInt(recvCode);
627 parcel.ReadUInt32(controlCmdType);
628 parcel.ReadUInt32(flag);
629 int errCode;
630 if (parcel.IsError()) {
631 LOGE("[AckControlPacketDeSerialization] DeSerialization failed");
632 errCode = -E_INVALID_ARGS;
633 goto ERROR;
634 }
635 packet->SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), flag);
636 errCode = inMsg->SetExternalObject<>(packet);
637 if (errCode != E_OK) {
638 goto ERROR;
639 }
640 return errCode;
641 ERROR:
642 delete packet;
643 packet = nullptr;
644 return errCode;
645 }
646
ControlRequestSerialization(Parcel & parcel,const Message * inMsg)647 int SingleVerSerializeManager::ControlRequestSerialization(Parcel &parcel, const Message *inMsg)
648 {
649 auto packet = inMsg->GetObject<ControlRequestPacket>();
650 if (packet == nullptr) {
651 return -E_INVALID_ARGS;
652 }
653 parcel.WriteUInt32(packet->GetVersion());
654 parcel.WriteInt(packet->GetSendCode());
655 parcel.WriteUInt32(packet->GetcontrolCmdType());
656 parcel.WriteUInt32(packet->GetFlag());
657 if (parcel.IsError()) {
658 LOGE("[ControlRequestSerialization] Serialization failed");
659 return -E_INVALID_ARGS;
660 }
661 parcel.EightByteAlign();
662 return E_OK;
663 }
664
ControlRequestDeSerialization(Parcel & parcel,ControlRequestPacket & packet)665 int SingleVerSerializeManager::ControlRequestDeSerialization(Parcel &parcel, ControlRequestPacket &packet)
666 {
667 uint32_t version = 0;
668 int32_t sendCode = 0;
669 uint32_t controlCmdType = 0;
670 uint32_t flag = 0;
671 parcel.ReadUInt32(version);
672 if (version > SOFTWARE_VERSION_CURRENT) {
673 return -E_VERSION_NOT_SUPPORT;
674 }
675 parcel.ReadInt(sendCode);
676 parcel.ReadUInt32(controlCmdType);
677 parcel.ReadUInt32(flag);
678 if (parcel.IsError()) {
679 LOGE("[ControlRequestDeSerialization] deserialize failed!");
680 return -E_LENGTH_ERROR;
681 }
682 packet.SetPacketHead(sendCode, version, static_cast<int32_t>(controlCmdType), flag);
683 return E_OK;
684 }
685
SubscribeCalculateLen(const Message * inMsg,uint32_t & len)686 int SingleVerSerializeManager::SubscribeCalculateLen(const Message *inMsg, uint32_t &len)
687 {
688 auto packet = inMsg->GetObject<SubscribeRequest>();
689 if (packet == nullptr) {
690 return -E_INVALID_ARGS;
691 }
692 len = packet->CalculateLen();
693 return E_OK;
694 }
695
SubscribeSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)696 int SingleVerSerializeManager::SubscribeSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
697 {
698 auto packet = inMsg->GetObject<SubscribeRequest>();
699 if (packet == nullptr) {
700 return -E_INVALID_ARGS;
701 }
702 Parcel parcel(buffer, length);
703 int errCode = ControlRequestSerialization(parcel, inMsg);
704 if (errCode != E_OK) {
705 LOGE("[SubscribeSerialization] ControlRequestPacket Serialization failed, errCode=%d", errCode);
706 return errCode;
707 }
708 QuerySyncObject queryObj = packet->GetQuery();
709 errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
710 if (errCode != E_OK) {
711 LOGE("[SubscribeSerialization] query object Serialization failed, errCode=%d", errCode);
712 return errCode;
713 }
714 return E_OK;
715 }
716
SubscribeDeSerialization(Parcel & parcel,Message * inMsg,ControlRequestPacket & controlPacket)717 int SingleVerSerializeManager::SubscribeDeSerialization(Parcel &parcel, Message *inMsg,
718 ControlRequestPacket &controlPacket)
719 {
720 auto packet = new (std::nothrow) SubscribeRequest();
721 if (packet == nullptr) {
722 return -E_OUT_OF_MEMORY;
723 }
724 QuerySyncObject querySyncObj;
725 int errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
726 if (errCode != E_OK) {
727 goto ERROR;
728 }
729 packet->SetPacketHead(controlPacket.GetSendCode(), controlPacket.GetVersion(),
730 static_cast<int32_t>(controlPacket.GetcontrolCmdType()), controlPacket.GetFlag());
731 packet->SetQuery(querySyncObj);
732 errCode = inMsg->SetExternalObject<>(packet);
733 if (errCode != E_OK) {
734 goto ERROR;
735 }
736 return errCode;
737 ERROR:
738 delete packet;
739 packet = nullptr;
740 return errCode;
741 }
742
RegisterCommunicatorTransformFunc()743 int SingleVerSerializeManager::RegisterCommunicatorTransformFunc()
744 {
745 TransformFunc func;
746 func.computeFunc = std::bind(&SingleVerSerializeManager::CalculateLen, std::placeholders::_1);
747 func.serializeFunc = std::bind(&SingleVerSerializeManager::Serialization, std::placeholders::_1,
748 std::placeholders::_2, std::placeholders::_3);
749 func.deserializeFunc = std::bind(&SingleVerSerializeManager::DeSerialization, std::placeholders::_1,
750 std::placeholders::_2, std::placeholders::_3);
751
752 static std::vector<MessageId> messageIds = {
753 QUERY_SYNC_MESSAGE, DATA_SYNC_MESSAGE, CONTROL_SYNC_MESSAGE, REMOTE_EXECUTE_MESSAGE
754 };
755 int errCode = E_OK;
756 for (auto &id : messageIds) {
757 int retCode = MessageTransform::RegTransformFunction(static_cast<uint32_t>(id), func);
758 if (retCode != E_OK) {
759 LOGE("[SingleVerSerializeManager][RegisterTransformFunc] regist messageId %u failed %d",
760 static_cast<uint32_t>(id), retCode);
761 errCode = retCode;
762 }
763 }
764 return errCode;
765 }
766
RegisterInnerTransformFunc()767 void SingleVerSerializeManager::RegisterInnerTransformFunc()
768 {
769 TransformFunc func;
770 func.computeFunc = std::bind(&SingleVerSerializeManager::ISyncPacketCalculateLen, std::placeholders::_1);
771 func.serializeFunc = std::bind(&SingleVerSerializeManager::ISyncPacketSerialization,
772 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
773 func.deserializeFunc = std::bind(&SingleVerSerializeManager::ISyncPacketDeSerialization,
774 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
775 std::lock_guard<std::mutex> autoLock(handlesLock_);
776 messageHandles_.emplace(static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE), func);
777 }
778
ISyncPacketCalculateLen(const Message * inMsg)779 uint32_t SingleVerSerializeManager::ISyncPacketCalculateLen(const Message *inMsg)
780 {
781 if (inMsg == nullptr) {
782 return 0u;
783 }
784 uint32_t len = 0u;
785 const auto packet = inMsg->GetObject<ISyncPacket>();
786 if (packet != nullptr) {
787 len = packet->CalculateLen();
788 }
789 return len;
790 }
791
ISyncPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)792 int SingleVerSerializeManager::ISyncPacketSerialization(uint8_t *buffer, uint32_t length,
793 const Message *inMsg)
794 {
795 if (inMsg == nullptr) {
796 return -E_INVALID_ARGS;
797 }
798 int errCode = E_OK;
799 Parcel parcel(buffer, length);
800 auto packet = inMsg->GetObject<ISyncPacket>();
801 if (packet != nullptr) {
802 errCode = packet->Serialization(parcel);
803 }
804 return errCode;
805 }
806
ISyncPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)807 int SingleVerSerializeManager::ISyncPacketDeSerialization(const uint8_t *buffer, uint32_t length,
808 Message *inMsg)
809 {
810 if (inMsg == nullptr) {
811 return -E_INVALID_ARGS;
812 }
813 ISyncPacket *packet = nullptr;
814 int errCode = BuildISyncPacket(inMsg, packet);
815 if (errCode != E_OK) {
816 return errCode;
817 }
818 Parcel parcel(const_cast<uint8_t *>(buffer), length);
819 do {
820 errCode = packet->DeSerialization(parcel);
821 if (errCode != E_OK) {
822 break;
823 }
824 errCode = inMsg->SetExternalObject(packet);
825 } while (false);
826 if (errCode != E_OK) {
827 delete packet;
828 packet = nullptr;
829 }
830 return E_OK;
831 }
832
BuildISyncPacket(Message * inMsg,ISyncPacket * & packet)833 int SingleVerSerializeManager::BuildISyncPacket(Message *inMsg, ISyncPacket *&packet)
834 {
835 uint32_t messageId = inMsg->GetMessageId();
836 if (messageId != static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE)) {
837 return -E_INVALID_ARGS;
838 }
839 switch (inMsg->GetMessageType()) {
840 case TYPE_REQUEST:
841 packet = new(std::nothrow) RemoteExecutorRequestPacket();
842 break;
843 case TYPE_RESPONSE:
844 packet = new(std::nothrow) RemoteExecutorAckPacket();
845 break;
846 default:
847 packet = nullptr;
848 break;
849 }
850 if (packet == nullptr) {
851 return -E_OUT_OF_MEMORY;
852 }
853 return E_OK;
854 }
855
DataPacketExtraConditionsSerialization(Parcel & parcel,const DataRequestPacket * packet)856 int SingleVerSerializeManager::DataPacketExtraConditionsSerialization(Parcel &parcel, const DataRequestPacket *packet)
857 {
858 std::map<std::string, std::string> extraConditions = packet->GetExtraConditions();
859 if (extraConditions.size() > DBConstant::MAX_CONDITION_COUNT) {
860 return -E_INVALID_ARGS;
861 }
862 parcel.WriteUInt32(static_cast<uint32_t>(extraConditions.size()));
863 for (const auto &entry : extraConditions) {
864 if (entry.first.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
865 entry.second.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
866 return -E_INVALID_ARGS;
867 }
868 parcel.WriteString(entry.first);
869 parcel.WriteString(entry.second);
870 }
871 parcel.EightByteAlign();
872 if (parcel.IsError()) {
873 return -E_PARSE_FAIL;
874 }
875 return E_OK;
876 }
877
DataPacketExtraConditionsDeserialization(Parcel & parcel,DataRequestPacket * packet)878 int SingleVerSerializeManager::DataPacketExtraConditionsDeserialization(Parcel &parcel, DataRequestPacket *packet)
879 {
880 if (!packet->IsExtraConditionData()) {
881 return E_OK;
882 }
883 uint32_t conditionSize = 0u;
884 (void) parcel.ReadUInt32(conditionSize);
885 if (conditionSize > DBConstant::MAX_CONDITION_COUNT) {
886 return -E_INVALID_ARGS;
887 }
888 std::map<std::string, std::string> extraConditions;
889 for (uint32_t i = 0; i < conditionSize; i++) {
890 std::string conditionKey;
891 std::string conditionVal;
892 (void) parcel.ReadString(conditionKey);
893 (void) parcel.ReadString(conditionVal);
894 if (conditionKey.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
895 conditionVal.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
896 return -E_INVALID_ARGS;
897 }
898 extraConditions[conditionKey] = conditionVal;
899 }
900 parcel.EightByteAlign();
901 if (parcel.IsError()) {
902 return -E_PARSE_FAIL;
903 }
904 packet->SetExtraConditions(extraConditions);
905 return E_OK;
906 }
907 } // namespace DistributedDB