1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_serialize_manager.h"
17
18 #include "db_common.h"
19 #include "generic_single_ver_kv_entry.h"
20 #include "icommunicator.h"
21 #include "log_print.h"
22 #include "message_transform.h"
23 #include "parcel.h"
24 #include "remote_executor_packet.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "sync_types.h"
27 #include "version.h"
28
29 namespace DistributedDB {
30 std::mutex SingleVerSerializeManager::handlesLock_;
31 std::map<uint32_t, TransformFunc> SingleVerSerializeManager::messageHandles_;
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)32 int SingleVerSerializeManager::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
33 {
34 if ((buffer == nullptr) || length == 0u || !(IsPacketValid(inMsg))) {
35 return -E_MESSAGE_ID_ERROR;
36 }
37 SerializeFunc serializeFunc = nullptr;
38 {
39 std::lock_guard<std::mutex> autoLock(handlesLock_);
40 if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
41 serializeFunc = messageHandles_.at(inMsg->GetMessageId()).serializeFunc;
42 }
43 }
44 if (serializeFunc != nullptr) {
45 return serializeFunc(buffer, length, inMsg);
46 }
47
48 if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
49 return ControlSerialization(buffer, length, inMsg);
50 }
51 return DataSerialization(buffer, length, inMsg);
52 }
53
DataSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)54 int SingleVerSerializeManager::DataSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
55 {
56 switch (inMsg->GetMessageType()) {
57 case TYPE_REQUEST:
58 return DataPacketSerialization(buffer, length, inMsg);
59 case TYPE_RESPONSE:
60 case TYPE_NOTIFY:
61 return AckPacketSerialization(buffer, length, inMsg);
62 default:
63 return -E_MESSAGE_TYPE_ERROR;
64 }
65 }
66
ControlSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)67 int SingleVerSerializeManager::ControlSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
68 {
69 switch (inMsg->GetMessageType()) {
70 case TYPE_REQUEST:
71 return ControlPacketSerialization(buffer, length, inMsg);
72 case TYPE_RESPONSE:
73 return AckControlPacketSerialization(buffer, length, inMsg);
74 default:
75 return -E_MESSAGE_TYPE_ERROR;
76 }
77 }
78
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)79 int SingleVerSerializeManager::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
80 {
81 if ((buffer == nullptr) || length == 0u || !(IsPacketValid(inMsg))) {
82 return -E_MESSAGE_ID_ERROR;
83 }
84 DeserializeFunc deserializeFunc = nullptr;
85 {
86 std::lock_guard<std::mutex> autoLock(handlesLock_);
87 if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
88 deserializeFunc = messageHandles_.at(inMsg->GetMessageId()).deserializeFunc;
89 }
90 }
91 if (deserializeFunc != nullptr) {
92 return deserializeFunc(buffer, length, inMsg);
93 }
94 if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
95 return ControlDeSerialization(buffer, length, inMsg);
96 }
97 return DataDeSerialization(buffer, length, inMsg);
98 }
99
DataDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)100 int SingleVerSerializeManager::DataDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
101 {
102 switch (inMsg->GetMessageType()) {
103 case TYPE_REQUEST:
104 return DataPacketDeSerialization(buffer, length, inMsg);
105 case TYPE_RESPONSE:
106 case TYPE_NOTIFY:
107 return AckPacketDeSerialization(buffer, length, inMsg);
108 default:
109 return -E_MESSAGE_TYPE_ERROR;
110 }
111 }
112
ControlDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)113 int SingleVerSerializeManager::ControlDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
114 {
115 switch (inMsg->GetMessageType()) {
116 case TYPE_REQUEST:
117 return ControlPacketDeSerialization(buffer, length, inMsg);
118 case TYPE_RESPONSE:
119 return AckControlPacketDeSerialization(buffer, length, inMsg);
120 default:
121 return -E_MESSAGE_TYPE_ERROR;
122 }
123 }
124
CalculateLen(const Message * inMsg)125 uint32_t SingleVerSerializeManager::CalculateLen(const Message *inMsg)
126 {
127 if (!(IsPacketValid(inMsg))) {
128 return 0;
129 }
130 ComputeLengthFunc computeFunc = nullptr;
131 {
132 std::lock_guard<std::mutex> autoLock(handlesLock_);
133 if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
134 computeFunc = messageHandles_.at(inMsg->GetMessageId()).computeFunc;
135 }
136 }
137 if (computeFunc != nullptr) {
138 return computeFunc(inMsg);
139 }
140 if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
141 return CalculateControlLen(inMsg);
142 }
143 return CalculateDataLen(inMsg);
144 }
145
CalculateDataLen(const Message * inMsg)146 uint32_t SingleVerSerializeManager::CalculateDataLen(const Message *inMsg)
147 {
148 uint32_t len = 0;
149 int errCode;
150 switch (inMsg->GetMessageType()) {
151 case TYPE_REQUEST:
152 errCode = DataPacketCalculateLen(inMsg, len);
153 if (errCode != E_OK) {
154 LOGE("[CalculateDataLen] calculate data request packet len failed, errCode=%d", errCode);
155 return 0;
156 }
157 return len;
158 case TYPE_RESPONSE:
159 case TYPE_NOTIFY:
160 errCode = AckPacketCalculateLen(inMsg, len);
161 if (errCode != E_OK) {
162 LOGE("[CalculateDataLen] calculate data notify packet len failed errCode=%d", errCode);
163 return 0;
164 }
165 return len;
166 default:
167 return 0;
168 }
169 }
170
CalculateControlLen(const Message * inMsg)171 uint32_t SingleVerSerializeManager::CalculateControlLen(const Message *inMsg)
172 {
173 uint32_t len = 0;
174 int errCode;
175 switch (inMsg->GetMessageType()) {
176 case TYPE_REQUEST:
177 errCode = ControlPacketCalculateLen(inMsg, len);
178 if (errCode != E_OK) {
179 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
180 return 0;
181 }
182 return len;
183 case TYPE_RESPONSE:
184 case TYPE_NOTIFY:
185 errCode = AckControlPacketCalculateLen(inMsg, len);
186 if (errCode != E_OK) {
187 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
188 return 0;
189 }
190 return len;
191 default:
192 return 0;
193 }
194 }
195
RegisterTransformFunc()196 int SingleVerSerializeManager::RegisterTransformFunc()
197 {
198 RegisterInnerTransformFunc();
199 return RegisterCommunicatorTransformFunc();
200 }
201
DataPacketSyncerPartSerialization(Parcel & parcel,const DataRequestPacket * packet)202 int SingleVerSerializeManager::DataPacketSyncerPartSerialization(Parcel &parcel, const DataRequestPacket *packet)
203 {
204 parcel.WriteUInt64(packet->GetEndWaterMark());
205 parcel.WriteUInt64(packet->GetLocalWaterMark());
206 parcel.WriteUInt64(packet->GetPeerWaterMark());
207 parcel.WriteInt(packet->GetSendCode());
208 parcel.WriteInt(packet->GetMode());
209 parcel.WriteUInt32(packet->GetSessionId());
210 parcel.WriteVector<uint64_t>(packet->GetReserved());
211 if (parcel.IsError()) {
212 return -E_PARSE_FAIL;
213 }
214 if (packet->GetVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
215 parcel.WriteUInt32(packet->GetFlag());
216 }
217 parcel.EightByteAlign();
218 if (parcel.IsError()) {
219 return -E_PARSE_FAIL;
220 }
221 return E_OK;
222 }
223
DataPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)224 int SingleVerSerializeManager::DataPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
225 {
226 auto packet = inMsg->GetObject<DataRequestPacket>();
227 if (packet == nullptr) {
228 return -E_INVALID_ARGS;
229 }
230 Parcel parcel(buffer, length);
231
232 // version
233 int errCode = parcel.WriteUInt32(packet->GetVersion());
234 if (errCode != E_OK) {
235 LOGE("[DataPacketSerialization] Serialize version failed");
236 return errCode;
237 }
238 // sendDataItems
239 errCode = GenericSingleVerKvEntry::SerializeDatas(
240 (packet->IsCompressData() ? std::vector<SendDataItem> {} : packet->GetData()), parcel, packet->GetVersion());
241 if (errCode != E_OK) {
242 LOGE("[DataPacketSerialization] Serialize Data failed");
243 return errCode;
244 }
245
246 // data sync
247 errCode = DataPacketSyncerPartSerialization(parcel, packet);
248 if (errCode != E_OK) {
249 LOGE("[DataPacketSerialization] Serialize Data failed");
250 return errCode;
251 }
252 if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
253 errCode = DataPacketQuerySyncSerialization(parcel, packet); // for query sync
254 if (errCode != E_OK) {
255 return errCode;
256 }
257 }
258 if (packet->IsCompressData()) {
259 // serialize compress data
260 errCode = GenericSingleVerKvEntry::SerializeCompressedDatas(packet->GetData(), packet->GetCompressData(),
261 parcel, packet->GetVersion(), packet->GetCompressAlgo());
262 if (errCode != E_OK) {
263 LOGE("[DataPacketSerialization] Serialize compress Data failed");
264 return errCode;
265 }
266 }
267 return DataPacketInnerSerialization(packet, parcel);
268 }
269
DataPacketQuerySyncSerialization(Parcel & parcel,const DataRequestPacket * packet)270 int SingleVerSerializeManager::DataPacketQuerySyncSerialization(Parcel &parcel, const DataRequestPacket *packet)
271 {
272 // deleted record send watermark
273 int errCode = parcel.WriteUInt64(packet->GetDeletedWaterMark());
274 if (errCode != E_OK) {
275 LOGE("[QuerySerialization] Serialize deleted record send watermark failed!");
276 return errCode;
277 }
278
279 // query identify
280 QuerySyncObject queryObj = packet->GetQuery();
281 errCode = parcel.WriteString(packet->GetQueryId());
282 if (errCode != E_OK) {
283 LOGE("[QuerySerialization] Serialize query id failed!");
284 return errCode;
285 }
286 if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
287 // need to check.
288 errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
289 }
290 return errCode;
291 }
292
DataPacketCalculateLen(const Message * inMsg,uint32_t & len)293 int SingleVerSerializeManager::DataPacketCalculateLen(const Message *inMsg, uint32_t &len)
294 {
295 const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
296 if (packet == nullptr) {
297 return -E_INVALID_ARGS;
298 }
299
300 len = packet->CalculateLen(inMsg->GetMessageId());
301 return E_OK;
302 }
303
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)304 int SingleVerSerializeManager::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
305 {
306 const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
307 if (packet == nullptr) {
308 return -E_INVALID_ARGS;
309 }
310
311 len = packet->CalculateLen();
312 return E_OK;
313 }
314
IsPacketValid(const Message * inMsg)315 bool SingleVerSerializeManager::IsPacketValid(const Message *inMsg)
316 {
317 if (inMsg == nullptr) {
318 return false;
319 }
320
321 int msgType = inMsg->GetMessageType();
322 if (msgType != TYPE_REQUEST && msgType != TYPE_RESPONSE && msgType != TYPE_NOTIFY) {
323 LOGE("[DataSync][IsPacketValid] Message type ERROR! message type=%d", msgType);
324 return false;
325 }
326 return true;
327 }
328
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)329 int SingleVerSerializeManager::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
330 {
331 const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
332 if (packet == nullptr) {
333 return -E_INVALID_ARGS;
334 }
335
336 Parcel parcel(buffer, length);
337 parcel.WriteUInt32(packet->GetVersion());
338 if (parcel.IsError()) {
339 return -E_PARSE_FAIL;
340 }
341 // now V1 compatible for softWareVersion :{101, 102}
342 return AckPacketSyncerPartSerializationV1(parcel, packet);
343 }
344
AckPacketSyncerPartSerializationV1(Parcel & parcel,const DataAckPacket * packet)345 int SingleVerSerializeManager::AckPacketSyncerPartSerializationV1(Parcel &parcel, const DataAckPacket *packet)
346 {
347 parcel.WriteUInt64(packet->GetData());
348 parcel.WriteInt(packet->GetRecvCode());
349 parcel.WriteVector<uint64_t>(packet->GetReserved());
350 parcel.EightByteAlign();
351 if (parcel.IsError()) {
352 return -E_PARSE_FAIL;
353 }
354 return E_OK;
355 }
356
DataPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)357 int SingleVerSerializeManager::DataPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
358 {
359 std::vector<SendDataItem> dataItems;
360 uint32_t version;
361 Parcel parcel(const_cast<uint8_t *>(buffer), length);
362 uint32_t packLen = parcel.ReadUInt32(version);
363 if (parcel.IsError()) {
364 return -E_PARSE_FAIL;
365 }
366
367 if (version > SOFTWARE_VERSION_CURRENT) {
368 return -E_VERSION_NOT_SUPPORT;
369 }
370
371 packLen += static_cast<uint32_t>(GenericSingleVerKvEntry::DeSerializeDatas(dataItems, parcel));
372 if (parcel.IsError()) {
373 return -E_PARSE_FAIL;
374 }
375
376 auto packet = new (std::nothrow) DataRequestPacket();
377 if (packet == nullptr) {
378 return -E_OUT_OF_MEMORY;
379 }
380
381 packet->SetVersion(version);
382 packet->SetData(dataItems);
383 int errCode = DataPacketSyncerPartDeSerialization(parcel, packet, packLen, length, version);
384 if (errCode != E_OK) {
385 goto ERROR;
386 }
387 if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
388 errCode = DataPacketQuerySyncDeSerialization(parcel, packet);
389 if (errCode != E_OK) {
390 goto ERROR;
391 }
392 }
393 errCode = DataPacketInnerDeSerialization(packet, parcel);
394 if (errCode != E_OK) {
395 goto ERROR;
396 }
397 errCode = inMsg->SetExternalObject<>(packet);
398 if (errCode == E_OK) {
399 return errCode;
400 }
401
402 ERROR:
403 delete packet;
404 packet = nullptr;
405 return errCode;
406 }
407
DataPacketQuerySyncDeSerialization(Parcel & parcel,DataRequestPacket * packet)408 int SingleVerSerializeManager::DataPacketQuerySyncDeSerialization(Parcel &parcel, DataRequestPacket *packet)
409 {
410 WaterMark deletedWatermark = 0;
411 parcel.ReadUInt64(deletedWatermark);
412 std::string queryId;
413 parcel.ReadString(queryId);
414 if (parcel.IsError()) {
415 return -E_PARSE_FAIL;
416 }
417 // query identify
418 QuerySyncObject querySyncObj;
419 int errCode = E_OK;
420 // for version 105, query is always sent.
421 if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
422 // need to check.
423 errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
424 }
425 if (errCode != E_OK) {
426 LOGI("[SingleVerSerializeManager] DeSerializeData object failed.");
427 return errCode;
428 }
429 packet->SetDeletedWaterMark(deletedWatermark);
430 packet->SetQueryId(queryId);
431 if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
432 packet->SetQuery(querySyncObj);
433 }
434 return E_OK;
435 }
436
DataPacketCompressDataDeSerialization(Parcel & parcel,DataRequestPacket * packet)437 int SingleVerSerializeManager::DataPacketCompressDataDeSerialization(Parcel &parcel, DataRequestPacket *packet)
438 {
439 std::vector<SendDataItem> originalData;
440 int errCode = GenericSingleVerKvEntry::DeSerializeCompressedDatas(originalData, parcel);
441 if (errCode != E_OK) {
442 LOGE("[SingleVerSerializeManager] DeSerializeComptressData failed, errCode=%d", errCode);
443 return errCode;
444 }
445 packet->SetData(originalData);
446 return E_OK;
447 }
448
DataPacketSyncerPartDeSerialization(Parcel & parcel,DataRequestPacket * packet,uint32_t packLen,uint32_t length,uint32_t version)449 int SingleVerSerializeManager::DataPacketSyncerPartDeSerialization(Parcel &parcel, DataRequestPacket *packet,
450 uint32_t packLen, uint32_t length, uint32_t version)
451 {
452 WaterMark waterMark;
453 WaterMark localWaterMark;
454 WaterMark peerWaterMark;
455 int32_t sendCode;
456 int32_t mode;
457 uint32_t sessionId;
458 std::vector<uint64_t> reserved;
459
460 uint64_t totPacketLen = packLen;
461 totPacketLen += parcel.ReadUInt64(waterMark);
462 totPacketLen += parcel.ReadUInt64(localWaterMark);
463 totPacketLen += parcel.ReadUInt64(peerWaterMark);
464 totPacketLen += parcel.ReadInt(sendCode);
465 totPacketLen += parcel.ReadInt(mode);
466 totPacketLen += parcel.ReadUInt32(sessionId);
467 totPacketLen += parcel.ReadVector<uint64_t>(reserved);
468 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
469 uint32_t flag = 0u;
470 totPacketLen += parcel.ReadUInt32(flag);
471 packet->SetFlag(flag);
472 }
473 if (totPacketLen > INT32_MAX) {
474 LOGE("[DataSync][DataPacketDeSerialization] deserialize failed! input totPackLen=%" PRIu64 " is over limit.",
475 totPacketLen);
476 return -E_LENGTH_ERROR;
477 }
478 parcel.EightByteAlign();
479 totPacketLen = Parcel::GetEightByteAlign(totPacketLen);
480 if (parcel.IsError()) {
481 LOGE("[DataSync][DataPacketDeSerialization] deserialize failed! input len=%" PRIu32 ", totPackLen=%" PRIu64,
482 length, totPacketLen);
483 return -E_LENGTH_ERROR;
484 }
485 packet->SetEndWaterMark(waterMark);
486 packet->SetLocalWaterMark(localWaterMark);
487 packet->SetPeerWaterMark(peerWaterMark);
488 packet->SetSendCode(sendCode);
489 packet->SetMode(mode);
490 packet->SetSessionId(sessionId);
491 packet->SetReserved(reserved);
492 return E_OK;
493 }
494
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)495 int SingleVerSerializeManager::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
496 {
497 DataAckPacket packet;
498 Parcel parcel(const_cast<uint8_t *>(buffer), length);
499 uint32_t version;
500
501 parcel.ReadUInt32(version);
502 if (parcel.IsError()) {
503 return -E_INVALID_ARGS;
504 }
505 if (version > SOFTWARE_VERSION_CURRENT) {
506 packet.SetVersion(version);
507 packet.SetRecvCode(-E_VERSION_NOT_SUPPORT);
508 return inMsg->SetCopiedObject<>(packet);
509 }
510 packet.SetVersion(version);
511 // now V1 compatible for softWareVersion :{101, 102}
512 int errCode = AckPacketSyncerPartDeSerializationV1(parcel, packet);
513 if (errCode != E_OK) {
514 return errCode;
515 }
516
517 return inMsg->SetCopiedObject<>(packet);
518 }
519
AckPacketSyncerPartDeSerializationV1(Parcel & parcel,DataAckPacket & packet)520 int SingleVerSerializeManager::AckPacketSyncerPartDeSerializationV1(Parcel &parcel, DataAckPacket &packet)
521 {
522 WaterMark mark;
523 int32_t errCode;
524 std::vector<uint64_t> reserved;
525
526 parcel.ReadUInt64(mark);
527 parcel.ReadInt(errCode);
528 parcel.ReadVector<uint64_t>(reserved);
529 if (parcel.IsError()) {
530 LOGE("[AckPacketSyncerPartDeSerializationV1] DeSerialization failed");
531 return -E_INVALID_ARGS;
532 }
533 packet.SetData(mark);
534 packet.SetRecvCode(errCode);
535 packet.SetReserved(reserved);
536 return E_OK;
537 }
538
ControlPacketCalculateLen(const Message * inMsg,uint32_t & len)539 int SingleVerSerializeManager::ControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
540 {
541 auto packet = inMsg->GetObject<SubscribeRequest>();
542 if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
543 LOGE("[ControlPacketSerialization] invalid control cmd");
544 return -E_INVALID_ARGS;
545 }
546 if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
547 return SingleVerSerializeManager::SubscribeCalculateLen(inMsg, len);
548 }
549 return E_OK;
550 }
551
ControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)552 int SingleVerSerializeManager::ControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
553 {
554 auto packet = inMsg->GetObject<SubscribeRequest>();
555 if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
556 LOGE("[ControlPacketSerialization] invalid control cmd");
557 return -E_INVALID_ARGS;
558 }
559 if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
560 return SingleVerSerializeManager::SubscribeSerialization(buffer, length, inMsg);
561 }
562 return E_OK;
563 }
564
ControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)565 int SingleVerSerializeManager::ControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
566 {
567 Parcel parcel(const_cast<uint8_t *>(buffer), length);
568 ControlRequestPacket packet;
569 int errCode = ControlRequestDeSerialization(parcel, packet);
570 if (errCode != E_OK) {
571 return errCode;
572 }
573 if (packet.GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet.GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
574 errCode = SubscribeDeSerialization(parcel, inMsg, packet);
575 }
576 return errCode;
577 }
578
AckControlPacketCalculateLen(const Message * inMsg,uint32_t & len)579 int SingleVerSerializeManager::AckControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
580 {
581 auto packet = inMsg->GetObject<ControlAckPacket>();
582 if (packet == nullptr) {
583 LOGE("[AckControlPacketCalculateLen] invalid control cmd");
584 return -E_INVALID_ARGS;
585 }
586 len = packet->CalculateLen();
587 return E_OK;
588 }
589
AckControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)590 int SingleVerSerializeManager::AckControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
591 {
592 auto packet = inMsg->GetObject<ControlAckPacket>();
593 if (packet == nullptr) {
594 return -E_INVALID_ARGS;
595 }
596 Parcel parcel(buffer, length);
597 parcel.WriteUInt32(packet->GetVersion());
598 parcel.WriteInt(packet->GetRecvCode());
599 parcel.WriteUInt32(packet->GetcontrolCmdType());
600 parcel.WriteUInt32(packet->GetFlag());
601 if (parcel.IsError()) {
602 LOGE("[AckControlPacketSerialization] Serialization failed");
603 return -E_INVALID_ARGS;
604 }
605 parcel.EightByteAlign();
606 return E_OK;
607 }
608
AckControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)609 int SingleVerSerializeManager::AckControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
610 {
611 auto packet = new (std::nothrow) ControlAckPacket();
612 if (packet == nullptr) {
613 return -E_OUT_OF_MEMORY;
614 }
615 Parcel parcel(const_cast<uint8_t *>(buffer), length);
616 int32_t recvCode = 0;
617 uint32_t version = 0;
618 uint32_t controlCmdType = 0;
619 uint32_t flag = 0;
620 parcel.ReadUInt32(version);
621 parcel.ReadInt(recvCode);
622 parcel.ReadUInt32(controlCmdType);
623 parcel.ReadUInt32(flag);
624 int errCode;
625 if (parcel.IsError()) {
626 LOGE("[AckControlPacketDeSerialization] DeSerialization failed");
627 errCode = -E_INVALID_ARGS;
628 goto ERROR;
629 }
630 packet->SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), flag);
631 errCode = inMsg->SetExternalObject<>(packet);
632 if (errCode != E_OK) {
633 goto ERROR;
634 }
635 return errCode;
636 ERROR:
637 delete packet;
638 packet = nullptr;
639 return errCode;
640 }
641
ControlRequestSerialization(Parcel & parcel,const Message * inMsg)642 int SingleVerSerializeManager::ControlRequestSerialization(Parcel &parcel, const Message *inMsg)
643 {
644 auto packet = inMsg->GetObject<SubscribeRequest>();
645 if (packet == nullptr) {
646 return -E_INVALID_ARGS;
647 }
648 parcel.WriteUInt32(packet->GetVersion());
649 parcel.WriteInt(packet->GetSendCode());
650 parcel.WriteUInt32(packet->GetcontrolCmdType());
651 parcel.WriteUInt32(packet->GetFlag());
652 parcel.EightByteAlign();
653 if (parcel.IsError()) {
654 LOGE("[ControlRequestSerialization] Serialization failed");
655 return -E_INVALID_ARGS;
656 }
657 return E_OK;
658 }
659
ControlRequestDeSerialization(Parcel & parcel,ControlRequestPacket & packet)660 int SingleVerSerializeManager::ControlRequestDeSerialization(Parcel &parcel, ControlRequestPacket &packet)
661 {
662 uint32_t version = 0;
663 int32_t sendCode = 0;
664 uint32_t controlCmdType = 0;
665 uint32_t flag = 0;
666 parcel.ReadUInt32(version);
667 if (version > SOFTWARE_VERSION_CURRENT) {
668 return -E_VERSION_NOT_SUPPORT;
669 }
670 parcel.ReadInt(sendCode);
671 parcel.ReadUInt32(controlCmdType);
672 parcel.ReadUInt32(flag);
673 if (parcel.IsError()) {
674 LOGE("[ControlRequestDeSerialization] deserialize failed!");
675 return -E_LENGTH_ERROR;
676 }
677 packet.SetPacketHead(sendCode, version, static_cast<int32_t>(controlCmdType), flag);
678 return E_OK;
679 }
680
SubscribeCalculateLen(const Message * inMsg,uint32_t & len)681 int SingleVerSerializeManager::SubscribeCalculateLen(const Message *inMsg, uint32_t &len)
682 {
683 auto packet = inMsg->GetObject<SubscribeRequest>();
684 if (packet == nullptr) {
685 return -E_INVALID_ARGS;
686 }
687 len = packet->CalculateLen();
688 return E_OK;
689 }
690
SubscribeSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)691 int SingleVerSerializeManager::SubscribeSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
692 {
693 auto packet = inMsg->GetObject<SubscribeRequest>();
694 if (packet == nullptr) {
695 return -E_INVALID_ARGS;
696 }
697 Parcel parcel(buffer, length);
698 int errCode = ControlRequestSerialization(parcel, inMsg);
699 if (errCode != E_OK) {
700 LOGE("[SubscribeSerialization] ControlRequestPacket Serialization failed, errCode=%d", errCode);
701 return errCode;
702 }
703 QuerySyncObject queryObj = packet->GetQuery();
704 errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
705 if (errCode != E_OK) {
706 LOGE("[SubscribeSerialization] query object Serialization failed, errCode=%d", errCode);
707 return errCode;
708 }
709 return E_OK;
710 }
711
SubscribeDeSerialization(Parcel & parcel,Message * inMsg,ControlRequestPacket & controlPacket)712 int SingleVerSerializeManager::SubscribeDeSerialization(Parcel &parcel, Message *inMsg,
713 ControlRequestPacket &controlPacket)
714 {
715 auto packet = new (std::nothrow) SubscribeRequest();
716 if (packet == nullptr) {
717 return -E_OUT_OF_MEMORY;
718 }
719 QuerySyncObject querySyncObj;
720 int errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
721 if (errCode != E_OK) {
722 goto ERROR;
723 }
724 packet->SetPacketHead(controlPacket.GetSendCode(), controlPacket.GetVersion(),
725 static_cast<int32_t>(controlPacket.GetcontrolCmdType()), controlPacket.GetFlag());
726 packet->SetQuery(querySyncObj);
727 errCode = inMsg->SetExternalObject<>(packet);
728 if (errCode != E_OK) {
729 goto ERROR;
730 }
731 return errCode;
732 ERROR:
733 delete packet;
734 packet = nullptr;
735 return errCode;
736 }
737
RegisterCommunicatorTransformFunc()738 int SingleVerSerializeManager::RegisterCommunicatorTransformFunc()
739 {
740 TransformFunc func;
741 func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); };
742 func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
743 return Serialization(buffer, length, inMsg);
744 };
745 func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
746 return DeSerialization(buffer, length, inMsg);
747 };
748
749 static std::vector<MessageId> messageIds = {
750 QUERY_SYNC_MESSAGE, DATA_SYNC_MESSAGE, CONTROL_SYNC_MESSAGE, REMOTE_EXECUTE_MESSAGE
751 };
752 int errCode = E_OK;
753 for (auto &id : messageIds) {
754 int retCode = MessageTransform::RegTransformFunction(static_cast<uint32_t>(id), func);
755 if (retCode != E_OK) {
756 LOGE("[SingleVerSerializeManager][RegisterTransformFunc] regist messageId %u failed %d",
757 static_cast<uint32_t>(id), retCode);
758 errCode = retCode;
759 }
760 }
761 return errCode;
762 }
763
RegisterInnerTransformFunc()764 void SingleVerSerializeManager::RegisterInnerTransformFunc()
765 {
766 TransformFunc func;
767 func.computeFunc = [](const Message *inMsg) { return ISyncPacketCalculateLen(inMsg); };
768 func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
769 return ISyncPacketSerialization(buffer, length, inMsg);
770 };
771 func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
772 return ISyncPacketDeSerialization(buffer, length, inMsg);
773 };
774 std::lock_guard<std::mutex> autoLock(handlesLock_);
775 messageHandles_.emplace(static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE), func);
776 }
777
ISyncPacketCalculateLen(const Message * inMsg)778 uint32_t SingleVerSerializeManager::ISyncPacketCalculateLen(const Message *inMsg)
779 {
780 if (inMsg == nullptr) {
781 return 0u;
782 }
783 uint32_t len = 0u;
784 const auto packet = inMsg->GetObject<ISyncPacket>();
785 if (packet != nullptr) {
786 len = packet->CalculateLen();
787 }
788 return len;
789 }
790
ISyncPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)791 int SingleVerSerializeManager::ISyncPacketSerialization(uint8_t *buffer, uint32_t length,
792 const Message *inMsg)
793 {
794 if (inMsg == nullptr) {
795 return -E_INVALID_ARGS;
796 }
797 int errCode = E_OK;
798 Parcel parcel(buffer, length);
799 auto packet = inMsg->GetObject<ISyncPacket>();
800 if (packet != nullptr) {
801 errCode = packet->Serialization(parcel);
802 }
803 return errCode;
804 }
805
ISyncPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)806 int SingleVerSerializeManager::ISyncPacketDeSerialization(const uint8_t *buffer, uint32_t length,
807 Message *inMsg)
808 {
809 if (inMsg == nullptr) {
810 return -E_INVALID_ARGS;
811 }
812 ISyncPacket *packet = nullptr;
813 int errCode = BuildISyncPacket(inMsg, packet);
814 if (errCode != E_OK) {
815 return errCode;
816 }
817 Parcel parcel(const_cast<uint8_t *>(buffer), length);
818 do {
819 errCode = packet->DeSerialization(parcel);
820 if (errCode != E_OK) {
821 break;
822 }
823 errCode = inMsg->SetExternalObject(packet);
824 } while (false);
825 if (errCode != E_OK) {
826 delete packet;
827 packet = nullptr;
828 }
829 return E_OK;
830 }
831
BuildISyncPacket(Message * inMsg,ISyncPacket * & packet)832 int SingleVerSerializeManager::BuildISyncPacket(Message *inMsg, ISyncPacket *&packet)
833 {
834 uint32_t messageId = inMsg->GetMessageId();
835 if (messageId != static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE)) {
836 return -E_INVALID_ARGS;
837 }
838 switch (inMsg->GetMessageType()) {
839 case TYPE_REQUEST:
840 packet = new(std::nothrow) RemoteExecutorRequestPacket();
841 break;
842 case TYPE_RESPONSE:
843 packet = new(std::nothrow) RemoteExecutorAckPacket();
844 break;
845 default:
846 packet = nullptr;
847 break;
848 }
849 if (packet == nullptr) {
850 return -E_OUT_OF_MEMORY;
851 }
852 return E_OK;
853 }
854
DataPacketExtraConditionsSerialization(Parcel & parcel,const DataRequestPacket * packet)855 int SingleVerSerializeManager::DataPacketExtraConditionsSerialization(Parcel &parcel, const DataRequestPacket *packet)
856 {
857 std::map<std::string, std::string> extraConditions = packet->GetExtraConditions();
858 if (extraConditions.size() > DBConstant::MAX_CONDITION_COUNT) {
859 return -E_INVALID_ARGS;
860 }
861 parcel.WriteUInt32(static_cast<uint32_t>(extraConditions.size()));
862 for (const auto &entry : extraConditions) {
863 if (entry.first.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
864 entry.second.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
865 return -E_INVALID_ARGS;
866 }
867 parcel.WriteString(entry.first);
868 parcel.WriteString(entry.second);
869 }
870 parcel.EightByteAlign();
871 if (parcel.IsError()) {
872 return -E_PARSE_FAIL;
873 }
874 return E_OK;
875 }
876
DataPacketExtraConditionsDeserialization(Parcel & parcel,DataRequestPacket * packet)877 int SingleVerSerializeManager::DataPacketExtraConditionsDeserialization(Parcel &parcel, DataRequestPacket *packet)
878 {
879 if (!packet->IsExtraConditionData()) {
880 return E_OK;
881 }
882 uint32_t conditionSize = 0u;
883 (void) parcel.ReadUInt32(conditionSize);
884 if (conditionSize > DBConstant::MAX_CONDITION_COUNT) {
885 return -E_INVALID_ARGS;
886 }
887 std::map<std::string, std::string> extraConditions;
888 for (uint32_t i = 0; i < conditionSize; i++) {
889 std::string conditionKey;
890 std::string conditionVal;
891 (void) parcel.ReadString(conditionKey);
892 (void) parcel.ReadString(conditionVal);
893 if (conditionKey.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
894 conditionVal.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
895 return -E_INVALID_ARGS;
896 }
897 extraConditions[conditionKey] = conditionVal;
898 }
899 parcel.EightByteAlign();
900 if (parcel.IsError()) {
901 return -E_PARSE_FAIL;
902 }
903 packet->SetExtraConditions(extraConditions);
904 return E_OK;
905 }
906
DataPacketInnerDeSerialization(DataRequestPacket * packet,Parcel & parcel)907 int SingleVerSerializeManager::DataPacketInnerDeSerialization(DataRequestPacket *packet, Parcel &parcel)
908 {
909 int errCode = E_OK;
910 if (packet->IsCompressData()) {
911 errCode = DataPacketCompressDataDeSerialization(parcel, packet);
912 if (errCode != E_OK) {
913 return errCode;
914 }
915 }
916 errCode = DataPacketExtraConditionsDeserialization(parcel, packet);
917 if (errCode != E_OK) {
918 return errCode;
919 }
920 if (packet->GetVersion() >= SOFTWARE_VERSION_RELEASE_9_0) {
921 uint64_t schemaVersion = 0u;
922 parcel.ReadUInt64(schemaVersion);
923 int64_t systemTimeOffset = 0u;
924 parcel.ReadInt64(systemTimeOffset);
925 int64_t senderTimeOffset = 0u;
926 parcel.ReadInt64(senderTimeOffset);
927 if (parcel.IsError()) {
928 LOGE("[SingleVerSerializeManager] parse schema version or time offset failed");
929 return -E_PARSE_FAIL;
930 }
931 packet->SetSchemaVersion(schemaVersion);
932 packet->SetSystemTimeOffset(systemTimeOffset);
933 packet->SetSenderTimeOffset(senderTimeOffset);
934 if (!parcel.IsContinueRead()) {
935 return errCode;
936 }
937 SecurityOption option;
938 parcel.ReadInt(option.securityLabel);
939 parcel.ReadInt(option.securityFlag);
940 packet->SetSecurityOption(option);
941 }
942 if (SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
943 uint32_t total = 0u;
944 parcel.ReadUInt32(total);
945 parcel.EightByteAlign();
946 packet->SetTotalDataCount(total);
947 }
948 return errCode;
949 }
950
DataPacketInnerSerialization(const DataRequestPacket * packet,Parcel & parcel)951 int SingleVerSerializeManager::DataPacketInnerSerialization(const DataRequestPacket *packet, Parcel &parcel)
952 {
953 // flag mask add in 103
954 if (packet->GetVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
955 return E_OK;
956 }
957 if (packet->IsExtraConditionData()) {
958 int errCode = DataPacketExtraConditionsSerialization(parcel, packet);
959 if (errCode != E_OK) {
960 LOGE("[SingleVerSerializeManager] Serialize extra condition failed %d", errCode);
961 return errCode;
962 }
963 }
964 if (packet->GetVersion() >= SOFTWARE_VERSION_RELEASE_9_0) {
965 parcel.WriteUInt64(packet->GetSchemaVersion());
966 parcel.WriteInt64(packet->GetSystemTimeOffset());
967 parcel.WriteInt64(packet->GetSenderTimeOffset());
968 if (parcel.IsError()) {
969 LOGE("[SingleVerSerializeManager] Serialize schema version or time offset failed");
970 return -E_PARSE_FAIL;
971 }
972 auto option = packet->GetSecurityOption();
973 parcel.WriteInt(option.securityLabel);
974 parcel.WriteInt(option.securityFlag);
975 if (parcel.IsError()) {
976 LOGE("[SingleVerSerializeManager] Serialize security option failed");
977 return -E_PARSE_FAIL;
978 }
979 }
980 if (SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
981 parcel.WriteUInt32(packet->GetTotalDataCount());
982 parcel.EightByteAlign();
983 }
984 return E_OK;
985 }
986 } // namespace DistributedDB