1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef OMIT_MULTI_VER
17 #include "value_slice_sync.h"
18
19 #include "db_constant.h"
20 #include "log_print.h"
21 #include "message_transform.h"
22 #include "parcel.h"
23 #include "performance_analysis.h"
24 #include "sync_types.h"
25
26 namespace DistributedDB {
27 const int ValueSliceSync::MAX_VALUE_NODE_SIZE = 100000;
28
29 // Class ValueSliceHashPacket
CalculateLen() const30 uint32_t ValueSliceHashPacket::CalculateLen() const
31 {
32 uint64_t len = Parcel::GetIntLen();
33 len = Parcel::GetEightByteAlign(len);
34 len += Parcel::GetVectorCharLen(valueSliceHash_);
35 if (len > INT32_MAX) {
36 return 0;
37 }
38 return len;
39 }
40
SetValueSliceHash(ValueSliceHash & hash)41 void ValueSliceHashPacket::SetValueSliceHash(ValueSliceHash &hash)
42 {
43 valueSliceHash_ = std::move(hash);
44 }
45
GetValueSliceHash(ValueSliceHash & hash) const46 void ValueSliceHashPacket::GetValueSliceHash(ValueSliceHash &hash) const
47 {
48 hash = valueSliceHash_;
49 }
50
SetErrCode(int32_t errCode)51 void ValueSliceHashPacket::SetErrCode(int32_t errCode)
52 {
53 errCode_ = errCode;
54 }
55
GetErrCode() const56 int32_t ValueSliceHashPacket::GetErrCode() const
57 {
58 return errCode_;
59 }
60
61 // Class ValueSlicePacket
CalculateLen() const62 uint32_t ValueSlicePacket::CalculateLen() const
63 {
64 uint64_t len = Parcel::GetIntLen();
65 len = Parcel::GetEightByteAlign(len);
66 len += Parcel::GetVectorCharLen(valueSlice_);
67 if (len > INT32_MAX) {
68 return 0;
69 }
70 return len;
71 }
72
SetData(const ValueSlice & data)73 void ValueSlicePacket::SetData(const ValueSlice &data)
74 {
75 valueSlice_ = std::move(data);
76 }
77
GetData(ValueSlice & data) const78 void ValueSlicePacket::GetData(ValueSlice &data) const
79 {
80 data = valueSlice_;
81 }
82
SetErrorCode(int32_t errCode)83 void ValueSlicePacket::SetErrorCode(int32_t errCode)
84 {
85 errorCode_ = errCode;
86 }
87
GetErrorCode(int32_t & errCode) const88 void ValueSlicePacket::GetErrorCode(int32_t &errCode) const
89 {
90 errCode = errorCode_;
91 }
92
93 // Class ValueSliceSync
~ValueSliceSync()94 ValueSliceSync::~ValueSliceSync()
95 {
96 storagePtr_ = nullptr;
97 communicateHandle_ = nullptr;
98 }
99
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)100 int ValueSliceSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
101 {
102 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
103 return -E_INVALID_ARGS;
104 }
105
106 switch (inMsg->GetMessageType()) {
107 case TYPE_REQUEST:
108 return RequestPacketSerialization(buffer, length, inMsg);
109 case TYPE_RESPONSE:
110 return AckPacketSerialization(buffer, length, inMsg);
111 default:
112 return -E_MESSAGE_TYPE_ERROR;
113 }
114 }
115
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)116 int ValueSliceSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
117 {
118 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
119 return -E_INVALID_ARGS;
120 }
121
122 switch (inMsg->GetMessageType()) {
123 case TYPE_REQUEST:
124 return RequestPacketDeSerialization(buffer, length, inMsg);
125 case TYPE_RESPONSE:
126 return AckPacketDeSerialization(buffer, length, inMsg);
127 default:
128 return -E_MESSAGE_TYPE_ERROR;
129 }
130 }
131
CalculateLen(const Message * inMsg)132 uint32_t ValueSliceSync::CalculateLen(const Message *inMsg)
133 {
134 if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
135 return 0;
136 }
137
138 uint32_t len = 0;
139 int errCode = E_OK;
140 switch (inMsg->GetMessageType()) {
141 case TYPE_REQUEST:
142 errCode = RequestPacketCalculateLen(inMsg, len);
143 break;
144 case TYPE_RESPONSE:
145 errCode = AckPacketCalculateLen(inMsg, len);
146 break;
147 default:
148 return 0;
149 }
150 if (errCode != E_OK) {
151 return 0;
152 }
153 return len;
154 }
155
RegisterTransformFunc()156 int ValueSliceSync::RegisterTransformFunc()
157 {
158 TransformFunc func;
159 func.computeFunc = std::bind(&ValueSliceSync::CalculateLen, std::placeholders::_1);
160 func.serializeFunc = std::bind(&ValueSliceSync::Serialization, std::placeholders::_1,
161 std::placeholders::_2, std::placeholders::_3);
162 func.deserializeFunc = std::bind(&ValueSliceSync::DeSerialization, std::placeholders::_1,
163 std::placeholders::_2, std::placeholders::_3);
164 return MessageTransform::RegTransformFunction(VALUE_SLICE_SYNC_MESSAGE, func);
165 }
166
Initialize(MultiVerKvDBSyncInterface * storagePtr,ICommunicator * communicateHandle)167 int ValueSliceSync::Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle)
168 {
169 if ((storagePtr == nullptr) || (communicateHandle == nullptr)) {
170 return -E_INVALID_ARGS;
171 }
172 storagePtr_ = storagePtr;
173 communicateHandle_ = communicateHandle;
174 return E_OK;
175 }
176
SyncStart(MultiVerSyncTaskContext * context)177 int ValueSliceSync::SyncStart(MultiVerSyncTaskContext *context)
178 {
179 if (context == nullptr) {
180 return -E_INVALID_ARGS;
181 }
182 int entriesIndex = context->GetEntriesIndex();
183 int entriesSize = context->GetEntriesSize();
184 if (entriesSize > DBConstant::MAX_ENTRIES_SIZE) {
185 LOGE("ValueSliceSync::entriesSize too large %d", entriesSize);
186 return -E_INVALID_ARGS;
187 }
188 while (entriesIndex < entriesSize) {
189 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
190 if (performance != nullptr) {
191 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_VALUE_SLICE_NODE);
192 }
193 ValueSliceHash valueSliceHashNode;
194 int errCode = GetValidValueSliceHashNode(context, valueSliceHashNode);
195 if (performance != nullptr) {
196 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_VALUE_SLICE_NODE);
197 }
198 LOGD("ValueSliceSync::SyncStart begin errCode = %d", errCode);
199 if (errCode == E_OK) {
200 errCode = SendRequestPacket(context, valueSliceHashNode);
201 LOGD("ValueSliceSync::SyncStart send request packet dst=%s{private}, errCode = %d",
202 context->GetDeviceId().c_str(), errCode);
203 return errCode;
204 }
205 // move to next entry
206 MultiVerKvEntry *entry = nullptr;
207 std::vector<ValueSliceHash> valueHashes;
208 entriesIndex++;
209 if (entriesIndex < entriesSize) {
210 LOGD("ValueSliceSync::SyncStart begin entriesIndex = %d, entriesSize = %d", entriesIndex, entriesSize);
211 context->SetEntriesIndex(entriesIndex);
212 context->GetEntry(entriesIndex, entry);
213 errCode = entry->GetValueHash(valueHashes);
214 if (errCode != E_OK) {
215 LOGE("ValueSliceSync::entry->GetValueHash %d", errCode);
216 return errCode;
217 }
218 context->SetValueSliceHashNodes(valueHashes);
219 context->SetValueSlicesIndex(0);
220 context->SetValueSlicesSize(static_cast<int>(valueHashes.size()));
221 } else {
222 // all entries are received, move to next commit
223 return -E_NOT_FOUND;
224 }
225 }
226 return -E_NOT_FOUND;
227 }
228
RequestRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)229 int ValueSliceSync::RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
230 {
231 if (!IsPacketValid(message, TYPE_REQUEST) || context == nullptr) {
232 return -E_INVALID_ARGS;
233 }
234
235 const ValueSliceHashPacket *packet = message->GetObject<ValueSliceHashPacket>();
236 if (packet == nullptr) {
237 return -E_INVALID_ARGS;
238 }
239 ValueSliceHash valueSliceHashNode;
240 packet->GetValueSliceHash(valueSliceHashNode);
241 if ((packet->GetErrCode() == -E_LAST_SYNC_FRAME) && valueSliceHashNode.empty()) {
242 return -E_LAST_SYNC_FRAME;
243 }
244 ValueSlice valueSlice;
245 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
246 if (performance != nullptr) {
247 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_READ_VALUE_SLICE);
248 }
249 int errCode = GetValueSlice(valueSliceHashNode, valueSlice);
250 if (performance != nullptr) {
251 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_READ_VALUE_SLICE);
252 }
253 if (errCode != E_OK) {
254 LOGE("ValueSliceSync::RequestRecvCallback : GetValueSlice ERR, errno = %d", errCode);
255 }
256 errCode = SendAckPacket(context, valueSlice, errCode, message);
257 LOGD("ValueSliceSync::RequestRecvCallback : SendAckPacket, errno = %d, dst = %s{private}", errCode,
258 context->GetDeviceId().c_str());
259 if (packet->GetErrCode() == -E_LAST_SYNC_FRAME) {
260 return -E_LAST_SYNC_FRAME;
261 }
262 return errCode;
263 }
264
AckRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)265 int ValueSliceSync::AckRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
266 {
267 if (!IsPacketValid(message, TYPE_RESPONSE) || (context == nullptr)) {
268 return -E_INVALID_ARGS;
269 }
270
271 const ValueSlicePacket *packet = message->GetObject<ValueSlicePacket>();
272 if (packet == nullptr) {
273 return -E_INVALID_ARGS;
274 }
275 int errCode = E_OK;
276 packet->GetErrorCode(errCode);
277 ValueSlice valueSlice;
278 packet->GetData(valueSlice);
279 if (errCode != E_OK) {
280 return errCode;
281 }
282 int index = context->GetValueSlicesIndex();
283 ValueSliceHash hashValue;
284 context->GetValueSliceHashNode(index, hashValue);
285 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
286 if (performance != nullptr) {
287 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_SAVE_VALUE_SLICE);
288 }
289 errCode = PutValueSlice(hashValue, valueSlice);
290 if (performance != nullptr) {
291 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_SAVE_VALUE_SLICE);
292 }
293 LOGD("ValueSliceSync::AckRecvCallback PutValueSlice finished, src=%s{private}, errCode = %d",
294 context->GetDeviceId().c_str(), errCode);
295 return errCode;
296 }
297
SendFinishedRequest(const MultiVerSyncTaskContext * context)298 void ValueSliceSync::SendFinishedRequest(const MultiVerSyncTaskContext *context)
299 {
300 if (context == nullptr) {
301 return;
302 }
303
304 ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
305 if (packet == nullptr) {
306 return;
307 }
308
309 packet->SetErrCode(-E_LAST_SYNC_FRAME);
310 Message *message = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
311 if (message == nullptr) {
312 delete packet;
313 packet = nullptr;
314 return;
315 }
316
317 int errCode = message->SetExternalObject(packet);
318 if (errCode != E_OK) {
319 delete packet;
320 packet = nullptr;
321 delete message;
322 message = nullptr;
323 return;
324 }
325
326 message->SetMessageType(TYPE_REQUEST);
327 message->SetTarget(context->GetDeviceId());
328 message->SetSessionId(context->GetRequestSessionId());
329 message->SetSequenceId(context->GetSequenceId());
330 errCode = Send(message->GetTarget(), message);
331 if (errCode != E_OK) {
332 delete message;
333 message = nullptr;
334 LOGE("[ValueSliceSync][SendRequestPacket] SendRequestPacket failed, err %d", errCode);
335 }
336 LOGI("[ValueSliceSync][SendRequestPacket] SendRequestPacket dst=%s{private}", context->GetDeviceId().c_str());
337 }
338
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)339 int ValueSliceSync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
340 {
341 const ValueSliceHashPacket *packet = inMsg->GetObject<ValueSliceHashPacket>();
342 if (packet == nullptr) {
343 return -E_INVALID_ARGS;
344 }
345
346 len = packet->CalculateLen();
347 return E_OK;
348 }
349
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)350 int ValueSliceSync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
351 {
352 const ValueSliceHashPacket *packet = inMsg->GetObject<ValueSliceHashPacket>();
353 if ((packet == nullptr) || (length != packet->CalculateLen())) {
354 return -E_INVALID_ARGS;
355 }
356
357 Parcel parcel(buffer, length);
358 ValueSliceHash valueSliceHash;
359 packet->GetValueSliceHash(valueSliceHash);
360 int32_t ackCode = packet->GetErrCode();
361 // errCode Serialization
362 int32_t errCode = parcel.WriteInt(ackCode);
363 if (errCode != E_OK) {
364 return -E_SECUREC_ERROR;
365 }
366 parcel.EightByteAlign();
367 // commitMap Serialization
368 errCode = parcel.WriteVectorChar(valueSliceHash);
369 if (errCode != E_OK) {
370 return -E_SECUREC_ERROR;
371 }
372
373 return errCode;
374 }
375
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)376 int ValueSliceSync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
377 {
378 Parcel parcel(const_cast<uint8_t *>(buffer), length);
379
380 int ackCode = 0;
381 // errCode DeSerialization
382 uint32_t packLen = parcel.ReadInt(ackCode);
383 parcel.EightByteAlign();
384 if (parcel.IsError()) {
385 return -E_PARSE_FAIL;
386 }
387 packLen = Parcel::GetEightByteAlign(packLen);
388
389 ValueSliceHash valueSliceHash;
390 // commit DeSerialization
391 packLen += parcel.ReadVectorChar(valueSliceHash);
392 if (packLen != length || parcel.IsError()) {
393 return -E_INVALID_ARGS;
394 }
395 ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
396 if (packet == nullptr) {
397 LOGE("ValueSliceSync::AckPacketDeSerialization : new packet error");
398 return -E_OUT_OF_MEMORY;
399 }
400
401 packet->SetValueSliceHash(valueSliceHash);
402 int errCode = inMsg->SetExternalObject<>(packet);
403 if (errCode != E_OK) {
404 delete packet;
405 packet = nullptr;
406 }
407 return errCode;
408 }
409
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)410 int ValueSliceSync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
411 {
412 const ValueSlicePacket *packet = inMsg->GetObject<ValueSlicePacket>();
413 if (packet == nullptr) {
414 return -E_INVALID_ARGS;
415 }
416 len = packet->CalculateLen();
417 return E_OK;
418 }
419
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)420 int ValueSliceSync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
421 {
422 if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_RESPONSE)) {
423 return -E_INVALID_ARGS;
424 }
425 const ValueSlicePacket *packet = inMsg->GetObject<ValueSlicePacket>();
426 if ((packet == nullptr) || (length != packet->CalculateLen())) {
427 return -E_INVALID_ARGS;
428 }
429
430 Parcel parcel(buffer, length);
431 ValueSlice valueSlice;
432 packet->GetData(valueSlice);
433 int32_t ackCode = 0;
434 packet->GetErrorCode(ackCode);
435 // errCode Serialization
436 int32_t errCode = parcel.WriteInt(ackCode);
437 if (errCode != E_OK) {
438 return -E_SECUREC_ERROR;
439 }
440 parcel.EightByteAlign();
441
442 // commits vector Serialization
443 errCode = parcel.WriteVectorChar(valueSlice);
444 if (errCode != E_OK) {
445 return -E_SECUREC_ERROR;
446 }
447
448 return errCode;
449 }
450
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)451 int ValueSliceSync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
452 {
453 Parcel parcel(const_cast<uint8_t *>(buffer), length);
454 int32_t ackCode = 0;
455 uint32_t packLen = 0;
456 ValueSlice valueSlice;
457
458 // errCode DeSerialization
459 packLen += parcel.ReadInt(ackCode);
460 parcel.EightByteAlign();
461 if (parcel.IsError()) {
462 return -E_PARSE_FAIL;
463 }
464 packLen = Parcel::GetEightByteAlign(packLen);
465 // valueSlice DeSerialization
466 packLen += parcel.ReadVectorChar(valueSlice);
467 if (packLen != length || parcel.IsError()) {
468 LOGE("ValueSliceSync::AckPacketSerialization data error, packLen = %" PRIu32 ", length = %" PRIu32,
469 packLen, length);
470 return -E_INVALID_ARGS;
471 }
472 ValueSlicePacket *packet = new (std::nothrow) ValueSlicePacket();
473 if (packet == nullptr) {
474 LOGE("ValueSliceSync::AckPacketDeSerialization : new packet error");
475 return -E_OUT_OF_MEMORY;
476 }
477 packet->SetData(valueSlice);
478 packet->SetErrorCode(ackCode);
479 int errCode = inMsg->SetExternalObject<>(packet);
480 if (errCode != E_OK) {
481 delete packet;
482 packet = nullptr;
483 }
484 return errCode;
485 }
486
IsPacketValid(const Message * inMsg,uint16_t messageType)487 bool ValueSliceSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
488 {
489 if ((inMsg == nullptr) || (inMsg->GetMessageId() != VALUE_SLICE_SYNC_MESSAGE)) {
490 return false;
491 }
492 if (messageType != inMsg->GetMessageType()) {
493 return false;
494 }
495 return true;
496 }
497
GetValidValueSliceHashNode(MultiVerSyncTaskContext * context,ValueSliceHash & valueHashNode)498 int ValueSliceSync::GetValidValueSliceHashNode(MultiVerSyncTaskContext *context, ValueSliceHash &valueHashNode)
499 {
500 int index = context->GetValueSlicesIndex();
501 int valueNodesSize = context->GetValueSlicesSize();
502 if (valueNodesSize > MAX_VALUE_NODE_SIZE) {
503 LOGD("ValueSliceSync::GetValidValueSliceHashNode failed, too large!");
504 return -E_LENGTH_ERROR;
505 }
506 LOGD("ValueSliceSync::GetValidValueSliceHashNode ValueSlicesSize = %d", valueNodesSize);
507 if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
508 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
509 index--;
510 }
511 std::vector<ValueSliceHash> valueSliceHashNodes;
512 context->GetValueSliceHashNodes(valueSliceHashNodes);
513 index = (index < 0) ? 0 : index;
514 while (index < valueNodesSize) {
515 if (IsValueSliceExisted(valueSliceHashNodes[index])) {
516 index++;
517 context->SetValueSlicesIndex(index);
518 continue;
519 }
520 valueHashNode = valueSliceHashNodes[index];
521 return E_OK;
522 }
523 return -E_NOT_FOUND;
524 }
525
Send(const DeviceID & deviceId,const Message * inMsg)526 int ValueSliceSync::Send(const DeviceID &deviceId, const Message *inMsg)
527 {
528 SendConfig conf = {false, false, SEND_TIME_OUT, {}};
529 int errCode = communicateHandle_->SendMessage(deviceId, inMsg, conf);
530 if (errCode != E_OK) {
531 LOGE("ValueSliceSync::Send ERR! err = %d", errCode);
532 }
533 return errCode;
534 }
535
SendRequestPacket(const MultiVerSyncTaskContext * context,ValueSliceHash & valueSliceHash)536 int ValueSliceSync::SendRequestPacket(const MultiVerSyncTaskContext *context, ValueSliceHash &valueSliceHash)
537 {
538 ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
539 if (packet == nullptr) {
540 LOGE("ValueSliceSync::SendRequestPacket : new packet error");
541 return -E_OUT_OF_MEMORY;
542 }
543
544 packet->SetValueSliceHash(valueSliceHash);
545 Message *message = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
546 if (message == nullptr) {
547 delete packet;
548 packet = nullptr;
549 LOGE("ValueSliceSync::SendRequestPacket : new message error");
550 return -E_OUT_OF_MEMORY;
551 }
552
553 int errCode = message->SetExternalObject<>(packet);
554 if (errCode != E_OK) {
555 delete packet;
556 packet = nullptr;
557 delete message;
558 message = nullptr;
559 return errCode;
560 }
561
562 message->SetMessageType(TYPE_REQUEST);
563 message->SetTarget(context->GetDeviceId());
564 message->SetSessionId(context->GetRequestSessionId());
565 message->SetSequenceId(context->GetSequenceId());
566 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
567 if (performance != nullptr) {
568 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_VALUE_SLICE_SEND_REQUEST_TO_ACK_RECV);
569 }
570 errCode = Send(message->GetTarget(), message);
571 if (errCode != E_OK) {
572 delete message;
573 message = nullptr;
574 }
575 return errCode;
576 }
577
SendAckPacket(const MultiVerSyncTaskContext * context,const ValueSlice & value,int ackCode,const Message * message)578 int ValueSliceSync::SendAckPacket(const MultiVerSyncTaskContext *context, const ValueSlice &value,
579 int ackCode, const Message *message)
580 {
581 ValueSlicePacket *packet = new (std::nothrow) ValueSlicePacket();
582 if (packet == nullptr) {
583 LOGE("ValueSliceSync::SendAckPacket : packet is nullptr");
584 return -E_OUT_OF_MEMORY;
585 }
586
587 Message *ackMessage = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
588 if (ackMessage == nullptr) {
589 delete packet;
590 packet = nullptr;
591 LOGE("ValueSliceSync::SendAckPacket : new message error");
592 return -E_OUT_OF_MEMORY;
593 }
594
595 packet->SetData(value);
596 packet->SetErrorCode(static_cast<int32_t>(ackCode));
597 int errCode = ackMessage->SetExternalObject<>(packet);
598 if (errCode != E_OK) {
599 delete packet;
600 packet = nullptr;
601 delete ackMessage;
602 ackMessage = nullptr;
603 return errCode;
604 }
605
606 ackMessage->SetMessageType(TYPE_RESPONSE);
607 ackMessage->SetTarget(context->GetDeviceId());
608 ackMessage->SetSequenceId(message->GetSequenceId());
609 ackMessage->SetSessionId(message->GetSessionId());
610 errCode = Send(ackMessage->GetTarget(), ackMessage);
611 if (errCode != E_OK) {
612 delete ackMessage;
613 ackMessage = nullptr;
614 }
615
616 return errCode;
617 }
618
IsValueSliceExisted(const ValueSliceHash & value)619 bool ValueSliceSync::IsValueSliceExisted(const ValueSliceHash &value)
620 {
621 return storagePtr_->IsValueSliceExisted(value);
622 }
623
GetValueSlice(const ValueSliceHash & hashValue,ValueSlice & sliceValue)624 int ValueSliceSync::GetValueSlice(const ValueSliceHash &hashValue, ValueSlice &sliceValue)
625 {
626 return storagePtr_->GetValueSlice(hashValue, sliceValue);
627 }
628
PutValueSlice(const ValueSliceHash & hashValue,const ValueSlice & sliceValue)629 int ValueSliceSync::PutValueSlice(const ValueSliceHash &hashValue, const ValueSlice &sliceValue)
630 {
631 return storagePtr_->PutValueSlice(hashValue, sliceValue);
632 }
633 } // namespace DistributedDB
634 #endif
635