• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OMIT_MULTI_VER
17 #include "commit_history_sync.h"
18 
19 #include "sync_engine.h"
20 #include "parcel.h"
21 #include "log_print.h"
22 #include "message_transform.h"
23 #include "performance_analysis.h"
24 #include "db_constant.h"
25 
26 namespace DistributedDB {
27 // Class CommitHistorySyncRequestPacket
CalculateLen() const28 uint32_t CommitHistorySyncRequestPacket::CalculateLen() const
29 {
30     uint64_t len = Parcel::GetUInt64Len();
31     // commitMap len
32     for (const auto &iter : commitMap_) {
33         len += Parcel::GetStringLen(iter.first);
34         len += Parcel::GetMultiVerCommitLen(iter.second);
35         if (len > INT32_MAX) {
36             return 0;
37         }
38     }
39     len += Parcel::GetUInt32Len(); // version
40     len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
41     len = Parcel::GetEightByteAlign(len);
42     if (len > INT32_MAX) {
43         return 0;
44     }
45     return len;
46 }
47 
SetCommitMap(std::map<std::string,MultiVerCommitNode> & inMap)48 void CommitHistorySyncRequestPacket::SetCommitMap(std::map<std::string, MultiVerCommitNode> &inMap)
49 {
50     commitMap_ = std::move(inMap);
51 }
52 
GetCommitMap(std::map<std::string,MultiVerCommitNode> & outMap) const53 void CommitHistorySyncRequestPacket::GetCommitMap(std::map<std::string, MultiVerCommitNode> &outMap) const
54 {
55     outMap = commitMap_;
56 }
57 
SetVersion(uint32_t version)58 void CommitHistorySyncRequestPacket::SetVersion(uint32_t version)
59 {
60     version_ = version;
61 }
62 
GetVersion() const63 uint32_t CommitHistorySyncRequestPacket::GetVersion() const
64 {
65     return version_;
66 }
67 
SetReserved(std::vector<uint64_t> & reserved)68 void CommitHistorySyncRequestPacket::SetReserved(std::vector<uint64_t> &reserved)
69 {
70     reserved_ = std::move(reserved);
71 }
72 
GetReserved() const73 std::vector<uint64_t> CommitHistorySyncRequestPacket::GetReserved() const
74 {
75     return reserved_;
76 }
77 
CalculateLen() const78 uint32_t CommitHistorySyncAckPacket::CalculateLen() const
79 {
80     uint64_t len = Parcel::GetIntLen(); // errCode
81     len += Parcel::GetUInt32Len(); // version
82     len = Parcel::GetEightByteAlign(len);
83 
84     // commits vector len
85     len += Parcel::GetMultiVerCommitsLen(commits_);
86     len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
87     len = Parcel::GetEightByteAlign(len);
88     if (len > INT32_MAX) {
89         return 0;
90     }
91     return len;
92 }
93 
SetData(std::vector<MultiVerCommitNode> & inData)94 void CommitHistorySyncAckPacket::SetData(std::vector<MultiVerCommitNode> &inData)
95 {
96     commits_ = std::move(inData);
97 }
98 
GetData(std::vector<MultiVerCommitNode> & outData) const99 void CommitHistorySyncAckPacket::GetData(std::vector<MultiVerCommitNode> &outData) const
100 {
101     outData = commits_;
102 }
103 
SetErrorCode(int32_t errCode)104 void CommitHistorySyncAckPacket::SetErrorCode(int32_t errCode)
105 {
106     errorCode_ = errCode;
107 }
108 
GetErrorCode(int32_t & errCode) const109 void CommitHistorySyncAckPacket::GetErrorCode(int32_t &errCode) const
110 {
111     errCode = errorCode_;
112 }
113 
SetVersion(uint32_t version)114 void CommitHistorySyncAckPacket::SetVersion(uint32_t version)
115 {
116     version_ = version;
117 }
118 
GetVersion() const119 uint32_t CommitHistorySyncAckPacket::GetVersion() const
120 {
121     return version_;
122 }
123 
SetReserved(std::vector<uint64_t> & reserved)124 void CommitHistorySyncAckPacket::SetReserved(std::vector<uint64_t> &reserved)
125 {
126     reserved_ = std::move(reserved);
127 }
128 
GetReserved() const129 std::vector<uint64_t> CommitHistorySyncAckPacket::GetReserved() const
130 {
131     return reserved_;
132 }
133 
134 // Class CommitHistorySync
~CommitHistorySync()135 CommitHistorySync::~CommitHistorySync()
136 {
137     storagePtr_ = nullptr;
138     communicateHandle_ = nullptr;
139 }
140 
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)141 int CommitHistorySync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
142 {
143     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
144         return -E_MESSAGE_ID_ERROR;
145     }
146 
147     switch (inMsg->GetMessageType()) {
148         case TYPE_REQUEST:
149             return RequestPacketSerialization(buffer, length, inMsg);
150         case TYPE_RESPONSE:
151             return AckPacketSerialization(buffer, length, inMsg);
152         default:
153             return -E_MESSAGE_TYPE_ERROR;
154     }
155 }
156 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)157 int CommitHistorySync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
158 {
159     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
160         return -E_MESSAGE_ID_ERROR;
161     }
162 
163     switch (inMsg->GetMessageType()) {
164         case TYPE_REQUEST:
165             return RequestPacketDeSerialization(buffer, length, inMsg);
166         case TYPE_RESPONSE:
167             return AckPacketDeSerialization(buffer, length, inMsg);
168         default:
169             return -E_MESSAGE_TYPE_ERROR;
170     }
171 }
172 
CalculateLen(const Message * inMsg)173 uint32_t CommitHistorySync::CalculateLen(const Message *inMsg)
174 {
175     if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
176         return 0;
177     }
178 
179     uint32_t len = 0;
180     int errCode = E_OK;
181     switch (inMsg->GetMessageType()) {
182         case TYPE_REQUEST:
183             errCode = RequestPacketCalculateLen(inMsg, len);
184             if (errCode != E_OK) {
185                 return 0;
186             }
187             return len;
188         case TYPE_RESPONSE:
189             errCode = AckPacketCalculateLen(inMsg, len);
190             if (errCode != E_OK) {
191                 return 0;
192             }
193             return len;
194         default:
195             return 0;
196     }
197 }
198 
RegisterTransformFunc()199 int CommitHistorySync::RegisterTransformFunc()
200 {
201     TransformFunc func;
202     func.computeFunc = std::bind(&CommitHistorySync::CalculateLen, std::placeholders::_1);
203     func.serializeFunc = std::bind(&CommitHistorySync::Serialization, std::placeholders::_1,
204                                    std::placeholders::_2, std::placeholders::_3);
205     func.deserializeFunc = std::bind(&CommitHistorySync::DeSerialization, std::placeholders::_1,
206                                      std::placeholders::_2, std::placeholders::_3);
207     return MessageTransform::RegTransformFunction(COMMIT_HISTORY_SYNC_MESSAGE, func);
208 }
209 
Initialize(MultiVerKvDBSyncInterface * storagePtr,ICommunicator * communicateHandle)210 int CommitHistorySync::Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle)
211 {
212     if ((storagePtr == nullptr) || (communicateHandle == nullptr)) {
213         return -E_INVALID_ARGS;
214     }
215     storagePtr_ = storagePtr;
216     communicateHandle_ = communicateHandle;
217     return E_OK;
218 }
219 
TimeOutCallback(MultiVerSyncTaskContext * context,const Message * message) const220 void CommitHistorySync::TimeOutCallback(MultiVerSyncTaskContext *context, const Message *message) const
221 {
222     (void)context;
223     (void)message;
224     return;
225 }
226 
SyncStart(MultiVerSyncTaskContext * context)227 int CommitHistorySync::SyncStart(MultiVerSyncTaskContext *context)
228 {
229     if (context == nullptr) {
230         return -E_INVALID_ARGS;
231     }
232     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
233     if (performance != nullptr) {
234         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_DEVICE_LATEST_COMMIT);
235     }
236     std::map<std::string, MultiVerCommitNode> commitMap;
237     int errCode = GetDeviceLatestCommit(commitMap);
238     if (performance != nullptr) {
239         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_DEVICE_LATEST_COMMIT);
240     }
241     if ((errCode != E_OK) && (errCode != -E_NOT_FOUND)) {
242         return errCode;
243     }
244 
245     LOGD("CommitHistorySync::commitMap size = %zu, dst=%s{private}", commitMap.size(), context->GetDeviceId().c_str());
246     return SendRequestPacket(context, commitMap);
247 }
248 
RequestRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)249 int CommitHistorySync::RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
250 {
251     if (!IsPacketValid(message, TYPE_REQUEST) || context == nullptr) {
252         return -E_INVALID_ARGS;
253     }
254     const CommitHistorySyncRequestPacket *packet = message->GetObject<CommitHistorySyncRequestPacket>();
255     if (packet == nullptr) {
256         return -E_INVALID_ARGS;
257     }
258     std::vector<MultiVerCommitNode> commits;
259     int errCode = RunPermissionCheck(context->GetDeviceId());
260     if (errCode == -E_NOT_PERMIT) {
261         LOGE("CommitHistorySync::RequestRecvCallback RunPermissionCheck not pass");
262         SendAckPacket(context, commits, errCode, message);
263         return errCode;
264     }
265     std::map<std::string, MultiVerCommitNode> commitMap;
266     packet->GetCommitMap(commitMap);
267     uint32_t ver = packet->GetVersion();
268     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
269     if (performance != nullptr) {
270         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_COMMIT_TREE);
271     }
272     errCode = GetCommitTree(commitMap, commits);
273     if (performance != nullptr) {
274         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_COMMIT_TREE);
275     }
276     if (errCode != E_OK) {
277         LOGE("CommitHistorySync::RequestRecvCallback : GetCommitTree ERR, errno = %d", errCode);
278     }
279 
280     errCode = SendAckPacket(context, commits, errCode, message);
281     LOGD("CommitHistorySync::RequestRecvCallback:SendAckPacket, errno = %d, dst=%s{private}, ver = %" PRIu32
282         ", myversion = %" PRIu32, errCode, context->GetDeviceId().c_str(), ver, SOFTWARE_VERSION_CURRENT);
283     if (errCode == E_OK) {
284         if (commitMap.empty()) {
285             LOGD("[CommitHistorySync][RequestRecvCallback] no need to start SyncResponse");
286             return -E_NOT_FOUND;
287         }
288     }
289     return errCode;
290 }
291 
AckRecvCallback(MultiVerSyncTaskContext * context,const Message * message)292 int CommitHistorySync::AckRecvCallback(MultiVerSyncTaskContext *context, const Message *message)
293 {
294     if (!IsPacketValid(message, TYPE_RESPONSE) || (context == nullptr)) {
295         return -E_INVALID_ARGS;
296     }
297 
298     std::vector<MultiVerCommitNode> commits;
299     int32_t errCode;
300 
301     const CommitHistorySyncAckPacket *packet = message->GetObject<CommitHistorySyncAckPacket>();
302     if (packet == nullptr) {
303         return -E_INVALID_ARGS;
304     }
305     packet->GetErrorCode(errCode);
306     if (errCode == -E_NOT_PERMIT) {
307         LOGE("CommitHistorySync::AckRecvCallback RunPermissionCheck not pass");
308         return errCode;
309     }
310     packet->GetData(commits);
311     uint32_t ver = packet->GetVersion();
312     context->SetCommits(commits);
313     context->SetCommitIndex(0);
314     context->SetCommitsSize(static_cast<int>(commits.size()));
315     LOGD("CommitHistorySync::AckRecvCallback end, CommitsSize = %zu, dst = %s{private}, ver = %d, myversion = %u",
316         commits.size(), context->GetDeviceId().c_str(), ver, SOFTWARE_VERSION_CURRENT);
317     return E_OK;
318 }
319 
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)320 int CommitHistorySync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
321 {
322     if (inMsg == nullptr) {
323         return -E_INVALID_ARGS;
324     }
325     const CommitHistorySyncRequestPacket *packet = inMsg->GetObject<CommitHistorySyncRequestPacket>();
326     if (packet == nullptr) {
327         return -E_INVALID_ARGS;
328     }
329 
330     if ((inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE) || (inMsg->GetMessageType() != TYPE_REQUEST)) {
331         return -E_INVALID_ARGS;
332     }
333     len = packet->CalculateLen();
334     return E_OK;
335 }
336 
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)337 int CommitHistorySync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
338 {
339     if ((buffer == nullptr) || (inMsg == nullptr)) {
340         return -E_INVALID_ARGS;
341     }
342     const CommitHistorySyncRequestPacket *packet = inMsg->GetObject<CommitHistorySyncRequestPacket>();
343     if ((packet == nullptr) || (length != packet->CalculateLen())) {
344         return -E_INVALID_ARGS;
345     }
346 
347     Parcel parcel(buffer, length);
348     std::map<std::string, MultiVerCommitNode> commitMap;
349     packet->GetCommitMap(commitMap);
350 
351     int errCode = parcel.WriteUInt64(commitMap.size());
352     if (errCode != E_OK) {
353         return -E_SECUREC_ERROR;
354     }
355     // commitMap Serialization
356     for (auto &iter : commitMap) {
357         errCode = parcel.WriteString(iter.first);
358         if (errCode != E_OK) {
359             return -E_SECUREC_ERROR;
360         }
361         errCode = parcel.WriteMultiVerCommit(iter.second);
362         if (errCode != E_OK) {
363             return -E_SECUREC_ERROR;
364         }
365     }
366     errCode = parcel.WriteUInt32(packet->GetVersion());
367     if (errCode != E_OK) {
368         return -E_SECUREC_ERROR;
369     }
370     errCode = parcel.WriteVector<uint64_t>(packet->GetReserved());
371     if (errCode != E_OK) {
372         return -E_SECUREC_ERROR;
373     }
374     parcel.EightByteAlign();
375     if (parcel.IsError()) { // almost success
376         return -E_INVALID_ARGS;
377     }
378     return errCode;
379 }
380 
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)381 int CommitHistorySync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
382 {
383     if ((buffer == nullptr) || (inMsg == nullptr)) {
384         return -E_INVALID_ARGS;
385     }
386 
387     uint64_t packLen = 0;
388     uint64_t len = 0;
389     Parcel parcel(const_cast<uint8_t *>(buffer), length);
390     packLen += parcel.ReadUInt64(len);
391     if (len > DBConstant::MAX_DEVICES_SIZE) {
392         LOGE("CommitHistorySync::RequestPacketDeSerialization : commitMap size too large = %" PRIu64, len);
393         return -E_INVALID_ARGS;
394     }
395     // commitMap DeSerialization
396     std::map<std::string, MultiVerCommitNode> commitMap;
397     while (len > 0) {
398         std::string key;
399         MultiVerCommitNode val;
400         packLen += parcel.ReadString(key);
401         packLen += parcel.ReadMultiVerCommit(val);
402         commitMap[key] = val;
403         len--;
404         if (parcel.IsError()) {
405             return -E_INVALID_ARGS;
406         }
407     }
408     uint32_t version;
409     std::vector<uint64_t> reserved;
410     packLen += parcel.ReadUInt32(version);
411     packLen += parcel.ReadVector<uint64_t>(reserved);
412     packLen = Parcel::GetEightByteAlign(packLen);
413     if (packLen != length || parcel.IsError()) {
414         LOGE("CommitHistorySync::RequestPacketDeSerialization : length error, input len = %" PRIu32
415             ", cac len = %" PRIu64, length, packLen);
416         return -E_INVALID_ARGS;
417     }
418     CommitHistorySyncRequestPacket *packet = new (std::nothrow) CommitHistorySyncRequestPacket();
419     if (packet == nullptr) {
420         LOGE("CommitHistorySync::RequestPacketDeSerialization : new packet error");
421         return -E_OUT_OF_MEMORY;
422     }
423     packet->SetCommitMap(commitMap);
424     packet->SetVersion(version);
425     packet->SetReserved(reserved);
426     int errCode = inMsg->SetExternalObject<>(packet);
427     if (errCode != E_OK) {
428         delete packet;
429         packet = nullptr;
430     }
431     return errCode;
432 }
433 
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)434 int CommitHistorySync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
435 {
436     if (inMsg == nullptr) {
437         return -E_INVALID_ARGS;
438     }
439     const CommitHistorySyncAckPacket *packet = inMsg->GetObject<CommitHistorySyncAckPacket>();
440     if (packet == nullptr) {
441         return -E_INVALID_ARGS;
442     }
443 
444     if ((inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE) || (inMsg->GetMessageType() != TYPE_RESPONSE)) {
445         return -E_INVALID_ARGS;
446     }
447     len = packet->CalculateLen();
448     return E_OK;
449 }
450 
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)451 int CommitHistorySync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
452 {
453     if ((buffer == nullptr) || (inMsg == nullptr)) {
454         return -E_INVALID_ARGS;
455     }
456     const CommitHistorySyncAckPacket *packet = inMsg->GetObject<CommitHistorySyncAckPacket>();
457     if ((packet == nullptr) || (length != packet->CalculateLen())) {
458         return -E_INVALID_ARGS;
459     }
460 
461     Parcel parcel(buffer, length);
462     int32_t ackErrCode;
463     std::vector<MultiVerCommitNode> commits;
464 
465     packet->GetData(commits);
466     packet->GetErrorCode(ackErrCode);
467     // errCode Serialization
468     parcel.WriteInt(ackErrCode);
469     parcel.WriteUInt32(packet->GetVersion());
470     parcel.EightByteAlign();
471     if (parcel.IsError()) { // almost success
472         return -E_INVALID_ARGS;
473     }
474     // commits vector Serialization
475     int errCode = parcel.WriteMultiVerCommits(commits);
476     if (errCode != E_OK) {
477         return -E_SECUREC_ERROR;
478     }
479     errCode = parcel.WriteVector<uint64_t>(packet->GetReserved());
480     if (errCode != E_OK) {
481         return -E_SECUREC_ERROR;
482     }
483     parcel.EightByteAlign();
484     if (parcel.IsError()) { // almost success
485         return -E_INVALID_ARGS;
486     }
487     return errCode;
488 }
489 
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)490 int CommitHistorySync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
491 {
492     std::vector<MultiVerCommitNode> commits;
493     uint32_t packLen = 0;
494     Parcel parcel(const_cast<uint8_t *>(buffer), length);
495     int32_t pktErrCode;
496     uint32_t version;
497     std::vector<uint64_t> reserved;
498 
499     // errCode DeSerialization
500     packLen += parcel.ReadInt(pktErrCode);
501     packLen += parcel.ReadUInt32(version);
502     parcel.EightByteAlign();
503     if (parcel.IsError()) {
504         return -E_PARSE_FAIL;
505     }
506     packLen = Parcel::GetEightByteAlign(packLen);
507     // commits vector DeSerialization
508     packLen += parcel.ReadMultiVerCommits(commits);
509     packLen += parcel.ReadVector<uint64_t>(reserved);
510     packLen = Parcel::GetEightByteAlign(packLen);
511     if (packLen != length || parcel.IsError()) {
512         LOGE("CommitHistorySync::AckPacketDeSerialization : packet len error, input len = %u, cal len = %u",
513             length, packLen);
514         return -E_INVALID_ARGS;
515     }
516     CommitHistorySyncAckPacket *packet = new (std::nothrow) CommitHistorySyncAckPacket();
517     if (packet == nullptr) {
518         LOGE("CommitHistorySync::AckPacketDeSerialization : new packet error");
519         return -E_OUT_OF_MEMORY;
520     }
521     packet->SetData(commits);
522     packet->SetErrorCode(pktErrCode);
523     packet->SetVersion(version);
524     packet->SetReserved(reserved);
525     int errCode = inMsg->SetExternalObject<>(packet);
526     if (errCode != E_OK) {
527         delete packet;
528         packet = nullptr;
529     }
530     return errCode;
531 }
532 
IsPacketValid(const Message * inMsg,uint16_t messageType)533 bool CommitHistorySync::IsPacketValid(const Message *inMsg, uint16_t messageType)
534 {
535     if ((inMsg == nullptr) || (inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE)) {
536         return false;
537     }
538     if (messageType != inMsg->GetMessageType()) {
539         return false;
540     }
541     return true;
542 }
543 
Send(const DeviceID & deviceId,const Message * inMsg)544 int CommitHistorySync::Send(const DeviceID &deviceId, const Message *inMsg)
545 {
546     SendConfig conf = {false, false, SEND_TIME_OUT, {}};
547     int errCode = communicateHandle_->SendMessage(deviceId, inMsg, conf);
548     if (errCode != E_OK) {
549         LOGE("CommitHistorySync::Send ERR! err = %d", errCode);
550     }
551     return errCode;
552 }
553 
GetDeviceLatestCommit(std::map<std::string,MultiVerCommitNode> & commitMap)554 int CommitHistorySync::GetDeviceLatestCommit(std::map<std::string, MultiVerCommitNode> &commitMap)
555 {
556     std::map<std::string, MultiVerCommitNode> readCommitMap;
557     int errCode = storagePtr_->GetDeviceLatestCommit(readCommitMap);
558     if (errCode != E_OK) {
559         return errCode;
560     }
561 
562     std::string localDevice;
563     errCode = GetLocalDeviceInfo(localDevice);
564     LOGD("GetLocalDeviceInfo : %s{private}, errCode = %d", localDevice.c_str(), errCode);
565     if (errCode != E_OK) {
566         return errCode;
567     }
568 
569     for (auto &item : readCommitMap) {
570         errCode = storagePtr_->TransferSyncCommitDevInfo(item.second, localDevice, false);
571         if (errCode != E_OK) {
572             break;
573         }
574         commitMap.insert(std::make_pair(item.second.deviceInfo, item.second));
575     }
576 
577     return errCode;
578 }
579 
GetCommitTree(const std::map<std::string,MultiVerCommitNode> & commitMap,std::vector<MultiVerCommitNode> & commits)580 int CommitHistorySync::GetCommitTree(const std::map<std::string, MultiVerCommitNode> &commitMap,
581     std::vector<MultiVerCommitNode> &commits)
582 {
583     std::map<std::string, MultiVerCommitNode> newCommitMap;
584 
585     std::string localDevice;
586     int errCode = GetLocalDeviceInfo(localDevice);
587     LOGD("GetLocalDeviceInfo : %s{private}, errCode = %d", localDevice.c_str(), errCode);
588     if (errCode != E_OK) {
589         return errCode;
590     }
591 
592     for (const auto &item : commitMap) {
593         MultiVerCommitNode commitNode = item.second;
594         errCode = storagePtr_->TransferSyncCommitDevInfo(commitNode, localDevice, true);
595         if (errCode != E_OK) {
596             return errCode;
597         }
598         newCommitMap.insert(std::make_pair(commitNode.deviceInfo, commitNode));
599     }
600 
601     errCode = storagePtr_->GetCommitTree(newCommitMap, commits);
602     if (errCode != E_OK) {
603         return errCode;
604     }
605     for (auto &commit : commits) {
606         errCode = storagePtr_->TransferSyncCommitDevInfo(commit, localDevice, false);
607         if (errCode != E_OK) {
608             break;
609         }
610     }
611     return errCode;
612 }
613 
SendRequestPacket(const MultiVerSyncTaskContext * context,std::map<std::string,MultiVerCommitNode> & commitMap)614 int CommitHistorySync::SendRequestPacket(const MultiVerSyncTaskContext *context,
615     std::map<std::string, MultiVerCommitNode> &commitMap)
616 {
617     CommitHistorySyncRequestPacket *packet = new (std::nothrow) CommitHistorySyncRequestPacket();
618     if (packet == nullptr) {
619         LOGE("CommitHistorySync::SendRequestPacket : new packet error");
620         return -E_OUT_OF_MEMORY;
621     }
622     packet->SetCommitMap(commitMap);
623     packet->SetVersion(SOFTWARE_VERSION_CURRENT);
624     Message *message = new (std::nothrow) Message(COMMIT_HISTORY_SYNC_MESSAGE);
625     if (message == nullptr) {
626         LOGE("CommitHistorySync::SendRequestPacket : new message error");
627         delete packet;
628         packet = nullptr;
629         return -E_OUT_OF_MEMORY;
630     }
631     message->SetMessageType(TYPE_REQUEST);
632     message->SetTarget(context->GetDeviceId());
633     int errCode = message->SetExternalObject(packet);
634     if (errCode != E_OK) {
635         delete packet;
636         packet = nullptr;
637         delete message;
638         message = nullptr;
639         LOGE("CommitHistorySync::SendRequestPacket : SetExternalObject failed errCode:%d", errCode);
640         return errCode;
641     }
642     message->SetSessionId(context->GetRequestSessionId());
643     message->SetSequenceId(context->GetSequenceId());
644 
645     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
646     if (performance != nullptr) {
647         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_COMMIT_SEND_REQUEST_TO_ACK_RECV);
648     }
649     errCode = Send(message->GetTarget(), message);
650     if (errCode != E_OK) {
651         LOGE("CommitHistorySync::SendRequestPacket : Send failed errCode:%d", errCode);
652         delete message;
653         message = nullptr;
654     }
655     return errCode;
656 }
657 
SendAckPacket(const MultiVerSyncTaskContext * context,std::vector<MultiVerCommitNode> & commits,int ackCode,const Message * message)658 int CommitHistorySync::SendAckPacket(const MultiVerSyncTaskContext *context,
659     std::vector<MultiVerCommitNode> &commits, int ackCode, const Message *message)
660 {
661     if (message == nullptr) {
662         LOGE("CommitHistorySync::SendAckPacket : message is nullptr");
663         return -E_INVALID_ARGS;
664     }
665     CommitHistorySyncAckPacket *packet = new (std::nothrow) CommitHistorySyncAckPacket();
666     if (packet == nullptr) {
667         LOGE("CommitHistorySync::SendAckPacket : packet is nullptr");
668         return -E_OUT_OF_MEMORY;
669     }
670     Message *ackMessage = new (std::nothrow) Message(COMMIT_HISTORY_SYNC_MESSAGE);
671     if (ackMessage == nullptr) {
672         LOGE("CommitHistorySync::SendAckPacket : new message error");
673         delete packet;
674         packet = nullptr;
675         return -E_OUT_OF_MEMORY;
676     }
677 
678     packet->SetData(commits);
679     packet->SetErrorCode(static_cast<int32_t>(ackCode));
680     packet->SetVersion(SOFTWARE_VERSION_CURRENT);
681     ackMessage->SetMessageType(TYPE_RESPONSE);
682     ackMessage->SetTarget(context->GetDeviceId());
683     int errCode = ackMessage->SetExternalObject(packet);
684     if (errCode != E_OK) {
685         delete packet;
686         packet = nullptr;
687         delete ackMessage;
688         ackMessage = nullptr;
689         LOGE("CommitHistorySync::SendAckPacket : SetExternalObject failed errCode:%d", errCode);
690         return errCode;
691     }
692     ackMessage->SetSequenceId(message->GetSequenceId());
693     ackMessage->SetSessionId(message->GetSessionId());
694     errCode = Send(ackMessage->GetTarget(), ackMessage);
695     if (errCode != E_OK) {
696         LOGE("CommitHistorySync::SendAckPacket : Send failed errCode:%d", errCode);
697         delete ackMessage;
698         ackMessage = nullptr;
699     }
700     return errCode;
701 }
702 
GetLocalDeviceInfo(std::string & deviceInfo)703 int CommitHistorySync::GetLocalDeviceInfo(std::string &deviceInfo)
704 {
705     return communicateHandle_->GetLocalIdentity(deviceInfo);
706 }
707 
RunPermissionCheck(const std::string & deviceId) const708 int CommitHistorySync::RunPermissionCheck(const std::string &deviceId) const
709 {
710     std::string appId = storagePtr_->GetDbProperties().GetStringProp(KvDBProperties::APP_ID, "");
711     std::string userId = storagePtr_->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, "");
712     std::string storeId = storagePtr_->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, "");
713     uint8_t flag = CHECK_FLAG_SEND;
714     PermissionCheckParam param = { userId, appId, storeId, deviceId };
715     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(param, flag);
716     if (errCode != E_OK) {
717         LOGE("[CommitHistorySync] RunPermissionCheck not pass errCode:%d, flag:%d", errCode, flag);
718         return -E_NOT_PERMIT;
719     }
720     return errCode;
721 }
722 }
723 #endif