1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef OMIT_MULTI_VER
17 #include "value_slice_sync.h"
18
19 #include "db_constant.h"
20 #include "log_print.h"
21 #include "message_transform.h"
22 #include "parcel.h"
23 #include "performance_analysis.h"
24 #include "sync_types.h"
25
26 namespace DistributedDB {
27 const int ValueSliceSync::MAX_VALUE_NODE_SIZE = 100000;
28
29 // Class ValueSliceHashPacket
CalculateLen() const30 uint32_t ValueSliceHashPacket::CalculateLen() const
31 {
32 uint64_t len = Parcel::GetIntLen();
33 len = Parcel::GetEightByteAlign(len);
34 len += Parcel::GetVectorCharLen(valueSliceHash_);
35 if (len > INT32_MAX) {
36 return 0;
37 }
38 return len;
39 }
40
SetValueSliceHash(ValueSliceHash & hash)41 void ValueSliceHashPacket::SetValueSliceHash(ValueSliceHash &hash)
42 {
43 valueSliceHash_ = std::move(hash);
44 }
45
GetValueSliceHash(ValueSliceHash & hash) const46 void ValueSliceHashPacket::GetValueSliceHash(ValueSliceHash &hash) const
47 {
48 hash = valueSliceHash_;
49 }
50
SetErrCode(int32_t errCode)51 void ValueSliceHashPacket::SetErrCode(int32_t errCode)
52 {
53 errCode_ = errCode;
54 }
55
GetErrCode() const56 int32_t ValueSliceHashPacket::GetErrCode() const
57 {
58 return errCode_;
59 }
60
61 // Class ValueSlicePacket
CalculateLen() const62 uint32_t ValueSlicePacket::CalculateLen() const
63 {
64 uint64_t len = Parcel::GetIntLen();
65 len = Parcel::GetEightByteAlign(len);
66 len += Parcel::GetVectorCharLen(valueSlice_);
67 if (len > INT32_MAX) {
68 return 0;
69 }
70 return len;
71 }
72
SetData(const ValueSlice & data)73 void ValueSlicePacket::SetData(const ValueSlice &data)
74 {
75 valueSlice_ = std::move(data);
76 }
77
GetData(ValueSlice & data) const78 void ValueSlicePacket::GetData(ValueSlice &data) const
79 {
80 data = valueSlice_;
81 }
82
SetErrorCode(int32_t errCode)83 void ValueSlicePacket::SetErrorCode(int32_t errCode)
84 {
85 errorCode_ = errCode;
86 }
87
GetErrorCode(int32_t & errCode) const88 void ValueSlicePacket::GetErrorCode(int32_t &errCode) const
89 {
90 errCode = errorCode_;
91 }
92
93 // Class ValueSliceSync
~ValueSliceSync()94 ValueSliceSync::~ValueSliceSync()
95 {
96 storagePtr_ = nullptr;
97 communicateHandle_ = nullptr;
98 }
99
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)100 int ValueSliceSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
101 {
102 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
103 return -E_INVALID_ARGS;
104 }
105
106 switch (inMsg->GetMessageType()) {
107 case TYPE_REQUEST:
108 return RequestPacketSerialization(buffer, length, inMsg);
109 case TYPE_RESPONSE:
110 return AckPacketSerialization(buffer, length, inMsg);
111 default:
112 return -E_MESSAGE_TYPE_ERROR;
113 }
114 }
115
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)116 int ValueSliceSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
117 {
118 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
119 return -E_INVALID_ARGS;
120 }
121
122 switch (inMsg->GetMessageType()) {
123 case TYPE_REQUEST:
124 return RequestPacketDeSerialization(buffer, length, inMsg);
125 case TYPE_RESPONSE:
126 return AckPacketDeSerialization(buffer, length, inMsg);
127 default:
128 return -E_MESSAGE_TYPE_ERROR;
129 }
130 }
131
CalculateLen(const Message * inMsg)132 uint32_t ValueSliceSync::CalculateLen(const Message *inMsg)
133 {
134 if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
135 return 0;
136 }
137
138 uint32_t len = 0;
139 int errCode = E_OK;
140 switch (inMsg->GetMessageType()) {
141 case TYPE_REQUEST:
142 errCode = RequestPacketCalculateLen(inMsg, len);
143 break;
144 case TYPE_RESPONSE:
145 errCode = AckPacketCalculateLen(inMsg, len);
146 break;
147 default:
148 return 0;
149 }
150 if (errCode != E_OK) {
151 return 0;
152 }
153 return len;
154 }
155
RegisterTransformFunc()156 int ValueSliceSync::RegisterTransformFunc()
157 {
158 TransformFunc func;
159 func.computeFunc = std::bind(&ValueSliceSync::CalculateLen, std::placeholders::_1);
160 func.serializeFunc = std::bind(&ValueSliceSync::Serialization, std::placeholders::_1,
161 std::placeholders::_2, std::placeholders::_3);
162 func.deserializeFunc = std::bind(&ValueSliceSync::DeSerialization, std::placeholders::_1,
163 std::placeholders::_2, std::placeholders::_3);
164 return MessageTransform::RegTransformFunction(VALUE_SLICE_SYNC_MESSAGE, func);
165 }
166
Initialize(MultiVerKvDBSyncInterface * storagePtr,ICommunicator * communicateHandle)167 int ValueSliceSync::Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle)
168 {
169 if ((storagePtr == nullptr) || (communicateHandle == nullptr)) {
170 return -E_INVALID_ARGS;
171 }
172 storagePtr_ = storagePtr;
173 communicateHandle_ = communicateHandle;
174 return E_OK;
175 }
176
SyncStart(MultiVerSyncTaskContext * context)177 int ValueSliceSync::SyncStart(MultiVerSyncTaskContext *context)
178 {
179 if (context == nullptr) {
180 return -E_INVALID_ARGS;
181 }
182 int entriesIndex = context->GetEntriesIndex();
183 int entriesSize = context->GetEntriesSize();
184 if (entriesSize > DBConstant::MAX_ENTRIES_SIZE) {
185 LOGE("ValueSliceSync::entriesSize too large %d", entriesSize);
186 return -E_INVALID_ARGS;
187 }
188 while (entriesIndex < entriesSize) {
189 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
190 if (performance != nullptr) {
191 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_VALUE_SLICE_NODE);
192 }
193 ValueSliceHash valueSliceHashNode;
194 int errCode = GetValidValueSliceHashNode(context, valueSliceHashNode);
195 if (performance != nullptr) {
196 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_VALUE_SLICE_NODE);
197 }
198 LOGD("ValueSliceSync::SyncStart begin errCode = %d", errCode);
199 if (errCode == E_OK) {
200 errCode = SendRequestPacket(context, valueSliceHashNode);
201 LOGD("ValueSliceSync::SyncStart send request packet dst=%s{private}, errCode = %d",
202 context->GetDeviceId().c_str(), errCode);
203 return errCode;
204 }
205 // move to next entry
206 MultiVerKvEntry *entry = nullptr;
207 std::vector<ValueSliceHash> valueHashes;
208 entriesIndex++;
209 if (entriesIndex < entriesSize) {
210 LOGD("ValueSliceSync::SyncStart begin entriesIndex = %d, entriesSize = %d", entriesIndex, entriesSize);
211 context->SetEntriesIndex(entriesIndex);
212 context->GetEntry(entriesIndex, entry);
213 errCode = entry->GetValueHash(valueHashes);
214 if (errCode != E_OK) {
215 LOGE("ValueSliceSync::entry->GetValueHash %d", errCode);
216 return errCode;
217 }
218 context->SetValueSliceHashNodes(valueHashes);
219 context->SetValueSlicesIndex(0);
220 context->SetValueSlicesSize(static_cast<int>(valueHashes.size()));
221 } else {
222 // all entries are received, move to next commit
223 return -E_NOT_FOUND;
224 }
225 }
226 return -E_NOT_FOUND;
227 }
228
RequestRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)229 int ValueSliceSync::RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
230 {
231 if (!IsPacketValid(message, TYPE_REQUEST) || context == nullptr) {
232 return -E_INVALID_ARGS;
233 }
234
235 const ValueSliceHashPacket *packet = message->GetObject<ValueSliceHashPacket>();
236 if (packet == nullptr) {
237 return -E_INVALID_ARGS;
238 }
239 ValueSliceHash valueSliceHashNode;
240 packet->GetValueSliceHash(valueSliceHashNode);
241 if ((packet->GetErrCode() == -E_LAST_SYNC_FRAME) && valueSliceHashNode.empty()) {
242 return -E_LAST_SYNC_FRAME;
243 }
244 ValueSlice valueSlice;
245 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
246 if (performance != nullptr) {
247 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_READ_VALUE_SLICE);
248 }
249 int errCode = GetValueSlice(valueSliceHashNode, valueSlice);
250 if (performance != nullptr) {
251 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_READ_VALUE_SLICE);
252 }
253 if (errCode != E_OK) {
254 LOGE("ValueSliceSync::RequestRecvCallback : GetValueSlice ERR, errno = %d", errCode);
255 }
256 errCode = SendAckPacket(context, valueSlice, errCode, message);
257 LOGD("ValueSliceSync::RequestRecvCallback : SendAckPacket, errno = %d, dst = %s{private}", errCode,
258 context->GetDeviceId().c_str());
259 if (packet->GetErrCode() == -E_LAST_SYNC_FRAME) {
260 return -E_LAST_SYNC_FRAME;
261 }
262 return errCode;
263 }
264
AckRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)265 int ValueSliceSync::AckRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
266 {
267 if (!IsPacketValid(message, TYPE_RESPONSE) || (context == nullptr)) {
268 return -E_INVALID_ARGS;
269 }
270
271 const ValueSlicePacket *packet = message->GetObject<ValueSlicePacket>();
272 if (packet == nullptr) {
273 return -E_INVALID_ARGS;
274 }
275 int errCode = E_OK;
276 packet->GetErrorCode(errCode);
277 ValueSlice valueSlice;
278 packet->GetData(valueSlice);
279 if (errCode != E_OK) {
280 return errCode;
281 }
282 int index = context->GetValueSlicesIndex();
283 ValueSliceHash hashValue;
284 context->GetValueSliceHashNode(index, hashValue);
285 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
286 if (performance != nullptr) {
287 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_SAVE_VALUE_SLICE);
288 }
289 errCode = PutValueSlice(hashValue, valueSlice);
290 if (performance != nullptr) {
291 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_SAVE_VALUE_SLICE);
292 }
293 LOGD("ValueSliceSync::AckRecvCallback PutValueSlice finished, src=%s{private}, errCode = %d",
294 context->GetDeviceId().c_str(), errCode);
295 return errCode;
296 }
297
SendFinishedRequest(const MultiVerSyncTaskContext * context)298 void ValueSliceSync::SendFinishedRequest(const MultiVerSyncTaskContext *context)
299 {
300 if (context == nullptr) {
301 return;
302 }
303
304 ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
305 if (packet == nullptr) {
306 return;
307 }
308
309 packet->SetErrCode(-E_LAST_SYNC_FRAME);
310 Message *message = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
311 if (message == nullptr) {
312 delete packet;
313 packet = nullptr;
314 return;
315 }
316
317 int errCode = message->SetExternalObject(packet);
318 if (errCode != E_OK) {
319 delete packet;
320 packet = nullptr;
321 delete message;
322 message = nullptr;
323 return;
324 }
325
326 message->SetMessageType(TYPE_REQUEST);
327 message->SetTarget(context->GetDeviceId());
328 message->SetSessionId(context->GetRequestSessionId());
329 message->SetSequenceId(context->GetSequenceId());
330 errCode = Send(message->GetTarget(), message);
331 if (errCode != E_OK) {
332 delete message;
333 message = nullptr;
334 LOGE("[ValueSliceSync][SendRequestPacket] SendRequestPacket failed, err %d", errCode);
335 }
336 LOGI("[ValueSliceSync][SendRequestPacket] SendRequestPacket dst=%s{private}", context->GetDeviceId().c_str());
337 }
338
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)339 int ValueSliceSync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
340 {
341 const ValueSliceHashPacket *packet = inMsg->GetObject<ValueSliceHashPacket>();
342 if (packet == nullptr) {
343 return -E_INVALID_ARGS;
344 }
345
346 len = packet->CalculateLen();
347 return E_OK;
348 }
349
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)350 int ValueSliceSync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
351 {
352 const ValueSliceHashPacket *packet = inMsg->GetObject<ValueSliceHashPacket>();
353 if ((packet == nullptr) || (length != packet->CalculateLen())) {
354 return -E_INVALID_ARGS;
355 }
356
357 Parcel parcel(buffer, length);
358 ValueSliceHash valueSliceHash;
359 packet->GetValueSliceHash(valueSliceHash);
360 int32_t ackCode = packet->GetErrCode();
361 // errCode Serialization
362 int32_t errCode = parcel.WriteInt(ackCode);
363 if (errCode != E_OK) {
364 return -E_SECUREC_ERROR;
365 }
366 parcel.EightByteAlign();
367 // commitMap Serialization
368 errCode = parcel.WriteVectorChar(valueSliceHash);
369 if (errCode != E_OK) {
370 return -E_SECUREC_ERROR;
371 }
372
373 return errCode;
374 }
375
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)376 int ValueSliceSync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
377 {
378 Parcel parcel(const_cast<uint8_t *>(buffer), length);
379
380 int ackCode = 0;
381 // errCode DeSerialization
382 uint32_t packLen = parcel.ReadInt(ackCode);
383 parcel.EightByteAlign();
384 packLen = Parcel::GetEightByteAlign(packLen);
385
386 ValueSliceHash valueSliceHash;
387 // commit DeSerialization
388 packLen += parcel.ReadVectorChar(valueSliceHash);
389 if (packLen != length || parcel.IsError()) {
390 return -E_INVALID_ARGS;
391 }
392 ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
393 if (packet == nullptr) {
394 LOGE("ValueSliceSync::AckPacketDeSerialization : new packet error");
395 return -E_OUT_OF_MEMORY;
396 }
397
398 packet->SetValueSliceHash(valueSliceHash);
399 int errCode = inMsg->SetExternalObject<>(packet);
400 if (errCode != E_OK) {
401 delete packet;
402 packet = nullptr;
403 }
404 return errCode;
405 }
406
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)407 int ValueSliceSync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
408 {
409 const ValueSlicePacket *packet = inMsg->GetObject<ValueSlicePacket>();
410 if (packet == nullptr) {
411 return -E_INVALID_ARGS;
412 }
413 len = packet->CalculateLen();
414 return E_OK;
415 }
416
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)417 int ValueSliceSync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
418 {
419 if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_RESPONSE)) {
420 return -E_INVALID_ARGS;
421 }
422 const ValueSlicePacket *packet = inMsg->GetObject<ValueSlicePacket>();
423 if ((packet == nullptr) || (length != packet->CalculateLen())) {
424 return -E_INVALID_ARGS;
425 }
426
427 Parcel parcel(buffer, length);
428 ValueSlice valueSlice;
429 packet->GetData(valueSlice);
430 int32_t ackCode = 0;
431 packet->GetErrorCode(ackCode);
432 // errCode Serialization
433 int32_t errCode = parcel.WriteInt(ackCode);
434 if (errCode != E_OK) {
435 return -E_SECUREC_ERROR;
436 }
437 parcel.EightByteAlign();
438
439 // commits vector Serialization
440 errCode = parcel.WriteVectorChar(valueSlice);
441 if (errCode != E_OK) {
442 return -E_SECUREC_ERROR;
443 }
444
445 return errCode;
446 }
447
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)448 int ValueSliceSync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
449 {
450 Parcel parcel(const_cast<uint8_t *>(buffer), length);
451 int32_t ackCode = 0;
452 uint32_t packLen = 0;
453 ValueSlice valueSlice;
454
455 // errCode DeSerialization
456 packLen += parcel.ReadInt(ackCode);
457 parcel.EightByteAlign();
458 packLen = Parcel::GetEightByteAlign(packLen);
459 // valueSlice DeSerialization
460 packLen += parcel.ReadVectorChar(valueSlice);
461 if (packLen != length || parcel.IsError()) {
462 LOGE("ValueSliceSync::AckPacketSerialization data error, packLen = %" PRIu32 ", length = %" PRIu32,
463 packLen, length);
464 return -E_INVALID_ARGS;
465 }
466 ValueSlicePacket *packet = new (std::nothrow) ValueSlicePacket();
467 if (packet == nullptr) {
468 LOGE("ValueSliceSync::AckPacketDeSerialization : new packet error");
469 return -E_OUT_OF_MEMORY;
470 }
471 packet->SetData(valueSlice);
472 packet->SetErrorCode(ackCode);
473 int errCode = inMsg->SetExternalObject<>(packet);
474 if (errCode != E_OK) {
475 delete packet;
476 packet = nullptr;
477 }
478 return errCode;
479 }
480
IsPacketValid(const Message * inMsg,uint16_t messageType)481 bool ValueSliceSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
482 {
483 if ((inMsg == nullptr) || (inMsg->GetMessageId() != VALUE_SLICE_SYNC_MESSAGE)) {
484 return false;
485 }
486 if (messageType != inMsg->GetMessageType()) {
487 return false;
488 }
489 return true;
490 }
491
GetValidValueSliceHashNode(MultiVerSyncTaskContext * context,ValueSliceHash & valueHashNode)492 int ValueSliceSync::GetValidValueSliceHashNode(MultiVerSyncTaskContext *context, ValueSliceHash &valueHashNode)
493 {
494 int index = context->GetValueSlicesIndex();
495 int valueNodesSize = context->GetValueSlicesSize();
496 if (valueNodesSize > MAX_VALUE_NODE_SIZE) {
497 LOGD("ValueSliceSync::GetValidValueSliceHashNode failed, too large!");
498 return -E_LENGTH_ERROR;
499 }
500 LOGD("ValueSliceSync::GetValidValueSliceHashNode ValueSlicesSize = %d", valueNodesSize);
501 if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
502 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
503 index--;
504 }
505 std::vector<ValueSliceHash> valueSliceHashNodes;
506 context->GetValueSliceHashNodes(valueSliceHashNodes);
507 index = (index < 0) ? 0 : index;
508 while (index < valueNodesSize) {
509 if (IsValueSliceExisted(valueSliceHashNodes[index])) {
510 index++;
511 context->SetValueSlicesIndex(index);
512 continue;
513 }
514 valueHashNode = valueSliceHashNodes[index];
515 return E_OK;
516 }
517 return -E_NOT_FOUND;
518 }
519
Send(const DeviceID & deviceId,const Message * inMsg)520 int ValueSliceSync::Send(const DeviceID &deviceId, const Message *inMsg)
521 {
522 SendConfig conf = {false, false, SEND_TIME_OUT, {}};
523 int errCode = communicateHandle_->SendMessage(deviceId, inMsg, conf);
524 if (errCode != E_OK) {
525 LOGE("ValueSliceSync::Send ERR! err = %d", errCode);
526 }
527 return errCode;
528 }
529
SendRequestPacket(const MultiVerSyncTaskContext * context,ValueSliceHash & valueSliceHash)530 int ValueSliceSync::SendRequestPacket(const MultiVerSyncTaskContext *context, ValueSliceHash &valueSliceHash)
531 {
532 ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
533 if (packet == nullptr) {
534 LOGE("ValueSliceSync::SendRequestPacket : new packet error");
535 return -E_OUT_OF_MEMORY;
536 }
537
538 packet->SetValueSliceHash(valueSliceHash);
539 Message *message = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
540 if (message == nullptr) {
541 delete packet;
542 packet = nullptr;
543 LOGE("ValueSliceSync::SendRequestPacket : new message error");
544 return -E_OUT_OF_MEMORY;
545 }
546
547 int errCode = message->SetExternalObject<>(packet);
548 if (errCode != E_OK) {
549 delete packet;
550 packet = nullptr;
551 delete message;
552 message = nullptr;
553 return errCode;
554 }
555
556 message->SetMessageType(TYPE_REQUEST);
557 message->SetTarget(context->GetDeviceId());
558 message->SetSessionId(context->GetRequestSessionId());
559 message->SetSequenceId(context->GetSequenceId());
560 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
561 if (performance != nullptr) {
562 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_VALUE_SLICE_SEND_REQUEST_TO_ACK_RECV);
563 }
564 errCode = Send(message->GetTarget(), message);
565 if (errCode != E_OK) {
566 delete message;
567 message = nullptr;
568 }
569 return errCode;
570 }
571
SendAckPacket(const MultiVerSyncTaskContext * context,const ValueSlice & value,int ackCode,const Message * message)572 int ValueSliceSync::SendAckPacket(const MultiVerSyncTaskContext *context, const ValueSlice &value,
573 int ackCode, const Message *message)
574 {
575 ValueSlicePacket *packet = new (std::nothrow) ValueSlicePacket();
576 if (packet == nullptr) {
577 LOGE("ValueSliceSync::SendAckPacket : packet is nullptr");
578 return -E_OUT_OF_MEMORY;
579 }
580
581 Message *ackMessage = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
582 if (ackMessage == nullptr) {
583 delete packet;
584 packet = nullptr;
585 LOGE("ValueSliceSync::SendAckPacket : new message error");
586 return -E_OUT_OF_MEMORY;
587 }
588
589 packet->SetData(value);
590 packet->SetErrorCode(static_cast<int32_t>(ackCode));
591 int errCode = ackMessage->SetExternalObject<>(packet);
592 if (errCode != E_OK) {
593 delete packet;
594 packet = nullptr;
595 delete ackMessage;
596 ackMessage = nullptr;
597 return errCode;
598 }
599
600 ackMessage->SetMessageType(TYPE_RESPONSE);
601 ackMessage->SetTarget(context->GetDeviceId());
602 ackMessage->SetSequenceId(message->GetSequenceId());
603 ackMessage->SetSessionId(message->GetSessionId());
604 errCode = Send(ackMessage->GetTarget(), ackMessage);
605 if (errCode != E_OK) {
606 delete ackMessage;
607 ackMessage = nullptr;
608 }
609
610 return errCode;
611 }
612
IsValueSliceExisted(const ValueSliceHash & value)613 bool ValueSliceSync::IsValueSliceExisted(const ValueSliceHash &value)
614 {
615 return storagePtr_->IsValueSliceExisted(value);
616 }
617
GetValueSlice(const ValueSliceHash & hashValue,ValueSlice & sliceValue)618 int ValueSliceSync::GetValueSlice(const ValueSliceHash &hashValue, ValueSlice &sliceValue)
619 {
620 return storagePtr_->GetValueSlice(hashValue, sliceValue);
621 }
622
PutValueSlice(const ValueSliceHash & hashValue,const ValueSlice & sliceValue)623 int ValueSliceSync::PutValueSlice(const ValueSliceHash &hashValue, const ValueSlice &sliceValue)
624 {
625 return storagePtr_->PutValueSlice(hashValue, sliceValue);
626 }
627 } // namespace DistributedDB
628 #endif
629