1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef OMIT_MULTI_VER
17 #include "commit_history_sync.h"
18
19 #include "sync_engine.h"
20 #include "parcel.h"
21 #include "log_print.h"
22 #include "message_transform.h"
23 #include "performance_analysis.h"
24 #include "db_constant.h"
25
26 namespace DistributedDB {
27 // Class CommitHistorySyncRequestPacket
CalculateLen() const28 uint32_t CommitHistorySyncRequestPacket::CalculateLen() const
29 {
30 uint64_t len = Parcel::GetUInt64Len();
31 // commitMap len
32 for (const auto &iter : commitMap_) {
33 len += Parcel::GetStringLen(iter.first);
34 len += Parcel::GetMultiVerCommitLen(iter.second);
35 if (len > INT32_MAX) {
36 return 0;
37 }
38 }
39 len += Parcel::GetUInt32Len(); // version
40 len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
41 len = Parcel::GetEightByteAlign(len);
42 if (len > INT32_MAX) {
43 return 0;
44 }
45 return len;
46 }
47
SetCommitMap(std::map<std::string,MultiVerCommitNode> & inMap)48 void CommitHistorySyncRequestPacket::SetCommitMap(std::map<std::string, MultiVerCommitNode> &inMap)
49 {
50 commitMap_ = std::move(inMap);
51 }
52
GetCommitMap(std::map<std::string,MultiVerCommitNode> & outMap) const53 void CommitHistorySyncRequestPacket::GetCommitMap(std::map<std::string, MultiVerCommitNode> &outMap) const
54 {
55 outMap = commitMap_;
56 }
57
SetVersion(uint32_t version)58 void CommitHistorySyncRequestPacket::SetVersion(uint32_t version)
59 {
60 version_ = version;
61 }
62
GetVersion() const63 uint32_t CommitHistorySyncRequestPacket::GetVersion() const
64 {
65 return version_;
66 }
67
SetReserved(std::vector<uint64_t> & reserved)68 void CommitHistorySyncRequestPacket::SetReserved(std::vector<uint64_t> &reserved)
69 {
70 reserved_ = std::move(reserved);
71 }
72
GetReserved() const73 std::vector<uint64_t> CommitHistorySyncRequestPacket::GetReserved() const
74 {
75 return reserved_;
76 }
77
CalculateLen() const78 uint32_t CommitHistorySyncAckPacket::CalculateLen() const
79 {
80 uint64_t len = Parcel::GetIntLen(); // errCode
81 len += Parcel::GetUInt32Len(); // version
82 len = Parcel::GetEightByteAlign(len);
83
84 // commits vector len
85 len += Parcel::GetMultiVerCommitsLen(commits_);
86 len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
87 len = Parcel::GetEightByteAlign(len);
88 if (len > INT32_MAX) {
89 return 0;
90 }
91 return len;
92 }
93
SetData(std::vector<MultiVerCommitNode> & inData)94 void CommitHistorySyncAckPacket::SetData(std::vector<MultiVerCommitNode> &inData)
95 {
96 commits_ = std::move(inData);
97 }
98
GetData(std::vector<MultiVerCommitNode> & outData) const99 void CommitHistorySyncAckPacket::GetData(std::vector<MultiVerCommitNode> &outData) const
100 {
101 outData = commits_;
102 }
103
SetErrorCode(int32_t errCode)104 void CommitHistorySyncAckPacket::SetErrorCode(int32_t errCode)
105 {
106 errorCode_ = errCode;
107 }
108
GetErrorCode(int32_t & errCode) const109 void CommitHistorySyncAckPacket::GetErrorCode(int32_t &errCode) const
110 {
111 errCode = errorCode_;
112 }
113
SetVersion(uint32_t version)114 void CommitHistorySyncAckPacket::SetVersion(uint32_t version)
115 {
116 version_ = version;
117 }
118
GetVersion() const119 uint32_t CommitHistorySyncAckPacket::GetVersion() const
120 {
121 return version_;
122 }
123
SetReserved(std::vector<uint64_t> & reserved)124 void CommitHistorySyncAckPacket::SetReserved(std::vector<uint64_t> &reserved)
125 {
126 reserved_ = std::move(reserved);
127 }
128
GetReserved() const129 std::vector<uint64_t> CommitHistorySyncAckPacket::GetReserved() const
130 {
131 return reserved_;
132 }
133
134 // Class CommitHistorySync
~CommitHistorySync()135 CommitHistorySync::~CommitHistorySync()
136 {
137 storagePtr_ = nullptr;
138 communicateHandle_ = nullptr;
139 }
140
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)141 int CommitHistorySync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
142 {
143 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
144 return -E_MESSAGE_ID_ERROR;
145 }
146
147 switch (inMsg->GetMessageType()) {
148 case TYPE_REQUEST:
149 return RequestPacketSerialization(buffer, length, inMsg);
150 case TYPE_RESPONSE:
151 return AckPacketSerialization(buffer, length, inMsg);
152 default:
153 return -E_MESSAGE_TYPE_ERROR;
154 }
155 }
156
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)157 int CommitHistorySync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
158 {
159 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
160 return -E_MESSAGE_ID_ERROR;
161 }
162
163 switch (inMsg->GetMessageType()) {
164 case TYPE_REQUEST:
165 return RequestPacketDeSerialization(buffer, length, inMsg);
166 case TYPE_RESPONSE:
167 return AckPacketDeSerialization(buffer, length, inMsg);
168 default:
169 return -E_MESSAGE_TYPE_ERROR;
170 }
171 }
172
CalculateLen(const Message * inMsg)173 uint32_t CommitHistorySync::CalculateLen(const Message *inMsg)
174 {
175 if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
176 return 0;
177 }
178
179 uint32_t len = 0;
180 int errCode = E_OK;
181 switch (inMsg->GetMessageType()) {
182 case TYPE_REQUEST:
183 errCode = RequestPacketCalculateLen(inMsg, len);
184 if (errCode != E_OK) {
185 return 0;
186 }
187 return len;
188 case TYPE_RESPONSE:
189 errCode = AckPacketCalculateLen(inMsg, len);
190 if (errCode != E_OK) {
191 return 0;
192 }
193 return len;
194 default:
195 return 0;
196 }
197 }
198
RegisterTransformFunc()199 int CommitHistorySync::RegisterTransformFunc()
200 {
201 TransformFunc func;
202 func.computeFunc = std::bind(&CommitHistorySync::CalculateLen, std::placeholders::_1);
203 func.serializeFunc = std::bind(&CommitHistorySync::Serialization, std::placeholders::_1,
204 std::placeholders::_2, std::placeholders::_3);
205 func.deserializeFunc = std::bind(&CommitHistorySync::DeSerialization, std::placeholders::_1,
206 std::placeholders::_2, std::placeholders::_3);
207 return MessageTransform::RegTransformFunction(COMMIT_HISTORY_SYNC_MESSAGE, func);
208 }
209
Initialize(MultiVerKvDBSyncInterface * storagePtr,ICommunicator * communicateHandle)210 int CommitHistorySync::Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle)
211 {
212 if ((storagePtr == nullptr) || (communicateHandle == nullptr)) {
213 return -E_INVALID_ARGS;
214 }
215 storagePtr_ = storagePtr;
216 communicateHandle_ = communicateHandle;
217 return E_OK;
218 }
219
TimeOutCallback(MultiVerSyncTaskContext * context,const Message * message) const220 void CommitHistorySync::TimeOutCallback(MultiVerSyncTaskContext *context, const Message *message) const
221 {
222 (void)context;
223 (void)message;
224 return;
225 }
226
SyncStart(MultiVerSyncTaskContext * context)227 int CommitHistorySync::SyncStart(MultiVerSyncTaskContext *context)
228 {
229 if (context == nullptr) {
230 return -E_INVALID_ARGS;
231 }
232 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
233 if (performance != nullptr) {
234 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_DEVICE_LATEST_COMMIT);
235 }
236 std::map<std::string, MultiVerCommitNode> commitMap;
237 int errCode = GetDeviceLatestCommit(commitMap);
238 if (performance != nullptr) {
239 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_DEVICE_LATEST_COMMIT);
240 }
241 if ((errCode != E_OK) && (errCode != -E_NOT_FOUND)) {
242 return errCode;
243 }
244
245 LOGD("CommitHistorySync::commitMap size = %zu, dst=%s{private}", commitMap.size(), context->GetDeviceId().c_str());
246 return SendRequestPacket(context, commitMap);
247 }
248
RequestRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)249 int CommitHistorySync::RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
250 {
251 if (!IsPacketValid(message, TYPE_REQUEST) || context == nullptr) {
252 return -E_INVALID_ARGS;
253 }
254 const CommitHistorySyncRequestPacket *packet = message->GetObject<CommitHistorySyncRequestPacket>();
255 if (packet == nullptr) {
256 return -E_INVALID_ARGS;
257 }
258 std::vector<MultiVerCommitNode> commits;
259 int errCode = RunPermissionCheck(context->GetDeviceId());
260 if (errCode == -E_NOT_PERMIT) {
261 LOGE("CommitHistorySync::RequestRecvCallback RunPermissionCheck not pass");
262 SendAckPacket(context, commits, errCode, message);
263 return errCode;
264 }
265 std::map<std::string, MultiVerCommitNode> commitMap;
266 packet->GetCommitMap(commitMap);
267 uint32_t ver = packet->GetVersion();
268 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
269 if (performance != nullptr) {
270 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_COMMIT_TREE);
271 }
272 errCode = GetCommitTree(commitMap, commits);
273 if (performance != nullptr) {
274 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_COMMIT_TREE);
275 }
276 if (errCode != E_OK) {
277 LOGE("CommitHistorySync::RequestRecvCallback : GetCommitTree ERR, errno = %d", errCode);
278 }
279
280 errCode = SendAckPacket(context, commits, errCode, message);
281 LOGD("CommitHistorySync::RequestRecvCallback:SendAckPacket, errno = %d, dst=%s{private}, ver = %" PRIu32
282 ", myversion = %" PRIu32, errCode, context->GetDeviceId().c_str(), ver, SOFTWARE_VERSION_CURRENT);
283 if (errCode == E_OK) {
284 if (commitMap.empty()) {
285 LOGD("[CommitHistorySync][RequestRecvCallback] no need to start SyncResponse");
286 return -E_NOT_FOUND;
287 }
288 }
289 return errCode;
290 }
291
AckRecvCallback(MultiVerSyncTaskContext * context,const Message * message)292 int CommitHistorySync::AckRecvCallback(MultiVerSyncTaskContext *context, const Message *message)
293 {
294 if (!IsPacketValid(message, TYPE_RESPONSE) || (context == nullptr)) {
295 return -E_INVALID_ARGS;
296 }
297
298 std::vector<MultiVerCommitNode> commits;
299 int32_t errCode;
300
301 const CommitHistorySyncAckPacket *packet = message->GetObject<CommitHistorySyncAckPacket>();
302 if (packet == nullptr) {
303 return -E_INVALID_ARGS;
304 }
305 packet->GetErrorCode(errCode);
306 if (errCode == -E_NOT_PERMIT) {
307 LOGE("CommitHistorySync::AckRecvCallback RunPermissionCheck not pass");
308 return errCode;
309 }
310 packet->GetData(commits);
311 uint32_t ver = packet->GetVersion();
312 context->SetCommits(commits);
313 context->SetCommitIndex(0);
314 context->SetCommitsSize(static_cast<int>(commits.size()));
315 LOGD("CommitHistorySync::AckRecvCallback end, CommitsSize = %zu, dst = %s{private}, ver = %d, myversion = %u",
316 commits.size(), context->GetDeviceId().c_str(), ver, SOFTWARE_VERSION_CURRENT);
317 return E_OK;
318 }
319
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)320 int CommitHistorySync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
321 {
322 if (inMsg == nullptr) {
323 return -E_INVALID_ARGS;
324 }
325 const CommitHistorySyncRequestPacket *packet = inMsg->GetObject<CommitHistorySyncRequestPacket>();
326 if (packet == nullptr) {
327 return -E_INVALID_ARGS;
328 }
329
330 if ((inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE) || (inMsg->GetMessageType() != TYPE_REQUEST)) {
331 return -E_INVALID_ARGS;
332 }
333 len = packet->CalculateLen();
334 return E_OK;
335 }
336
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)337 int CommitHistorySync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
338 {
339 if ((buffer == nullptr) || (inMsg == nullptr)) {
340 return -E_INVALID_ARGS;
341 }
342 const CommitHistorySyncRequestPacket *packet = inMsg->GetObject<CommitHistorySyncRequestPacket>();
343 if ((packet == nullptr) || (length != packet->CalculateLen())) {
344 return -E_INVALID_ARGS;
345 }
346
347 Parcel parcel(buffer, length);
348 std::map<std::string, MultiVerCommitNode> commitMap;
349 packet->GetCommitMap(commitMap);
350
351 int errCode = parcel.WriteUInt64(commitMap.size());
352 if (errCode != E_OK) {
353 return -E_SECUREC_ERROR;
354 }
355 // commitMap Serialization
356 for (auto &iter : commitMap) {
357 errCode = parcel.WriteString(iter.first);
358 if (errCode != E_OK) {
359 return -E_SECUREC_ERROR;
360 }
361 errCode = parcel.WriteMultiVerCommit(iter.second);
362 if (errCode != E_OK) {
363 return -E_SECUREC_ERROR;
364 }
365 }
366 errCode = parcel.WriteUInt32(packet->GetVersion());
367 if (errCode != E_OK) {
368 return -E_SECUREC_ERROR;
369 }
370 errCode = parcel.WriteVector<uint64_t>(packet->GetReserved());
371 if (errCode != E_OK) {
372 return -E_SECUREC_ERROR;
373 }
374 parcel.EightByteAlign();
375 if (parcel.IsError()) { // almost success
376 return -E_INVALID_ARGS;
377 }
378 return errCode;
379 }
380
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)381 int CommitHistorySync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
382 {
383 if ((buffer == nullptr) || (inMsg == nullptr)) {
384 return -E_INVALID_ARGS;
385 }
386
387 uint64_t packLen = 0;
388 uint64_t len = 0;
389 Parcel parcel(const_cast<uint8_t *>(buffer), length);
390 packLen += parcel.ReadUInt64(len);
391 if (len > DBConstant::MAX_DEVICES_SIZE) {
392 LOGE("CommitHistorySync::RequestPacketDeSerialization : commitMap size too large = %" PRIu64, len);
393 return -E_INVALID_ARGS;
394 }
395 // commitMap DeSerialization
396 std::map<std::string, MultiVerCommitNode> commitMap;
397 while (len > 0) {
398 std::string key;
399 MultiVerCommitNode val;
400 packLen += parcel.ReadString(key);
401 packLen += parcel.ReadMultiVerCommit(val);
402 commitMap[key] = val;
403 len--;
404 if (parcel.IsError()) {
405 return -E_INVALID_ARGS;
406 }
407 }
408 uint32_t version;
409 std::vector<uint64_t> reserved;
410 packLen += parcel.ReadUInt32(version);
411 packLen += parcel.ReadVector<uint64_t>(reserved);
412 packLen = Parcel::GetEightByteAlign(packLen);
413 if (packLen != length || parcel.IsError()) {
414 LOGE("CommitHistorySync::RequestPacketDeSerialization : length error, input len = %" PRIu32
415 ", cac len = %" PRIu64, length, packLen);
416 return -E_INVALID_ARGS;
417 }
418 CommitHistorySyncRequestPacket *packet = new (std::nothrow) CommitHistorySyncRequestPacket();
419 if (packet == nullptr) {
420 LOGE("CommitHistorySync::RequestPacketDeSerialization : new packet error");
421 return -E_OUT_OF_MEMORY;
422 }
423 packet->SetCommitMap(commitMap);
424 packet->SetVersion(version);
425 packet->SetReserved(reserved);
426 int errCode = inMsg->SetExternalObject<>(packet);
427 if (errCode != E_OK) {
428 delete packet;
429 packet = nullptr;
430 }
431 return errCode;
432 }
433
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)434 int CommitHistorySync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
435 {
436 if (inMsg == nullptr) {
437 return -E_INVALID_ARGS;
438 }
439 const CommitHistorySyncAckPacket *packet = inMsg->GetObject<CommitHistorySyncAckPacket>();
440 if (packet == nullptr) {
441 return -E_INVALID_ARGS;
442 }
443
444 if ((inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE) || (inMsg->GetMessageType() != TYPE_RESPONSE)) {
445 return -E_INVALID_ARGS;
446 }
447 len = packet->CalculateLen();
448 return E_OK;
449 }
450
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)451 int CommitHistorySync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
452 {
453 if ((buffer == nullptr) || (inMsg == nullptr)) {
454 return -E_INVALID_ARGS;
455 }
456 const CommitHistorySyncAckPacket *packet = inMsg->GetObject<CommitHistorySyncAckPacket>();
457 if ((packet == nullptr) || (length != packet->CalculateLen())) {
458 return -E_INVALID_ARGS;
459 }
460
461 Parcel parcel(buffer, length);
462 int32_t ackErrCode;
463 std::vector<MultiVerCommitNode> commits;
464
465 packet->GetData(commits);
466 packet->GetErrorCode(ackErrCode);
467 // errCode Serialization
468 parcel.WriteInt(ackErrCode);
469 parcel.WriteUInt32(packet->GetVersion());
470 parcel.EightByteAlign();
471 if (parcel.IsError()) { // almost success
472 return -E_INVALID_ARGS;
473 }
474 // commits vector Serialization
475 int errCode = parcel.WriteMultiVerCommits(commits);
476 if (errCode != E_OK) {
477 return -E_SECUREC_ERROR;
478 }
479 errCode = parcel.WriteVector<uint64_t>(packet->GetReserved());
480 if (errCode != E_OK) {
481 return -E_SECUREC_ERROR;
482 }
483 parcel.EightByteAlign();
484 if (parcel.IsError()) { // almost success
485 return -E_INVALID_ARGS;
486 }
487 return errCode;
488 }
489
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)490 int CommitHistorySync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
491 {
492 std::vector<MultiVerCommitNode> commits;
493 uint32_t packLen = 0;
494 Parcel parcel(const_cast<uint8_t *>(buffer), length);
495 int32_t pktErrCode;
496 uint32_t version;
497 std::vector<uint64_t> reserved;
498
499 // errCode DeSerialization
500 packLen += parcel.ReadInt(pktErrCode);
501 packLen += parcel.ReadUInt32(version);
502 parcel.EightByteAlign();
503 if (parcel.IsError()) {
504 return -E_PARSE_FAIL;
505 }
506 packLen = Parcel::GetEightByteAlign(packLen);
507 // commits vector DeSerialization
508 packLen += parcel.ReadMultiVerCommits(commits);
509 packLen += parcel.ReadVector<uint64_t>(reserved);
510 packLen = Parcel::GetEightByteAlign(packLen);
511 if (packLen != length || parcel.IsError()) {
512 LOGE("CommitHistorySync::AckPacketDeSerialization : packet len error, input len = %u, cal len = %u",
513 length, packLen);
514 return -E_INVALID_ARGS;
515 }
516 CommitHistorySyncAckPacket *packet = new (std::nothrow) CommitHistorySyncAckPacket();
517 if (packet == nullptr) {
518 LOGE("CommitHistorySync::AckPacketDeSerialization : new packet error");
519 return -E_OUT_OF_MEMORY;
520 }
521 packet->SetData(commits);
522 packet->SetErrorCode(pktErrCode);
523 packet->SetVersion(version);
524 packet->SetReserved(reserved);
525 int errCode = inMsg->SetExternalObject<>(packet);
526 if (errCode != E_OK) {
527 delete packet;
528 packet = nullptr;
529 }
530 return errCode;
531 }
532
IsPacketValid(const Message * inMsg,uint16_t messageType)533 bool CommitHistorySync::IsPacketValid(const Message *inMsg, uint16_t messageType)
534 {
535 if ((inMsg == nullptr) || (inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE)) {
536 return false;
537 }
538 if (messageType != inMsg->GetMessageType()) {
539 return false;
540 }
541 return true;
542 }
543
Send(const DeviceID & deviceId,const Message * inMsg)544 int CommitHistorySync::Send(const DeviceID &deviceId, const Message *inMsg)
545 {
546 SendConfig conf = {false, false, SEND_TIME_OUT, {}};
547 int errCode = communicateHandle_->SendMessage(deviceId, inMsg, conf);
548 if (errCode != E_OK) {
549 LOGE("CommitHistorySync::Send ERR! err = %d", errCode);
550 }
551 return errCode;
552 }
553
GetDeviceLatestCommit(std::map<std::string,MultiVerCommitNode> & commitMap)554 int CommitHistorySync::GetDeviceLatestCommit(std::map<std::string, MultiVerCommitNode> &commitMap)
555 {
556 std::map<std::string, MultiVerCommitNode> readCommitMap;
557 int errCode = storagePtr_->GetDeviceLatestCommit(readCommitMap);
558 if (errCode != E_OK) {
559 return errCode;
560 }
561
562 std::string localDevice;
563 errCode = GetLocalDeviceInfo(localDevice);
564 LOGD("GetLocalDeviceInfo : %s{private}, errCode = %d", localDevice.c_str(), errCode);
565 if (errCode != E_OK) {
566 return errCode;
567 }
568
569 for (auto &item : readCommitMap) {
570 errCode = storagePtr_->TransferSyncCommitDevInfo(item.second, localDevice, false);
571 if (errCode != E_OK) {
572 break;
573 }
574 commitMap.insert(std::make_pair(item.second.deviceInfo, item.second));
575 }
576
577 return errCode;
578 }
579
GetCommitTree(const std::map<std::string,MultiVerCommitNode> & commitMap,std::vector<MultiVerCommitNode> & commits)580 int CommitHistorySync::GetCommitTree(const std::map<std::string, MultiVerCommitNode> &commitMap,
581 std::vector<MultiVerCommitNode> &commits)
582 {
583 std::map<std::string, MultiVerCommitNode> newCommitMap;
584
585 std::string localDevice;
586 int errCode = GetLocalDeviceInfo(localDevice);
587 LOGD("GetLocalDeviceInfo : %s{private}, errCode = %d", localDevice.c_str(), errCode);
588 if (errCode != E_OK) {
589 return errCode;
590 }
591
592 for (const auto &item : commitMap) {
593 MultiVerCommitNode commitNode = item.second;
594 errCode = storagePtr_->TransferSyncCommitDevInfo(commitNode, localDevice, true);
595 if (errCode != E_OK) {
596 return errCode;
597 }
598 newCommitMap.insert(std::make_pair(commitNode.deviceInfo, commitNode));
599 }
600
601 errCode = storagePtr_->GetCommitTree(newCommitMap, commits);
602 if (errCode != E_OK) {
603 return errCode;
604 }
605 for (auto &commit : commits) {
606 errCode = storagePtr_->TransferSyncCommitDevInfo(commit, localDevice, false);
607 if (errCode != E_OK) {
608 break;
609 }
610 }
611 return errCode;
612 }
613
SendRequestPacket(const MultiVerSyncTaskContext * context,std::map<std::string,MultiVerCommitNode> & commitMap)614 int CommitHistorySync::SendRequestPacket(const MultiVerSyncTaskContext *context,
615 std::map<std::string, MultiVerCommitNode> &commitMap)
616 {
617 CommitHistorySyncRequestPacket *packet = new (std::nothrow) CommitHistorySyncRequestPacket();
618 if (packet == nullptr) {
619 LOGE("CommitHistorySync::SendRequestPacket : new packet error");
620 return -E_OUT_OF_MEMORY;
621 }
622 packet->SetCommitMap(commitMap);
623 packet->SetVersion(SOFTWARE_VERSION_CURRENT);
624 Message *message = new (std::nothrow) Message(COMMIT_HISTORY_SYNC_MESSAGE);
625 if (message == nullptr) {
626 LOGE("CommitHistorySync::SendRequestPacket : new message error");
627 delete packet;
628 packet = nullptr;
629 return -E_OUT_OF_MEMORY;
630 }
631 message->SetMessageType(TYPE_REQUEST);
632 message->SetTarget(context->GetDeviceId());
633 int errCode = message->SetExternalObject(packet);
634 if (errCode != E_OK) {
635 delete packet;
636 packet = nullptr;
637 delete message;
638 message = nullptr;
639 LOGE("CommitHistorySync::SendRequestPacket : SetExternalObject failed errCode:%d", errCode);
640 return errCode;
641 }
642 message->SetSessionId(context->GetRequestSessionId());
643 message->SetSequenceId(context->GetSequenceId());
644
645 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
646 if (performance != nullptr) {
647 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_COMMIT_SEND_REQUEST_TO_ACK_RECV);
648 }
649 errCode = Send(message->GetTarget(), message);
650 if (errCode != E_OK) {
651 LOGE("CommitHistorySync::SendRequestPacket : Send failed errCode:%d", errCode);
652 delete message;
653 message = nullptr;
654 }
655 return errCode;
656 }
657
SendAckPacket(const MultiVerSyncTaskContext * context,std::vector<MultiVerCommitNode> & commits,int ackCode,const Message * message)658 int CommitHistorySync::SendAckPacket(const MultiVerSyncTaskContext *context,
659 std::vector<MultiVerCommitNode> &commits, int ackCode, const Message *message)
660 {
661 if (message == nullptr) {
662 LOGE("CommitHistorySync::SendAckPacket : message is nullptr");
663 return -E_INVALID_ARGS;
664 }
665 CommitHistorySyncAckPacket *packet = new (std::nothrow) CommitHistorySyncAckPacket();
666 if (packet == nullptr) {
667 LOGE("CommitHistorySync::SendAckPacket : packet is nullptr");
668 return -E_OUT_OF_MEMORY;
669 }
670 Message *ackMessage = new (std::nothrow) Message(COMMIT_HISTORY_SYNC_MESSAGE);
671 if (ackMessage == nullptr) {
672 LOGE("CommitHistorySync::SendAckPacket : new message error");
673 delete packet;
674 packet = nullptr;
675 return -E_OUT_OF_MEMORY;
676 }
677
678 packet->SetData(commits);
679 packet->SetErrorCode(static_cast<int32_t>(ackCode));
680 packet->SetVersion(SOFTWARE_VERSION_CURRENT);
681 ackMessage->SetMessageType(TYPE_RESPONSE);
682 ackMessage->SetTarget(context->GetDeviceId());
683 int errCode = ackMessage->SetExternalObject(packet);
684 if (errCode != E_OK) {
685 delete packet;
686 packet = nullptr;
687 delete ackMessage;
688 ackMessage = nullptr;
689 LOGE("CommitHistorySync::SendAckPacket : SetExternalObject failed errCode:%d", errCode);
690 return errCode;
691 }
692 ackMessage->SetSequenceId(message->GetSequenceId());
693 ackMessage->SetSessionId(message->GetSessionId());
694 errCode = Send(ackMessage->GetTarget(), ackMessage);
695 if (errCode != E_OK) {
696 LOGE("CommitHistorySync::SendAckPacket : Send failed errCode:%d", errCode);
697 delete ackMessage;
698 ackMessage = nullptr;
699 }
700 return errCode;
701 }
702
GetLocalDeviceInfo(std::string & deviceInfo)703 int CommitHistorySync::GetLocalDeviceInfo(std::string &deviceInfo)
704 {
705 return communicateHandle_->GetLocalIdentity(deviceInfo);
706 }
707
RunPermissionCheck(const std::string & deviceId) const708 int CommitHistorySync::RunPermissionCheck(const std::string &deviceId) const
709 {
710 std::string appId = storagePtr_->GetDbProperties().GetStringProp(KvDBProperties::APP_ID, "");
711 std::string userId = storagePtr_->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, "");
712 std::string storeId = storagePtr_->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, "");
713 uint8_t flag = CHECK_FLAG_SEND;
714 PermissionCheckParam param = { userId, appId, storeId, deviceId };
715 int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(param, flag);
716 if (errCode != E_OK) {
717 LOGE("[CommitHistorySync] RunPermissionCheck not pass errCode:%d, flag:%d", errCode, flag);
718 return -E_NOT_PERMIT;
719 }
720 return errCode;
721 }
722 }
723 #endif