1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef OMIT_MULTI_VER
17 #include "multi_ver_data_sync.h"
18
19 #include "parcel.h"
20 #include "log_print.h"
21 #include "sync_types.h"
22 #include "message_transform.h"
23 #include "performance_analysis.h"
24 #include "db_constant.h"
25
26 namespace DistributedDB {
27 // Class MultiVerRequestPacket
CalculateLen() const28 uint32_t MultiVerRequestPacket::CalculateLen() const
29 {
30 uint64_t len = Parcel::GetIntLen();
31 len = Parcel::GetEightByteAlign(len);
32 len += Parcel::GetMultiVerCommitLen(commit_);
33 if (len > INT32_MAX) {
34 return 0;
35 }
36 return len;
37 }
38
SetCommit(MultiVerCommitNode & commit)39 void MultiVerRequestPacket::SetCommit(MultiVerCommitNode &commit)
40 {
41 commit_ = std::move(commit);
42 }
43
GetCommit(MultiVerCommitNode & commit) const44 void MultiVerRequestPacket::GetCommit(MultiVerCommitNode &commit) const
45 {
46 commit = commit_;
47 }
48
SetErrCode(int32_t errCode)49 void MultiVerRequestPacket::SetErrCode(int32_t errCode)
50 {
51 errCode_ = errCode;
52 }
53
GetErrCode() const54 int32_t MultiVerRequestPacket::GetErrCode() const
55 {
56 return errCode_;
57 }
58
59 // Class MultiVerAckPacket
CalculateLen() const60 uint32_t MultiVerAckPacket::CalculateLen() const
61 {
62 uint64_t len = Parcel::GetIntLen();
63 len = Parcel::GetEightByteAlign(len);
64 for (const auto &iter : entries_) {
65 len += Parcel::GetVectorCharLen(iter);
66 if (len > INT32_MAX) {
67 return 0;
68 }
69 }
70 return len;
71 }
72
SetData(std::vector<std::vector<uint8_t>> & data)73 void MultiVerAckPacket::SetData(std::vector<std::vector<uint8_t>> &data)
74 {
75 entries_ = std::move(data);
76 }
77
GetData(std::vector<std::vector<uint8_t>> & data) const78 void MultiVerAckPacket::GetData(std::vector<std::vector<uint8_t>> &data) const
79 {
80 data = entries_;
81 }
82
SetErrorCode(int32_t errCode)83 void MultiVerAckPacket::SetErrorCode(int32_t errCode)
84 {
85 errorCode_ = errCode;
86 }
87
GetErrorCode(int32_t & errCode) const88 void MultiVerAckPacket::GetErrorCode(int32_t &errCode) const
89 {
90 errCode = errorCode_;
91 }
92
93 // Class MultiVerDataSync
~MultiVerDataSync()94 MultiVerDataSync::~MultiVerDataSync()
95 {
96 storagePtr_ = nullptr;
97 communicateHandle_ = nullptr;
98 }
99
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)100 int MultiVerDataSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
101 {
102 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
103 return -E_INVALID_ARGS;
104 }
105
106 switch (inMsg->GetMessageType()) {
107 case TYPE_REQUEST:
108 return RequestPacketSerialization(buffer, length, inMsg);
109 case TYPE_RESPONSE:
110 return AckPacketSerialization(buffer, length, inMsg);
111 default:
112 return -E_MESSAGE_TYPE_ERROR;
113 }
114 }
115
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)116 int MultiVerDataSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
117 {
118 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
119 return -E_MESSAGE_ID_ERROR;
120 }
121
122 switch (inMsg->GetMessageType()) {
123 case TYPE_REQUEST:
124 return RequestPacketDeSerialization(buffer, length, inMsg);
125 case TYPE_RESPONSE:
126 return AckPacketDeSerialization(buffer, length, inMsg);
127 default:
128 return -E_MESSAGE_TYPE_ERROR;
129 }
130 }
131
CalculateLen(const Message * inMsg)132 uint32_t MultiVerDataSync::CalculateLen(const Message *inMsg)
133 {
134 if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
135 return 0;
136 }
137
138 uint32_t len = 0;
139 int errCode = E_OK;
140 switch (inMsg->GetMessageType()) {
141 case TYPE_REQUEST:
142 errCode = RequestPacketCalculateLen(inMsg, len);
143 break;
144 case TYPE_RESPONSE:
145 errCode = AckPacketCalculateLen(inMsg, len);
146 break;
147 default:
148 return 0;
149 }
150 if (errCode != E_OK) {
151 return 0;
152 }
153 return len;
154 }
155
RegisterTransformFunc()156 int MultiVerDataSync::RegisterTransformFunc()
157 {
158 TransformFunc func;
159 func.computeFunc = std::bind(&MultiVerDataSync::CalculateLen, std::placeholders::_1);
160 func.serializeFunc = std::bind(&MultiVerDataSync::Serialization, std::placeholders::_1,
161 std::placeholders::_2, std::placeholders::_3);
162 func.deserializeFunc = std::bind(&MultiVerDataSync::DeSerialization, std::placeholders::_1,
163 std::placeholders::_2, std::placeholders::_3);
164 return MessageTransform::RegTransformFunction(MULTI_VER_DATA_SYNC_MESSAGE, func);
165 }
166
Initialize(MultiVerKvDBSyncInterface * storagePtr,ICommunicator * communicateHandle)167 int MultiVerDataSync::Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle)
168 {
169 if ((storagePtr == nullptr) || (communicateHandle == nullptr)) {
170 return -E_INVALID_ARGS;
171 }
172 storagePtr_ = storagePtr;
173 communicateHandle_ = communicateHandle;
174 return E_OK;
175 }
176
TimeOutCallback(MultiVerSyncTaskContext * context,const Message * message) const177 void MultiVerDataSync::TimeOutCallback(MultiVerSyncTaskContext *context, const Message *message) const
178 {
179 return;
180 }
181
SyncStart(MultiVerSyncTaskContext * context)182 int MultiVerDataSync::SyncStart(MultiVerSyncTaskContext *context)
183 {
184 if (context == nullptr) {
185 return -E_INVALID_ARGS;
186 }
187 LOGD("MultiVerDataSync::SyncStart dst=%s{private}, begin", context->GetDeviceId().c_str());
188 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
189 if (performance != nullptr) {
190 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_DATA_GET_VALID_COMMIT);
191 }
192 MultiVerCommitNode commit;
193 int errCode = GetValidCommit(context, commit);
194 if (performance != nullptr) {
195 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_DATA_GET_VALID_COMMIT);
196 }
197 if (errCode != E_OK) {
198 // sync don't need start
199 SendFinishedRequest(context);
200 return errCode;
201 }
202
203 errCode = SendRequestPacket(context, commit);
204 LOGD("MultiVerDataSync::SyncStart dst=%s{private}, end", context->GetDeviceId().c_str());
205 return errCode;
206 }
207
RequestRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)208 int MultiVerDataSync::RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
209 {
210 if (message == nullptr || context == nullptr) {
211 return -E_INVALID_ARGS;
212 }
213
214 if (!IsPacketValid(message, TYPE_REQUEST)) {
215 return -E_INVALID_ARGS;
216 }
217
218 const MultiVerRequestPacket *packet = message->GetObject<MultiVerRequestPacket>();
219 if (packet == nullptr) {
220 return -E_INVALID_ARGS;
221 }
222 if (packet->GetErrCode() == -E_LAST_SYNC_FRAME) {
223 return -E_LAST_SYNC_FRAME;
224 }
225 MultiVerCommitNode commit;
226 packet->GetCommit(commit);
227 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
228 if (performance != nullptr) {
229 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_COMMIT_DATA);
230 }
231 std::vector<MultiVerKvEntry *> dataEntries;
232 int errCode = GetCommitData(commit, dataEntries);
233 if (performance != nullptr) {
234 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_COMMIT_DATA);
235 }
236 if (errCode != E_OK) {
237 LOGE("MultiVerDataSync::RequestRecvCallback : GetCommitData ERR, errno = %d", errCode);
238 }
239
240 errCode = SendAckPacket(context, dataEntries, errCode, message);
241 for (auto &iter : dataEntries) {
242 ReleaseKvEntry(iter);
243 iter = nullptr;
244 }
245 LOGD("MultiVerDataSync::RequestRecvCallback : SendAckPacket, errno = %d, dst = %s{private}",
246 errCode, context->GetDeviceId().c_str());
247 return errCode;
248 }
249
AckRecvCallback(MultiVerSyncTaskContext * context,const Message * message)250 int MultiVerDataSync::AckRecvCallback(MultiVerSyncTaskContext *context, const Message *message)
251 {
252 if (message == nullptr) {
253 return -E_INVALID_ARGS;
254 }
255 if (!IsPacketValid(message, TYPE_RESPONSE) || (context == nullptr)) {
256 return -E_INVALID_ARGS;
257 }
258
259 const MultiVerAckPacket *packet = message->GetObject<MultiVerAckPacket>();
260 if (packet == nullptr) {
261 return -E_INVALID_ARGS;
262 }
263 int32_t errCode = E_OK;
264 packet->GetErrorCode(errCode);
265 if (errCode != E_OK) {
266 return errCode;
267 }
268 std::vector<std::vector<uint8_t>> dataEntries;
269 std::vector<MultiVerKvEntry *> entries;
270 std::vector<ValueSliceHash> valueHashes;
271 MultiVerKvEntry *entry = nullptr;
272
273 packet->GetData(dataEntries);
274 for (const auto &iter : dataEntries) {
275 MultiVerKvEntry *item = CreateKvEntry(iter);
276 entries.push_back(item);
277 }
278 context->ReleaseEntries();
279 context->SetEntries(entries);
280 context->SetEntriesIndex(0);
281 context->SetEntriesSize(static_cast<int>(entries.size()));
282 LOGD("MultiVerDataSync::AckRecvCallback src=%s{private}, entries num = %zu",
283 context->GetDeviceId().c_str(), entries.size());
284
285 if (entries.size() > 0) {
286 entry = entries[0];
287 errCode = entry->GetValueHash(valueHashes);
288 if (errCode != E_OK) {
289 return errCode;
290 }
291 }
292 context->SetValueSliceHashNodes(valueHashes);
293 context->SetValueSlicesIndex(0);
294 context->SetValueSlicesSize(valueHashes.size());
295 LOGD("MultiVerDataSync::AckRecvCallback src=%s{private}, ValueSlicesSize num = %zu",
296 context->GetDeviceId().c_str(), valueHashes.size());
297 return errCode;
298 }
299
PutCommitData(const MultiVerCommitNode & commit,const std::vector<MultiVerKvEntry * > & entries,const std::string & deviceName)300 int MultiVerDataSync::PutCommitData(const MultiVerCommitNode &commit, const std::vector<MultiVerKvEntry *> &entries,
301 const std::string &deviceName)
302 {
303 return storagePtr_->PutCommitData(commit, entries, deviceName);
304 }
305
MergeSyncCommit(const MultiVerCommitNode & commit,const std::vector<MultiVerCommitNode> & commits)306 int MultiVerDataSync::MergeSyncCommit(const MultiVerCommitNode &commit, const std::vector<MultiVerCommitNode> &commits)
307 {
308 return storagePtr_->MergeSyncCommit(commit, commits);
309 }
310
ReleaseKvEntry(const MultiVerKvEntry * entry)311 void MultiVerDataSync::ReleaseKvEntry(const MultiVerKvEntry *entry)
312 {
313 return storagePtr_->ReleaseKvEntry(entry);
314 }
315
SendFinishedRequest(const MultiVerSyncTaskContext * context)316 void MultiVerDataSync::SendFinishedRequest(const MultiVerSyncTaskContext *context)
317 {
318 if (context == nullptr) {
319 return;
320 }
321 MultiVerRequestPacket *packet = new (std::nothrow) MultiVerRequestPacket();
322 if (packet == nullptr) {
323 LOGE("MultiVerRequestPacket::SendRequestPacket : new packet error");
324 return;
325 }
326 packet->SetErrCode(-E_LAST_SYNC_FRAME);
327 Message *message = new (std::nothrow) Message(MULTI_VER_DATA_SYNC_MESSAGE);
328 if (message == nullptr) {
329 delete packet;
330 packet = nullptr;
331 LOGE("MultiVerDataSync::SendRequestPacket : new message error");
332 return;
333 }
334 message->SetMessageType(TYPE_REQUEST);
335 message->SetTarget(context->GetDeviceId());
336 int errCode = message->SetExternalObject(packet);
337 if (errCode != E_OK) {
338 delete packet;
339 packet = nullptr;
340 delete message;
341 message = nullptr;
342 LOGE("[MultiVerDataSync][SendFinishedRequest] : SetExternalObject failed errCode:%d", errCode);
343 return;
344 }
345 message->SetSessionId(context->GetRequestSessionId());
346 message->SetSequenceId(context->GetSequenceId());
347
348 errCode = Send(message->GetTarget(), message);
349 if (errCode != E_OK) {
350 delete message;
351 message = nullptr;
352 LOGE("[MultiVerDataSync][SendFinishedRequest] SendFinishedRequest failed, err %d", errCode);
353 }
354 LOGI("[MultiVerDataSync][SendFinishedRequest] SendFinishedRequest dst=%s{private}", context->GetDeviceId().c_str());
355 }
356
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)357 int MultiVerDataSync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
358 {
359 if ((inMsg == nullptr) || !IsPacketValid(inMsg, TYPE_REQUEST)) {
360 return -E_INVALID_ARGS;
361 }
362 const MultiVerRequestPacket *packet = inMsg->GetObject<MultiVerRequestPacket>();
363 if (packet == nullptr) {
364 return -E_INVALID_ARGS;
365 }
366
367 len = packet->CalculateLen();
368 return E_OK;
369 }
370
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)371 int MultiVerDataSync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
372 {
373 if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_REQUEST)) {
374 return -E_INVALID_ARGS;
375 }
376 const MultiVerRequestPacket *packet = inMsg->GetObject<MultiVerRequestPacket>();
377 if ((packet == nullptr) || (length != packet->CalculateLen())) {
378 return -E_INVALID_ARGS;
379 }
380
381 MultiVerCommitNode commit;
382 packet->GetCommit(commit);
383 int32_t ackCode = packet->GetErrCode();
384
385 Parcel parcel(buffer, length);
386 int errCode = parcel.WriteInt(ackCode);
387 if (errCode != E_OK) {
388 return -E_SECUREC_ERROR;
389 }
390 parcel.EightByteAlign();
391
392 // commitMap Serialization
393 errCode = parcel.WriteMultiVerCommit(commit);
394 if (errCode != E_OK) {
395 return -E_SECUREC_ERROR;
396 }
397
398 return errCode;
399 }
400
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)401 int MultiVerDataSync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
402 {
403 if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_REQUEST)) {
404 return -E_INVALID_ARGS;
405 }
406
407 MultiVerCommitNode commit;
408 Parcel parcel(const_cast<uint8_t *>(buffer), length);
409 int32_t pktErrCode;
410 uint64_t packLen = parcel.ReadInt(pktErrCode);
411 if (parcel.IsError()) {
412 return -E_INVALID_ARGS;
413 }
414 parcel.EightByteAlign();
415 packLen = Parcel::GetEightByteAlign(packLen);
416 // commit DeSerialization
417 packLen += parcel.ReadMultiVerCommit(commit);
418 if (packLen != length || parcel.IsError()) {
419 return -E_INVALID_ARGS;
420 }
421 MultiVerRequestPacket *packet = new (std::nothrow) MultiVerRequestPacket();
422 if (packet == nullptr) {
423 LOGE("MultiVerDataSync::RequestPacketDeSerialization : new packet error");
424 return -E_OUT_OF_MEMORY;
425 }
426 packet->SetCommit(commit);
427 packet->SetErrCode(pktErrCode);
428 int errCode = inMsg->SetExternalObject<>(packet);
429 if (errCode != E_OK) {
430 delete packet;
431 packet = nullptr;
432 }
433 return errCode;
434 }
435
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)436 int MultiVerDataSync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
437 {
438 if (!IsPacketValid(inMsg, TYPE_RESPONSE)) {
439 return -E_INVALID_ARGS;
440 }
441
442 const MultiVerAckPacket *packet = inMsg->GetObject<MultiVerAckPacket>();
443 if (packet == nullptr) {
444 return -E_INVALID_ARGS;
445 }
446 len = packet->CalculateLen();
447 return E_OK;
448 }
449
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)450 int MultiVerDataSync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
451 {
452 if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_RESPONSE)) {
453 return -E_INVALID_ARGS;
454 }
455 const MultiVerAckPacket *packet = inMsg->GetObject<MultiVerAckPacket>();
456 if ((packet == nullptr) || (length != packet->CalculateLen())) {
457 return -E_INVALID_ARGS;
458 }
459
460 Parcel parcel(buffer, length);
461 std::vector<std::vector<uint8_t>> entries;
462
463 packet->GetData(entries);
464 int32_t errCode = E_OK;
465 packet->GetErrorCode(errCode);
466 // errCode Serialization
467 errCode = parcel.WriteInt(errCode);
468 if (errCode != E_OK) {
469 return -E_SECUREC_ERROR;
470 }
471 parcel.EightByteAlign();
472
473 // commits vector Serialization
474 for (const auto &iter : entries) {
475 errCode = parcel.WriteVectorChar(iter);
476 if (errCode != E_OK) {
477 return -E_SECUREC_ERROR;
478 }
479 }
480
481 return errCode;
482 }
483
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)484 int MultiVerDataSync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
485 {
486 if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_RESPONSE)) {
487 return -E_INVALID_ARGS;
488 }
489
490 Parcel parcel(const_cast<uint8_t *>(buffer), length);
491 int32_t pktErrCode;
492
493 // errCode DeSerialization
494 uint32_t packLen = parcel.ReadInt(pktErrCode);
495 if (parcel.IsError()) {
496 return -E_INVALID_ARGS;
497 }
498 parcel.EightByteAlign();
499 packLen = Parcel::GetEightByteAlign(packLen);
500
501 // commits vector DeSerialization
502 std::vector<std::vector<uint8_t>> entries;
503 while (packLen < length) {
504 std::vector<uint8_t> data;
505 packLen += parcel.ReadVectorChar(data);
506 // A valid dataItem got, Save to storage
507 entries.push_back(data);
508 if (parcel.IsError()) {
509 return -E_INVALID_ARGS;
510 }
511 }
512 MultiVerAckPacket *packet = new (std::nothrow) MultiVerAckPacket();
513 if (packet == nullptr) {
514 LOGE("MultiVerDataSync::AckPacketDeSerialization : new packet error");
515 return -E_OUT_OF_MEMORY;
516 }
517 packet->SetData(entries);
518 packet->SetErrorCode(pktErrCode);
519 int errCode = inMsg->SetExternalObject<>(packet);
520 if (errCode != E_OK) {
521 delete packet;
522 packet = nullptr;
523 }
524 return errCode;
525 }
526
IsPacketValid(const Message * inMsg,uint16_t messageType)527 bool MultiVerDataSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
528 {
529 if ((inMsg == nullptr) || (inMsg->GetMessageId() != MULTI_VER_DATA_SYNC_MESSAGE)) {
530 return false;
531 }
532 if (messageType != inMsg->GetMessageType()) {
533 return false;
534 }
535 return true;
536 }
537
GetValidCommit(MultiVerSyncTaskContext * context,MultiVerCommitNode & commit)538 int MultiVerDataSync::GetValidCommit(MultiVerSyncTaskContext *context, MultiVerCommitNode &commit)
539 {
540 int commitsSize = context->GetCommitsSize();
541 if (commitsSize > DBConstant::MAX_COMMIT_SIZE) {
542 LOGE("MultiVerDataSync::GetValidCommit failed, to large!");
543 return -E_LENGTH_ERROR;
544 }
545 int index = context->GetCommitIndex();
546 if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
547 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
548 index--;
549 }
550 index = (index < 0) ? 0 : index;
551 LOGD("MultiVerDataSync::GetValidCommit begin, dst=%s{private}, index = %d", context->GetDeviceId().c_str(), index);
552 while (index < commitsSize) {
553 MultiVerCommitNode commitItem;
554 context->GetCommit(index, commitItem);
555 LOGD("MultiVerDataSync::GetValidCommit , dst=%s{private}, index = %d, commitsSize = %d",
556 context->GetDeviceId().c_str(), index, commitsSize);
557
558 index++;
559 context->SetCommitIndex(index);
560 if (IsCommitExisted(commitItem)) {
561 continue;
562 }
563 commit = commitItem;
564 LOGD("MultiVerDataSync::GetValidCommit ok, dst=%s{private}, commit index = %d",
565 context->GetDeviceId().c_str(), index);
566 return E_OK;
567 }
568 LOGD("MultiVerDataSync::GetValidCommit not found, dst=%s{private}", context->GetDeviceId().c_str());
569 return -E_NOT_FOUND;
570 }
571
IsCommitExisted(const MultiVerCommitNode & commit)572 bool MultiVerDataSync::IsCommitExisted(const MultiVerCommitNode &commit)
573 {
574 return storagePtr_->IsCommitExisted(commit);
575 }
576
Send(const DeviceID & deviceId,const Message * inMsg)577 int MultiVerDataSync::Send(const DeviceID &deviceId, const Message *inMsg)
578 {
579 SendConfig conf = {false, false, SEND_TIME_OUT, {}};
580 int errCode = communicateHandle_->SendMessage(deviceId, inMsg, conf);
581 if (errCode != E_OK) {
582 LOGE("MultiVerDataSync::Send ERR! ERR = %d", errCode);
583 }
584 return errCode;
585 }
586
SendRequestPacket(const MultiVerSyncTaskContext * context,MultiVerCommitNode & commit)587 int MultiVerDataSync::SendRequestPacket(const MultiVerSyncTaskContext *context, MultiVerCommitNode &commit)
588 {
589 MultiVerRequestPacket *packet = new (std::nothrow) MultiVerRequestPacket();
590 if (packet == nullptr) {
591 LOGE("MultiVerRequestPacket::SendRequestPacket : new packet error");
592 return -E_OUT_OF_MEMORY;
593 }
594 packet->SetCommit(commit);
595 Message *message = new (std::nothrow) Message(MULTI_VER_DATA_SYNC_MESSAGE);
596 if (message == nullptr) {
597 delete packet;
598 packet = nullptr;
599 LOGE("MultiVerDataSync::SendRequestPacket : new message error");
600 return -E_OUT_OF_MEMORY;
601 }
602 message->SetMessageType(TYPE_REQUEST);
603 message->SetTarget(context->GetDeviceId());
604 int errCode = message->SetExternalObject(packet);
605 if (errCode != E_OK) {
606 delete packet;
607 packet = nullptr;
608 delete message;
609 message = nullptr;
610 LOGE("[MultiVerDataSync][SendRequestPacket] : SetExternalObject failed errCode:%d", errCode);
611 return errCode;
612 }
613 message->SetSessionId(context->GetRequestSessionId());
614 message->SetSequenceId(context->GetSequenceId());
615
616 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
617 if (performance != nullptr) {
618 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_DATA_ENTRY_SEND_REQUEST_TO_ACK_RECV);
619 }
620 errCode = Send(message->GetTarget(), message);
621 if (errCode != E_OK) {
622 delete message;
623 message = nullptr;
624 }
625 LOGD("MultiVerDataSync::SendRequestPacket end");
626 return errCode;
627 }
628
SendAckPacket(const MultiVerSyncTaskContext * context,const std::vector<MultiVerKvEntry * > & dataItems,int retCode,const Message * message)629 int MultiVerDataSync::SendAckPacket(const MultiVerSyncTaskContext *context,
630 const std::vector<MultiVerKvEntry *> &dataItems, int retCode, const Message *message)
631 {
632 if (message == nullptr) {
633 LOGE("MultiVerDataSync::SendAckPacket : message is nullptr");
634 return -E_INVALID_ARGS;
635 }
636
637 MultiVerAckPacket *packet = new (std::nothrow) MultiVerAckPacket();
638 if (packet == nullptr) {
639 LOGE("MultiVerDataSync::SendAckPack et : packet is nullptr");
640 return -E_OUT_OF_MEMORY;
641 }
642 Message *ackMessage = new (std::nothrow) Message(MULTI_VER_DATA_SYNC_MESSAGE);
643 if (ackMessage == nullptr) {
644 delete packet;
645 packet = nullptr;
646 LOGE("MultiVerDataSync::SendAckPacket : new message error");
647 return -E_OUT_OF_MEMORY;
648 }
649
650 std::vector<std::vector<uint8_t>> entries;
651 for (const auto &iter : dataItems) {
652 std::vector<uint8_t> item;
653 iter->GetSerialData(item);
654 entries.push_back(item);
655 }
656 packet->SetData(entries);
657 packet->SetErrorCode(static_cast<int32_t>(retCode));
658
659 ackMessage->SetMessageType(TYPE_RESPONSE);
660 ackMessage->SetTarget(context->GetDeviceId());
661 int errCode = ackMessage->SetExternalObject(packet);
662 if (errCode != E_OK) {
663 delete packet;
664 packet = nullptr;
665 delete ackMessage;
666 ackMessage = nullptr;
667 LOGE("[MultiVerDataSync][SendAckPacket] : SetExternalObject failed errCode:%d", errCode);
668 return errCode;
669 }
670 ackMessage->SetSequenceId(message->GetSequenceId());
671 ackMessage->SetSessionId(message->GetSessionId());
672 errCode = Send(ackMessage->GetTarget(), ackMessage);
673 if (errCode != E_OK) {
674 delete ackMessage;
675 ackMessage = nullptr;
676 }
677 LOGD("MultiVerDataSync::SendAckPacket end, dst=%s{private}, errCode = %d", context->GetDeviceId().c_str(), errCode);
678 return errCode;
679 }
680
GetCommitData(const MultiVerCommitNode & commit,std::vector<MultiVerKvEntry * > & entries)681 int MultiVerDataSync::GetCommitData(const MultiVerCommitNode &commit, std::vector<MultiVerKvEntry *> &entries)
682 {
683 return storagePtr_->GetCommitData(commit, entries);
684 }
685
CreateKvEntry(const std::vector<uint8_t> & entry)686 MultiVerKvEntry *MultiVerDataSync::CreateKvEntry(const std::vector<uint8_t> &entry)
687 {
688 return storagePtr_->CreateKvEntry(entry);
689 }
690 }
691 #endif