• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OMIT_MULTI_VER
17 #include "value_slice_sync.h"
18 
19 #include "db_constant.h"
20 #include "log_print.h"
21 #include "message_transform.h"
22 #include "parcel.h"
23 #include "performance_analysis.h"
24 #include "sync_types.h"
25 
26 namespace DistributedDB {
27 const int ValueSliceSync::MAX_VALUE_NODE_SIZE = 100000;
28 
29 // Class ValueSliceHashPacket
CalculateLen() const30 uint32_t ValueSliceHashPacket::CalculateLen() const
31 {
32     uint64_t len = Parcel::GetIntLen();
33     len = Parcel::GetEightByteAlign(len);
34     len += Parcel::GetVectorCharLen(valueSliceHash_);
35     if (len > INT32_MAX) {
36         return 0;
37     }
38     return len;
39 }
40 
SetValueSliceHash(ValueSliceHash & hash)41 void ValueSliceHashPacket::SetValueSliceHash(ValueSliceHash &hash)
42 {
43     valueSliceHash_ = std::move(hash);
44 }
45 
GetValueSliceHash(ValueSliceHash & hash) const46 void ValueSliceHashPacket::GetValueSliceHash(ValueSliceHash &hash) const
47 {
48     hash = valueSliceHash_;
49 }
50 
SetErrCode(int32_t errCode)51 void ValueSliceHashPacket::SetErrCode(int32_t errCode)
52 {
53     errCode_ = errCode;
54 }
55 
GetErrCode() const56 int32_t ValueSliceHashPacket::GetErrCode() const
57 {
58     return errCode_;
59 }
60 
61 // Class ValueSlicePacket
CalculateLen() const62 uint32_t ValueSlicePacket::CalculateLen() const
63 {
64     uint64_t len = Parcel::GetIntLen();
65     len = Parcel::GetEightByteAlign(len);
66     len += Parcel::GetVectorCharLen(valueSlice_);
67     if (len > INT32_MAX) {
68         return 0;
69     }
70     return len;
71 }
72 
SetData(const ValueSlice & data)73 void ValueSlicePacket::SetData(const ValueSlice &data)
74 {
75     valueSlice_ = std::move(data);
76 }
77 
GetData(ValueSlice & data) const78 void ValueSlicePacket::GetData(ValueSlice &data) const
79 {
80     data = valueSlice_;
81 }
82 
SetErrorCode(int32_t errCode)83 void ValueSlicePacket::SetErrorCode(int32_t errCode)
84 {
85     errorCode_ = errCode;
86 }
87 
GetErrorCode(int32_t & errCode) const88 void ValueSlicePacket::GetErrorCode(int32_t &errCode) const
89 {
90     errCode = errorCode_;
91 }
92 
93 // Class ValueSliceSync
~ValueSliceSync()94 ValueSliceSync::~ValueSliceSync()
95 {
96     storagePtr_ = nullptr;
97     communicateHandle_ = nullptr;
98 }
99 
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)100 int ValueSliceSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
101 {
102     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
103         return -E_INVALID_ARGS;
104     }
105 
106     switch (inMsg->GetMessageType()) {
107         case TYPE_REQUEST:
108             return RequestPacketSerialization(buffer, length, inMsg);
109         case TYPE_RESPONSE:
110             return AckPacketSerialization(buffer, length, inMsg);
111         default:
112             return -E_MESSAGE_TYPE_ERROR;
113     }
114 }
115 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)116 int ValueSliceSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
117 {
118     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
119         return -E_INVALID_ARGS;
120     }
121 
122     switch (inMsg->GetMessageType()) {
123         case TYPE_REQUEST:
124             return RequestPacketDeSerialization(buffer, length, inMsg);
125         case TYPE_RESPONSE:
126             return AckPacketDeSerialization(buffer, length, inMsg);
127         default:
128             return -E_MESSAGE_TYPE_ERROR;
129     }
130 }
131 
CalculateLen(const Message * inMsg)132 uint32_t ValueSliceSync::CalculateLen(const Message *inMsg)
133 {
134     if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
135         return 0;
136     }
137 
138     uint32_t len = 0;
139     int errCode = E_OK;
140     switch (inMsg->GetMessageType()) {
141         case TYPE_REQUEST:
142             errCode = RequestPacketCalculateLen(inMsg, len);
143             break;
144         case TYPE_RESPONSE:
145             errCode = AckPacketCalculateLen(inMsg, len);
146             break;
147         default:
148             return 0;
149     }
150     if (errCode != E_OK) {
151         return 0;
152     }
153     return len;
154 }
155 
RegisterTransformFunc()156 int ValueSliceSync::RegisterTransformFunc()
157 {
158     TransformFunc func;
159     func.computeFunc = std::bind(&ValueSliceSync::CalculateLen, std::placeholders::_1);
160     func.serializeFunc = std::bind(&ValueSliceSync::Serialization, std::placeholders::_1,
161                                    std::placeholders::_2, std::placeholders::_3);
162     func.deserializeFunc = std::bind(&ValueSliceSync::DeSerialization, std::placeholders::_1,
163                                      std::placeholders::_2, std::placeholders::_3);
164     return MessageTransform::RegTransformFunction(VALUE_SLICE_SYNC_MESSAGE, func);
165 }
166 
Initialize(MultiVerKvDBSyncInterface * storagePtr,ICommunicator * communicateHandle)167 int ValueSliceSync::Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle)
168 {
169     if ((storagePtr == nullptr) || (communicateHandle == nullptr)) {
170         return -E_INVALID_ARGS;
171     }
172     storagePtr_ = storagePtr;
173     communicateHandle_ = communicateHandle;
174     return E_OK;
175 }
176 
SyncStart(MultiVerSyncTaskContext * context)177 int ValueSliceSync::SyncStart(MultiVerSyncTaskContext *context)
178 {
179     if (context == nullptr) {
180         return -E_INVALID_ARGS;
181     }
182     int entriesIndex = context->GetEntriesIndex();
183     int entriesSize = context->GetEntriesSize();
184     if (entriesSize > DBConstant::MAX_ENTRIES_SIZE) {
185         LOGE("ValueSliceSync::entriesSize too large %d", entriesSize);
186         return -E_INVALID_ARGS;
187     }
188     while (entriesIndex < entriesSize) {
189         PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
190         if (performance != nullptr) {
191             performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_VALUE_SLICE_NODE);
192         }
193         ValueSliceHash valueSliceHashNode;
194         int errCode = GetValidValueSliceHashNode(context, valueSliceHashNode);
195         if (performance != nullptr) {
196             performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_VALUE_SLICE_NODE);
197         }
198         LOGD("ValueSliceSync::SyncStart begin errCode = %d", errCode);
199         if (errCode == E_OK) {
200             errCode = SendRequestPacket(context, valueSliceHashNode);
201             LOGD("ValueSliceSync::SyncStart send request packet dst=%s{private}, errCode = %d",
202                 context->GetDeviceId().c_str(), errCode);
203             return errCode;
204         }
205         // move to next entry
206         MultiVerKvEntry *entry = nullptr;
207         std::vector<ValueSliceHash> valueHashes;
208         entriesIndex++;
209         if (entriesIndex < entriesSize) {
210             LOGD("ValueSliceSync::SyncStart begin entriesIndex = %d, entriesSize = %d", entriesIndex, entriesSize);
211             context->SetEntriesIndex(entriesIndex);
212             context->GetEntry(entriesIndex, entry);
213             errCode = entry->GetValueHash(valueHashes);
214             if (errCode != E_OK) {
215                 LOGE("ValueSliceSync::entry->GetValueHash %d", errCode);
216                 return errCode;
217             }
218             context->SetValueSliceHashNodes(valueHashes);
219             context->SetValueSlicesIndex(0);
220             context->SetValueSlicesSize(static_cast<int>(valueHashes.size()));
221         } else {
222             // all entries are received, move to next commit
223             return -E_NOT_FOUND;
224         }
225     }
226     return -E_NOT_FOUND;
227 }
228 
RequestRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)229 int ValueSliceSync::RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
230 {
231     if (!IsPacketValid(message, TYPE_REQUEST) || context == nullptr) {
232         return -E_INVALID_ARGS;
233     }
234 
235     const ValueSliceHashPacket *packet = message->GetObject<ValueSliceHashPacket>();
236     if (packet == nullptr) {
237         return -E_INVALID_ARGS;
238     }
239     ValueSliceHash valueSliceHashNode;
240     packet->GetValueSliceHash(valueSliceHashNode);
241     if ((packet->GetErrCode() == -E_LAST_SYNC_FRAME) && valueSliceHashNode.empty()) {
242         return -E_LAST_SYNC_FRAME;
243     }
244     ValueSlice valueSlice;
245     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
246     if (performance != nullptr) {
247         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_READ_VALUE_SLICE);
248     }
249     int errCode = GetValueSlice(valueSliceHashNode, valueSlice);
250     if (performance != nullptr) {
251         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_READ_VALUE_SLICE);
252     }
253     if (errCode != E_OK) {
254         LOGE("ValueSliceSync::RequestRecvCallback : GetValueSlice ERR, errno = %d", errCode);
255     }
256     errCode = SendAckPacket(context, valueSlice, errCode, message);
257     LOGD("ValueSliceSync::RequestRecvCallback : SendAckPacket, errno = %d, dst = %s{private}", errCode,
258          context->GetDeviceId().c_str());
259     if (packet->GetErrCode() == -E_LAST_SYNC_FRAME) {
260         return -E_LAST_SYNC_FRAME;
261     }
262     return errCode;
263 }
264 
AckRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)265 int ValueSliceSync::AckRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
266 {
267     if (!IsPacketValid(message, TYPE_RESPONSE) || (context == nullptr)) {
268         return -E_INVALID_ARGS;
269     }
270 
271     const ValueSlicePacket *packet = message->GetObject<ValueSlicePacket>();
272     if (packet == nullptr) {
273         return -E_INVALID_ARGS;
274     }
275     int errCode = E_OK;
276     packet->GetErrorCode(errCode);
277     ValueSlice valueSlice;
278     packet->GetData(valueSlice);
279     if (errCode != E_OK) {
280         return errCode;
281     }
282     int index = context->GetValueSlicesIndex();
283     ValueSliceHash hashValue;
284     context->GetValueSliceHashNode(index, hashValue);
285     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
286     if (performance != nullptr) {
287         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_SAVE_VALUE_SLICE);
288     }
289     errCode = PutValueSlice(hashValue, valueSlice);
290     if (performance != nullptr) {
291         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_SAVE_VALUE_SLICE);
292     }
293     LOGD("ValueSliceSync::AckRecvCallback PutValueSlice finished, src=%s{private}, errCode = %d",
294         context->GetDeviceId().c_str(), errCode);
295     return errCode;
296 }
297 
SendFinishedRequest(const MultiVerSyncTaskContext * context)298 void ValueSliceSync::SendFinishedRequest(const MultiVerSyncTaskContext *context)
299 {
300     if (context == nullptr) {
301         return;
302     }
303 
304     ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
305     if (packet == nullptr) {
306         return;
307     }
308 
309     packet->SetErrCode(-E_LAST_SYNC_FRAME);
310     Message *message = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
311     if (message == nullptr) {
312         delete packet;
313         packet = nullptr;
314         return;
315     }
316 
317     int errCode = message->SetExternalObject(packet);
318     if (errCode != E_OK) {
319         delete packet;
320         packet = nullptr;
321         delete message;
322         message = nullptr;
323         return;
324     }
325 
326     message->SetMessageType(TYPE_REQUEST);
327     message->SetTarget(context->GetDeviceId());
328     message->SetSessionId(context->GetRequestSessionId());
329     message->SetSequenceId(context->GetSequenceId());
330     errCode = Send(message->GetTarget(), message);
331     if (errCode != E_OK) {
332         delete message;
333         message = nullptr;
334         LOGE("[ValueSliceSync][SendRequestPacket] SendRequestPacket failed, err %d", errCode);
335     }
336     LOGI("[ValueSliceSync][SendRequestPacket] SendRequestPacket dst=%s{private}", context->GetDeviceId().c_str());
337 }
338 
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)339 int ValueSliceSync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
340 {
341     const ValueSliceHashPacket *packet = inMsg->GetObject<ValueSliceHashPacket>();
342     if (packet == nullptr) {
343         return -E_INVALID_ARGS;
344     }
345 
346     len = packet->CalculateLen();
347     return E_OK;
348 }
349 
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)350 int ValueSliceSync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
351 {
352     const ValueSliceHashPacket *packet = inMsg->GetObject<ValueSliceHashPacket>();
353     if ((packet == nullptr) || (length != packet->CalculateLen())) {
354         return -E_INVALID_ARGS;
355     }
356 
357     Parcel parcel(buffer, length);
358     ValueSliceHash valueSliceHash;
359     packet->GetValueSliceHash(valueSliceHash);
360     int32_t ackCode = packet->GetErrCode();
361     // errCode Serialization
362     int32_t errCode = parcel.WriteInt(ackCode);
363     if (errCode != E_OK) {
364         return -E_SECUREC_ERROR;
365     }
366     parcel.EightByteAlign();
367     // commitMap Serialization
368     errCode = parcel.WriteVectorChar(valueSliceHash);
369     if (errCode != E_OK) {
370         return -E_SECUREC_ERROR;
371     }
372 
373     return errCode;
374 }
375 
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)376 int ValueSliceSync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
377 {
378     Parcel parcel(const_cast<uint8_t *>(buffer), length);
379 
380     int ackCode = 0;
381     // errCode DeSerialization
382     uint32_t packLen = parcel.ReadInt(ackCode);
383     parcel.EightByteAlign();
384     if (parcel.IsError()) {
385         return -E_PARSE_FAIL;
386     }
387     packLen = Parcel::GetEightByteAlign(packLen);
388 
389     ValueSliceHash valueSliceHash;
390     // commit DeSerialization
391     packLen += parcel.ReadVectorChar(valueSliceHash);
392     if (packLen != length || parcel.IsError()) {
393         return -E_INVALID_ARGS;
394     }
395     ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
396     if (packet == nullptr) {
397         LOGE("ValueSliceSync::AckPacketDeSerialization : new packet error");
398         return -E_OUT_OF_MEMORY;
399     }
400 
401     packet->SetValueSliceHash(valueSliceHash);
402     int errCode = inMsg->SetExternalObject<>(packet);
403     if (errCode != E_OK) {
404         delete packet;
405         packet = nullptr;
406     }
407     return errCode;
408 }
409 
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)410 int ValueSliceSync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
411 {
412     const ValueSlicePacket *packet = inMsg->GetObject<ValueSlicePacket>();
413     if (packet == nullptr) {
414         return -E_INVALID_ARGS;
415     }
416     len = packet->CalculateLen();
417     return E_OK;
418 }
419 
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)420 int ValueSliceSync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
421 {
422     if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_RESPONSE)) {
423         return -E_INVALID_ARGS;
424     }
425     const ValueSlicePacket *packet = inMsg->GetObject<ValueSlicePacket>();
426     if ((packet == nullptr) || (length != packet->CalculateLen())) {
427         return -E_INVALID_ARGS;
428     }
429 
430     Parcel parcel(buffer, length);
431     ValueSlice valueSlice;
432     packet->GetData(valueSlice);
433     int32_t ackCode = 0;
434     packet->GetErrorCode(ackCode);
435     // errCode Serialization
436     int32_t errCode = parcel.WriteInt(ackCode);
437     if (errCode != E_OK) {
438         return -E_SECUREC_ERROR;
439     }
440     parcel.EightByteAlign();
441 
442     // commits vector Serialization
443     errCode = parcel.WriteVectorChar(valueSlice);
444     if (errCode != E_OK) {
445         return -E_SECUREC_ERROR;
446     }
447 
448     return errCode;
449 }
450 
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)451 int ValueSliceSync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
452 {
453     Parcel parcel(const_cast<uint8_t *>(buffer), length);
454     int32_t ackCode = 0;
455     uint32_t packLen = 0;
456     ValueSlice valueSlice;
457 
458     // errCode DeSerialization
459     packLen += parcel.ReadInt(ackCode);
460     parcel.EightByteAlign();
461     if (parcel.IsError()) {
462         return -E_PARSE_FAIL;
463     }
464     packLen = Parcel::GetEightByteAlign(packLen);
465     // valueSlice DeSerialization
466     packLen += parcel.ReadVectorChar(valueSlice);
467     if (packLen != length || parcel.IsError()) {
468         LOGE("ValueSliceSync::AckPacketSerialization data error, packLen = %" PRIu32 ", length = %" PRIu32,
469             packLen, length);
470         return -E_INVALID_ARGS;
471     }
472     ValueSlicePacket *packet = new (std::nothrow) ValueSlicePacket();
473     if (packet == nullptr) {
474         LOGE("ValueSliceSync::AckPacketDeSerialization : new packet error");
475         return -E_OUT_OF_MEMORY;
476     }
477     packet->SetData(valueSlice);
478     packet->SetErrorCode(ackCode);
479     int errCode = inMsg->SetExternalObject<>(packet);
480     if (errCode != E_OK) {
481         delete packet;
482         packet = nullptr;
483     }
484     return errCode;
485 }
486 
IsPacketValid(const Message * inMsg,uint16_t messageType)487 bool ValueSliceSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
488 {
489     if ((inMsg == nullptr) || (inMsg->GetMessageId() != VALUE_SLICE_SYNC_MESSAGE)) {
490         return false;
491     }
492     if (messageType != inMsg->GetMessageType()) {
493         return false;
494     }
495     return true;
496 }
497 
GetValidValueSliceHashNode(MultiVerSyncTaskContext * context,ValueSliceHash & valueHashNode)498 int ValueSliceSync::GetValidValueSliceHashNode(MultiVerSyncTaskContext *context, ValueSliceHash &valueHashNode)
499 {
500     int index = context->GetValueSlicesIndex();
501     int valueNodesSize = context->GetValueSlicesSize();
502     if (valueNodesSize > MAX_VALUE_NODE_SIZE) {
503         LOGD("ValueSliceSync::GetValidValueSliceHashNode failed, too large!");
504         return -E_LENGTH_ERROR;
505     }
506     LOGD("ValueSliceSync::GetValidValueSliceHashNode ValueSlicesSize = %d", valueNodesSize);
507     if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
508         context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
509         index--;
510     }
511     std::vector<ValueSliceHash> valueSliceHashNodes;
512     context->GetValueSliceHashNodes(valueSliceHashNodes);
513     index = (index < 0) ? 0 : index;
514     while (index < valueNodesSize) {
515         if (IsValueSliceExisted(valueSliceHashNodes[index])) {
516             index++;
517             context->SetValueSlicesIndex(index);
518             continue;
519         }
520         valueHashNode = valueSliceHashNodes[index];
521         return E_OK;
522     }
523     return -E_NOT_FOUND;
524 }
525 
Send(const DeviceID & deviceId,const Message * inMsg)526 int ValueSliceSync::Send(const DeviceID &deviceId, const Message *inMsg)
527 {
528     SendConfig conf = {false, false, SEND_TIME_OUT, {}};
529     int errCode = communicateHandle_->SendMessage(deviceId, inMsg, conf);
530     if (errCode != E_OK) {
531         LOGE("ValueSliceSync::Send ERR! err = %d", errCode);
532     }
533     return errCode;
534 }
535 
SendRequestPacket(const MultiVerSyncTaskContext * context,ValueSliceHash & valueSliceHash)536 int ValueSliceSync::SendRequestPacket(const MultiVerSyncTaskContext *context, ValueSliceHash &valueSliceHash)
537 {
538     ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
539     if (packet == nullptr) {
540         LOGE("ValueSliceSync::SendRequestPacket : new packet error");
541         return -E_OUT_OF_MEMORY;
542     }
543 
544     packet->SetValueSliceHash(valueSliceHash);
545     Message *message = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
546     if (message == nullptr) {
547         delete packet;
548         packet = nullptr;
549         LOGE("ValueSliceSync::SendRequestPacket : new message error");
550         return -E_OUT_OF_MEMORY;
551     }
552 
553     int errCode = message->SetExternalObject<>(packet);
554     if (errCode != E_OK) {
555         delete packet;
556         packet = nullptr;
557         delete message;
558         message = nullptr;
559         return errCode;
560     }
561 
562     message->SetMessageType(TYPE_REQUEST);
563     message->SetTarget(context->GetDeviceId());
564     message->SetSessionId(context->GetRequestSessionId());
565     message->SetSequenceId(context->GetSequenceId());
566     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
567     if (performance != nullptr) {
568         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_VALUE_SLICE_SEND_REQUEST_TO_ACK_RECV);
569     }
570     errCode = Send(message->GetTarget(), message);
571     if (errCode != E_OK) {
572         delete message;
573         message = nullptr;
574     }
575     return errCode;
576 }
577 
SendAckPacket(const MultiVerSyncTaskContext * context,const ValueSlice & value,int ackCode,const Message * message)578 int ValueSliceSync::SendAckPacket(const MultiVerSyncTaskContext *context, const ValueSlice &value,
579     int ackCode, const Message *message)
580 {
581     ValueSlicePacket *packet = new (std::nothrow) ValueSlicePacket();
582     if (packet == nullptr) {
583         LOGE("ValueSliceSync::SendAckPacket : packet is nullptr");
584         return -E_OUT_OF_MEMORY;
585     }
586 
587     Message *ackMessage = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
588     if (ackMessage == nullptr) {
589         delete packet;
590         packet = nullptr;
591         LOGE("ValueSliceSync::SendAckPacket : new message error");
592         return -E_OUT_OF_MEMORY;
593     }
594 
595     packet->SetData(value);
596     packet->SetErrorCode(static_cast<int32_t>(ackCode));
597     int errCode = ackMessage->SetExternalObject<>(packet);
598     if (errCode != E_OK) {
599         delete packet;
600         packet = nullptr;
601         delete ackMessage;
602         ackMessage = nullptr;
603         return errCode;
604     }
605 
606     ackMessage->SetMessageType(TYPE_RESPONSE);
607     ackMessage->SetTarget(context->GetDeviceId());
608     ackMessage->SetSequenceId(message->GetSequenceId());
609     ackMessage->SetSessionId(message->GetSessionId());
610     errCode = Send(ackMessage->GetTarget(), ackMessage);
611     if (errCode != E_OK) {
612         delete ackMessage;
613         ackMessage = nullptr;
614     }
615 
616     return errCode;
617 }
618 
IsValueSliceExisted(const ValueSliceHash & value)619 bool ValueSliceSync::IsValueSliceExisted(const ValueSliceHash &value)
620 {
621     return storagePtr_->IsValueSliceExisted(value);
622 }
623 
GetValueSlice(const ValueSliceHash & hashValue,ValueSlice & sliceValue)624 int ValueSliceSync::GetValueSlice(const ValueSliceHash &hashValue, ValueSlice &sliceValue)
625 {
626     return storagePtr_->GetValueSlice(hashValue, sliceValue);
627 }
628 
PutValueSlice(const ValueSliceHash & hashValue,const ValueSlice & sliceValue)629 int ValueSliceSync::PutValueSlice(const ValueSliceHash &hashValue, const ValueSlice &sliceValue)
630 {
631     return storagePtr_->PutValueSlice(hashValue, sliceValue);
632 }
633 } // namespace DistributedDB
634 #endif
635