1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "ability_sync.h"
17
18 #include "message_transform.h"
19 #include "version.h"
20 #include "db_errno.h"
21 #include "log_print.h"
22 #include "sync_types.h"
23 #include "db_common.h"
24 #include "single_ver_kvdb_sync_interface.h"
25 #include "single_ver_sync_task_context.h"
26 #include "single_ver_kv_sync_task_context.h"
27 #ifdef RELATIONAL_STORE
28 #include "relational_db_sync_interface.h"
29 #include "single_ver_relational_sync_task_context.h"
30 #endif
31
32 namespace DistributedDB {
AbilitySyncRequestPacket()33 AbilitySyncRequestPacket::AbilitySyncRequestPacket()
34 : protocolVersion_(ABILITY_SYNC_VERSION_V1),
35 sendCode_(E_OK),
36 softwareVersion_(SOFTWARE_VERSION_CURRENT),
37 secLabel_(0),
38 secFlag_(0),
39 schemaType_(0),
40 dbCreateTime_(0)
41 {
42 }
43
~AbilitySyncRequestPacket()44 AbilitySyncRequestPacket::~AbilitySyncRequestPacket()
45 {
46 }
47
SetProtocolVersion(uint32_t protocolVersion)48 void AbilitySyncRequestPacket::SetProtocolVersion(uint32_t protocolVersion)
49 {
50 protocolVersion_ = protocolVersion;
51 }
52
GetProtocolVersion() const53 uint32_t AbilitySyncRequestPacket::GetProtocolVersion() const
54 {
55 return protocolVersion_;
56 }
57
SetSendCode(int32_t sendCode)58 void AbilitySyncRequestPacket::SetSendCode(int32_t sendCode)
59 {
60 sendCode_ = sendCode;
61 }
62
GetSendCode() const63 int32_t AbilitySyncRequestPacket::GetSendCode() const
64 {
65 return sendCode_;
66 }
67
SetSoftwareVersion(uint32_t swVersion)68 void AbilitySyncRequestPacket::SetSoftwareVersion(uint32_t swVersion)
69 {
70 softwareVersion_ = swVersion;
71 }
72
GetSoftwareVersion() const73 uint32_t AbilitySyncRequestPacket::GetSoftwareVersion() const
74 {
75 return softwareVersion_;
76 }
77
SetSchema(const std::string & schema)78 void AbilitySyncRequestPacket::SetSchema(const std::string &schema)
79 {
80 schema_ = schema;
81 }
82
GetSchema() const83 std::string AbilitySyncRequestPacket::GetSchema() const
84 {
85 return schema_;
86 }
87
SetSchemaType(uint32_t schemaType)88 void AbilitySyncRequestPacket::SetSchemaType(uint32_t schemaType)
89 {
90 schemaType_ = schemaType;
91 }
92
GetSchemaType() const93 uint32_t AbilitySyncRequestPacket::GetSchemaType() const
94 {
95 return schemaType_;
96 }
97
SetSecLabel(int32_t secLabel)98 void AbilitySyncRequestPacket::SetSecLabel(int32_t secLabel)
99 {
100 secLabel_ = secLabel;
101 }
102
GetSecLabel() const103 int32_t AbilitySyncRequestPacket::GetSecLabel() const
104 {
105 return secLabel_;
106 }
107
SetSecFlag(int32_t secFlag)108 void AbilitySyncRequestPacket::SetSecFlag(int32_t secFlag)
109 {
110 secFlag_ = secFlag;
111 }
112
GetSecFlag() const113 int32_t AbilitySyncRequestPacket::GetSecFlag() const
114 {
115 return secFlag_;
116 }
117
SetDbCreateTime(uint64_t dbCreateTime)118 void AbilitySyncRequestPacket::SetDbCreateTime(uint64_t dbCreateTime)
119 {
120 dbCreateTime_ = dbCreateTime;
121 }
122
GetDbCreateTime() const123 uint64_t AbilitySyncRequestPacket::GetDbCreateTime() const
124 {
125 return dbCreateTime_;
126 }
127
CalculateLen() const128 uint32_t AbilitySyncRequestPacket::CalculateLen() const
129 {
130 uint64_t len = 0;
131 len += Parcel::GetUInt32Len(); // protocolVersion_
132 len += Parcel::GetIntLen(); // sendCode_
133 len += Parcel::GetUInt32Len(); // softwareVersion_
134 uint32_t schemLen = Parcel::GetStringLen(schema_);
135 if (schemLen == 0) {
136 LOGE("[AbilitySyncRequestPacket][CalculateLen] schemLen err!");
137 return 0;
138 }
139 len += schemLen;
140 len += Parcel::GetIntLen(); // secLabel_
141 len += Parcel::GetIntLen(); // secFlag_
142 len += Parcel::GetUInt32Len(); // schemaType_
143 len += Parcel::GetUInt64Len(); // dbCreateTime_
144 len += DbAbility::CalculateLen(dbAbility_); // dbAbility_
145 // the reason why not 8-byte align is that old version is not 8-byte align
146 // so it is not possible to set 8-byte align for high version.
147 if (len > INT32_MAX) {
148 LOGE("[AbilitySyncRequestPacket][CalculateLen] err len:%" PRIu64, len);
149 return 0;
150 }
151 return len;
152 }
153
GetDbAbility() const154 DbAbility AbilitySyncRequestPacket::GetDbAbility() const
155 {
156 return dbAbility_;
157 }
158
SetDbAbility(const DbAbility & dbAbility)159 void AbilitySyncRequestPacket::SetDbAbility(const DbAbility &dbAbility)
160 {
161 dbAbility_ = dbAbility;
162 }
163
AbilitySyncAckPacket()164 AbilitySyncAckPacket::AbilitySyncAckPacket()
165 : protocolVersion_(ABILITY_SYNC_VERSION_V1),
166 softwareVersion_(SOFTWARE_VERSION_CURRENT),
167 ackCode_(E_OK),
168 secLabel_(0),
169 secFlag_(0),
170 schemaType_(0),
171 permitSync_(0),
172 requirePeerConvert_(0),
173 dbCreateTime_(0)
174 {
175 }
176
~AbilitySyncAckPacket()177 AbilitySyncAckPacket::~AbilitySyncAckPacket()
178 {
179 }
180
SetProtocolVersion(uint32_t protocolVersion)181 void AbilitySyncAckPacket::SetProtocolVersion(uint32_t protocolVersion)
182 {
183 protocolVersion_ = protocolVersion;
184 }
185
SetSoftwareVersion(uint32_t swVersion)186 void AbilitySyncAckPacket::SetSoftwareVersion(uint32_t swVersion)
187 {
188 softwareVersion_ = swVersion;
189 }
190
GetSoftwareVersion() const191 uint32_t AbilitySyncAckPacket::GetSoftwareVersion() const
192 {
193 return softwareVersion_;
194 }
195
GetProtocolVersion() const196 uint32_t AbilitySyncAckPacket::GetProtocolVersion() const
197 {
198 return protocolVersion_;
199 }
200
SetAckCode(int32_t ackCode)201 void AbilitySyncAckPacket::SetAckCode(int32_t ackCode)
202 {
203 ackCode_ = ackCode;
204 }
205
GetAckCode() const206 int32_t AbilitySyncAckPacket::GetAckCode() const
207 {
208 return ackCode_;
209 }
210
SetSchema(const std::string & schema)211 void AbilitySyncAckPacket::SetSchema(const std::string &schema)
212 {
213 schema_ = schema;
214 }
215
GetSchema() const216 std::string AbilitySyncAckPacket::GetSchema() const
217 {
218 return schema_;
219 }
220
SetSchemaType(uint32_t schemaType)221 void AbilitySyncAckPacket::SetSchemaType(uint32_t schemaType)
222 {
223 schemaType_ = schemaType;
224 }
225
GetSchemaType() const226 uint32_t AbilitySyncAckPacket::GetSchemaType() const
227 {
228 return schemaType_;
229 }
230
SetSecLabel(int32_t secLabel)231 void AbilitySyncAckPacket::SetSecLabel(int32_t secLabel)
232 {
233 secLabel_ = secLabel;
234 }
235
GetSecLabel() const236 int32_t AbilitySyncAckPacket::GetSecLabel() const
237 {
238 return secLabel_;
239 }
240
SetSecFlag(int32_t secFlag)241 void AbilitySyncAckPacket::SetSecFlag(int32_t secFlag)
242 {
243 secFlag_ = secFlag;
244 }
245
GetSecFlag() const246 int32_t AbilitySyncAckPacket::GetSecFlag() const
247 {
248 return secFlag_;
249 }
250
SetPermitSync(uint32_t permitSync)251 void AbilitySyncAckPacket::SetPermitSync(uint32_t permitSync)
252 {
253 permitSync_ = permitSync;
254 }
255
GetPermitSync() const256 uint32_t AbilitySyncAckPacket::GetPermitSync() const
257 {
258 return permitSync_;
259 }
260
SetRequirePeerConvert(uint32_t requirePeerConvert)261 void AbilitySyncAckPacket::SetRequirePeerConvert(uint32_t requirePeerConvert)
262 {
263 requirePeerConvert_ = requirePeerConvert;
264 }
265
GetRequirePeerConvert() const266 uint32_t AbilitySyncAckPacket::GetRequirePeerConvert() const
267 {
268 return requirePeerConvert_;
269 }
270
SetDbCreateTime(uint64_t dbCreateTime)271 void AbilitySyncAckPacket::SetDbCreateTime(uint64_t dbCreateTime)
272 {
273 dbCreateTime_ = dbCreateTime;
274 }
275
GetDbCreateTime() const276 uint64_t AbilitySyncAckPacket::GetDbCreateTime() const
277 {
278 return dbCreateTime_;
279 }
280
CalculateLen() const281 uint32_t AbilitySyncAckPacket::CalculateLen() const
282 {
283 uint64_t len = 0;
284 len += Parcel::GetUInt32Len();
285 len += Parcel::GetUInt32Len();
286 len += Parcel::GetIntLen();
287 uint32_t schemLen = Parcel::GetStringLen(schema_);
288 if (schemLen == 0) {
289 LOGE("[AbilitySyncAckPacket][CalculateLen] schemLen err!");
290 return 0;
291 }
292 len += schemLen;
293 len += Parcel::GetIntLen(); // secLabel_
294 len += Parcel::GetIntLen(); // secFlag_
295 len += Parcel::GetUInt32Len(); // schemaType_
296 len += Parcel::GetUInt32Len(); // permitSync_
297 len += Parcel::GetUInt32Len(); // requirePeerConvert_
298 len += Parcel::GetUInt64Len(); // dbCreateTime_
299 len += DbAbility::CalculateLen(dbAbility_); // dbAbility_
300 len += SchemaNegotiate::CalculateParcelLen(relationalSyncOpinion_);
301 if (len > INT32_MAX) {
302 LOGE("[AbilitySyncAckPacket][CalculateLen] err len:%" PRIu64, len);
303 return 0;
304 }
305 return len;
306 }
307
GetDbAbility() const308 DbAbility AbilitySyncAckPacket::GetDbAbility() const
309 {
310 return dbAbility_;
311 }
312
SetDbAbility(const DbAbility & dbAbility)313 void AbilitySyncAckPacket::SetDbAbility(const DbAbility &dbAbility)
314 {
315 dbAbility_ = dbAbility;
316 }
317
SetRelationalSyncOpinion(const RelationalSyncOpinion & relationalSyncOpinion)318 void AbilitySyncAckPacket::SetRelationalSyncOpinion(const RelationalSyncOpinion &relationalSyncOpinion)
319 {
320 relationalSyncOpinion_ = relationalSyncOpinion;
321 }
322
GetRelationalSyncOpinion() const323 RelationalSyncOpinion AbilitySyncAckPacket::GetRelationalSyncOpinion() const
324 {
325 return relationalSyncOpinion_;
326 }
327
AbilitySync()328 AbilitySync::AbilitySync()
329 : communicator_(nullptr),
330 storageInterface_(nullptr),
331 metadata_(nullptr),
332 syncFinished_(false)
333 {
334 }
335
~AbilitySync()336 AbilitySync::~AbilitySync()
337 {
338 communicator_ = nullptr;
339 storageInterface_ = nullptr;
340 metadata_ = nullptr;
341 }
342
Initialize(ICommunicator * inCommunicator,ISyncInterface * inStorage,std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)343 int AbilitySync::Initialize(ICommunicator *inCommunicator, ISyncInterface *inStorage,
344 std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
345 {
346 if (inCommunicator == nullptr || inStorage == nullptr || deviceId.empty() || inMetadata == nullptr) {
347 return -E_INVALID_ARGS;
348 }
349 communicator_ = inCommunicator;
350 storageInterface_ = inStorage;
351 metadata_ = inMetadata;
352 deviceId_ = deviceId;
353 return E_OK;
354 }
355
SyncStart(uint32_t sessionId,uint32_t sequenceId,uint16_t remoteCommunicatorVersion,const CommErrHandler & handler)356 int AbilitySync::SyncStart(uint32_t sessionId, uint32_t sequenceId, uint16_t remoteCommunicatorVersion,
357 const CommErrHandler &handler)
358 {
359 AbilitySyncRequestPacket packet;
360 int errCode = SetAbilityRequestBodyInfo(packet, remoteCommunicatorVersion);
361 if (errCode != E_OK) {
362 return errCode;
363 }
364 Message *message = new (std::nothrow) Message(ABILITY_SYNC_MESSAGE);
365 if (message == nullptr) {
366 return -E_OUT_OF_MEMORY;
367 }
368 message->SetMessageType(TYPE_REQUEST);
369 errCode = message->SetCopiedObject<>(packet);
370 if (errCode != E_OK) {
371 LOGE("[AbilitySync][SyncStart] SetCopiedObject failed, err %d", errCode);
372 delete message;
373 message = nullptr;
374 return errCode;
375 }
376 message->SetVersion(MSG_VERSION_EXT);
377 message->SetSessionId(sessionId);
378 message->SetSequenceId(sequenceId);
379 SendConfig conf;
380 SetSendConfigParam(storageInterface_->GetDbProperties(), deviceId_, false, SEND_TIME_OUT, conf);
381 errCode = communicator_->SendMessage(deviceId_, message, conf, handler);
382 if (errCode != E_OK) {
383 LOGE("[AbilitySync][SyncStart] SendPacket failed, err %d", errCode);
384 delete message;
385 message = nullptr;
386 }
387 return errCode;
388 }
389
AckRecv(const Message * message,ISyncTaskContext * context)390 int AbilitySync::AckRecv(const Message *message, ISyncTaskContext *context)
391 {
392 int errCode = AckMsgCheck(message, context);
393 if (errCode != E_OK) {
394 return errCode;
395 }
396 const AbilitySyncAckPacket *packet = message->GetObject<AbilitySyncAckPacket>();
397 if (packet == nullptr) {
398 return -E_INVALID_ARGS;
399 }
400 uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
401 context->SetRemoteSoftwareVersion(remoteSoftwareVersion);
402 if (remoteSoftwareVersion > SOFTWARE_VERSION_RELEASE_2_0) {
403 errCode = AckRecvWithHighVersion(message, context, packet);
404 } else {
405 std::string schema = packet->GetSchema();
406 uint8_t schemaType = packet->GetSchemaType();
407 bool isCompatible = static_cast<SyncGenericInterface *>(storageInterface_)->CheckCompatible(schema, schemaType);
408 if (!isCompatible) {
409 (static_cast<SingleVerSyncTaskContext *>(context))->SetTaskErrCode(-E_SCHEMA_MISMATCH);
410 LOGE("[AbilitySync][AckRecv] scheme check failed");
411 return -E_SCHEMA_MISMATCH;
412 }
413 LOGI("[AbilitySync][AckRecv]remoteSoftwareVersion = %u, isCompatible = %d,", remoteSoftwareVersion,
414 isCompatible);
415 }
416 return errCode;
417 }
418
RequestRecv(const Message * message,ISyncTaskContext * context)419 int AbilitySync::RequestRecv(const Message *message, ISyncTaskContext *context)
420 {
421 if (message == nullptr || context == nullptr) {
422 return -E_INVALID_ARGS;
423 }
424 const AbilitySyncRequestPacket *packet = message->GetObject<AbilitySyncRequestPacket>();
425 if (packet == nullptr) {
426 return -E_INVALID_ARGS;
427 }
428 if (packet->GetSendCode() == -E_VERSION_NOT_SUPPORT) {
429 AbilitySyncAckPacket ackPacket;
430 (void)SendAck(message, -E_VERSION_NOT_SUPPORT, false, ackPacket);
431 LOGI("[AbilitySync][RequestRecv] version can not support, remote version is %u", packet->GetProtocolVersion());
432 return -E_VERSION_NOT_SUPPORT;
433 }
434
435 std::string schema = packet->GetSchema();
436 uint8_t schemaType = packet->GetSchemaType();
437 bool isCompatible = static_cast<SyncGenericInterface *>(storageInterface_)->CheckCompatible(schema, schemaType);
438 if (!isCompatible) {
439 (static_cast<SingleVerSyncTaskContext *>(context))->SetTaskErrCode(-E_SCHEMA_MISMATCH);
440 }
441 uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
442 context->SetRemoteSoftwareVersion(remoteSoftwareVersion);
443 return HandleRequestRecv(message, context, isCompatible);
444 }
445
AckNotifyRecv(const Message * message,ISyncTaskContext * context)446 int AbilitySync::AckNotifyRecv(const Message *message, ISyncTaskContext *context)
447 {
448 if (message == nullptr || context == nullptr) {
449 return -E_INVALID_ARGS;
450 }
451 if (message->GetErrorNo() == E_FEEDBACK_UNKNOWN_MESSAGE) {
452 LOGE("[AbilitySync][AckNotifyRecv] Remote device dose not support this message id");
453 context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
454 return -E_FEEDBACK_UNKNOWN_MESSAGE;
455 }
456 const AbilitySyncAckPacket *packet = message->GetObject<AbilitySyncAckPacket>();
457 if (packet == nullptr) {
458 return -E_INVALID_ARGS;
459 }
460 int errCode = packet->GetAckCode();
461 if (errCode != E_OK) {
462 LOGE("[AbilitySync][AckNotifyRecv] received an errCode %d", errCode);
463 return errCode;
464 }
465 std::string schema = packet->GetSchema();
466 uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
467 context->SetRemoteSoftwareVersion(remoteSoftwareVersion);
468 AbilitySyncAckPacket sendPacket;
469 errCode = HandleVersionV3AckSchemaParam(packet, sendPacket, context, false);
470 int ackCode = errCode;
471 LOGI("[AckNotifyRecv] receive dev = %s ack notify, remoteSoftwareVersion = %u, ackCode = %d",
472 STR_MASK(deviceId_), remoteSoftwareVersion, errCode);
473 if (errCode == E_OK) {
474 (static_cast<SingleVerSyncTaskContext *>(context))->SetIsSchemaSync(true);
475 ackCode = AbilitySync::LAST_NOTIFY;
476 }
477 (void)SendAckWithEmptySchema(message, ackCode, true);
478 return errCode;
479 }
480
GetAbilitySyncFinishedStatus() const481 bool AbilitySync::GetAbilitySyncFinishedStatus() const
482 {
483 return syncFinished_;
484 }
485
SetAbilitySyncFinishedStatus(bool syncFinished)486 void AbilitySync::SetAbilitySyncFinishedStatus(bool syncFinished)
487 {
488 syncFinished_ = syncFinished;
489 }
490
SecLabelCheck(const AbilitySyncRequestPacket * packet) const491 bool AbilitySync::SecLabelCheck(const AbilitySyncRequestPacket *packet) const
492 {
493 int32_t remoteSecLabel = packet->GetSecLabel();
494 int32_t remoteSecFlag = packet->GetSecFlag();
495 if (remoteSecLabel == NOT_SURPPORT_SEC_CLASSIFICATION || remoteSecLabel == SecurityLabel::NOT_SET) {
496 return true;
497 }
498 SecurityOption option;
499 int errCode = (static_cast<SyncGenericInterface *>(storageInterface_))->GetSecurityOption(option);
500 LOGI("[AbilitySync][RequestRecv] local l:%d, f:%d, errCode:%d", option.securityLabel, option.securityFlag, errCode);
501 if (errCode == -E_NOT_SUPPORT || (errCode == E_OK && option.securityLabel == SecurityLabel::NOT_SET)) {
502 return true;
503 }
504 if (remoteSecLabel == FAILED_GET_SEC_CLASSIFICATION || errCode != E_OK) {
505 LOGE("[AbilitySync][RequestRecv] check error remoteL:%d, errCode:%d", remoteSecLabel, errCode);
506 return false;
507 }
508 if (remoteSecLabel == option.securityLabel) {
509 return true;
510 } else {
511 LOGE("[AbilitySync][RequestRecv] check error remote:%d , %d local:%d , %d",
512 remoteSecLabel, remoteSecFlag, option.securityLabel, option.securityFlag);
513 return false;
514 }
515 }
516
HandleVersionV3RequestParam(const AbilitySyncRequestPacket * packet,ISyncTaskContext * context) const517 void AbilitySync::HandleVersionV3RequestParam(const AbilitySyncRequestPacket *packet, ISyncTaskContext *context) const
518 {
519 int32_t remoteSecLabel = packet->GetSecLabel();
520 int32_t remoteSecFlag = packet->GetSecFlag();
521 DbAbility remoteDbAbility = packet->GetDbAbility();
522 (static_cast<SingleVerSyncTaskContext *>(context))->SetDbAbility(remoteDbAbility);
523 (static_cast<SingleVerSyncTaskContext *>(context))->SetRemoteSeccurityOption({remoteSecLabel, remoteSecFlag});
524 (static_cast<SingleVerSyncTaskContext *>(context))->SetReceivcPermitCheck(false);
525 LOGI("[AbilitySync][HandleVersionV3RequestParam] remoteSecLabel = %d, remoteSecFlag = %d, remoteSchemaType = %u",
526 remoteSecLabel, remoteSecFlag, packet->GetSchemaType());
527 }
528
HandleVersionV3AckSecOptionParam(const AbilitySyncAckPacket * packet,ISyncTaskContext * context) const529 void AbilitySync::HandleVersionV3AckSecOptionParam(const AbilitySyncAckPacket *packet,
530 ISyncTaskContext *context) const
531 {
532 int32_t remoteSecLabel = packet->GetSecLabel();
533 int32_t remoteSecFlag = packet->GetSecFlag();
534 SecurityOption secOption = {remoteSecLabel, remoteSecFlag};
535 (static_cast<SingleVerSyncTaskContext *>(context))->SetRemoteSeccurityOption(secOption);
536 (static_cast<SingleVerSyncTaskContext *>(context))->SetSendPermitCheck(false);
537 LOGI("[AbilitySync][AckRecv] remoteSecLabel = %d, remoteSecFlag = %d", remoteSecLabel, remoteSecFlag);
538 }
539
HandleVersionV3AckSchemaParam(const AbilitySyncAckPacket * recvPacket,AbilitySyncAckPacket & sendPacket,ISyncTaskContext * context,bool sendOpinion) const540 int AbilitySync::HandleVersionV3AckSchemaParam(const AbilitySyncAckPacket *recvPacket,
541 AbilitySyncAckPacket &sendPacket, ISyncTaskContext *context, bool sendOpinion) const
542 {
543 if (IsSingleRelationalVer()) {
544 return HandleRelationAckSchemaParam(recvPacket, sendPacket, context, sendOpinion);
545 }
546 HandleKvAckSchemaParam(recvPacket, context, sendPacket);
547 return E_OK;
548 }
549
GetPacketSecOption(SecurityOption & option) const550 void AbilitySync::GetPacketSecOption(SecurityOption &option) const
551 {
552 int errCode =
553 (static_cast<SyncGenericInterface *>(storageInterface_))->GetSecurityOption(option);
554 if (errCode == -E_NOT_SUPPORT) {
555 LOGE("[AbilitySync][SyncStart] GetSecOpt not surpport sec classification");
556 option.securityLabel = NOT_SURPPORT_SEC_CLASSIFICATION;
557 } else if (errCode != E_OK) {
558 LOGE("[AbilitySync][SyncStart] GetSecOpt errCode:%d", errCode);
559 option.securityLabel = FAILED_GET_SEC_CLASSIFICATION;
560 }
561 }
562
RegisterTransformFunc()563 int AbilitySync::RegisterTransformFunc()
564 {
565 TransformFunc func;
566 func.computeFunc = std::bind(&AbilitySync::CalculateLen, std::placeholders::_1);
567 func.serializeFunc = std::bind(&AbilitySync::Serialization, std::placeholders::_1,
568 std::placeholders::_2, std::placeholders::_3);
569 func.deserializeFunc = std::bind(&AbilitySync::DeSerialization, std::placeholders::_1,
570 std::placeholders::_2, std::placeholders::_3);
571 return MessageTransform::RegTransformFunction(ABILITY_SYNC_MESSAGE, func);
572 }
573
CalculateLen(const Message * inMsg)574 uint32_t AbilitySync::CalculateLen(const Message *inMsg)
575 {
576 if ((inMsg == nullptr) || (inMsg->GetMessageId() != ABILITY_SYNC_MESSAGE)) {
577 return 0;
578 }
579 int errCode;
580 uint32_t len = 0;
581 switch (inMsg->GetMessageType()) {
582 case TYPE_REQUEST:
583 errCode = RequestPacketCalculateLen(inMsg, len);
584 if (errCode != E_OK) {
585 LOGE("[AbilitySync][CalculateLen] request packet calc length err %d", errCode);
586 }
587 break;
588 case TYPE_RESPONSE:
589 errCode = AckPacketCalculateLen(inMsg, len);
590 if (errCode != E_OK) {
591 LOGE("[AbilitySync][CalculateLen] ack packet calc length err %d", errCode);
592 }
593 break;
594 case TYPE_NOTIFY:
595 errCode = AckPacketCalculateLen(inMsg, len);
596 if (errCode != E_OK) {
597 LOGE("[AbilitySync][CalculateLen] ack packet calc length err %d", errCode);
598 }
599 break;
600 default:
601 LOGE("[AbilitySync][CalculateLen] message type not support, type %d", inMsg->GetMessageType());
602 break;
603 }
604 return len;
605 }
606
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)607 int AbilitySync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
608 {
609 if ((buffer == nullptr) || (inMsg == nullptr)) {
610 return -E_INVALID_ARGS;
611 }
612
613 switch (inMsg->GetMessageType()) {
614 case TYPE_REQUEST:
615 return RequestPacketSerialization(buffer, length, inMsg);
616 case TYPE_RESPONSE:
617 case TYPE_NOTIFY:
618 return AckPacketSerialization(buffer, length, inMsg);
619 default:
620 return -E_MESSAGE_TYPE_ERROR;
621 }
622 }
623
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)624 int AbilitySync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
625 {
626 if ((buffer == nullptr) || (inMsg == nullptr)) {
627 return -E_INVALID_ARGS;
628 }
629
630 switch (inMsg->GetMessageType()) {
631 case TYPE_REQUEST:
632 return RequestPacketDeSerialization(buffer, length, inMsg);
633 case TYPE_RESPONSE:
634 case TYPE_NOTIFY:
635 return AckPacketDeSerialization(buffer, length, inMsg);
636 default:
637 return -E_MESSAGE_TYPE_ERROR;
638 }
639 }
640
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)641 int AbilitySync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
642 {
643 const AbilitySyncRequestPacket *packet = inMsg->GetObject<AbilitySyncRequestPacket>();
644 if (packet == nullptr) {
645 return -E_INVALID_ARGS;
646 }
647
648 len = packet->CalculateLen();
649 return E_OK;
650 }
651
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)652 int AbilitySync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
653 {
654 const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
655 if (packet == nullptr) {
656 return -E_INVALID_ARGS;
657 }
658
659 len = packet->CalculateLen();
660 return E_OK;
661 }
662
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)663 int AbilitySync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
664 {
665 const AbilitySyncRequestPacket *packet = inMsg->GetObject<AbilitySyncRequestPacket>();
666 if ((packet == nullptr) || (length != packet->CalculateLen())) {
667 return -E_INVALID_ARGS;
668 }
669
670 Parcel parcel(buffer, length);
671 parcel.WriteUInt32(packet->GetProtocolVersion());
672 parcel.WriteInt(packet->GetSendCode());
673 parcel.WriteUInt32(packet->GetSoftwareVersion());
674 parcel.WriteString(packet->GetSchema());
675 parcel.WriteInt(packet->GetSecLabel());
676 parcel.WriteInt(packet->GetSecFlag());
677 parcel.WriteUInt32(packet->GetSchemaType());
678 parcel.WriteUInt64(packet->GetDbCreateTime());
679 int errCode = DbAbility::Serialize(parcel, packet->GetDbAbility());
680 if (parcel.IsError() || errCode != E_OK) {
681 return -E_PARSE_FAIL;
682 }
683 return E_OK;
684 }
685
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)686 int AbilitySync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
687 {
688 const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
689 if ((packet == nullptr) || (length != packet->CalculateLen())) {
690 return -E_INVALID_ARGS;
691 }
692
693 Parcel parcel(buffer, length);
694 parcel.WriteUInt32(ABILITY_SYNC_VERSION_V1);
695 parcel.WriteUInt32(SOFTWARE_VERSION_CURRENT);
696 parcel.WriteInt(packet->GetAckCode());
697 parcel.WriteString(packet->GetSchema());
698 parcel.WriteInt(packet->GetSecLabel());
699 parcel.WriteInt(packet->GetSecFlag());
700 parcel.WriteUInt32(packet->GetSchemaType());
701 parcel.WriteUInt32(packet->GetPermitSync());
702 parcel.WriteUInt32(packet->GetRequirePeerConvert());
703 parcel.WriteUInt64(packet->GetDbCreateTime());
704 int errCode = DbAbility::Serialize(parcel, packet->GetDbAbility());
705 if (parcel.IsError() || errCode != E_OK) {
706 return -E_PARSE_FAIL;
707 }
708 errCode = SchemaNegotiate::SerializeData(packet->GetRelationalSyncOpinion(), parcel);
709 if (parcel.IsError() || errCode != E_OK) {
710 return -E_PARSE_FAIL;
711 }
712 return E_OK;
713 }
714
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)715 int AbilitySync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
716 {
717 auto *packet = new (std::nothrow) AbilitySyncRequestPacket();
718 if (packet == nullptr) {
719 return -E_OUT_OF_MEMORY;
720 }
721
722 Parcel parcel(const_cast<uint8_t *>(buffer), length);
723 uint32_t version = 0;
724 uint32_t softwareVersion = 0;
725 std::string schema;
726 int32_t sendCode = 0;
727 int errCode = -E_PARSE_FAIL;
728
729 parcel.ReadUInt32(version);
730 if (parcel.IsError()) {
731 goto ERROR_OUT;
732 }
733 packet->SetProtocolVersion(version);
734 if (version > ABILITY_SYNC_VERSION_V1) {
735 packet->SetSendCode(-E_VERSION_NOT_SUPPORT);
736 errCode = inMsg->SetExternalObject<>(packet);
737 if (errCode != E_OK) {
738 goto ERROR_OUT;
739 }
740 return errCode;
741 }
742 parcel.ReadInt(sendCode);
743 parcel.ReadUInt32(softwareVersion);
744 parcel.ReadString(schema);
745 errCode = RequestPacketDeSerializationTailPart(parcel, packet, softwareVersion);
746 if (parcel.IsError() || errCode != E_OK) {
747 goto ERROR_OUT;
748 }
749 packet->SetSendCode(sendCode);
750 packet->SetSoftwareVersion(softwareVersion);
751 packet->SetSchema(schema);
752
753 errCode = inMsg->SetExternalObject<>(packet);
754 if (errCode == E_OK) {
755 return E_OK;
756 }
757
758 ERROR_OUT:
759 delete packet;
760 packet = nullptr;
761 return errCode;
762 }
763
RequestPacketDeSerializationTailPart(Parcel & parcel,AbilitySyncRequestPacket * packet,uint32_t version)764 int AbilitySync::RequestPacketDeSerializationTailPart(Parcel &parcel, AbilitySyncRequestPacket *packet,
765 uint32_t version)
766 {
767 if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_2_0) {
768 int32_t secLabel = 0;
769 int32_t secFlag = 0;
770 uint32_t schemaType = 0;
771 parcel.ReadInt(secLabel);
772 parcel.ReadInt(secFlag);
773 parcel.ReadUInt32(schemaType);
774 packet->SetSecLabel(secLabel);
775 packet->SetSecFlag(secFlag);
776 packet->SetSchemaType(schemaType);
777 }
778 if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_3_0) {
779 uint64_t dbCreateTime = 0;
780 parcel.ReadUInt64(dbCreateTime);
781 packet->SetDbCreateTime(dbCreateTime);
782 }
783 DbAbility remoteDbAbility;
784 int errCode = DbAbility::DeSerialize(parcel, remoteDbAbility);
785 if (errCode != E_OK) {
786 LOGE("[AbilitySync] request packet DeSerializ failed.");
787 return errCode;
788 }
789 packet->SetDbAbility(remoteDbAbility);
790 return E_OK;
791 }
792
AckPacketDeSerializationTailPart(Parcel & parcel,AbilitySyncAckPacket * packet,uint32_t version)793 int AbilitySync::AckPacketDeSerializationTailPart(Parcel &parcel, AbilitySyncAckPacket *packet, uint32_t version)
794 {
795 if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_2_0) {
796 int32_t secLabel = 0;
797 int32_t secFlag = 0;
798 uint32_t schemaType = 0;
799 uint32_t permitSync = 0;
800 uint32_t requirePeerConvert = 0;
801 parcel.ReadInt(secLabel);
802 parcel.ReadInt(secFlag);
803 parcel.ReadUInt32(schemaType);
804 parcel.ReadUInt32(permitSync);
805 parcel.ReadUInt32(requirePeerConvert);
806 packet->SetSecLabel(secLabel);
807 packet->SetSecFlag(secFlag);
808 packet->SetSchemaType(schemaType);
809 packet->SetPermitSync(permitSync);
810 packet->SetRequirePeerConvert(requirePeerConvert);
811 }
812 if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_3_0) {
813 uint64_t dbCreateTime = 0;
814 parcel.ReadUInt64(dbCreateTime);
815 packet->SetDbCreateTime(dbCreateTime);
816 }
817 DbAbility remoteDbAbility;
818 int errCode = DbAbility::DeSerialize(parcel, remoteDbAbility);
819 if (errCode != E_OK) {
820 LOGE("[AbilitySync] ack packet DeSerializ failed.");
821 return errCode;
822 }
823 packet->SetDbAbility(remoteDbAbility);
824 RelationalSyncOpinion relationalSyncOpinion;
825 errCode = SchemaNegotiate::DeserializeData(parcel, relationalSyncOpinion);
826 if (errCode != E_OK) {
827 LOGE("[AbilitySync] ack packet DeSerializ RelationalSyncOpinion failed.");
828 return errCode;
829 }
830 packet->SetRelationalSyncOpinion(relationalSyncOpinion);
831 return E_OK;
832 }
833
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)834 int AbilitySync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
835 {
836 auto *packet = new (std::nothrow) AbilitySyncAckPacket();
837 if (packet == nullptr) {
838 return -E_OUT_OF_MEMORY;
839 }
840
841 Parcel parcel(const_cast<uint8_t *>(buffer), length);
842 uint32_t version = 0;
843 uint32_t softwareVersion = 0;
844 int32_t ackCode = E_OK;
845 std::string schema;
846 int errCode;
847 parcel.ReadUInt32(version);
848 if (parcel.IsError()) {
849 LOGE("[AbilitySync][RequestDeSerialization] read version failed!");
850 errCode = -E_PARSE_FAIL;
851 goto ERROR_OUT;
852 }
853 packet->SetProtocolVersion(version);
854 if (version > ABILITY_SYNC_VERSION_V1) {
855 packet->SetAckCode(-E_VERSION_NOT_SUPPORT);
856 errCode = inMsg->SetExternalObject<>(packet);
857 if (errCode != E_OK) {
858 goto ERROR_OUT;
859 }
860 return errCode;
861 }
862 parcel.ReadUInt32(softwareVersion);
863 parcel.ReadInt(ackCode);
864 parcel.ReadString(schema);
865 errCode = AckPacketDeSerializationTailPart(parcel, packet, softwareVersion);
866 if (parcel.IsError() || errCode != E_OK) {
867 LOGE("[AbilitySync][RequestDeSerialization] DeSerialization failed!");
868 errCode = -E_PARSE_FAIL;
869 goto ERROR_OUT;
870 }
871 packet->SetSoftwareVersion(softwareVersion);
872 packet->SetAckCode(ackCode);
873 packet->SetSchema(schema);
874 errCode = inMsg->SetExternalObject<>(packet);
875 if (errCode == E_OK) {
876 return E_OK;
877 }
878
879 ERROR_OUT:
880 delete packet;
881 packet = nullptr;
882 return errCode;
883 }
884
SetAbilityRequestBodyInfo(AbilitySyncRequestPacket & packet,uint16_t remoteCommunicatorVersion) const885 int AbilitySync::SetAbilityRequestBodyInfo(AbilitySyncRequestPacket &packet, uint16_t remoteCommunicatorVersion) const
886 {
887 uint64_t dbCreateTime;
888 int errCode =
889 (static_cast<SyncGenericInterface *>(storageInterface_))->GetDatabaseCreateTimestamp(dbCreateTime);
890 if (errCode != E_OK) {
891 LOGE("[AbilitySync][FillAbilityRequest] GetDatabaseCreateTimestamp failed, err %d", errCode);
892 return errCode;
893 }
894 SecurityOption option;
895 GetPacketSecOption(option);
896 std::string schemaStr;
897 uint32_t schemaType = 0;
898 if (IsSingleKvVer()) {
899 SchemaObject schemaObj = (static_cast<SingleVerKvDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
900 schemaStr = schemaObj.ToSchemaString();
901 schemaType = static_cast<uint32_t>(schemaObj.GetSchemaType());
902 } else if (IsSingleRelationalVer()) {
903 auto schemaObj = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
904 schemaStr = schemaObj.ToSchemaString();
905 schemaType = static_cast<uint32_t>(schemaObj.GetSchemaType());
906 }
907 DbAbility dbAbility;
908 errCode = GetDbAbilityInfo(dbAbility);
909 if (errCode != E_OK) {
910 LOGE("[AbilitySync][FillAbilityRequest] GetDbAbility failed, err %d", errCode);
911 return errCode;
912 }
913 // 102 version is forbidden to sync with 103 json-schema or flatbuffer-schema
914 // so schema should put null string while remote is 102 version to avoid this bug.
915 if (remoteCommunicatorVersion == 1) {
916 packet.SetSchema("");
917 packet.SetSchemaType(0);
918 } else {
919 packet.SetSchema(schemaStr);
920 packet.SetSchemaType(schemaType);
921 }
922 packet.SetProtocolVersion(ABILITY_SYNC_VERSION_V1);
923 packet.SetSoftwareVersion(SOFTWARE_VERSION_CURRENT);
924 packet.SetSecLabel(option.securityLabel);
925 packet.SetSecFlag(option.securityFlag);
926 packet.SetDbCreateTime(dbCreateTime);
927 packet.SetDbAbility(dbAbility);
928 LOGI("[AbilitySync][FillRequest] ver=%u,Lab=%d,Flag=%d,dbCreateTime=%" PRId64, SOFTWARE_VERSION_CURRENT,
929 option.securityLabel, option.securityFlag, dbCreateTime);
930 return E_OK;
931 }
932
SetAbilityAckBodyInfo(AbilitySyncAckPacket & ackPacket,int ackCode,bool isAckNotify) const933 int AbilitySync::SetAbilityAckBodyInfo(AbilitySyncAckPacket &ackPacket, int ackCode, bool isAckNotify) const
934 {
935 int errCode = E_OK;
936 ackPacket.SetProtocolVersion(ABILITY_SYNC_VERSION_V1);
937 ackPacket.SetSoftwareVersion(SOFTWARE_VERSION_CURRENT);
938 if (!isAckNotify) {
939 SecurityOption option;
940 GetPacketSecOption(option);
941 ackPacket.SetSecLabel(option.securityLabel);
942 ackPacket.SetSecFlag(option.securityFlag);
943 uint64_t dbCreateTime = 0;
944 errCode =
945 (static_cast<SyncGenericInterface *>(storageInterface_))->GetDatabaseCreateTimestamp(dbCreateTime);
946 if (errCode != E_OK) {
947 LOGE("[AbilitySync][SyncStart] GetDatabaseCreateTimestamp failed, err %d", errCode);
948 ackCode = errCode;
949 }
950 DbAbility dbAbility;
951 errCode = GetDbAbilityInfo(dbAbility);
952 if (errCode != E_OK) {
953 LOGE("[AbilitySync][FillAbilityRequest] GetDbAbility failed, err %d", errCode);
954 return errCode;
955 }
956 ackPacket.SetDbCreateTime(dbCreateTime);
957 ackPacket.SetDbAbility(dbAbility);
958 }
959 ackPacket.SetAckCode(ackCode);
960 return E_OK;
961 }
962
SetAbilityAckSchemaInfo(AbilitySyncAckPacket & ackPacket,const ISchema & schemaObj) const963 void AbilitySync::SetAbilityAckSchemaInfo(AbilitySyncAckPacket &ackPacket, const ISchema &schemaObj) const
964 {
965 ackPacket.SetSchema(schemaObj.ToSchemaString());
966 ackPacket.SetSchemaType(static_cast<uint32_t>(schemaObj.GetSchemaType()));
967 }
968
SetAbilityAckSyncOpinionInfo(AbilitySyncAckPacket & ackPacket,SyncOpinion localOpinion) const969 void AbilitySync::SetAbilityAckSyncOpinionInfo(AbilitySyncAckPacket &ackPacket, SyncOpinion localOpinion) const
970 {
971 ackPacket.SetPermitSync(localOpinion.permitSync);
972 ackPacket.SetRequirePeerConvert(localOpinion.requirePeerConvert);
973 }
974
GetDbAbilityInfo(DbAbility & dbAbility) const975 int AbilitySync::GetDbAbilityInfo(DbAbility &dbAbility) const
976 {
977 int errCode = E_OK;
978 for (const auto &item : SyncConfig::ABILITYBITS) {
979 errCode = dbAbility.SetAbilityItem(item, SUPPORT_MARK);
980 if (errCode != E_OK) {
981 return errCode;
982 }
983 }
984 return errCode;
985 }
986
AckMsgCheck(const Message * message,ISyncTaskContext * context) const987 int AbilitySync::AckMsgCheck(const Message *message, ISyncTaskContext *context) const
988 {
989 if (message == nullptr || context == nullptr) {
990 return -E_INVALID_ARGS;
991 }
992 if (message->GetErrorNo() == E_FEEDBACK_UNKNOWN_MESSAGE) {
993 LOGE("[AbilitySync][AckMsgCheck] Remote device dose not support this message id");
994 context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
995 context->SetTaskErrCode(-E_FEEDBACK_UNKNOWN_MESSAGE);
996 return -E_FEEDBACK_UNKNOWN_MESSAGE;
997 }
998 if (message->GetErrorNo() == E_FEEDBACK_COMMUNICATOR_NOT_FOUND) {
999 LOGE("[AbilitySync][AckMsgCheck] Remote db is closed");
1000 context->SetTaskErrCode(-E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
1001 return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
1002 }
1003 const AbilitySyncAckPacket *packet = message->GetObject<AbilitySyncAckPacket>();
1004 if (packet == nullptr) {
1005 return -E_INVALID_ARGS;
1006 }
1007 int ackCode = packet->GetAckCode();
1008 if (ackCode != E_OK) {
1009 LOGE("[AbilitySync][AckMsgCheck] received an errCode %d", ackCode);
1010 context->SetTaskErrCode(ackCode);
1011 return ackCode;
1012 }
1013 return E_OK;
1014 }
1015
IsSingleKvVer() const1016 bool AbilitySync::IsSingleKvVer() const
1017 {
1018 return storageInterface_->GetInterfaceType() == ISyncInterface::SYNC_SVD;
1019 }
IsSingleRelationalVer() const1020 bool AbilitySync::IsSingleRelationalVer() const
1021 {
1022 #ifdef RELATIONAL_STORE
1023 return storageInterface_->GetInterfaceType() == ISyncInterface::SYNC_RELATION;
1024 #elif
1025 return false;
1026 #endif
1027 }
1028
HandleRequestRecv(const Message * message,ISyncTaskContext * context,bool isCompatible)1029 int AbilitySync::HandleRequestRecv(const Message *message, ISyncTaskContext *context, bool isCompatible)
1030 {
1031 const AbilitySyncRequestPacket *packet = message->GetObject<AbilitySyncRequestPacket>();
1032 if (packet == nullptr) {
1033 return -E_INVALID_ARGS;
1034 }
1035 uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
1036 int ackCode;
1037 std::string schema = packet->GetSchema();
1038 if (remoteSoftwareVersion <= SOFTWARE_VERSION_RELEASE_2_0) {
1039 LOGI("[AbilitySync][RequestRecv] remote version = %u, CheckSchemaCompatible = %d",
1040 remoteSoftwareVersion, isCompatible);
1041 return SendAckWithEmptySchema(message, E_OK, false);
1042 }
1043 HandleVersionV3RequestParam(packet, context);
1044 if (SecLabelCheck(packet)) {
1045 ackCode = E_OK;
1046 } else {
1047 ackCode = -E_SECURITY_OPTION_CHECK_ERROR;
1048 }
1049 if (ackCode == E_OK && remoteSoftwareVersion > SOFTWARE_VERSION_RELEASE_3_0) {
1050 ackCode = metadata_->SetDbCreateTime(deviceId_, packet->GetDbCreateTime(), true);
1051 }
1052 AbilitySyncAckPacket ackPacket;
1053 if (IsSingleRelationalVer()) {
1054 ackPacket.SetRelationalSyncOpinion(MakeRelationSyncOpnion(packet, schema));
1055 } else {
1056 SetAbilityAckSyncOpinionInfo(ackPacket, MakeKvSyncOpnion(packet, schema));
1057 }
1058 LOGI("[AbilitySync][RequestRecv] remote dev=%s,ver=%u,schemaCompatible=%d", STR_MASK(deviceId_),
1059 remoteSoftwareVersion, isCompatible);
1060 return SendAck(message, ackCode, false, ackPacket);
1061 }
1062
SendAck(const Message * message,int ackCode,bool isAckNotify,AbilitySyncAckPacket & ackPacket)1063 int AbilitySync::SendAck(const Message *message, int ackCode, bool isAckNotify, AbilitySyncAckPacket &ackPacket)
1064 {
1065 int errCode = SetAbilityAckBodyInfo(ackPacket, ackCode, isAckNotify);
1066 if (errCode != E_OK) {
1067 return errCode;
1068 }
1069 if (IsSingleRelationalVer()) {
1070 auto schemaObj = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1071 SetAbilityAckSchemaInfo(ackPacket, schemaObj);
1072 } else if (IsSingleKvVer()) {
1073 SchemaObject schemaObject = static_cast<SingleVerKvDBSyncInterface *>(storageInterface_)->GetSchemaInfo();
1074 SetAbilityAckSchemaInfo(ackPacket, schemaObject);
1075 }
1076 return SendAck(message, ackPacket, isAckNotify);
1077 }
1078
SendAckWithEmptySchema(const Message * message,int ackCode,bool isAckNotify)1079 int AbilitySync::SendAckWithEmptySchema(const Message *message, int ackCode,
1080 bool isAckNotify)
1081 {
1082 AbilitySyncAckPacket ackPacket;
1083 int errCode = SetAbilityAckBodyInfo(ackPacket, ackCode, isAckNotify);
1084 if (errCode != E_OK) {
1085 return errCode;
1086 }
1087 SetAbilityAckSchemaInfo(ackPacket, SchemaObject());
1088 return SendAck(message, ackPacket, isAckNotify);
1089 }
1090
SendAck(const Message * inMsg,const AbilitySyncAckPacket & ackPacket,bool isAckNotify)1091 int AbilitySync::SendAck(const Message *inMsg, const AbilitySyncAckPacket &ackPacket, bool isAckNotify)
1092 {
1093 Message *ackMessage = new (std::nothrow) Message(ABILITY_SYNC_MESSAGE);
1094 if (ackMessage == nullptr) {
1095 LOGE("[AbilitySync][SendAck] message create failed, may be memleak!");
1096 return -E_OUT_OF_MEMORY;
1097 }
1098 int errCode = ackMessage->SetCopiedObject<>(ackPacket);
1099 if (errCode != E_OK) {
1100 LOGE("[AbilitySync][SendAck] SetCopiedObject failed, err %d", errCode);
1101 delete ackMessage;
1102 ackMessage = nullptr;
1103 return errCode;
1104 }
1105 (!isAckNotify) ? ackMessage->SetMessageType(TYPE_RESPONSE) : ackMessage->SetMessageType(TYPE_NOTIFY);
1106 ackMessage->SetTarget(deviceId_);
1107 ackMessage->SetSessionId(inMsg->GetSessionId());
1108 ackMessage->SetSequenceId(inMsg->GetSequenceId());
1109 SendConfig conf;
1110 SetSendConfigParam(storageInterface_->GetDbProperties(), deviceId_, false, SEND_TIME_OUT, conf);
1111 errCode = communicator_->SendMessage(deviceId_, ackMessage, conf);
1112 if (errCode != E_OK) {
1113 LOGE("[AbilitySync][SendAck] SendPacket failed, err %d", errCode);
1114 delete ackMessage;
1115 ackMessage = nullptr;
1116 }
1117 return errCode;
1118 }
1119
MakeKvSyncOpnion(const AbilitySyncRequestPacket * packet,const std::string & remoteSchema) const1120 SyncOpinion AbilitySync::MakeKvSyncOpnion(const AbilitySyncRequestPacket *packet, const std::string &remoteSchema) const
1121 {
1122 uint8_t remoteSchemaType = packet->GetSchemaType();
1123 SchemaObject localSchema = (static_cast<SingleVerKvDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1124 SyncOpinion localSyncOpinion = SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1125 return localSyncOpinion;
1126 }
1127
MakeRelationSyncOpnion(const AbilitySyncRequestPacket * packet,const std::string & remoteSchema) const1128 RelationalSyncOpinion AbilitySync::MakeRelationSyncOpnion(const AbilitySyncRequestPacket *packet,
1129 const std::string &remoteSchema) const
1130 {
1131 uint8_t remoteSchemaType = packet->GetSchemaType();
1132 RelationalSchemaObject localSchema = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1133 return SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1134 }
1135
HandleKvAckSchemaParam(const AbilitySyncAckPacket * recvPacket,ISyncTaskContext * context,AbilitySyncAckPacket & sendPacket) const1136 void AbilitySync::HandleKvAckSchemaParam(const AbilitySyncAckPacket *recvPacket,
1137 ISyncTaskContext *context, AbilitySyncAckPacket &sendPacket) const
1138 {
1139 std::string remoteSchema = recvPacket->GetSchema();
1140 uint8_t remoteSchemaType = recvPacket->GetSchemaType();
1141 bool permitSync = static_cast<bool>(recvPacket->GetPermitSync());
1142 bool requirePeerConvert = static_cast<bool>(recvPacket->GetRequirePeerConvert());
1143 SyncOpinion remoteOpinion = {permitSync, requirePeerConvert, true};
1144 SchemaObject localSchema = (static_cast<SingleVerKvDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1145 SyncOpinion syncOpinion = SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1146 SyncStrategy localStrategy = SchemaNegotiate::ConcludeSyncStrategy(syncOpinion, remoteOpinion);
1147 SetAbilityAckSyncOpinionInfo(sendPacket, syncOpinion);
1148 (static_cast<SingleVerKvSyncTaskContext *>(context))->SetSyncStrategy(localStrategy);
1149 }
1150
HandleRelationAckSchemaParam(const AbilitySyncAckPacket * recvPacket,AbilitySyncAckPacket & sendPacket,ISyncTaskContext * context,bool sendOpinion) const1151 int AbilitySync::HandleRelationAckSchemaParam(const AbilitySyncAckPacket *recvPacket, AbilitySyncAckPacket &sendPacket,
1152 ISyncTaskContext *context, bool sendOpinion) const
1153 {
1154 std::string remoteSchema = recvPacket->GetSchema();
1155 uint8_t remoteSchemaType = recvPacket->GetSchemaType();
1156 auto localSchema = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1157 auto localOpinion = SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1158 auto localStrategy = SchemaNegotiate::ConcludeSyncStrategy(localOpinion,
1159 recvPacket->GetRelationalSyncOpinion());
1160 (static_cast<SingleVerRelationalSyncTaskContext *>(context))->SetRelationalSyncStrategy(localStrategy);
1161 int errCode = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->
1162 CreateDistributedDeviceTable(context->GetDeviceId(), localStrategy);
1163 if (errCode != E_OK) {
1164 LOGE("[AbilitySync][AckRecv] create distributed device table failed,errCode=%d", errCode);
1165 }
1166 if (sendOpinion) {
1167 sendPacket.SetRelationalSyncOpinion(localOpinion);
1168 }
1169 return errCode;
1170 }
1171
AckRecvWithHighVersion(const Message * message,ISyncTaskContext * context,const AbilitySyncAckPacket * packet)1172 int AbilitySync::AckRecvWithHighVersion(const Message *message, ISyncTaskContext *context,
1173 const AbilitySyncAckPacket *packet)
1174 {
1175 HandleVersionV3AckSecOptionParam(packet, context);
1176 AbilitySyncAckPacket ackPacket;
1177 int errCode = HandleVersionV3AckSchemaParam(packet, ackPacket, context, true);
1178 if (errCode != E_OK) {
1179 context->SetTaskErrCode(errCode);
1180 return errCode;
1181 }
1182 auto singleVerContext = static_cast<SingleVerSyncTaskContext *>(context);
1183 auto query = singleVerContext->GetQuery();
1184 bool permitSync = (singleVerContext->GetSyncStrategy(query)).permitSync;
1185 if (!permitSync) {
1186 (static_cast<SingleVerSyncTaskContext *>(context))->SetTaskErrCode(-E_SCHEMA_MISMATCH);
1187 LOGE("[AbilitySync][AckRecv] scheme check failed");
1188 return -E_SCHEMA_MISMATCH;
1189 }
1190 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
1191 errCode = metadata_->SetDbCreateTime(deviceId_, packet->GetDbCreateTime(), true);
1192 if (errCode != E_OK) {
1193 LOGE("[AbilitySync][AckRecv] set db create time failed,errCode=%d", errCode);
1194 context->SetTaskErrCode(errCode);
1195 return errCode;
1196 }
1197 }
1198 DbAbility remoteDbAbility = packet->GetDbAbility();
1199 (static_cast<SingleVerSyncTaskContext *>(context))->SetDbAbility(remoteDbAbility);
1200 (void)SendAck(message, AbilitySync::CHECK_SUCCESS, true, ackPacket);
1201 (static_cast<SingleVerSyncTaskContext *>(context))->SetIsSchemaSync(true);
1202 return E_OK;
1203 }
1204 } // namespace DistributedDB