• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OMIT_MULTI_VER
17 #include "commit_history_sync.h"
18 
19 #include "sync_engine.h"
20 #include "parcel.h"
21 #include "log_print.h"
22 #include "message_transform.h"
23 #include "performance_analysis.h"
24 #include "db_constant.h"
25 
26 namespace DistributedDB {
27 // Class CommitHistorySyncRequestPacket
CalculateLen() const28 uint32_t CommitHistorySyncRequestPacket::CalculateLen() const
29 {
30     uint64_t len = Parcel::GetUInt64Len();
31     // commitMap len
32     for (const auto &iter : commitMap_) {
33         len += Parcel::GetStringLen(iter.first);
34         len += Parcel::GetMultiVerCommitLen(iter.second);
35         if (len > INT32_MAX) {
36             return 0;
37         }
38     }
39     len += Parcel::GetUInt32Len(); // version
40     len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
41     len = Parcel::GetEightByteAlign(len);
42     if (len > INT32_MAX) {
43         return 0;
44     }
45     return len;
46 }
47 
SetCommitMap(std::map<std::string,MultiVerCommitNode> & inMap)48 void CommitHistorySyncRequestPacket::SetCommitMap(std::map<std::string, MultiVerCommitNode> &inMap)
49 {
50     commitMap_ = std::move(inMap);
51 }
52 
GetCommitMap(std::map<std::string,MultiVerCommitNode> & outMap) const53 void CommitHistorySyncRequestPacket::GetCommitMap(std::map<std::string, MultiVerCommitNode> &outMap) const
54 {
55     outMap = commitMap_;
56 }
57 
SetVersion(uint32_t version)58 void CommitHistorySyncRequestPacket::SetVersion(uint32_t version)
59 {
60     version_ = version;
61 }
62 
GetVersion() const63 uint32_t CommitHistorySyncRequestPacket::GetVersion() const
64 {
65     return version_;
66 }
67 
SetReserved(std::vector<uint64_t> & reserved)68 void CommitHistorySyncRequestPacket::SetReserved(std::vector<uint64_t> &reserved)
69 {
70     reserved_ = std::move(reserved);
71 }
72 
GetReserved() const73 std::vector<uint64_t> CommitHistorySyncRequestPacket::GetReserved() const
74 {
75     return reserved_;
76 }
77 
CalculateLen() const78 uint32_t CommitHistorySyncAckPacket::CalculateLen() const
79 {
80     uint64_t len = Parcel::GetIntLen(); // errCode
81     len += Parcel::GetUInt32Len(); // version
82     len = Parcel::GetEightByteAlign(len);
83 
84     // commits vector len
85     len += Parcel::GetMultiVerCommitsLen(commits_);
86     len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
87     len = Parcel::GetEightByteAlign(len);
88     if (len > INT32_MAX) {
89         return 0;
90     }
91     return len;
92 }
93 
SetData(std::vector<MultiVerCommitNode> & inData)94 void CommitHistorySyncAckPacket::SetData(std::vector<MultiVerCommitNode> &inData)
95 {
96     commits_ = std::move(inData);
97 }
98 
GetData(std::vector<MultiVerCommitNode> & outData) const99 void CommitHistorySyncAckPacket::GetData(std::vector<MultiVerCommitNode> &outData) const
100 {
101     outData = commits_;
102 }
103 
SetErrorCode(int32_t errCode)104 void CommitHistorySyncAckPacket::SetErrorCode(int32_t errCode)
105 {
106     errorCode_ = errCode;
107 }
108 
GetErrorCode(int32_t & errCode) const109 void CommitHistorySyncAckPacket::GetErrorCode(int32_t &errCode) const
110 {
111     errCode = errorCode_;
112 }
113 
SetVersion(uint32_t version)114 void CommitHistorySyncAckPacket::SetVersion(uint32_t version)
115 {
116     version_ = version;
117 }
118 
GetVersion() const119 uint32_t CommitHistorySyncAckPacket::GetVersion() const
120 {
121     return version_;
122 }
123 
SetReserved(std::vector<uint64_t> & reserved)124 void CommitHistorySyncAckPacket::SetReserved(std::vector<uint64_t> &reserved)
125 {
126     reserved_ = std::move(reserved);
127 }
128 
GetReserved() const129 std::vector<uint64_t> CommitHistorySyncAckPacket::GetReserved() const
130 {
131     return reserved_;
132 }
133 
134 // Class CommitHistorySync
~CommitHistorySync()135 CommitHistorySync::~CommitHistorySync()
136 {
137     storagePtr_ = nullptr;
138     communicateHandle_ = nullptr;
139 }
140 
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)141 int CommitHistorySync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
142 {
143     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
144         return -E_MESSAGE_ID_ERROR;
145     }
146 
147     switch (inMsg->GetMessageType()) {
148         case TYPE_REQUEST:
149             return RequestPacketSerialization(buffer, length, inMsg);
150         case TYPE_RESPONSE:
151             return AckPacketSerialization(buffer, length, inMsg);
152         default:
153             return -E_MESSAGE_TYPE_ERROR;
154     }
155 }
156 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)157 int CommitHistorySync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
158 {
159     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
160         return -E_MESSAGE_ID_ERROR;
161     }
162 
163     switch (inMsg->GetMessageType()) {
164         case TYPE_REQUEST:
165             return RequestPacketDeSerialization(buffer, length, inMsg);
166         case TYPE_RESPONSE:
167             return AckPacketDeSerialization(buffer, length, inMsg);
168         default:
169             return -E_MESSAGE_TYPE_ERROR;
170     }
171 }
172 
CalculateLen(const Message * inMsg)173 uint32_t CommitHistorySync::CalculateLen(const Message *inMsg)
174 {
175     if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
176         return 0;
177     }
178 
179     uint32_t len = 0;
180     int errCode = E_OK;
181     switch (inMsg->GetMessageType()) {
182         case TYPE_REQUEST:
183             errCode = RequestPacketCalculateLen(inMsg, len);
184             if (errCode != E_OK) {
185                 return 0;
186             }
187             return len;
188         case TYPE_RESPONSE:
189             errCode = AckPacketCalculateLen(inMsg, len);
190             if (errCode != E_OK) {
191                 return 0;
192             }
193             return len;
194         default:
195             return 0;
196     }
197 }
198 
RegisterTransformFunc()199 int CommitHistorySync::RegisterTransformFunc()
200 {
201     TransformFunc func;
202     func.computeFunc = std::bind(&CommitHistorySync::CalculateLen, std::placeholders::_1);
203     func.serializeFunc = std::bind(&CommitHistorySync::Serialization, std::placeholders::_1,
204                                    std::placeholders::_2, std::placeholders::_3);
205     func.deserializeFunc = std::bind(&CommitHistorySync::DeSerialization, std::placeholders::_1,
206                                      std::placeholders::_2, std::placeholders::_3);
207     return MessageTransform::RegTransformFunction(COMMIT_HISTORY_SYNC_MESSAGE, func);
208 }
209 
Initialize(MultiVerKvDBSyncInterface * storagePtr,ICommunicator * communicateHandle)210 int CommitHistorySync::Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle)
211 {
212     if ((storagePtr == nullptr) || (communicateHandle == nullptr)) {
213         return -E_INVALID_ARGS;
214     }
215     storagePtr_ = storagePtr;
216     communicateHandle_ = communicateHandle;
217     return E_OK;
218 }
219 
TimeOutCallback(MultiVerSyncTaskContext * context,const Message * message) const220 void CommitHistorySync::TimeOutCallback(MultiVerSyncTaskContext *context, const Message *message) const
221 {
222     (void)context;
223     (void)message;
224     return;
225 }
226 
SyncStart(MultiVerSyncTaskContext * context)227 int CommitHistorySync::SyncStart(MultiVerSyncTaskContext *context)
228 {
229     if (context == nullptr) {
230         return -E_INVALID_ARGS;
231     }
232     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
233     if (performance != nullptr) {
234         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_DEVICE_LATEST_COMMIT);
235     }
236     std::map<std::string, MultiVerCommitNode> commitMap;
237     int errCode = GetDeviceLatestCommit(commitMap);
238     if (performance != nullptr) {
239         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_DEVICE_LATEST_COMMIT);
240     }
241     if ((errCode != E_OK) && (errCode != -E_NOT_FOUND)) {
242         return errCode;
243     }
244 
245     LOGD("CommitHistorySync::commitMap size = %zu, dst=%s{private}", commitMap.size(), context->GetDeviceId().c_str());
246     return SendRequestPacket(context, commitMap);
247 }
248 
RequestRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)249 int CommitHistorySync::RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
250 {
251     if (!IsPacketValid(message, TYPE_REQUEST) || context == nullptr) {
252         return -E_INVALID_ARGS;
253     }
254     const CommitHistorySyncRequestPacket *packet = message->GetObject<CommitHistorySyncRequestPacket>();
255     if (packet == nullptr) {
256         return -E_INVALID_ARGS;
257     }
258     std::vector<MultiVerCommitNode> commits;
259     int errCode = RunPermissionCheck(context->GetDeviceId());
260     if (errCode == -E_NOT_PERMIT) {
261         LOGE("CommitHistorySync::RequestRecvCallback RunPermissionCheck not pass");
262         SendAckPacket(context, commits, errCode, message);
263         return errCode;
264     }
265     std::map<std::string, MultiVerCommitNode> commitMap;
266     packet->GetCommitMap(commitMap);
267     uint32_t ver = packet->GetVersion();
268     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
269     if (performance != nullptr) {
270         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_COMMIT_TREE);
271     }
272     errCode = GetCommitTree(commitMap, commits);
273     if (performance != nullptr) {
274         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_COMMIT_TREE);
275     }
276     if (errCode != E_OK) {
277         LOGE("CommitHistorySync::RequestRecvCallback : GetCommitTree ERR, errno = %d", errCode);
278     }
279 
280     errCode = SendAckPacket(context, commits, errCode, message);
281     LOGD("CommitHistorySync::RequestRecvCallback:SendAckPacket, errno = %d, dst=%s{private}, ver = %" PRIu32
282         ", myversion = %" PRIu32, errCode, context->GetDeviceId().c_str(), ver, SOFTWARE_VERSION_CURRENT);
283     if (errCode == E_OK) {
284         if (commitMap.empty()) {
285             LOGD("[CommitHistorySync][RequestRecvCallback] no need to start SyncResponse");
286             return -E_NOT_FOUND;
287         }
288     }
289     return errCode;
290 }
291 
AckRecvCallback(MultiVerSyncTaskContext * context,const Message * message)292 int CommitHistorySync::AckRecvCallback(MultiVerSyncTaskContext *context, const Message *message)
293 {
294     if (!IsPacketValid(message, TYPE_RESPONSE) || (context == nullptr)) {
295         return -E_INVALID_ARGS;
296     }
297 
298     std::vector<MultiVerCommitNode> commits;
299     int32_t errCode;
300 
301     const CommitHistorySyncAckPacket *packet = message->GetObject<CommitHistorySyncAckPacket>();
302     if (packet == nullptr) {
303         return -E_INVALID_ARGS;
304     }
305     packet->GetErrorCode(errCode);
306     if (errCode == -E_NOT_PERMIT) {
307         LOGE("CommitHistorySync::AckRecvCallback RunPermissionCheck not pass");
308         return errCode;
309     }
310     packet->GetData(commits);
311     uint32_t ver = packet->GetVersion();
312     context->SetCommits(commits);
313     context->SetCommitIndex(0);
314     context->SetCommitsSize(static_cast<int>(commits.size()));
315     LOGD("CommitHistorySync::AckRecvCallback end, CommitsSize = %zu, dst = %s{private}, ver = %d, myversion = %u",
316         commits.size(), context->GetDeviceId().c_str(), ver, SOFTWARE_VERSION_CURRENT);
317     return E_OK;
318 }
319 
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)320 int CommitHistorySync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
321 {
322     if (inMsg == nullptr) {
323         return -E_INVALID_ARGS;
324     }
325     const CommitHistorySyncRequestPacket *packet = inMsg->GetObject<CommitHistorySyncRequestPacket>();
326     if (packet == nullptr) {
327         return -E_INVALID_ARGS;
328     }
329 
330     if ((inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE) || (inMsg->GetMessageType() != TYPE_REQUEST)) {
331         return -E_INVALID_ARGS;
332     }
333     len = packet->CalculateLen();
334     return E_OK;
335 }
336 
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)337 int CommitHistorySync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
338 {
339     if ((buffer == nullptr) || (inMsg == nullptr)) {
340         return -E_INVALID_ARGS;
341     }
342     const CommitHistorySyncRequestPacket *packet = inMsg->GetObject<CommitHistorySyncRequestPacket>();
343     if ((packet == nullptr) || (length != packet->CalculateLen())) {
344         return -E_INVALID_ARGS;
345     }
346 
347     Parcel parcel(buffer, length);
348     std::map<std::string, MultiVerCommitNode> commitMap;
349     packet->GetCommitMap(commitMap);
350 
351     int errCode = parcel.WriteUInt64(commitMap.size());
352     if (errCode != E_OK) {
353         return -E_SECUREC_ERROR;
354     }
355     // commitMap Serialization
356     for (auto &iter : commitMap) {
357         errCode = parcel.WriteString(iter.first);
358         if (errCode != E_OK) {
359             return -E_SECUREC_ERROR;
360         }
361         errCode = parcel.WriteMultiVerCommit(iter.second);
362         if (errCode != E_OK) {
363             return -E_SECUREC_ERROR;
364         }
365     }
366     errCode = parcel.WriteUInt32(packet->GetVersion());
367     if (errCode != E_OK) {
368         return -E_SECUREC_ERROR;
369     }
370     errCode = parcel.WriteVector<uint64_t>(packet->GetReserved());
371     if (errCode != E_OK) {
372         return -E_SECUREC_ERROR;
373     }
374     parcel.EightByteAlign();
375     return errCode;
376 }
377 
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)378 int CommitHistorySync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
379 {
380     if ((buffer == nullptr) || (inMsg == nullptr)) {
381         return -E_INVALID_ARGS;
382     }
383 
384     uint64_t packLen = 0;
385     uint64_t len = 0;
386     Parcel parcel(const_cast<uint8_t *>(buffer), length);
387     packLen += parcel.ReadUInt64(len);
388     if (len > DBConstant::MAX_DEVICES_SIZE) {
389         LOGE("CommitHistorySync::RequestPacketDeSerialization : commitMap size too large = %" PRIu64, len);
390         return -E_INVALID_ARGS;
391     }
392     // commitMap DeSerialization
393     std::map<std::string, MultiVerCommitNode> commitMap;
394     while (len > 0) {
395         std::string key;
396         MultiVerCommitNode val;
397         packLen += parcel.ReadString(key);
398         packLen += parcel.ReadMultiVerCommit(val);
399         commitMap[key] = val;
400         len--;
401         if (parcel.IsError()) {
402             return -E_INVALID_ARGS;
403         }
404     }
405     uint32_t version;
406     std::vector<uint64_t> reserved;
407     packLen += parcel.ReadUInt32(version);
408     packLen += parcel.ReadVector<uint64_t>(reserved);
409     packLen = Parcel::GetEightByteAlign(packLen);
410     if (packLen != length || parcel.IsError()) {
411         LOGE("CommitHistorySync::RequestPacketDeSerialization : length error, input len = %" PRIu32
412             ", cac len = %" PRIu64, length, packLen);
413         return -E_INVALID_ARGS;
414     }
415     CommitHistorySyncRequestPacket *packet = new (std::nothrow) CommitHistorySyncRequestPacket();
416     if (packet == nullptr) {
417         LOGE("CommitHistorySync::RequestPacketDeSerialization : new packet error");
418         return -E_OUT_OF_MEMORY;
419     }
420     packet->SetCommitMap(commitMap);
421     packet->SetVersion(version);
422     packet->SetReserved(reserved);
423     int errCode = inMsg->SetExternalObject<>(packet);
424     if (errCode != E_OK) {
425         delete packet;
426         packet = nullptr;
427     }
428     return errCode;
429 }
430 
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)431 int CommitHistorySync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
432 {
433     if (inMsg == nullptr) {
434         return -E_INVALID_ARGS;
435     }
436     const CommitHistorySyncAckPacket *packet = inMsg->GetObject<CommitHistorySyncAckPacket>();
437     if (packet == nullptr) {
438         return -E_INVALID_ARGS;
439     }
440 
441     if ((inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE) || (inMsg->GetMessageType() != TYPE_RESPONSE)) {
442         return -E_INVALID_ARGS;
443     }
444     len = packet->CalculateLen();
445     return E_OK;
446 }
447 
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)448 int CommitHistorySync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
449 {
450     if ((buffer == nullptr) || (inMsg == nullptr)) {
451         return -E_INVALID_ARGS;
452     }
453     const CommitHistorySyncAckPacket *packet = inMsg->GetObject<CommitHistorySyncAckPacket>();
454     if ((packet == nullptr) || (length != packet->CalculateLen())) {
455         return -E_INVALID_ARGS;
456     }
457 
458     Parcel parcel(buffer, length);
459     int32_t ackErrCode;
460     std::vector<MultiVerCommitNode> commits;
461 
462     packet->GetData(commits);
463     packet->GetErrorCode(ackErrCode);
464     // errCode Serialization
465     int errCode = parcel.WriteInt(ackErrCode);
466     if (errCode != E_OK) {
467         return -E_SECUREC_ERROR;
468     }
469     errCode = parcel.WriteUInt32(packet->GetVersion());
470     if (errCode != E_OK) {
471         return -E_SECUREC_ERROR;
472     }
473     parcel.EightByteAlign();
474 
475     // commits vector Serialization
476     errCode = parcel.WriteMultiVerCommits(commits);
477     if (errCode != E_OK) {
478         return -E_SECUREC_ERROR;
479     }
480     errCode = parcel.WriteVector<uint64_t>(packet->GetReserved());
481     if (errCode != E_OK) {
482         return -E_SECUREC_ERROR;
483     }
484     parcel.EightByteAlign();
485     return errCode;
486 }
487 
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)488 int CommitHistorySync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
489 {
490     std::vector<MultiVerCommitNode> commits;
491     uint32_t packLen = 0;
492     Parcel parcel(const_cast<uint8_t *>(buffer), length);
493     int32_t pktErrCode;
494     uint32_t version;
495     std::vector<uint64_t> reserved;
496 
497     // errCode DeSerialization
498     packLen += parcel.ReadInt(pktErrCode);
499     packLen += parcel.ReadUInt32(version);
500     parcel.EightByteAlign();
501     packLen = Parcel::GetEightByteAlign(packLen);
502     // commits vector DeSerialization
503     packLen += parcel.ReadMultiVerCommits(commits);
504     packLen += parcel.ReadVector<uint64_t>(reserved);
505     packLen = Parcel::GetEightByteAlign(packLen);
506     if (packLen != length || parcel.IsError()) {
507         LOGE("CommitHistorySync::AckPacketDeSerialization : packet len error, input len = %u, cal len = %u",
508             length, packLen);
509         return -E_INVALID_ARGS;
510     }
511     CommitHistorySyncAckPacket *packet = new (std::nothrow) CommitHistorySyncAckPacket();
512     if (packet == nullptr) {
513         LOGE("CommitHistorySync::AckPacketDeSerialization : new packet error");
514         return -E_OUT_OF_MEMORY;
515     }
516     packet->SetData(commits);
517     packet->SetErrorCode(pktErrCode);
518     packet->SetVersion(version);
519     packet->SetReserved(reserved);
520     int errCode = inMsg->SetExternalObject<>(packet);
521     if (errCode != E_OK) {
522         delete packet;
523         packet = nullptr;
524     }
525     return errCode;
526 }
527 
IsPacketValid(const Message * inMsg,uint16_t messageType)528 bool CommitHistorySync::IsPacketValid(const Message *inMsg, uint16_t messageType)
529 {
530     if ((inMsg == nullptr) || (inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE)) {
531         return false;
532     }
533     if (messageType != inMsg->GetMessageType()) {
534         return false;
535     }
536     return true;
537 }
538 
Send(const DeviceID & deviceId,const Message * inMsg)539 int CommitHistorySync::Send(const DeviceID &deviceId, const Message *inMsg)
540 {
541     SendConfig conf = {false, false, SEND_TIME_OUT, {}};
542     int errCode = communicateHandle_->SendMessage(deviceId, inMsg, conf);
543     if (errCode != E_OK) {
544         LOGE("CommitHistorySync::Send ERR! err = %d", errCode);
545     }
546     return errCode;
547 }
548 
GetDeviceLatestCommit(std::map<std::string,MultiVerCommitNode> & commitMap)549 int CommitHistorySync::GetDeviceLatestCommit(std::map<std::string, MultiVerCommitNode> &commitMap)
550 {
551     std::map<std::string, MultiVerCommitNode> readCommitMap;
552     int errCode = storagePtr_->GetDeviceLatestCommit(readCommitMap);
553     if (errCode != E_OK) {
554         return errCode;
555     }
556 
557     std::string localDevice;
558     errCode = GetLocalDeviceInfo(localDevice);
559     LOGD("GetLocalDeviceInfo : %s{private}, errCode = %d", localDevice.c_str(), errCode);
560     if (errCode != E_OK) {
561         return errCode;
562     }
563 
564     for (auto &item : readCommitMap) {
565         errCode = storagePtr_->TransferSyncCommitDevInfo(item.second, localDevice, false);
566         if (errCode != E_OK) {
567             break;
568         }
569         commitMap.insert(std::make_pair(item.second.deviceInfo, item.second));
570     }
571 
572     return errCode;
573 }
574 
GetCommitTree(const std::map<std::string,MultiVerCommitNode> & commitMap,std::vector<MultiVerCommitNode> & commits)575 int CommitHistorySync::GetCommitTree(const std::map<std::string, MultiVerCommitNode> &commitMap,
576     std::vector<MultiVerCommitNode> &commits)
577 {
578     std::map<std::string, MultiVerCommitNode> newCommitMap;
579 
580     std::string localDevice;
581     int errCode = GetLocalDeviceInfo(localDevice);
582     LOGD("GetLocalDeviceInfo : %s{private}, errCode = %d", localDevice.c_str(), errCode);
583     if (errCode != E_OK) {
584         return errCode;
585     }
586 
587     for (const auto &item : commitMap) {
588         MultiVerCommitNode commitNode = item.second;
589         errCode = storagePtr_->TransferSyncCommitDevInfo(commitNode, localDevice, true);
590         if (errCode != E_OK) {
591             return errCode;
592         }
593         newCommitMap.insert(std::make_pair(commitNode.deviceInfo, commitNode));
594     }
595 
596     errCode = storagePtr_->GetCommitTree(newCommitMap, commits);
597     if (errCode != E_OK) {
598         return errCode;
599     }
600     for (auto &commit : commits) {
601         errCode = storagePtr_->TransferSyncCommitDevInfo(commit, localDevice, false);
602         if (errCode != E_OK) {
603             break;
604         }
605     }
606     return errCode;
607 }
608 
SendRequestPacket(const MultiVerSyncTaskContext * context,std::map<std::string,MultiVerCommitNode> & commitMap)609 int CommitHistorySync::SendRequestPacket(const MultiVerSyncTaskContext *context,
610     std::map<std::string, MultiVerCommitNode> &commitMap)
611 {
612     CommitHistorySyncRequestPacket *packet = new (std::nothrow) CommitHistorySyncRequestPacket();
613     if (packet == nullptr) {
614         LOGE("CommitHistorySync::SendRequestPacket : new packet error");
615         return -E_OUT_OF_MEMORY;
616     }
617     packet->SetCommitMap(commitMap);
618     packet->SetVersion(SOFTWARE_VERSION_CURRENT);
619     Message *message = new (std::nothrow) Message(COMMIT_HISTORY_SYNC_MESSAGE);
620     if (message == nullptr) {
621         LOGE("CommitHistorySync::SendRequestPacket : new message error");
622         delete packet;
623         packet = nullptr;
624         return -E_OUT_OF_MEMORY;
625     }
626     message->SetMessageType(TYPE_REQUEST);
627     message->SetTarget(context->GetDeviceId());
628     int errCode = message->SetExternalObject(packet);
629     if (errCode != E_OK) {
630         delete packet;
631         packet = nullptr;
632         delete message;
633         message = nullptr;
634         LOGE("CommitHistorySync::SendRequestPacket : SetExternalObject failed errCode:%d", errCode);
635         return errCode;
636     }
637     message->SetSessionId(context->GetRequestSessionId());
638     message->SetSequenceId(context->GetSequenceId());
639 
640     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
641     if (performance != nullptr) {
642         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_COMMIT_SEND_REQUEST_TO_ACK_RECV);
643     }
644     errCode = Send(message->GetTarget(), message);
645     if (errCode != E_OK) {
646         LOGE("CommitHistorySync::SendRequestPacket : Send failed errCode:%d", errCode);
647         delete message;
648         message = nullptr;
649     }
650     return errCode;
651 }
652 
SendAckPacket(const MultiVerSyncTaskContext * context,std::vector<MultiVerCommitNode> & commits,int ackCode,const Message * message)653 int CommitHistorySync::SendAckPacket(const MultiVerSyncTaskContext *context,
654     std::vector<MultiVerCommitNode> &commits, int ackCode, const Message *message)
655 {
656     if (message == nullptr) {
657         LOGE("CommitHistorySync::SendAckPacket : message is nullptr");
658         return -E_INVALID_ARGS;
659     }
660     CommitHistorySyncAckPacket *packet = new (std::nothrow) CommitHistorySyncAckPacket();
661     if (packet == nullptr) {
662         LOGE("CommitHistorySync::SendAckPacket : packet is nullptr");
663         return -E_OUT_OF_MEMORY;
664     }
665     Message *ackMessage = new (std::nothrow) Message(COMMIT_HISTORY_SYNC_MESSAGE);
666     if (ackMessage == nullptr) {
667         LOGE("CommitHistorySync::SendAckPacket : new message error");
668         delete packet;
669         packet = nullptr;
670         return -E_OUT_OF_MEMORY;
671     }
672 
673     packet->SetData(commits);
674     packet->SetErrorCode(static_cast<int32_t>(ackCode));
675     packet->SetVersion(SOFTWARE_VERSION_CURRENT);
676     ackMessage->SetMessageType(TYPE_RESPONSE);
677     ackMessage->SetTarget(context->GetDeviceId());
678     int errCode = ackMessage->SetExternalObject(packet);
679     if (errCode != E_OK) {
680         delete packet;
681         packet = nullptr;
682         delete ackMessage;
683         ackMessage = nullptr;
684         LOGE("CommitHistorySync::SendAckPacket : SetExternalObject failed errCode:%d", errCode);
685         return errCode;
686     }
687     ackMessage->SetSequenceId(message->GetSequenceId());
688     ackMessage->SetSessionId(message->GetSessionId());
689     errCode = Send(ackMessage->GetTarget(), ackMessage);
690     if (errCode != E_OK) {
691         LOGE("CommitHistorySync::SendAckPacket : Send failed errCode:%d", errCode);
692         delete ackMessage;
693         ackMessage = nullptr;
694     }
695     return errCode;
696 }
697 
GetLocalDeviceInfo(std::string & deviceInfo)698 int CommitHistorySync::GetLocalDeviceInfo(std::string &deviceInfo)
699 {
700     return communicateHandle_->GetLocalIdentity(deviceInfo);
701 }
702 
RunPermissionCheck(const std::string & deviceId) const703 int CommitHistorySync::RunPermissionCheck(const std::string &deviceId) const
704 {
705     std::string appId = storagePtr_->GetDbProperties().GetStringProp(KvDBProperties::APP_ID, "");
706     std::string userId = storagePtr_->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, "");
707     std::string storeId = storagePtr_->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, "");
708     uint8_t flag = CHECK_FLAG_SEND;
709     PermissionCheckParam param = { userId, appId, storeId, deviceId };
710     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(param, flag);
711     if (errCode != E_OK) {
712         LOGE("[CommitHistorySync] RunPermissionCheck not pass errCode:%d, flag:%d", errCode, flag);
713         return -E_NOT_PERMIT;
714     }
715     return errCode;
716 }
717 }
718 #endif