• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OMIT_MULTI_VER
17 #include "value_slice_sync.h"
18 
19 #include "db_constant.h"
20 #include "log_print.h"
21 #include "message_transform.h"
22 #include "parcel.h"
23 #include "performance_analysis.h"
24 #include "sync_types.h"
25 
26 namespace DistributedDB {
27 const int ValueSliceSync::MAX_VALUE_NODE_SIZE = 100000;
28 
29 // Class ValueSliceHashPacket
CalculateLen() const30 uint32_t ValueSliceHashPacket::CalculateLen() const
31 {
32     uint64_t len = Parcel::GetIntLen();
33     len = Parcel::GetEightByteAlign(len);
34     len += Parcel::GetVectorCharLen(valueSliceHash_);
35     if (len > INT32_MAX) {
36         return 0;
37     }
38     return len;
39 }
40 
SetValueSliceHash(ValueSliceHash & hash)41 void ValueSliceHashPacket::SetValueSliceHash(ValueSliceHash &hash)
42 {
43     valueSliceHash_ = std::move(hash);
44 }
45 
GetValueSliceHash(ValueSliceHash & hash) const46 void ValueSliceHashPacket::GetValueSliceHash(ValueSliceHash &hash) const
47 {
48     hash = valueSliceHash_;
49 }
50 
SetErrCode(int32_t errCode)51 void ValueSliceHashPacket::SetErrCode(int32_t errCode)
52 {
53     errCode_ = errCode;
54 }
55 
GetErrCode() const56 int32_t ValueSliceHashPacket::GetErrCode() const
57 {
58     return errCode_;
59 }
60 
61 // Class ValueSlicePacket
CalculateLen() const62 uint32_t ValueSlicePacket::CalculateLen() const
63 {
64     uint64_t len = Parcel::GetIntLen();
65     len = Parcel::GetEightByteAlign(len);
66     len += Parcel::GetVectorCharLen(valueSlice_);
67     if (len > INT32_MAX) {
68         return 0;
69     }
70     return len;
71 }
72 
SetData(const ValueSlice & data)73 void ValueSlicePacket::SetData(const ValueSlice &data)
74 {
75     valueSlice_ = std::move(data);
76 }
77 
GetData(ValueSlice & data) const78 void ValueSlicePacket::GetData(ValueSlice &data) const
79 {
80     data = valueSlice_;
81 }
82 
SetErrorCode(int32_t errCode)83 void ValueSlicePacket::SetErrorCode(int32_t errCode)
84 {
85     errorCode_ = errCode;
86 }
87 
GetErrorCode(int32_t & errCode) const88 void ValueSlicePacket::GetErrorCode(int32_t &errCode) const
89 {
90     errCode = errorCode_;
91 }
92 
93 // Class ValueSliceSync
~ValueSliceSync()94 ValueSliceSync::~ValueSliceSync()
95 {
96     storagePtr_ = nullptr;
97     communicateHandle_ = nullptr;
98 }
99 
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)100 int ValueSliceSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
101 {
102     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
103         return -E_INVALID_ARGS;
104     }
105 
106     switch (inMsg->GetMessageType()) {
107         case TYPE_REQUEST:
108             return RequestPacketSerialization(buffer, length, inMsg);
109         case TYPE_RESPONSE:
110             return AckPacketSerialization(buffer, length, inMsg);
111         default:
112             return -E_MESSAGE_TYPE_ERROR;
113     }
114 }
115 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)116 int ValueSliceSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
117 {
118     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
119         return -E_INVALID_ARGS;
120     }
121 
122     switch (inMsg->GetMessageType()) {
123         case TYPE_REQUEST:
124             return RequestPacketDeSerialization(buffer, length, inMsg);
125         case TYPE_RESPONSE:
126             return AckPacketDeSerialization(buffer, length, inMsg);
127         default:
128             return -E_MESSAGE_TYPE_ERROR;
129     }
130 }
131 
CalculateLen(const Message * inMsg)132 uint32_t ValueSliceSync::CalculateLen(const Message *inMsg)
133 {
134     if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
135         return 0;
136     }
137 
138     uint32_t len = 0;
139     int errCode = E_OK;
140     switch (inMsg->GetMessageType()) {
141         case TYPE_REQUEST:
142             errCode = RequestPacketCalculateLen(inMsg, len);
143             break;
144         case TYPE_RESPONSE:
145             errCode = AckPacketCalculateLen(inMsg, len);
146             break;
147         default:
148             return 0;
149     }
150     if (errCode != E_OK) {
151         return 0;
152     }
153     return len;
154 }
155 
RegisterTransformFunc()156 int ValueSliceSync::RegisterTransformFunc()
157 {
158     TransformFunc func;
159     func.computeFunc = std::bind(&ValueSliceSync::CalculateLen, std::placeholders::_1);
160     func.serializeFunc = std::bind(&ValueSliceSync::Serialization, std::placeholders::_1,
161                                    std::placeholders::_2, std::placeholders::_3);
162     func.deserializeFunc = std::bind(&ValueSliceSync::DeSerialization, std::placeholders::_1,
163                                      std::placeholders::_2, std::placeholders::_3);
164     return MessageTransform::RegTransformFunction(VALUE_SLICE_SYNC_MESSAGE, func);
165 }
166 
Initialize(MultiVerKvDBSyncInterface * storagePtr,ICommunicator * communicateHandle)167 int ValueSliceSync::Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle)
168 {
169     if ((storagePtr == nullptr) || (communicateHandle == nullptr)) {
170         return -E_INVALID_ARGS;
171     }
172     storagePtr_ = storagePtr;
173     communicateHandle_ = communicateHandle;
174     return E_OK;
175 }
176 
SyncStart(MultiVerSyncTaskContext * context)177 int ValueSliceSync::SyncStart(MultiVerSyncTaskContext *context)
178 {
179     if (context == nullptr) {
180         return -E_INVALID_ARGS;
181     }
182     int entriesIndex = context->GetEntriesIndex();
183     int entriesSize = context->GetEntriesSize();
184     if (entriesSize > DBConstant::MAX_ENTRIES_SIZE) {
185         LOGE("ValueSliceSync::entriesSize too large %d", entriesSize);
186         return -E_INVALID_ARGS;
187     }
188     while (entriesIndex < entriesSize) {
189         PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
190         if (performance != nullptr) {
191             performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_VALUE_SLICE_NODE);
192         }
193         ValueSliceHash valueSliceHashNode;
194         int errCode = GetValidValueSliceHashNode(context, valueSliceHashNode);
195         if (performance != nullptr) {
196             performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_VALUE_SLICE_NODE);
197         }
198         LOGD("ValueSliceSync::SyncStart begin errCode = %d", errCode);
199         if (errCode == E_OK) {
200             errCode = SendRequestPacket(context, valueSliceHashNode);
201             LOGD("ValueSliceSync::SyncStart send request packet dst=%s{private}, errCode = %d",
202                 context->GetDeviceId().c_str(), errCode);
203             return errCode;
204         }
205         // move to next entry
206         MultiVerKvEntry *entry = nullptr;
207         std::vector<ValueSliceHash> valueHashes;
208         entriesIndex++;
209         if (entriesIndex < entriesSize) {
210             LOGD("ValueSliceSync::SyncStart begin entriesIndex = %d, entriesSize = %d", entriesIndex, entriesSize);
211             context->SetEntriesIndex(entriesIndex);
212             context->GetEntry(entriesIndex, entry);
213             errCode = entry->GetValueHash(valueHashes);
214             if (errCode != E_OK) {
215                 LOGE("ValueSliceSync::entry->GetValueHash %d", errCode);
216                 return errCode;
217             }
218             context->SetValueSliceHashNodes(valueHashes);
219             context->SetValueSlicesIndex(0);
220             context->SetValueSlicesSize(static_cast<int>(valueHashes.size()));
221         } else {
222             // all entries are received, move to next commit
223             return -E_NOT_FOUND;
224         }
225     }
226     return -E_NOT_FOUND;
227 }
228 
RequestRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)229 int ValueSliceSync::RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
230 {
231     if (!IsPacketValid(message, TYPE_REQUEST) || context == nullptr) {
232         return -E_INVALID_ARGS;
233     }
234 
235     const ValueSliceHashPacket *packet = message->GetObject<ValueSliceHashPacket>();
236     if (packet == nullptr) {
237         return -E_INVALID_ARGS;
238     }
239     ValueSliceHash valueSliceHashNode;
240     packet->GetValueSliceHash(valueSliceHashNode);
241     if ((packet->GetErrCode() == -E_LAST_SYNC_FRAME) && valueSliceHashNode.empty()) {
242         return -E_LAST_SYNC_FRAME;
243     }
244     ValueSlice valueSlice;
245     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
246     if (performance != nullptr) {
247         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_READ_VALUE_SLICE);
248     }
249     int errCode = GetValueSlice(valueSliceHashNode, valueSlice);
250     if (performance != nullptr) {
251         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_READ_VALUE_SLICE);
252     }
253     if (errCode != E_OK) {
254         LOGE("ValueSliceSync::RequestRecvCallback : GetValueSlice ERR, errno = %d", errCode);
255     }
256     errCode = SendAckPacket(context, valueSlice, errCode, message);
257     LOGD("ValueSliceSync::RequestRecvCallback : SendAckPacket, errno = %d, dst = %s{private}", errCode,
258          context->GetDeviceId().c_str());
259     if (packet->GetErrCode() == -E_LAST_SYNC_FRAME) {
260         return -E_LAST_SYNC_FRAME;
261     }
262     return errCode;
263 }
264 
AckRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)265 int ValueSliceSync::AckRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
266 {
267     if (!IsPacketValid(message, TYPE_RESPONSE) || (context == nullptr)) {
268         return -E_INVALID_ARGS;
269     }
270 
271     const ValueSlicePacket *packet = message->GetObject<ValueSlicePacket>();
272     if (packet == nullptr) {
273         return -E_INVALID_ARGS;
274     }
275     int errCode = E_OK;
276     packet->GetErrorCode(errCode);
277     ValueSlice valueSlice;
278     packet->GetData(valueSlice);
279     if (errCode != E_OK) {
280         return errCode;
281     }
282     int index = context->GetValueSlicesIndex();
283     ValueSliceHash hashValue;
284     context->GetValueSliceHashNode(index, hashValue);
285     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
286     if (performance != nullptr) {
287         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_SAVE_VALUE_SLICE);
288     }
289     errCode = PutValueSlice(hashValue, valueSlice);
290     if (performance != nullptr) {
291         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_SAVE_VALUE_SLICE);
292     }
293     LOGD("ValueSliceSync::AckRecvCallback PutValueSlice finished, src=%s{private}, errCode = %d",
294         context->GetDeviceId().c_str(), errCode);
295     return errCode;
296 }
297 
SendFinishedRequest(const MultiVerSyncTaskContext * context)298 void ValueSliceSync::SendFinishedRequest(const MultiVerSyncTaskContext *context)
299 {
300     if (context == nullptr) {
301         return;
302     }
303 
304     ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
305     if (packet == nullptr) {
306         return;
307     }
308 
309     packet->SetErrCode(-E_LAST_SYNC_FRAME);
310     Message *message = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
311     if (message == nullptr) {
312         delete packet;
313         packet = nullptr;
314         return;
315     }
316 
317     int errCode = message->SetExternalObject(packet);
318     if (errCode != E_OK) {
319         delete packet;
320         packet = nullptr;
321         delete message;
322         message = nullptr;
323         return;
324     }
325 
326     message->SetMessageType(TYPE_REQUEST);
327     message->SetTarget(context->GetDeviceId());
328     message->SetSessionId(context->GetRequestSessionId());
329     message->SetSequenceId(context->GetSequenceId());
330     errCode = Send(message->GetTarget(), message);
331     if (errCode != E_OK) {
332         delete message;
333         message = nullptr;
334         LOGE("[ValueSliceSync][SendRequestPacket] SendRequestPacket failed, err %d", errCode);
335     }
336     LOGI("[ValueSliceSync][SendRequestPacket] SendRequestPacket dst=%s{private}", context->GetDeviceId().c_str());
337 }
338 
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)339 int ValueSliceSync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
340 {
341     const ValueSliceHashPacket *packet = inMsg->GetObject<ValueSliceHashPacket>();
342     if (packet == nullptr) {
343         return -E_INVALID_ARGS;
344     }
345 
346     len = packet->CalculateLen();
347     return E_OK;
348 }
349 
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)350 int ValueSliceSync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
351 {
352     const ValueSliceHashPacket *packet = inMsg->GetObject<ValueSliceHashPacket>();
353     if ((packet == nullptr) || (length != packet->CalculateLen())) {
354         return -E_INVALID_ARGS;
355     }
356 
357     Parcel parcel(buffer, length);
358     ValueSliceHash valueSliceHash;
359     packet->GetValueSliceHash(valueSliceHash);
360     int32_t ackCode = packet->GetErrCode();
361     // errCode Serialization
362     int32_t errCode = parcel.WriteInt(ackCode);
363     if (errCode != E_OK) {
364         return -E_SECUREC_ERROR;
365     }
366     parcel.EightByteAlign();
367     // commitMap Serialization
368     errCode = parcel.WriteVectorChar(valueSliceHash);
369     if (errCode != E_OK) {
370         return -E_SECUREC_ERROR;
371     }
372 
373     return errCode;
374 }
375 
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)376 int ValueSliceSync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
377 {
378     Parcel parcel(const_cast<uint8_t *>(buffer), length);
379 
380     int ackCode = 0;
381     // errCode DeSerialization
382     uint32_t packLen = parcel.ReadInt(ackCode);
383     parcel.EightByteAlign();
384     packLen = Parcel::GetEightByteAlign(packLen);
385 
386     ValueSliceHash valueSliceHash;
387     // commit DeSerialization
388     packLen += parcel.ReadVectorChar(valueSliceHash);
389     if (packLen != length || parcel.IsError()) {
390         return -E_INVALID_ARGS;
391     }
392     ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
393     if (packet == nullptr) {
394         LOGE("ValueSliceSync::AckPacketDeSerialization : new packet error");
395         return -E_OUT_OF_MEMORY;
396     }
397 
398     packet->SetValueSliceHash(valueSliceHash);
399     int errCode = inMsg->SetExternalObject<>(packet);
400     if (errCode != E_OK) {
401         delete packet;
402         packet = nullptr;
403     }
404     return errCode;
405 }
406 
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)407 int ValueSliceSync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
408 {
409     const ValueSlicePacket *packet = inMsg->GetObject<ValueSlicePacket>();
410     if (packet == nullptr) {
411         return -E_INVALID_ARGS;
412     }
413     len = packet->CalculateLen();
414     return E_OK;
415 }
416 
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)417 int ValueSliceSync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
418 {
419     if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_RESPONSE)) {
420         return -E_INVALID_ARGS;
421     }
422     const ValueSlicePacket *packet = inMsg->GetObject<ValueSlicePacket>();
423     if ((packet == nullptr) || (length != packet->CalculateLen())) {
424         return -E_INVALID_ARGS;
425     }
426 
427     Parcel parcel(buffer, length);
428     ValueSlice valueSlice;
429     packet->GetData(valueSlice);
430     int32_t ackCode = 0;
431     packet->GetErrorCode(ackCode);
432     // errCode Serialization
433     int32_t errCode = parcel.WriteInt(ackCode);
434     if (errCode != E_OK) {
435         return -E_SECUREC_ERROR;
436     }
437     parcel.EightByteAlign();
438 
439     // commits vector Serialization
440     errCode = parcel.WriteVectorChar(valueSlice);
441     if (errCode != E_OK) {
442         return -E_SECUREC_ERROR;
443     }
444 
445     return errCode;
446 }
447 
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)448 int ValueSliceSync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
449 {
450     Parcel parcel(const_cast<uint8_t *>(buffer), length);
451     int32_t ackCode = 0;
452     uint32_t packLen = 0;
453     ValueSlice valueSlice;
454 
455     // errCode DeSerialization
456     packLen += parcel.ReadInt(ackCode);
457     parcel.EightByteAlign();
458     packLen = Parcel::GetEightByteAlign(packLen);
459     // valueSlice DeSerialization
460     packLen += parcel.ReadVectorChar(valueSlice);
461     if (packLen != length || parcel.IsError()) {
462         LOGE("ValueSliceSync::AckPacketSerialization data error, packLen = %" PRIu32 ", length = %" PRIu32,
463             packLen, length);
464         return -E_INVALID_ARGS;
465     }
466     ValueSlicePacket *packet = new (std::nothrow) ValueSlicePacket();
467     if (packet == nullptr) {
468         LOGE("ValueSliceSync::AckPacketDeSerialization : new packet error");
469         return -E_OUT_OF_MEMORY;
470     }
471     packet->SetData(valueSlice);
472     packet->SetErrorCode(ackCode);
473     int errCode = inMsg->SetExternalObject<>(packet);
474     if (errCode != E_OK) {
475         delete packet;
476         packet = nullptr;
477     }
478     return errCode;
479 }
480 
IsPacketValid(const Message * inMsg,uint16_t messageType)481 bool ValueSliceSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
482 {
483     if ((inMsg == nullptr) || (inMsg->GetMessageId() != VALUE_SLICE_SYNC_MESSAGE)) {
484         return false;
485     }
486     if (messageType != inMsg->GetMessageType()) {
487         return false;
488     }
489     return true;
490 }
491 
GetValidValueSliceHashNode(MultiVerSyncTaskContext * context,ValueSliceHash & valueHashNode)492 int ValueSliceSync::GetValidValueSliceHashNode(MultiVerSyncTaskContext *context, ValueSliceHash &valueHashNode)
493 {
494     int index = context->GetValueSlicesIndex();
495     int valueNodesSize = context->GetValueSlicesSize();
496     if (valueNodesSize > MAX_VALUE_NODE_SIZE) {
497         LOGD("ValueSliceSync::GetValidValueSliceHashNode failed, too large!");
498         return -E_LENGTH_ERROR;
499     }
500     LOGD("ValueSliceSync::GetValidValueSliceHashNode ValueSlicesSize = %d", valueNodesSize);
501     if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
502         context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
503         index--;
504     }
505     std::vector<ValueSliceHash> valueSliceHashNodes;
506     context->GetValueSliceHashNodes(valueSliceHashNodes);
507     index = (index < 0) ? 0 : index;
508     while (index < valueNodesSize) {
509         if (IsValueSliceExisted(valueSliceHashNodes[index])) {
510             index++;
511             context->SetValueSlicesIndex(index);
512             continue;
513         }
514         valueHashNode = valueSliceHashNodes[index];
515         return E_OK;
516     }
517     return -E_NOT_FOUND;
518 }
519 
Send(const DeviceID & deviceId,const Message * inMsg)520 int ValueSliceSync::Send(const DeviceID &deviceId, const Message *inMsg)
521 {
522     SendConfig conf = {false, false, SEND_TIME_OUT, {}};
523     int errCode = communicateHandle_->SendMessage(deviceId, inMsg, conf);
524     if (errCode != E_OK) {
525         LOGE("ValueSliceSync::Send ERR! err = %d", errCode);
526     }
527     return errCode;
528 }
529 
SendRequestPacket(const MultiVerSyncTaskContext * context,ValueSliceHash & valueSliceHash)530 int ValueSliceSync::SendRequestPacket(const MultiVerSyncTaskContext *context, ValueSliceHash &valueSliceHash)
531 {
532     ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
533     if (packet == nullptr) {
534         LOGE("ValueSliceSync::SendRequestPacket : new packet error");
535         return -E_OUT_OF_MEMORY;
536     }
537 
538     packet->SetValueSliceHash(valueSliceHash);
539     Message *message = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
540     if (message == nullptr) {
541         delete packet;
542         packet = nullptr;
543         LOGE("ValueSliceSync::SendRequestPacket : new message error");
544         return -E_OUT_OF_MEMORY;
545     }
546 
547     int errCode = message->SetExternalObject<>(packet);
548     if (errCode != E_OK) {
549         delete packet;
550         packet = nullptr;
551         delete message;
552         message = nullptr;
553         return errCode;
554     }
555 
556     message->SetMessageType(TYPE_REQUEST);
557     message->SetTarget(context->GetDeviceId());
558     message->SetSessionId(context->GetRequestSessionId());
559     message->SetSequenceId(context->GetSequenceId());
560     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
561     if (performance != nullptr) {
562         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_VALUE_SLICE_SEND_REQUEST_TO_ACK_RECV);
563     }
564     errCode = Send(message->GetTarget(), message);
565     if (errCode != E_OK) {
566         delete message;
567         message = nullptr;
568     }
569     return errCode;
570 }
571 
SendAckPacket(const MultiVerSyncTaskContext * context,const ValueSlice & value,int ackCode,const Message * message)572 int ValueSliceSync::SendAckPacket(const MultiVerSyncTaskContext *context, const ValueSlice &value,
573     int ackCode, const Message *message)
574 {
575     ValueSlicePacket *packet = new (std::nothrow) ValueSlicePacket();
576     if (packet == nullptr) {
577         LOGE("ValueSliceSync::SendAckPacket : packet is nullptr");
578         return -E_OUT_OF_MEMORY;
579     }
580 
581     Message *ackMessage = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
582     if (ackMessage == nullptr) {
583         delete packet;
584         packet = nullptr;
585         LOGE("ValueSliceSync::SendAckPacket : new message error");
586         return -E_OUT_OF_MEMORY;
587     }
588 
589     packet->SetData(value);
590     packet->SetErrorCode(static_cast<int32_t>(ackCode));
591     int errCode = ackMessage->SetExternalObject<>(packet);
592     if (errCode != E_OK) {
593         delete packet;
594         packet = nullptr;
595         delete ackMessage;
596         ackMessage = nullptr;
597         return errCode;
598     }
599 
600     ackMessage->SetMessageType(TYPE_RESPONSE);
601     ackMessage->SetTarget(context->GetDeviceId());
602     ackMessage->SetSequenceId(message->GetSequenceId());
603     ackMessage->SetSessionId(message->GetSessionId());
604     errCode = Send(ackMessage->GetTarget(), ackMessage);
605     if (errCode != E_OK) {
606         delete ackMessage;
607         ackMessage = nullptr;
608     }
609 
610     return errCode;
611 }
612 
IsValueSliceExisted(const ValueSliceHash & value)613 bool ValueSliceSync::IsValueSliceExisted(const ValueSliceHash &value)
614 {
615     return storagePtr_->IsValueSliceExisted(value);
616 }
617 
GetValueSlice(const ValueSliceHash & hashValue,ValueSlice & sliceValue)618 int ValueSliceSync::GetValueSlice(const ValueSliceHash &hashValue, ValueSlice &sliceValue)
619 {
620     return storagePtr_->GetValueSlice(hashValue, sliceValue);
621 }
622 
PutValueSlice(const ValueSliceHash & hashValue,const ValueSlice & sliceValue)623 int ValueSliceSync::PutValueSlice(const ValueSliceHash &hashValue, const ValueSlice &sliceValue)
624 {
625     return storagePtr_->PutValueSlice(hashValue, sliceValue);
626 }
627 } // namespace DistributedDB
628 #endif
629