• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OMIT_MULTI_VER
17 #include "multi_ver_data_sync.h"
18 
19 #include "parcel.h"
20 #include "log_print.h"
21 #include "sync_types.h"
22 #include "message_transform.h"
23 #include "performance_analysis.h"
24 #include "db_constant.h"
25 
26 namespace DistributedDB {
27 // Class MultiVerRequestPacket
CalculateLen() const28 uint32_t MultiVerRequestPacket::CalculateLen() const
29 {
30     uint64_t len = Parcel::GetIntLen();
31     len = Parcel::GetEightByteAlign(len);
32     len += Parcel::GetMultiVerCommitLen(commit_);
33     if (len > INT32_MAX) {
34         return 0;
35     }
36     return len;
37 }
38 
SetCommit(MultiVerCommitNode & commit)39 void MultiVerRequestPacket::SetCommit(MultiVerCommitNode &commit)
40 {
41     commit_ = std::move(commit);
42 }
43 
GetCommit(MultiVerCommitNode & commit) const44 void MultiVerRequestPacket::GetCommit(MultiVerCommitNode &commit) const
45 {
46     commit = commit_;
47 }
48 
SetErrCode(int32_t errCode)49 void MultiVerRequestPacket::SetErrCode(int32_t errCode)
50 {
51     errCode_ = errCode;
52 }
53 
GetErrCode() const54 int32_t MultiVerRequestPacket::GetErrCode() const
55 {
56     return errCode_;
57 }
58 
59 // Class MultiVerAckPacket
CalculateLen() const60 uint32_t MultiVerAckPacket::CalculateLen() const
61 {
62     uint64_t len = Parcel::GetIntLen();
63     len = Parcel::GetEightByteAlign(len);
64     for (const auto &iter : entries_) {
65         len += Parcel::GetVectorCharLen(iter);
66         if (len > INT32_MAX) {
67             return 0;
68         }
69     }
70     return len;
71 }
72 
SetData(std::vector<std::vector<uint8_t>> & data)73 void MultiVerAckPacket::SetData(std::vector<std::vector<uint8_t>> &data)
74 {
75     entries_ = std::move(data);
76 }
77 
GetData(std::vector<std::vector<uint8_t>> & data) const78 void MultiVerAckPacket::GetData(std::vector<std::vector<uint8_t>> &data) const
79 {
80     data = entries_;
81 }
82 
SetErrorCode(int32_t errCode)83 void MultiVerAckPacket::SetErrorCode(int32_t errCode)
84 {
85     errorCode_ = errCode;
86 }
87 
GetErrorCode(int32_t & errCode) const88 void MultiVerAckPacket::GetErrorCode(int32_t &errCode) const
89 {
90     errCode = errorCode_;
91 }
92 
93 // Class MultiVerDataSync
~MultiVerDataSync()94 MultiVerDataSync::~MultiVerDataSync()
95 {
96     storagePtr_ = nullptr;
97     communicateHandle_ = nullptr;
98 }
99 
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)100 int MultiVerDataSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
101 {
102     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
103         return -E_INVALID_ARGS;
104     }
105 
106     switch (inMsg->GetMessageType()) {
107         case TYPE_REQUEST:
108             return RequestPacketSerialization(buffer, length, inMsg);
109         case TYPE_RESPONSE:
110             return AckPacketSerialization(buffer, length, inMsg);
111         default:
112             return -E_MESSAGE_TYPE_ERROR;
113     }
114 }
115 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)116 int MultiVerDataSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
117 {
118     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
119         return -E_MESSAGE_ID_ERROR;
120     }
121 
122     switch (inMsg->GetMessageType()) {
123         case TYPE_REQUEST:
124             return RequestPacketDeSerialization(buffer, length, inMsg);
125         case TYPE_RESPONSE:
126             return AckPacketDeSerialization(buffer, length, inMsg);
127         default:
128             return -E_MESSAGE_TYPE_ERROR;
129     }
130 }
131 
CalculateLen(const Message * inMsg)132 uint32_t MultiVerDataSync::CalculateLen(const Message *inMsg)
133 {
134     if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
135         return 0;
136     }
137 
138     uint32_t len = 0;
139     int errCode = E_OK;
140     switch (inMsg->GetMessageType()) {
141         case TYPE_REQUEST:
142             errCode = RequestPacketCalculateLen(inMsg, len);
143             break;
144         case TYPE_RESPONSE:
145             errCode = AckPacketCalculateLen(inMsg, len);
146             break;
147         default:
148             return 0;
149     }
150     if (errCode != E_OK) {
151         return 0;
152     }
153     return len;
154 }
155 
RegisterTransformFunc()156 int MultiVerDataSync::RegisterTransformFunc()
157 {
158     TransformFunc func;
159     func.computeFunc = std::bind(&MultiVerDataSync::CalculateLen, std::placeholders::_1);
160     func.serializeFunc = std::bind(&MultiVerDataSync::Serialization, std::placeholders::_1,
161                                    std::placeholders::_2, std::placeholders::_3);
162     func.deserializeFunc = std::bind(&MultiVerDataSync::DeSerialization, std::placeholders::_1,
163                                      std::placeholders::_2, std::placeholders::_3);
164     return MessageTransform::RegTransformFunction(MULTI_VER_DATA_SYNC_MESSAGE, func);
165 }
166 
Initialize(MultiVerKvDBSyncInterface * storagePtr,ICommunicator * communicateHandle)167 int MultiVerDataSync::Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle)
168 {
169     if ((storagePtr == nullptr) || (communicateHandle == nullptr)) {
170         return -E_INVALID_ARGS;
171     }
172     storagePtr_ = storagePtr;
173     communicateHandle_ = communicateHandle;
174     return E_OK;
175 }
176 
TimeOutCallback(MultiVerSyncTaskContext * context,const Message * message) const177 void MultiVerDataSync::TimeOutCallback(MultiVerSyncTaskContext *context, const Message *message) const
178 {
179     return;
180 }
181 
SyncStart(MultiVerSyncTaskContext * context)182 int MultiVerDataSync::SyncStart(MultiVerSyncTaskContext *context)
183 {
184     if (context == nullptr) {
185         return -E_INVALID_ARGS;
186     }
187     LOGD("MultiVerDataSync::SyncStart dst=%s{private}, begin", context->GetDeviceId().c_str());
188     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
189     if (performance != nullptr) {
190         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_DATA_GET_VALID_COMMIT);
191     }
192     MultiVerCommitNode commit;
193     int errCode = GetValidCommit(context, commit);
194     if (performance != nullptr) {
195         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_DATA_GET_VALID_COMMIT);
196     }
197     if (errCode != E_OK) {
198         // sync don't need start
199         SendFinishedRequest(context);
200         return errCode;
201     }
202 
203     errCode = SendRequestPacket(context, commit);
204     LOGD("MultiVerDataSync::SyncStart dst=%s{private}, end", context->GetDeviceId().c_str());
205     return errCode;
206 }
207 
RequestRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)208 int MultiVerDataSync::RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
209 {
210     if (message == nullptr || context == nullptr) {
211         return -E_INVALID_ARGS;
212     }
213 
214     if (!IsPacketValid(message, TYPE_REQUEST)) {
215         return -E_INVALID_ARGS;
216     }
217 
218     const MultiVerRequestPacket *packet = message->GetObject<MultiVerRequestPacket>();
219     if (packet == nullptr) {
220         return -E_INVALID_ARGS;
221     }
222     if (packet->GetErrCode() == -E_LAST_SYNC_FRAME) {
223         return -E_LAST_SYNC_FRAME;
224     }
225     MultiVerCommitNode commit;
226     packet->GetCommit(commit);
227     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
228     if (performance != nullptr) {
229         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_COMMIT_DATA);
230     }
231     std::vector<MultiVerKvEntry *> dataEntries;
232     int errCode = GetCommitData(commit, dataEntries);
233     if (performance != nullptr) {
234         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_COMMIT_DATA);
235     }
236     if (errCode != E_OK) {
237         LOGE("MultiVerDataSync::RequestRecvCallback : GetCommitData ERR, errno = %d", errCode);
238     }
239 
240     errCode = SendAckPacket(context, dataEntries, errCode, message);
241     for (auto &iter : dataEntries) {
242         ReleaseKvEntry(iter);
243         iter = nullptr;
244     }
245     LOGD("MultiVerDataSync::RequestRecvCallback : SendAckPacket, errno = %d, dst = %s{private}",
246          errCode, context->GetDeviceId().c_str());
247     return errCode;
248 }
249 
AckRecvCallback(MultiVerSyncTaskContext * context,const Message * message)250 int MultiVerDataSync::AckRecvCallback(MultiVerSyncTaskContext *context, const Message *message)
251 {
252     if (message == nullptr) {
253         return -E_INVALID_ARGS;
254     }
255     if (!IsPacketValid(message, TYPE_RESPONSE) || (context == nullptr)) {
256         return -E_INVALID_ARGS;
257     }
258 
259     const MultiVerAckPacket *packet = message->GetObject<MultiVerAckPacket>();
260     if (packet == nullptr) {
261         return -E_INVALID_ARGS;
262     }
263     int32_t errCode = E_OK;
264     packet->GetErrorCode(errCode);
265     if (errCode != E_OK) {
266         return errCode;
267     }
268     std::vector<std::vector<uint8_t>> dataEntries;
269     std::vector<MultiVerKvEntry *> entries;
270     std::vector<ValueSliceHash> valueHashes;
271     MultiVerKvEntry *entry = nullptr;
272 
273     packet->GetData(dataEntries);
274     for (const auto &iter : dataEntries) {
275         MultiVerKvEntry *item = CreateKvEntry(iter);
276         entries.push_back(item);
277     }
278     context->ReleaseEntries();
279     context->SetEntries(entries);
280     context->SetEntriesIndex(0);
281     context->SetEntriesSize(static_cast<int>(entries.size()));
282     LOGD("MultiVerDataSync::AckRecvCallback src=%s{private}, entries num = %zu",
283         context->GetDeviceId().c_str(), entries.size());
284 
285     if (entries.size() > 0) {
286         entry = entries[0];
287         errCode = entry->GetValueHash(valueHashes);
288         if (errCode != E_OK) {
289             return errCode;
290         }
291     }
292     context->SetValueSliceHashNodes(valueHashes);
293     context->SetValueSlicesIndex(0);
294     context->SetValueSlicesSize(valueHashes.size());
295     LOGD("MultiVerDataSync::AckRecvCallback src=%s{private}, ValueSlicesSize num = %zu",
296         context->GetDeviceId().c_str(), valueHashes.size());
297     return errCode;
298 }
299 
PutCommitData(const MultiVerCommitNode & commit,const std::vector<MultiVerKvEntry * > & entries,const std::string & deviceName)300 int MultiVerDataSync::PutCommitData(const MultiVerCommitNode &commit, const std::vector<MultiVerKvEntry *> &entries,
301     const std::string &deviceName)
302 {
303     return storagePtr_->PutCommitData(commit, entries, deviceName);
304 }
305 
MergeSyncCommit(const MultiVerCommitNode & commit,const std::vector<MultiVerCommitNode> & commits)306 int MultiVerDataSync::MergeSyncCommit(const MultiVerCommitNode &commit, const std::vector<MultiVerCommitNode> &commits)
307 {
308     return storagePtr_->MergeSyncCommit(commit, commits);
309 }
310 
ReleaseKvEntry(const MultiVerKvEntry * entry)311 void MultiVerDataSync::ReleaseKvEntry(const MultiVerKvEntry *entry)
312 {
313     return storagePtr_->ReleaseKvEntry(entry);
314 }
315 
SendFinishedRequest(const MultiVerSyncTaskContext * context)316 void MultiVerDataSync::SendFinishedRequest(const MultiVerSyncTaskContext *context)
317 {
318     if (context == nullptr) {
319         return;
320     }
321     MultiVerRequestPacket *packet = new (std::nothrow) MultiVerRequestPacket();
322     if (packet == nullptr) {
323         LOGE("MultiVerRequestPacket::SendRequestPacket : new packet error");
324         return;
325     }
326     packet->SetErrCode(-E_LAST_SYNC_FRAME);
327     Message *message = new (std::nothrow) Message(MULTI_VER_DATA_SYNC_MESSAGE);
328     if (message == nullptr) {
329         delete packet;
330         packet = nullptr;
331         LOGE("MultiVerDataSync::SendRequestPacket : new message error");
332         return;
333     }
334     message->SetMessageType(TYPE_REQUEST);
335     message->SetTarget(context->GetDeviceId());
336     int errCode = message->SetExternalObject(packet);
337     if (errCode != E_OK) {
338         delete packet;
339         packet = nullptr;
340         delete message;
341         message = nullptr;
342         LOGE("[MultiVerDataSync][SendFinishedRequest] : SetExternalObject failed errCode:%d", errCode);
343         return;
344     }
345     message->SetSessionId(context->GetRequestSessionId());
346     message->SetSequenceId(context->GetSequenceId());
347 
348     errCode = Send(message->GetTarget(), message);
349     if (errCode != E_OK) {
350         delete message;
351         message = nullptr;
352         LOGE("[MultiVerDataSync][SendFinishedRequest] SendFinishedRequest failed, err %d", errCode);
353     }
354     LOGI("[MultiVerDataSync][SendFinishedRequest] SendFinishedRequest dst=%s{private}", context->GetDeviceId().c_str());
355 }
356 
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)357 int MultiVerDataSync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
358 {
359     if ((inMsg == nullptr) || !IsPacketValid(inMsg, TYPE_REQUEST)) {
360         return -E_INVALID_ARGS;
361     }
362     const MultiVerRequestPacket *packet = inMsg->GetObject<MultiVerRequestPacket>();
363     if (packet == nullptr) {
364         return -E_INVALID_ARGS;
365     }
366 
367     len = packet->CalculateLen();
368     return E_OK;
369 }
370 
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)371 int MultiVerDataSync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
372 {
373     if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_REQUEST)) {
374         return -E_INVALID_ARGS;
375     }
376     const MultiVerRequestPacket *packet = inMsg->GetObject<MultiVerRequestPacket>();
377     if ((packet == nullptr) || (length != packet->CalculateLen())) {
378         return -E_INVALID_ARGS;
379     }
380 
381     MultiVerCommitNode commit;
382     packet->GetCommit(commit);
383     int32_t ackCode = packet->GetErrCode();
384 
385     Parcel parcel(buffer, length);
386     int errCode = parcel.WriteInt(ackCode);
387     if (errCode != E_OK) {
388         return -E_SECUREC_ERROR;
389     }
390     parcel.EightByteAlign();
391 
392     // commitMap Serialization
393     errCode = parcel.WriteMultiVerCommit(commit);
394     if (errCode != E_OK) {
395         return -E_SECUREC_ERROR;
396     }
397 
398     return errCode;
399 }
400 
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)401 int MultiVerDataSync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
402 {
403     if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_REQUEST)) {
404         return -E_INVALID_ARGS;
405     }
406 
407     MultiVerCommitNode commit;
408     Parcel parcel(const_cast<uint8_t *>(buffer), length);
409     int32_t pktErrCode;
410     uint64_t packLen = parcel.ReadInt(pktErrCode);
411     if (parcel.IsError()) {
412         return -E_INVALID_ARGS;
413     }
414     parcel.EightByteAlign();
415     packLen = Parcel::GetEightByteAlign(packLen);
416     // commit DeSerialization
417     packLen += parcel.ReadMultiVerCommit(commit);
418     if (packLen != length || parcel.IsError()) {
419         return -E_INVALID_ARGS;
420     }
421     MultiVerRequestPacket *packet = new (std::nothrow) MultiVerRequestPacket();
422     if (packet == nullptr) {
423         LOGE("MultiVerDataSync::RequestPacketDeSerialization : new packet error");
424         return -E_OUT_OF_MEMORY;
425     }
426     packet->SetCommit(commit);
427     packet->SetErrCode(pktErrCode);
428     int errCode = inMsg->SetExternalObject<>(packet);
429     if (errCode != E_OK) {
430         delete packet;
431         packet = nullptr;
432     }
433     return errCode;
434 }
435 
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)436 int MultiVerDataSync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
437 {
438     if (!IsPacketValid(inMsg, TYPE_RESPONSE)) {
439         return -E_INVALID_ARGS;
440     }
441 
442     const MultiVerAckPacket *packet = inMsg->GetObject<MultiVerAckPacket>();
443     if (packet == nullptr) {
444         return -E_INVALID_ARGS;
445     }
446     len = packet->CalculateLen();
447     return E_OK;
448 }
449 
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)450 int MultiVerDataSync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
451 {
452     if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_RESPONSE)) {
453         return -E_INVALID_ARGS;
454     }
455     const MultiVerAckPacket *packet = inMsg->GetObject<MultiVerAckPacket>();
456     if ((packet == nullptr) || (length != packet->CalculateLen())) {
457         return -E_INVALID_ARGS;
458     }
459 
460     Parcel parcel(buffer, length);
461     std::vector<std::vector<uint8_t>> entries;
462 
463     packet->GetData(entries);
464     int32_t errCode = E_OK;
465     packet->GetErrorCode(errCode);
466     // errCode Serialization
467     errCode = parcel.WriteInt(errCode);
468     if (errCode != E_OK) {
469         return -E_SECUREC_ERROR;
470     }
471     parcel.EightByteAlign();
472 
473     // commits vector Serialization
474     for (const auto &iter : entries) {
475         errCode = parcel.WriteVectorChar(iter);
476         if (errCode != E_OK) {
477             return -E_SECUREC_ERROR;
478         }
479     }
480 
481     return errCode;
482 }
483 
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)484 int MultiVerDataSync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
485 {
486     if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_RESPONSE)) {
487         return -E_INVALID_ARGS;
488     }
489 
490     Parcel parcel(const_cast<uint8_t *>(buffer), length);
491     int32_t pktErrCode;
492 
493     // errCode DeSerialization
494     uint32_t packLen = parcel.ReadInt(pktErrCode);
495     if (parcel.IsError()) {
496         return -E_INVALID_ARGS;
497     }
498     parcel.EightByteAlign();
499     packLen = Parcel::GetEightByteAlign(packLen);
500 
501     // commits vector DeSerialization
502     std::vector<std::vector<uint8_t>> entries;
503     while (packLen < length) {
504         std::vector<uint8_t> data;
505         packLen += parcel.ReadVectorChar(data);
506         // A valid dataItem got, Save to storage
507         entries.push_back(data);
508         if (parcel.IsError()) {
509             return -E_INVALID_ARGS;
510         }
511     }
512     MultiVerAckPacket *packet = new (std::nothrow) MultiVerAckPacket();
513     if (packet == nullptr) {
514         LOGE("MultiVerDataSync::AckPacketDeSerialization : new packet error");
515         return -E_OUT_OF_MEMORY;
516     }
517     packet->SetData(entries);
518     packet->SetErrorCode(pktErrCode);
519     int errCode = inMsg->SetExternalObject<>(packet);
520     if (errCode != E_OK) {
521         delete packet;
522         packet = nullptr;
523     }
524     return errCode;
525 }
526 
IsPacketValid(const Message * inMsg,uint16_t messageType)527 bool MultiVerDataSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
528 {
529     if ((inMsg == nullptr) || (inMsg->GetMessageId() != MULTI_VER_DATA_SYNC_MESSAGE)) {
530         return false;
531     }
532     if (messageType != inMsg->GetMessageType()) {
533         return false;
534     }
535     return true;
536 }
537 
GetValidCommit(MultiVerSyncTaskContext * context,MultiVerCommitNode & commit)538 int MultiVerDataSync::GetValidCommit(MultiVerSyncTaskContext *context, MultiVerCommitNode &commit)
539 {
540     int commitsSize = context->GetCommitsSize();
541     if (commitsSize > DBConstant::MAX_COMMIT_SIZE) {
542         LOGE("MultiVerDataSync::GetValidCommit failed, to large!");
543         return -E_LENGTH_ERROR;
544     }
545     int index = context->GetCommitIndex();
546     if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
547         context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
548         index--;
549     }
550     index = (index < 0) ? 0 : index;
551     LOGD("MultiVerDataSync::GetValidCommit begin, dst=%s{private}, index = %d", context->GetDeviceId().c_str(), index);
552     while (index < commitsSize) {
553         MultiVerCommitNode commitItem;
554         context->GetCommit(index, commitItem);
555         LOGD("MultiVerDataSync::GetValidCommit , dst=%s{private}, index = %d, commitsSize = %d",
556             context->GetDeviceId().c_str(), index, commitsSize);
557 
558         index++;
559         context->SetCommitIndex(index);
560         if (IsCommitExisted(commitItem)) {
561             continue;
562         }
563         commit = commitItem;
564         LOGD("MultiVerDataSync::GetValidCommit ok, dst=%s{private}, commit index = %d",
565              context->GetDeviceId().c_str(), index);
566         return E_OK;
567     }
568     LOGD("MultiVerDataSync::GetValidCommit not found, dst=%s{private}", context->GetDeviceId().c_str());
569     return -E_NOT_FOUND;
570 }
571 
IsCommitExisted(const MultiVerCommitNode & commit)572 bool MultiVerDataSync::IsCommitExisted(const MultiVerCommitNode &commit)
573 {
574     return storagePtr_->IsCommitExisted(commit);
575 }
576 
Send(const DeviceID & deviceId,const Message * inMsg)577 int MultiVerDataSync::Send(const DeviceID &deviceId, const Message *inMsg)
578 {
579     SendConfig conf = {false, false, SEND_TIME_OUT, {}};
580     int errCode = communicateHandle_->SendMessage(deviceId, inMsg, conf);
581     if (errCode != E_OK) {
582         LOGE("MultiVerDataSync::Send ERR! ERR = %d", errCode);
583     }
584     return errCode;
585 }
586 
SendRequestPacket(const MultiVerSyncTaskContext * context,MultiVerCommitNode & commit)587 int MultiVerDataSync::SendRequestPacket(const MultiVerSyncTaskContext *context, MultiVerCommitNode &commit)
588 {
589     MultiVerRequestPacket *packet = new (std::nothrow) MultiVerRequestPacket();
590     if (packet == nullptr) {
591         LOGE("MultiVerRequestPacket::SendRequestPacket : new packet error");
592         return -E_OUT_OF_MEMORY;
593     }
594     packet->SetCommit(commit);
595     Message *message = new (std::nothrow) Message(MULTI_VER_DATA_SYNC_MESSAGE);
596     if (message == nullptr) {
597         delete packet;
598         packet = nullptr;
599         LOGE("MultiVerDataSync::SendRequestPacket : new message error");
600         return -E_OUT_OF_MEMORY;
601     }
602     message->SetMessageType(TYPE_REQUEST);
603     message->SetTarget(context->GetDeviceId());
604     int errCode = message->SetExternalObject(packet);
605     if (errCode != E_OK) {
606         delete packet;
607         packet = nullptr;
608         delete message;
609         message = nullptr;
610         LOGE("[MultiVerDataSync][SendRequestPacket] : SetExternalObject failed errCode:%d", errCode);
611         return errCode;
612     }
613     message->SetSessionId(context->GetRequestSessionId());
614     message->SetSequenceId(context->GetSequenceId());
615 
616     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
617     if (performance != nullptr) {
618         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_DATA_ENTRY_SEND_REQUEST_TO_ACK_RECV);
619     }
620     errCode = Send(message->GetTarget(), message);
621     if (errCode != E_OK) {
622         delete message;
623         message = nullptr;
624     }
625     LOGD("MultiVerDataSync::SendRequestPacket end");
626     return errCode;
627 }
628 
SendAckPacket(const MultiVerSyncTaskContext * context,const std::vector<MultiVerKvEntry * > & dataItems,int retCode,const Message * message)629 int MultiVerDataSync::SendAckPacket(const MultiVerSyncTaskContext *context,
630     const std::vector<MultiVerKvEntry *> &dataItems, int retCode, const Message *message)
631 {
632     if (message == nullptr) {
633         LOGE("MultiVerDataSync::SendAckPacket : message is nullptr");
634         return -E_INVALID_ARGS;
635     }
636 
637     MultiVerAckPacket *packet = new (std::nothrow) MultiVerAckPacket();
638     if (packet == nullptr) {
639         LOGE("MultiVerDataSync::SendAckPack et : packet is nullptr");
640         return -E_OUT_OF_MEMORY;
641     }
642     Message *ackMessage = new (std::nothrow) Message(MULTI_VER_DATA_SYNC_MESSAGE);
643     if (ackMessage == nullptr) {
644         delete packet;
645         packet = nullptr;
646         LOGE("MultiVerDataSync::SendAckPacket : new message error");
647         return -E_OUT_OF_MEMORY;
648     }
649 
650     std::vector<std::vector<uint8_t>> entries;
651     for (const auto &iter : dataItems) {
652         std::vector<uint8_t> item;
653         iter->GetSerialData(item);
654         entries.push_back(item);
655     }
656     packet->SetData(entries);
657     packet->SetErrorCode(static_cast<int32_t>(retCode));
658 
659     ackMessage->SetMessageType(TYPE_RESPONSE);
660     ackMessage->SetTarget(context->GetDeviceId());
661     int errCode = ackMessage->SetExternalObject(packet);
662     if (errCode != E_OK) {
663         delete packet;
664         packet = nullptr;
665         delete ackMessage;
666         ackMessage = nullptr;
667         LOGE("[MultiVerDataSync][SendAckPacket] : SetExternalObject failed errCode:%d", errCode);
668         return errCode;
669     }
670     ackMessage->SetSequenceId(message->GetSequenceId());
671     ackMessage->SetSessionId(message->GetSessionId());
672     errCode = Send(ackMessage->GetTarget(), ackMessage);
673     if (errCode != E_OK) {
674         delete ackMessage;
675         ackMessage = nullptr;
676     }
677     LOGD("MultiVerDataSync::SendAckPacket end, dst=%s{private}, errCode = %d", context->GetDeviceId().c_str(), errCode);
678     return errCode;
679 }
680 
GetCommitData(const MultiVerCommitNode & commit,std::vector<MultiVerKvEntry * > & entries)681 int MultiVerDataSync::GetCommitData(const MultiVerCommitNode &commit, std::vector<MultiVerKvEntry *> &entries)
682 {
683     return storagePtr_->GetCommitData(commit, entries);
684 }
685 
CreateKvEntry(const std::vector<uint8_t> & entry)686 MultiVerKvEntry *MultiVerDataSync::CreateKvEntry(const std::vector<uint8_t> &entry)
687 {
688     return storagePtr_->CreateKvEntry(entry);
689 }
690 }
691 #endif