1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "ability_sync.h"
17
18 #include "message_transform.h"
19 #include "version.h"
20 #include "db_errno.h"
21 #include "log_print.h"
22 #include "sync_types.h"
23 #include "db_common.h"
24 #include "single_ver_kvdb_sync_interface.h"
25 #include "single_ver_sync_task_context.h"
26 #include "single_ver_kv_sync_task_context.h"
27 #ifdef RELATIONAL_STORE
28 #include "relational_db_sync_interface.h"
29 #include "single_ver_relational_sync_task_context.h"
30 #endif
31
32 namespace DistributedDB {
AbilitySyncRequestPacket()33 AbilitySyncRequestPacket::AbilitySyncRequestPacket()
34 : protocolVersion_(ABILITY_SYNC_VERSION_V1),
35 sendCode_(E_OK),
36 softwareVersion_(SOFTWARE_VERSION_CURRENT),
37 secLabel_(0),
38 secFlag_(0),
39 schemaType_(0),
40 dbCreateTime_(0)
41 {
42 }
43
~AbilitySyncRequestPacket()44 AbilitySyncRequestPacket::~AbilitySyncRequestPacket()
45 {
46 }
47
SetProtocolVersion(uint32_t protocolVersion)48 void AbilitySyncRequestPacket::SetProtocolVersion(uint32_t protocolVersion)
49 {
50 protocolVersion_ = protocolVersion;
51 }
52
GetProtocolVersion() const53 uint32_t AbilitySyncRequestPacket::GetProtocolVersion() const
54 {
55 return protocolVersion_;
56 }
57
SetSendCode(int32_t sendCode)58 void AbilitySyncRequestPacket::SetSendCode(int32_t sendCode)
59 {
60 sendCode_ = sendCode;
61 }
62
GetSendCode() const63 int32_t AbilitySyncRequestPacket::GetSendCode() const
64 {
65 return sendCode_;
66 }
67
SetSoftwareVersion(uint32_t swVersion)68 void AbilitySyncRequestPacket::SetSoftwareVersion(uint32_t swVersion)
69 {
70 softwareVersion_ = swVersion;
71 }
72
GetSoftwareVersion() const73 uint32_t AbilitySyncRequestPacket::GetSoftwareVersion() const
74 {
75 return softwareVersion_;
76 }
77
SetSchema(const std::string & schema)78 void AbilitySyncRequestPacket::SetSchema(const std::string &schema)
79 {
80 schema_ = schema;
81 }
82
GetSchema() const83 std::string AbilitySyncRequestPacket::GetSchema() const
84 {
85 return schema_;
86 }
87
SetSchemaType(uint32_t schemaType)88 void AbilitySyncRequestPacket::SetSchemaType(uint32_t schemaType)
89 {
90 schemaType_ = schemaType;
91 }
92
GetSchemaType() const93 uint32_t AbilitySyncRequestPacket::GetSchemaType() const
94 {
95 return schemaType_;
96 }
97
SetSecLabel(int32_t secLabel)98 void AbilitySyncRequestPacket::SetSecLabel(int32_t secLabel)
99 {
100 secLabel_ = secLabel;
101 }
102
GetSecLabel() const103 int32_t AbilitySyncRequestPacket::GetSecLabel() const
104 {
105 return secLabel_;
106 }
107
SetSecFlag(int32_t secFlag)108 void AbilitySyncRequestPacket::SetSecFlag(int32_t secFlag)
109 {
110 secFlag_ = secFlag;
111 }
112
GetSecFlag() const113 int32_t AbilitySyncRequestPacket::GetSecFlag() const
114 {
115 return secFlag_;
116 }
117
SetDbCreateTime(uint64_t dbCreateTime)118 void AbilitySyncRequestPacket::SetDbCreateTime(uint64_t dbCreateTime)
119 {
120 dbCreateTime_ = dbCreateTime;
121 }
122
GetDbCreateTime() const123 uint64_t AbilitySyncRequestPacket::GetDbCreateTime() const
124 {
125 return dbCreateTime_;
126 }
127
CalculateLen() const128 uint32_t AbilitySyncRequestPacket::CalculateLen() const
129 {
130 uint64_t len = 0;
131 len += Parcel::GetUInt32Len(); // protocolVersion_
132 len += Parcel::GetIntLen(); // sendCode_
133 len += Parcel::GetUInt32Len(); // softwareVersion_
134 uint32_t schemaLen = Parcel::GetStringLen(schema_);
135 if (schemaLen == 0) {
136 LOGE("[AbilitySyncRequestPacket][CalculateLen] schemaLen err!");
137 return 0;
138 }
139 len += schemaLen;
140 len += Parcel::GetIntLen(); // secLabel_
141 len += Parcel::GetIntLen(); // secFlag_
142 len += Parcel::GetUInt32Len(); // schemaType_
143 len += Parcel::GetUInt64Len(); // dbCreateTime_
144 len += DbAbility::CalculateLen(dbAbility_); // dbAbility_
145 // the reason why not 8-byte align is that old version is not 8-byte align
146 // so it is not possible to set 8-byte align for high version.
147 if (len > INT32_MAX) {
148 LOGE("[AbilitySyncRequestPacket][CalculateLen] err len:%" PRIu64, len);
149 return 0;
150 }
151 return len;
152 }
153
GetDbAbility() const154 DbAbility AbilitySyncRequestPacket::GetDbAbility() const
155 {
156 return dbAbility_;
157 }
158
SetDbAbility(const DbAbility & dbAbility)159 void AbilitySyncRequestPacket::SetDbAbility(const DbAbility &dbAbility)
160 {
161 dbAbility_ = dbAbility;
162 }
163
AbilitySyncAckPacket()164 AbilitySyncAckPacket::AbilitySyncAckPacket()
165 : protocolVersion_(ABILITY_SYNC_VERSION_V1),
166 softwareVersion_(SOFTWARE_VERSION_CURRENT),
167 ackCode_(E_OK),
168 secLabel_(0),
169 secFlag_(0),
170 schemaType_(0),
171 permitSync_(0),
172 requirePeerConvert_(0),
173 dbCreateTime_(0)
174 {
175 }
176
~AbilitySyncAckPacket()177 AbilitySyncAckPacket::~AbilitySyncAckPacket()
178 {
179 }
180
SetProtocolVersion(uint32_t protocolVersion)181 void AbilitySyncAckPacket::SetProtocolVersion(uint32_t protocolVersion)
182 {
183 protocolVersion_ = protocolVersion;
184 }
185
SetSoftwareVersion(uint32_t swVersion)186 void AbilitySyncAckPacket::SetSoftwareVersion(uint32_t swVersion)
187 {
188 softwareVersion_ = swVersion;
189 }
190
GetSoftwareVersion() const191 uint32_t AbilitySyncAckPacket::GetSoftwareVersion() const
192 {
193 return softwareVersion_;
194 }
195
GetProtocolVersion() const196 uint32_t AbilitySyncAckPacket::GetProtocolVersion() const
197 {
198 return protocolVersion_;
199 }
200
SetAckCode(int32_t ackCode)201 void AbilitySyncAckPacket::SetAckCode(int32_t ackCode)
202 {
203 ackCode_ = ackCode;
204 }
205
GetAckCode() const206 int32_t AbilitySyncAckPacket::GetAckCode() const
207 {
208 return ackCode_;
209 }
210
SetSchema(const std::string & schema)211 void AbilitySyncAckPacket::SetSchema(const std::string &schema)
212 {
213 schema_ = schema;
214 }
215
GetSchema() const216 std::string AbilitySyncAckPacket::GetSchema() const
217 {
218 return schema_;
219 }
220
SetSchemaType(uint32_t schemaType)221 void AbilitySyncAckPacket::SetSchemaType(uint32_t schemaType)
222 {
223 schemaType_ = schemaType;
224 }
225
GetSchemaType() const226 uint32_t AbilitySyncAckPacket::GetSchemaType() const
227 {
228 return schemaType_;
229 }
230
SetSecLabel(int32_t secLabel)231 void AbilitySyncAckPacket::SetSecLabel(int32_t secLabel)
232 {
233 secLabel_ = secLabel;
234 }
235
GetSecLabel() const236 int32_t AbilitySyncAckPacket::GetSecLabel() const
237 {
238 return secLabel_;
239 }
240
SetSecFlag(int32_t secFlag)241 void AbilitySyncAckPacket::SetSecFlag(int32_t secFlag)
242 {
243 secFlag_ = secFlag;
244 }
245
GetSecFlag() const246 int32_t AbilitySyncAckPacket::GetSecFlag() const
247 {
248 return secFlag_;
249 }
250
SetPermitSync(uint32_t permitSync)251 void AbilitySyncAckPacket::SetPermitSync(uint32_t permitSync)
252 {
253 permitSync_ = permitSync;
254 }
255
GetPermitSync() const256 uint32_t AbilitySyncAckPacket::GetPermitSync() const
257 {
258 return permitSync_;
259 }
260
SetRequirePeerConvert(uint32_t requirePeerConvert)261 void AbilitySyncAckPacket::SetRequirePeerConvert(uint32_t requirePeerConvert)
262 {
263 requirePeerConvert_ = requirePeerConvert;
264 }
265
GetRequirePeerConvert() const266 uint32_t AbilitySyncAckPacket::GetRequirePeerConvert() const
267 {
268 return requirePeerConvert_;
269 }
270
SetDbCreateTime(uint64_t dbCreateTime)271 void AbilitySyncAckPacket::SetDbCreateTime(uint64_t dbCreateTime)
272 {
273 dbCreateTime_ = dbCreateTime;
274 }
275
GetDbCreateTime() const276 uint64_t AbilitySyncAckPacket::GetDbCreateTime() const
277 {
278 return dbCreateTime_;
279 }
280
CalculateLen() const281 uint32_t AbilitySyncAckPacket::CalculateLen() const
282 {
283 uint64_t len = 0;
284 len += Parcel::GetUInt32Len();
285 len += Parcel::GetUInt32Len();
286 len += Parcel::GetIntLen();
287 uint32_t schemaLen = Parcel::GetStringLen(schema_);
288 if (schemaLen == 0) {
289 LOGE("[AbilitySyncAckPacket][CalculateLen] schemaLen err!");
290 return 0;
291 }
292 len += schemaLen;
293 len += Parcel::GetIntLen(); // secLabel_
294 len += Parcel::GetIntLen(); // secFlag_
295 len += Parcel::GetUInt32Len(); // schemaType_
296 len += Parcel::GetUInt32Len(); // permitSync_
297 len += Parcel::GetUInt32Len(); // requirePeerConvert_
298 len += Parcel::GetUInt64Len(); // dbCreateTime_
299 len += DbAbility::CalculateLen(dbAbility_); // dbAbility_
300 len += SchemaNegotiate::CalculateParcelLen(relationalSyncOpinion_);
301 if (len > INT32_MAX) {
302 LOGE("[AbilitySyncAckPacket][CalculateLen] err len:%" PRIu64, len);
303 return 0;
304 }
305 return len;
306 }
307
GetDbAbility() const308 DbAbility AbilitySyncAckPacket::GetDbAbility() const
309 {
310 return dbAbility_;
311 }
312
SetDbAbility(const DbAbility & dbAbility)313 void AbilitySyncAckPacket::SetDbAbility(const DbAbility &dbAbility)
314 {
315 dbAbility_ = dbAbility;
316 }
317
SetRelationalSyncOpinion(const RelationalSyncOpinion & relationalSyncOpinion)318 void AbilitySyncAckPacket::SetRelationalSyncOpinion(const RelationalSyncOpinion &relationalSyncOpinion)
319 {
320 relationalSyncOpinion_ = relationalSyncOpinion;
321 }
322
GetRelationalSyncOpinion() const323 RelationalSyncOpinion AbilitySyncAckPacket::GetRelationalSyncOpinion() const
324 {
325 return relationalSyncOpinion_;
326 }
327
AbilitySync()328 AbilitySync::AbilitySync()
329 : communicator_(nullptr),
330 storageInterface_(nullptr),
331 metadata_(nullptr),
332 syncFinished_(false)
333 {
334 }
335
~AbilitySync()336 AbilitySync::~AbilitySync()
337 {
338 communicator_ = nullptr;
339 storageInterface_ = nullptr;
340 metadata_ = nullptr;
341 }
342
Initialize(ICommunicator * inCommunicator,ISyncInterface * inStorage,const std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)343 int AbilitySync::Initialize(ICommunicator *inCommunicator, ISyncInterface *inStorage,
344 const std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
345 {
346 if (inCommunicator == nullptr || inStorage == nullptr || deviceId.empty() || inMetadata == nullptr) {
347 return -E_INVALID_ARGS;
348 }
349 communicator_ = inCommunicator;
350 storageInterface_ = inStorage;
351 metadata_ = inMetadata;
352 deviceId_ = deviceId;
353 return E_OK;
354 }
355
SyncStart(uint32_t sessionId,uint32_t sequenceId,uint16_t remoteCommunicatorVersion,const CommErrHandler & handler)356 int AbilitySync::SyncStart(uint32_t sessionId, uint32_t sequenceId, uint16_t remoteCommunicatorVersion,
357 const CommErrHandler &handler)
358 {
359 AbilitySyncRequestPacket packet;
360 int errCode = SetAbilityRequestBodyInfo(packet, remoteCommunicatorVersion);
361 if (errCode != E_OK) {
362 return errCode;
363 }
364 Message *message = new (std::nothrow) Message(ABILITY_SYNC_MESSAGE);
365 if (message == nullptr) {
366 return -E_OUT_OF_MEMORY;
367 }
368 message->SetMessageType(TYPE_REQUEST);
369 errCode = message->SetCopiedObject<>(packet);
370 if (errCode != E_OK) {
371 LOGE("[AbilitySync][SyncStart] SetCopiedObject failed, err %d", errCode);
372 delete message;
373 message = nullptr;
374 return errCode;
375 }
376 message->SetVersion(MSG_VERSION_EXT);
377 message->SetSessionId(sessionId);
378 message->SetSequenceId(sequenceId);
379 SendConfig conf;
380 SetSendConfigParam(storageInterface_->GetDbProperties(), deviceId_, false, SEND_TIME_OUT, conf);
381 errCode = communicator_->SendMessage(deviceId_, message, conf, handler);
382 if (errCode != E_OK) {
383 LOGE("[AbilitySync][SyncStart] SendPacket failed, err %d", errCode);
384 delete message;
385 message = nullptr;
386 }
387 return errCode;
388 }
389
AckRecv(const Message * message,ISyncTaskContext * context)390 int AbilitySync::AckRecv(const Message *message, ISyncTaskContext *context)
391 {
392 int errCode = AckMsgCheck(message, context);
393 if (errCode != E_OK) {
394 return errCode;
395 }
396 const AbilitySyncAckPacket *packet = message->GetObject<AbilitySyncAckPacket>();
397 if (packet == nullptr) {
398 return -E_INVALID_ARGS;
399 }
400 uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
401 context->SetRemoteSoftwareVersion(remoteSoftwareVersion);
402 if (remoteSoftwareVersion > SOFTWARE_VERSION_RELEASE_2_0) {
403 errCode = AckRecvWithHighVersion(message, context, packet);
404 } else {
405 std::string schema = packet->GetSchema();
406 uint8_t schemaType = packet->GetSchemaType();
407 bool isCompatible = static_cast<SyncGenericInterface *>(storageInterface_)->CheckCompatible(schema, schemaType);
408 if (!isCompatible) {
409 (static_cast<SingleVerSyncTaskContext *>(context))->SetTaskErrCode(-E_SCHEMA_MISMATCH);
410 LOGE("[AbilitySync][AckRecv] scheme check failed");
411 return -E_SCHEMA_MISMATCH;
412 }
413 LOGI("[AbilitySync][AckRecv]remoteSoftwareVersion = %u, isCompatible = %d,", remoteSoftwareVersion,
414 isCompatible);
415 }
416 return errCode;
417 }
418
RequestRecv(const Message * message,ISyncTaskContext * context)419 int AbilitySync::RequestRecv(const Message *message, ISyncTaskContext *context)
420 {
421 if (message == nullptr || context == nullptr) {
422 return -E_INVALID_ARGS;
423 }
424 const AbilitySyncRequestPacket *packet = message->GetObject<AbilitySyncRequestPacket>();
425 if (packet == nullptr) {
426 return -E_INVALID_ARGS;
427 }
428 if (packet->GetSendCode() == -E_VERSION_NOT_SUPPORT) {
429 AbilitySyncAckPacket ackPacket;
430 (void)SendAck(message, -E_VERSION_NOT_SUPPORT, false, ackPacket);
431 LOGI("[AbilitySync][RequestRecv] version can not support, remote version is %u", packet->GetProtocolVersion());
432 return -E_VERSION_NOT_SUPPORT;
433 }
434
435 std::string schema = packet->GetSchema();
436 uint8_t schemaType = packet->GetSchemaType();
437 bool isCompatible = static_cast<SyncGenericInterface *>(storageInterface_)->CheckCompatible(schema, schemaType);
438 if (!isCompatible) {
439 (static_cast<SingleVerSyncTaskContext *>(context))->SetTaskErrCode(-E_SCHEMA_MISMATCH);
440 }
441 uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
442 context->SetRemoteSoftwareVersion(remoteSoftwareVersion);
443 return HandleRequestRecv(message, context, isCompatible);
444 }
445
AckNotifyRecv(const Message * message,ISyncTaskContext * context)446 int AbilitySync::AckNotifyRecv(const Message *message, ISyncTaskContext *context)
447 {
448 if (message == nullptr || context == nullptr) {
449 return -E_INVALID_ARGS;
450 }
451 if (message->GetErrorNo() == E_FEEDBACK_UNKNOWN_MESSAGE) {
452 LOGE("[AbilitySync][AckNotifyRecv] Remote device dose not support this message id");
453 context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
454 return -E_FEEDBACK_UNKNOWN_MESSAGE;
455 }
456 const AbilitySyncAckPacket *packet = message->GetObject<AbilitySyncAckPacket>();
457 if (packet == nullptr) {
458 return -E_INVALID_ARGS;
459 }
460 int errCode = packet->GetAckCode();
461 if (errCode != E_OK) {
462 LOGE("[AbilitySync][AckNotifyRecv] received an errCode %d", errCode);
463 return errCode;
464 }
465 uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
466 context->SetRemoteSoftwareVersion(remoteSoftwareVersion);
467 AbilitySyncAckPacket sendPacket;
468 std::pair<bool, bool> schemaSyncStatus;
469 errCode = HandleVersionV3AckSchemaParam(packet, sendPacket, context, false, schemaSyncStatus);
470 int ackCode = errCode;
471 LOGI("[AckNotifyRecv] receive dev = %s ack notify, remoteSoftwareVersion = %u, ackCode = %d",
472 STR_MASK(deviceId_), remoteSoftwareVersion, errCode);
473 if (errCode == E_OK) {
474 ackCode = AbilitySync::LAST_NOTIFY;
475 }
476 (void)SendAckWithEmptySchema(message, ackCode, true);
477 return errCode;
478 }
479
GetAbilitySyncFinishedStatus() const480 bool AbilitySync::GetAbilitySyncFinishedStatus() const
481 {
482 return syncFinished_;
483 }
484
SetAbilitySyncFinishedStatus(bool syncFinished)485 void AbilitySync::SetAbilitySyncFinishedStatus(bool syncFinished)
486 {
487 syncFinished_ = syncFinished;
488 }
489
SecLabelCheck(const AbilitySyncRequestPacket * packet) const490 bool AbilitySync::SecLabelCheck(const AbilitySyncRequestPacket *packet) const
491 {
492 int32_t remoteSecLabel = packet->GetSecLabel();
493 int32_t remoteSecFlag = packet->GetSecFlag();
494 if (remoteSecLabel == NOT_SURPPORT_SEC_CLASSIFICATION || remoteSecLabel == SecurityLabel::NOT_SET) {
495 return true;
496 }
497 SecurityOption option;
498 int errCode = (static_cast<SyncGenericInterface *>(storageInterface_))->GetSecurityOption(option);
499 LOGI("[AbilitySync][RequestRecv] local l:%d, f:%d, errCode:%d", option.securityLabel, option.securityFlag, errCode);
500 if (errCode == -E_NOT_SUPPORT || (errCode == E_OK && option.securityLabel == SecurityLabel::NOT_SET)) {
501 return true;
502 }
503 if (remoteSecLabel == FAILED_GET_SEC_CLASSIFICATION || errCode != E_OK) {
504 LOGE("[AbilitySync][RequestRecv] check error remoteL:%d, errCode:%d", remoteSecLabel, errCode);
505 return false;
506 }
507 if (remoteSecLabel == option.securityLabel) {
508 return true;
509 } else {
510 LOGE("[AbilitySync][RequestRecv] check error remote:%d , %d local:%d , %d",
511 remoteSecLabel, remoteSecFlag, option.securityLabel, option.securityFlag);
512 return false;
513 }
514 }
515
HandleVersionV3RequestParam(const AbilitySyncRequestPacket * packet,ISyncTaskContext * context)516 void AbilitySync::HandleVersionV3RequestParam(const AbilitySyncRequestPacket *packet, ISyncTaskContext *context)
517 {
518 int32_t remoteSecLabel = packet->GetSecLabel();
519 int32_t remoteSecFlag = packet->GetSecFlag();
520 DbAbility remoteDbAbility = packet->GetDbAbility();
521 (static_cast<SingleVerSyncTaskContext *>(context))->SetDbAbility(remoteDbAbility);
522 (static_cast<SingleVerSyncTaskContext *>(context))->SetRemoteSeccurityOption({remoteSecLabel, remoteSecFlag});
523 (static_cast<SingleVerSyncTaskContext *>(context))->SetReceivcPermitCheck(false);
524 LOGI("[AbilitySync][HandleVersionV3RequestParam] remoteSecLabel = %d, remoteSecFlag = %d, remoteSchemaType = %u",
525 remoteSecLabel, remoteSecFlag, packet->GetSchemaType());
526 }
527
HandleVersionV3AckSecOptionParam(const AbilitySyncAckPacket * packet,ISyncTaskContext * context)528 void AbilitySync::HandleVersionV3AckSecOptionParam(const AbilitySyncAckPacket *packet,
529 ISyncTaskContext *context)
530 {
531 int32_t remoteSecLabel = packet->GetSecLabel();
532 int32_t remoteSecFlag = packet->GetSecFlag();
533 SecurityOption secOption = {remoteSecLabel, remoteSecFlag};
534 (static_cast<SingleVerSyncTaskContext *>(context))->SetRemoteSeccurityOption(secOption);
535 (static_cast<SingleVerSyncTaskContext *>(context))->SetSendPermitCheck(false);
536 LOGI("[AbilitySync][AckRecv] remoteSecLabel = %d, remoteSecFlag = %d", remoteSecLabel, remoteSecFlag);
537 }
538
HandleVersionV3AckSchemaParam(const AbilitySyncAckPacket * recvPacket,AbilitySyncAckPacket & sendPacket,ISyncTaskContext * context,bool sendOpinion,std::pair<bool,bool> & schemaSyncStatus) const539 int AbilitySync::HandleVersionV3AckSchemaParam(const AbilitySyncAckPacket *recvPacket,
540 AbilitySyncAckPacket &sendPacket, ISyncTaskContext *context, bool sendOpinion,
541 std::pair<bool, bool> &schemaSyncStatus) const
542 {
543 if (IsSingleRelationalVer()) {
544 return HandleRelationAckSchemaParam(recvPacket, sendPacket, context, sendOpinion, schemaSyncStatus);
545 }
546 HandleKvAckSchemaParam(recvPacket, context, sendPacket, schemaSyncStatus);
547 return E_OK;
548 }
549
GetPacketSecOption(SecurityOption & option) const550 void AbilitySync::GetPacketSecOption(SecurityOption &option) const
551 {
552 int errCode =
553 (static_cast<SyncGenericInterface *>(storageInterface_))->GetSecurityOption(option);
554 if (errCode == -E_NOT_SUPPORT) {
555 LOGE("[AbilitySync][SyncStart] GetSecOpt not surpport sec classification");
556 option.securityLabel = NOT_SURPPORT_SEC_CLASSIFICATION;
557 } else if (errCode != E_OK) {
558 LOGE("[AbilitySync][SyncStart] GetSecOpt errCode:%d", errCode);
559 option.securityLabel = FAILED_GET_SEC_CLASSIFICATION;
560 }
561 }
562
RegisterTransformFunc()563 int AbilitySync::RegisterTransformFunc()
564 {
565 TransformFunc func;
566 func.computeFunc = std::bind(&AbilitySync::CalculateLen, std::placeholders::_1);
567 func.serializeFunc = std::bind(&AbilitySync::Serialization, std::placeholders::_1,
568 std::placeholders::_2, std::placeholders::_3);
569 func.deserializeFunc = std::bind(&AbilitySync::DeSerialization, std::placeholders::_1,
570 std::placeholders::_2, std::placeholders::_3);
571 return MessageTransform::RegTransformFunction(ABILITY_SYNC_MESSAGE, func);
572 }
573
CalculateLen(const Message * inMsg)574 uint32_t AbilitySync::CalculateLen(const Message *inMsg)
575 {
576 if ((inMsg == nullptr) || (inMsg->GetMessageId() != ABILITY_SYNC_MESSAGE)) {
577 return 0;
578 }
579 int errCode;
580 uint32_t len = 0;
581 switch (inMsg->GetMessageType()) {
582 case TYPE_REQUEST:
583 errCode = RequestPacketCalculateLen(inMsg, len);
584 if (errCode != E_OK) {
585 LOGE("[AbilitySync][CalculateLen] request packet calc length err %d", errCode);
586 }
587 break;
588 case TYPE_RESPONSE:
589 errCode = AckPacketCalculateLen(inMsg, len);
590 if (errCode != E_OK) {
591 LOGE("[AbilitySync][CalculateLen] ack packet calc length err %d", errCode);
592 }
593 break;
594 case TYPE_NOTIFY:
595 errCode = AckPacketCalculateLen(inMsg, len);
596 if (errCode != E_OK) {
597 LOGE("[AbilitySync][CalculateLen] ack packet calc length err %d", errCode);
598 }
599 break;
600 default:
601 LOGE("[AbilitySync][CalculateLen] message type not support, type %d", inMsg->GetMessageType());
602 break;
603 }
604 return len;
605 }
606
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)607 int AbilitySync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
608 {
609 if ((buffer == nullptr) || (inMsg == nullptr)) {
610 return -E_INVALID_ARGS;
611 }
612
613 switch (inMsg->GetMessageType()) {
614 case TYPE_REQUEST:
615 return RequestPacketSerialization(buffer, length, inMsg);
616 case TYPE_RESPONSE:
617 case TYPE_NOTIFY:
618 return AckPacketSerialization(buffer, length, inMsg);
619 default:
620 return -E_MESSAGE_TYPE_ERROR;
621 }
622 }
623
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)624 int AbilitySync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
625 {
626 if ((buffer == nullptr) || (inMsg == nullptr)) {
627 return -E_INVALID_ARGS;
628 }
629
630 switch (inMsg->GetMessageType()) {
631 case TYPE_REQUEST:
632 return RequestPacketDeSerialization(buffer, length, inMsg);
633 case TYPE_RESPONSE:
634 case TYPE_NOTIFY:
635 return AckPacketDeSerialization(buffer, length, inMsg);
636 default:
637 return -E_MESSAGE_TYPE_ERROR;
638 }
639 }
640
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)641 int AbilitySync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
642 {
643 const AbilitySyncRequestPacket *packet = inMsg->GetObject<AbilitySyncRequestPacket>();
644 if (packet == nullptr) {
645 return -E_INVALID_ARGS;
646 }
647
648 len = packet->CalculateLen();
649 return E_OK;
650 }
651
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)652 int AbilitySync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
653 {
654 const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
655 if (packet == nullptr) {
656 return -E_INVALID_ARGS;
657 }
658
659 len = packet->CalculateLen();
660 return E_OK;
661 }
662
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)663 int AbilitySync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
664 {
665 const AbilitySyncRequestPacket *packet = inMsg->GetObject<AbilitySyncRequestPacket>();
666 if ((packet == nullptr) || (length != packet->CalculateLen())) {
667 return -E_INVALID_ARGS;
668 }
669
670 Parcel parcel(buffer, length);
671 parcel.WriteUInt32(packet->GetProtocolVersion());
672 parcel.WriteInt(packet->GetSendCode());
673 parcel.WriteUInt32(packet->GetSoftwareVersion());
674 parcel.WriteString(packet->GetSchema());
675 parcel.WriteInt(packet->GetSecLabel());
676 parcel.WriteInt(packet->GetSecFlag());
677 parcel.WriteUInt32(packet->GetSchemaType());
678 parcel.WriteUInt64(packet->GetDbCreateTime());
679 int errCode = DbAbility::Serialize(parcel, packet->GetDbAbility());
680 if (parcel.IsError() || errCode != E_OK) {
681 return -E_PARSE_FAIL;
682 }
683 return E_OK;
684 }
685
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)686 int AbilitySync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
687 {
688 const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
689 if ((packet == nullptr) || (length != packet->CalculateLen())) {
690 return -E_INVALID_ARGS;
691 }
692
693 Parcel parcel(buffer, length);
694 parcel.WriteUInt32(ABILITY_SYNC_VERSION_V1);
695 parcel.WriteUInt32(SOFTWARE_VERSION_CURRENT);
696 parcel.WriteInt(packet->GetAckCode());
697 parcel.WriteString(packet->GetSchema());
698 parcel.WriteInt(packet->GetSecLabel());
699 parcel.WriteInt(packet->GetSecFlag());
700 parcel.WriteUInt32(packet->GetSchemaType());
701 parcel.WriteUInt32(packet->GetPermitSync());
702 parcel.WriteUInt32(packet->GetRequirePeerConvert());
703 parcel.WriteUInt64(packet->GetDbCreateTime());
704 int errCode = DbAbility::Serialize(parcel, packet->GetDbAbility());
705 if (parcel.IsError() || errCode != E_OK) {
706 return -E_PARSE_FAIL;
707 }
708 errCode = SchemaNegotiate::SerializeData(packet->GetRelationalSyncOpinion(), parcel);
709 if (parcel.IsError() || errCode != E_OK) {
710 return -E_PARSE_FAIL;
711 }
712 return E_OK;
713 }
714
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)715 int AbilitySync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
716 {
717 auto *packet = new (std::nothrow) AbilitySyncRequestPacket();
718 if (packet == nullptr) {
719 return -E_OUT_OF_MEMORY;
720 }
721
722 Parcel parcel(const_cast<uint8_t *>(buffer), length);
723 uint32_t version = 0;
724 uint32_t softwareVersion = 0;
725 std::string schema;
726 int32_t sendCode = 0;
727 int errCode = -E_PARSE_FAIL;
728
729 parcel.ReadUInt32(version);
730 if (parcel.IsError()) {
731 goto ERROR_OUT;
732 }
733 packet->SetProtocolVersion(version);
734 if (version > ABILITY_SYNC_VERSION_V1) {
735 packet->SetSendCode(-E_VERSION_NOT_SUPPORT);
736 errCode = inMsg->SetExternalObject<>(packet);
737 if (errCode != E_OK) {
738 goto ERROR_OUT;
739 }
740 return errCode;
741 }
742 parcel.ReadInt(sendCode);
743 parcel.ReadUInt32(softwareVersion);
744 parcel.ReadString(schema);
745 errCode = RequestPacketDeSerializationTailPart(parcel, packet, softwareVersion);
746 if (parcel.IsError() || errCode != E_OK) {
747 goto ERROR_OUT;
748 }
749 packet->SetSendCode(sendCode);
750 packet->SetSoftwareVersion(softwareVersion);
751 packet->SetSchema(schema);
752
753 errCode = inMsg->SetExternalObject<>(packet);
754 if (errCode == E_OK) {
755 return E_OK;
756 }
757
758 ERROR_OUT:
759 delete packet;
760 return errCode;
761 }
762
RequestPacketDeSerializationTailPart(Parcel & parcel,AbilitySyncRequestPacket * packet,uint32_t version)763 int AbilitySync::RequestPacketDeSerializationTailPart(Parcel &parcel, AbilitySyncRequestPacket *packet,
764 uint32_t version)
765 {
766 if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_2_0) {
767 int32_t secLabel = 0;
768 int32_t secFlag = 0;
769 uint32_t schemaType = 0;
770 parcel.ReadInt(secLabel);
771 parcel.ReadInt(secFlag);
772 parcel.ReadUInt32(schemaType);
773 packet->SetSecLabel(secLabel);
774 packet->SetSecFlag(secFlag);
775 packet->SetSchemaType(schemaType);
776 }
777 if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_3_0) {
778 uint64_t dbCreateTime = 0;
779 parcel.ReadUInt64(dbCreateTime);
780 packet->SetDbCreateTime(dbCreateTime);
781 }
782 DbAbility remoteDbAbility;
783 int errCode = DbAbility::DeSerialize(parcel, remoteDbAbility);
784 if (errCode != E_OK) {
785 LOGE("[AbilitySync] request packet DeSerializ failed.");
786 return errCode;
787 }
788 packet->SetDbAbility(remoteDbAbility);
789 return E_OK;
790 }
791
AckPacketDeSerializationTailPart(Parcel & parcel,AbilitySyncAckPacket * packet,uint32_t version)792 int AbilitySync::AckPacketDeSerializationTailPart(Parcel &parcel, AbilitySyncAckPacket *packet, uint32_t version)
793 {
794 if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_2_0) {
795 int32_t secLabel = 0;
796 int32_t secFlag = 0;
797 uint32_t schemaType = 0;
798 uint32_t permitSync = 0;
799 uint32_t requirePeerConvert = 0;
800 parcel.ReadInt(secLabel);
801 parcel.ReadInt(secFlag);
802 parcel.ReadUInt32(schemaType);
803 parcel.ReadUInt32(permitSync);
804 parcel.ReadUInt32(requirePeerConvert);
805 packet->SetSecLabel(secLabel);
806 packet->SetSecFlag(secFlag);
807 packet->SetSchemaType(schemaType);
808 packet->SetPermitSync(permitSync);
809 packet->SetRequirePeerConvert(requirePeerConvert);
810 }
811 if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_3_0) {
812 uint64_t dbCreateTime = 0;
813 parcel.ReadUInt64(dbCreateTime);
814 packet->SetDbCreateTime(dbCreateTime);
815 }
816 DbAbility remoteDbAbility;
817 int errCode = DbAbility::DeSerialize(parcel, remoteDbAbility);
818 if (errCode != E_OK) {
819 LOGE("[AbilitySync] ack packet DeSerializ failed.");
820 return errCode;
821 }
822 packet->SetDbAbility(remoteDbAbility);
823 RelationalSyncOpinion relationalSyncOpinion;
824 errCode = SchemaNegotiate::DeserializeData(parcel, relationalSyncOpinion);
825 if (errCode != E_OK) {
826 LOGE("[AbilitySync] ack packet DeSerializ RelationalSyncOpinion failed.");
827 return errCode;
828 }
829 packet->SetRelationalSyncOpinion(relationalSyncOpinion);
830 return E_OK;
831 }
832
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)833 int AbilitySync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
834 {
835 auto *packet = new (std::nothrow) AbilitySyncAckPacket();
836 if (packet == nullptr) {
837 return -E_OUT_OF_MEMORY;
838 }
839
840 Parcel parcel(const_cast<uint8_t *>(buffer), length);
841 uint32_t version = 0;
842 uint32_t softwareVersion = 0;
843 int32_t ackCode = E_OK;
844 std::string schema;
845 int errCode;
846 parcel.ReadUInt32(version);
847 if (parcel.IsError()) {
848 LOGE("[AbilitySync][RequestDeSerialization] read version failed!");
849 errCode = -E_PARSE_FAIL;
850 goto ERROR_OUT;
851 }
852 packet->SetProtocolVersion(version);
853 if (version > ABILITY_SYNC_VERSION_V1) {
854 packet->SetAckCode(-E_VERSION_NOT_SUPPORT);
855 errCode = inMsg->SetExternalObject<>(packet);
856 if (errCode != E_OK) {
857 goto ERROR_OUT;
858 }
859 return errCode;
860 }
861 parcel.ReadUInt32(softwareVersion);
862 parcel.ReadInt(ackCode);
863 parcel.ReadString(schema);
864 errCode = AckPacketDeSerializationTailPart(parcel, packet, softwareVersion);
865 if (parcel.IsError() || errCode != E_OK) {
866 LOGE("[AbilitySync][RequestDeSerialization] DeSerialization failed!");
867 errCode = -E_PARSE_FAIL;
868 goto ERROR_OUT;
869 }
870 packet->SetSoftwareVersion(softwareVersion);
871 packet->SetAckCode(ackCode);
872 packet->SetSchema(schema);
873 errCode = inMsg->SetExternalObject<>(packet);
874 if (errCode == E_OK) {
875 return E_OK;
876 }
877
878 ERROR_OUT:
879 delete packet;
880 return errCode;
881 }
882
SetAbilityRequestBodyInfo(AbilitySyncRequestPacket & packet,uint16_t remoteCommunicatorVersion) const883 int AbilitySync::SetAbilityRequestBodyInfo(AbilitySyncRequestPacket &packet, uint16_t remoteCommunicatorVersion) const
884 {
885 uint64_t dbCreateTime;
886 int errCode =
887 (static_cast<SyncGenericInterface *>(storageInterface_))->GetDatabaseCreateTimestamp(dbCreateTime);
888 if (errCode != E_OK) {
889 LOGE("[AbilitySync][FillAbilityRequest] GetDatabaseCreateTimestamp failed, err %d", errCode);
890 return errCode;
891 }
892 SecurityOption option;
893 GetPacketSecOption(option);
894 std::string schemaStr;
895 uint32_t schemaType = 0;
896 if (IsSingleKvVer()) {
897 SchemaObject schemaObj = (static_cast<SingleVerKvDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
898 schemaStr = schemaObj.ToSchemaString();
899 schemaType = static_cast<uint32_t>(schemaObj.GetSchemaType());
900 } else if (IsSingleRelationalVer()) {
901 auto schemaObj = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
902 schemaStr = schemaObj.ToSchemaString();
903 schemaType = static_cast<uint32_t>(schemaObj.GetSchemaType());
904 }
905 DbAbility dbAbility;
906 errCode = GetDbAbilityInfo(dbAbility);
907 if (errCode != E_OK) {
908 LOGE("[AbilitySync][FillAbilityRequest] GetDbAbility failed, err %d", errCode);
909 return errCode;
910 }
911 // 102 version is forbidden to sync with 103 json-schema or flatbuffer-schema
912 // so schema should put null string while remote is 102 version to avoid this bug.
913 if (remoteCommunicatorVersion == 1) {
914 packet.SetSchema("");
915 packet.SetSchemaType(0);
916 } else {
917 packet.SetSchema(schemaStr);
918 packet.SetSchemaType(schemaType);
919 }
920 packet.SetProtocolVersion(ABILITY_SYNC_VERSION_V1);
921 packet.SetSoftwareVersion(SOFTWARE_VERSION_CURRENT);
922 packet.SetSecLabel(option.securityLabel);
923 packet.SetSecFlag(option.securityFlag);
924 packet.SetDbCreateTime(dbCreateTime);
925 packet.SetDbAbility(dbAbility);
926 LOGI("[AbilitySync][FillRequest] ver=%u,Lab=%d,Flag=%d,dbCreateTime=%" PRId64, SOFTWARE_VERSION_CURRENT,
927 option.securityLabel, option.securityFlag, dbCreateTime);
928 return E_OK;
929 }
930
SetAbilityAckBodyInfo(AbilitySyncAckPacket & ackPacket,int ackCode,bool isAckNotify) const931 int AbilitySync::SetAbilityAckBodyInfo(AbilitySyncAckPacket &ackPacket, int ackCode, bool isAckNotify) const
932 {
933 ackPacket.SetProtocolVersion(ABILITY_SYNC_VERSION_V1);
934 ackPacket.SetSoftwareVersion(SOFTWARE_VERSION_CURRENT);
935 if (!isAckNotify) {
936 SecurityOption option;
937 GetPacketSecOption(option);
938 ackPacket.SetSecLabel(option.securityLabel);
939 ackPacket.SetSecFlag(option.securityFlag);
940 uint64_t dbCreateTime = 0;
941 int errCode =
942 (static_cast<SyncGenericInterface *>(storageInterface_))->GetDatabaseCreateTimestamp(dbCreateTime);
943 if (errCode != E_OK) {
944 LOGE("[AbilitySync][SyncStart] GetDatabaseCreateTimestamp failed, err %d", errCode);
945 ackCode = errCode;
946 }
947 DbAbility dbAbility;
948 errCode = GetDbAbilityInfo(dbAbility);
949 if (errCode != E_OK) {
950 LOGE("[AbilitySync][FillAbilityRequest] GetDbAbility failed, err %d", errCode);
951 return errCode;
952 }
953 ackPacket.SetDbCreateTime(dbCreateTime);
954 ackPacket.SetDbAbility(dbAbility);
955 }
956 ackPacket.SetAckCode(ackCode);
957 return E_OK;
958 }
959
SetAbilityAckSchemaInfo(AbilitySyncAckPacket & ackPacket,const ISchema & schemaObj)960 void AbilitySync::SetAbilityAckSchemaInfo(AbilitySyncAckPacket &ackPacket, const ISchema &schemaObj)
961 {
962 ackPacket.SetSchema(schemaObj.ToSchemaString());
963 ackPacket.SetSchemaType(static_cast<uint32_t>(schemaObj.GetSchemaType()));
964 }
965
SetAbilityAckSyncOpinionInfo(AbilitySyncAckPacket & ackPacket,SyncOpinion localOpinion)966 void AbilitySync::SetAbilityAckSyncOpinionInfo(AbilitySyncAckPacket &ackPacket, SyncOpinion localOpinion)
967 {
968 ackPacket.SetPermitSync(localOpinion.permitSync);
969 ackPacket.SetRequirePeerConvert(localOpinion.requirePeerConvert);
970 }
971
GetDbAbilityInfo(DbAbility & dbAbility)972 int AbilitySync::GetDbAbilityInfo(DbAbility &dbAbility)
973 {
974 int errCode = E_OK;
975 for (const auto &item : SyncConfig::ABILITYBITS) {
976 errCode = dbAbility.SetAbilityItem(item, SUPPORT_MARK);
977 if (errCode != E_OK) {
978 return errCode;
979 }
980 }
981 return errCode;
982 }
983
AckMsgCheck(const Message * message,ISyncTaskContext * context) const984 int AbilitySync::AckMsgCheck(const Message *message, ISyncTaskContext *context) const
985 {
986 if (message == nullptr || context == nullptr) {
987 return -E_INVALID_ARGS;
988 }
989 if (message->GetErrorNo() == E_FEEDBACK_UNKNOWN_MESSAGE) {
990 LOGE("[AbilitySync][AckMsgCheck] Remote device dose not support this message id");
991 context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
992 context->SetTaskErrCode(-E_FEEDBACK_UNKNOWN_MESSAGE);
993 return -E_FEEDBACK_UNKNOWN_MESSAGE;
994 }
995 if (message->GetErrorNo() == E_FEEDBACK_COMMUNICATOR_NOT_FOUND) {
996 LOGE("[AbilitySync][AckMsgCheck] Remote db is closed");
997 context->SetTaskErrCode(-E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
998 return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
999 }
1000 const AbilitySyncAckPacket *packet = message->GetObject<AbilitySyncAckPacket>();
1001 if (packet == nullptr) {
1002 return -E_INVALID_ARGS;
1003 }
1004 int ackCode = packet->GetAckCode();
1005 if (ackCode != E_OK) {
1006 LOGE("[AbilitySync][AckMsgCheck] received an errCode %d", ackCode);
1007 context->SetTaskErrCode(ackCode);
1008 return ackCode;
1009 }
1010 return E_OK;
1011 }
1012
IsSingleKvVer() const1013 bool AbilitySync::IsSingleKvVer() const
1014 {
1015 return storageInterface_->GetInterfaceType() == ISyncInterface::SYNC_SVD;
1016 }
IsSingleRelationalVer() const1017 bool AbilitySync::IsSingleRelationalVer() const
1018 {
1019 #ifdef RELATIONAL_STORE
1020 return storageInterface_->GetInterfaceType() == ISyncInterface::SYNC_RELATION;
1021 #else
1022 return false;
1023 #endif
1024 }
1025
HandleRequestRecv(const Message * message,ISyncTaskContext * context,bool isCompatible)1026 int AbilitySync::HandleRequestRecv(const Message *message, ISyncTaskContext *context, bool isCompatible)
1027 {
1028 const AbilitySyncRequestPacket *packet = message->GetObject<AbilitySyncRequestPacket>();
1029 if (packet == nullptr) {
1030 return -E_INVALID_ARGS;
1031 }
1032 uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
1033 int ackCode;
1034 std::string schema = packet->GetSchema();
1035 if (remoteSoftwareVersion <= SOFTWARE_VERSION_RELEASE_2_0) {
1036 LOGI("[AbilitySync][RequestRecv] remote version = %u, CheckSchemaCompatible = %d",
1037 remoteSoftwareVersion, isCompatible);
1038 return SendAckWithEmptySchema(message, E_OK, false);
1039 }
1040 HandleVersionV3RequestParam(packet, context);
1041 if (SecLabelCheck(packet)) {
1042 ackCode = E_OK;
1043 } else {
1044 ackCode = -E_SECURITY_OPTION_CHECK_ERROR;
1045 }
1046 if (ackCode == E_OK && remoteSoftwareVersion > SOFTWARE_VERSION_RELEASE_3_0) {
1047 ackCode = metadata_->SetDbCreateTime(deviceId_, packet->GetDbCreateTime(), true);
1048 }
1049 AbilitySyncAckPacket ackPacket;
1050 if (IsSingleRelationalVer()) {
1051 ackPacket.SetRelationalSyncOpinion(MakeRelationSyncOpinion(packet, schema));
1052 } else {
1053 SetAbilityAckSyncOpinionInfo(ackPacket, MakeKvSyncOpinion(packet, schema));
1054 }
1055 LOGI("[AbilitySync][RequestRecv] remote dev=%s,ver=%u,schemaCompatible=%d", STR_MASK(deviceId_),
1056 remoteSoftwareVersion, isCompatible);
1057 return SendAck(message, ackCode, false, ackPacket);
1058 }
1059
SendAck(const Message * message,int ackCode,bool isAckNotify,AbilitySyncAckPacket & ackPacket)1060 int AbilitySync::SendAck(const Message *message, int ackCode, bool isAckNotify, AbilitySyncAckPacket &ackPacket)
1061 {
1062 int errCode = SetAbilityAckBodyInfo(ackPacket, ackCode, isAckNotify);
1063 if (errCode != E_OK) {
1064 return errCode;
1065 }
1066 if (IsSingleRelationalVer()) {
1067 auto schemaObj = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1068 SetAbilityAckSchemaInfo(ackPacket, schemaObj);
1069 } else if (IsSingleKvVer()) {
1070 SchemaObject schemaObject = static_cast<SingleVerKvDBSyncInterface *>(storageInterface_)->GetSchemaInfo();
1071 SetAbilityAckSchemaInfo(ackPacket, schemaObject);
1072 }
1073 return SendAck(message, ackPacket, isAckNotify);
1074 }
1075
SendAckWithEmptySchema(const Message * message,int ackCode,bool isAckNotify)1076 int AbilitySync::SendAckWithEmptySchema(const Message *message, int ackCode,
1077 bool isAckNotify)
1078 {
1079 AbilitySyncAckPacket ackPacket;
1080 int errCode = SetAbilityAckBodyInfo(ackPacket, ackCode, isAckNotify);
1081 if (errCode != E_OK) {
1082 return errCode;
1083 }
1084 SetAbilityAckSchemaInfo(ackPacket, SchemaObject());
1085 return SendAck(message, ackPacket, isAckNotify);
1086 }
1087
SendAck(const Message * inMsg,const AbilitySyncAckPacket & ackPacket,bool isAckNotify)1088 int AbilitySync::SendAck(const Message *inMsg, const AbilitySyncAckPacket &ackPacket, bool isAckNotify)
1089 {
1090 Message *ackMessage = new (std::nothrow) Message(ABILITY_SYNC_MESSAGE);
1091 if (ackMessage == nullptr) {
1092 LOGE("[AbilitySync][SendAck] message create failed, may be memleak!");
1093 return -E_OUT_OF_MEMORY;
1094 }
1095 int errCode = ackMessage->SetCopiedObject<>(ackPacket);
1096 if (errCode != E_OK) {
1097 LOGE("[AbilitySync][SendAck] SetCopiedObject failed, err %d", errCode);
1098 delete ackMessage;
1099 ackMessage = nullptr;
1100 return errCode;
1101 }
1102 (!isAckNotify) ? ackMessage->SetMessageType(TYPE_RESPONSE) : ackMessage->SetMessageType(TYPE_NOTIFY);
1103 ackMessage->SetTarget(deviceId_);
1104 ackMessage->SetSessionId(inMsg->GetSessionId());
1105 ackMessage->SetSequenceId(inMsg->GetSequenceId());
1106 SendConfig conf;
1107 SetSendConfigParam(storageInterface_->GetDbProperties(), deviceId_, false, SEND_TIME_OUT, conf);
1108 errCode = communicator_->SendMessage(deviceId_, ackMessage, conf);
1109 if (errCode != E_OK) {
1110 LOGE("[AbilitySync][SendAck] SendPacket failed, err %d", errCode);
1111 delete ackMessage;
1112 ackMessage = nullptr;
1113 }
1114 return errCode;
1115 }
1116
MakeKvSyncOpinion(const AbilitySyncRequestPacket * packet,const std::string & remoteSchema) const1117 SyncOpinion AbilitySync::MakeKvSyncOpinion(const AbilitySyncRequestPacket *packet,
1118 const std::string &remoteSchema) const
1119 {
1120 uint8_t remoteSchemaType = packet->GetSchemaType();
1121 SchemaObject localSchema = (static_cast<SingleVerKvDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1122 SyncOpinion localSyncOpinion = SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1123 return localSyncOpinion;
1124 }
1125
MakeRelationSyncOpinion(const AbilitySyncRequestPacket * packet,const std::string & remoteSchema) const1126 RelationalSyncOpinion AbilitySync::MakeRelationSyncOpinion(const AbilitySyncRequestPacket *packet,
1127 const std::string &remoteSchema) const
1128 {
1129 uint8_t remoteSchemaType = packet->GetSchemaType();
1130 RelationalSchemaObject localSchema = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1131 return SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1132 }
1133
HandleKvAckSchemaParam(const AbilitySyncAckPacket * recvPacket,ISyncTaskContext * context,AbilitySyncAckPacket & sendPacket,std::pair<bool,bool> & schemaSyncStatus) const1134 void AbilitySync::HandleKvAckSchemaParam(const AbilitySyncAckPacket *recvPacket,
1135 ISyncTaskContext *context, AbilitySyncAckPacket &sendPacket, std::pair<bool, bool> &schemaSyncStatus) const
1136 {
1137 std::string remoteSchema = recvPacket->GetSchema();
1138 uint8_t remoteSchemaType = recvPacket->GetSchemaType();
1139 bool permitSync = static_cast<bool>(recvPacket->GetPermitSync());
1140 bool requirePeerConvert = static_cast<bool>(recvPacket->GetRequirePeerConvert());
1141 SyncOpinion remoteOpinion = {permitSync, requirePeerConvert, true};
1142 SchemaObject localSchema = (static_cast<SingleVerKvDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1143 SyncOpinion syncOpinion = SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1144 SyncStrategy localStrategy = SchemaNegotiate::ConcludeSyncStrategy(syncOpinion, remoteOpinion);
1145 SetAbilityAckSyncOpinionInfo(sendPacket, syncOpinion);
1146 (static_cast<SingleVerKvSyncTaskContext *>(context))->SetSyncStrategy(localStrategy, true);
1147 schemaSyncStatus = {
1148 localStrategy.permitSync,
1149 true
1150 };
1151 }
1152
HandleRelationAckSchemaParam(const AbilitySyncAckPacket * recvPacket,AbilitySyncAckPacket & sendPacket,ISyncTaskContext * context,bool sendOpinion,std::pair<bool,bool> & schemaSyncStatus) const1153 int AbilitySync::HandleRelationAckSchemaParam(const AbilitySyncAckPacket *recvPacket, AbilitySyncAckPacket &sendPacket,
1154 ISyncTaskContext *context, bool sendOpinion, std::pair<bool, bool> &schemaSyncStatus) const
1155 {
1156 std::string remoteSchema = recvPacket->GetSchema();
1157 uint8_t remoteSchemaType = recvPacket->GetSchemaType();
1158 auto localSchema = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1159 auto localOpinion = SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1160 auto localStrategy = SchemaNegotiate::ConcludeSyncStrategy(localOpinion,
1161 recvPacket->GetRelationalSyncOpinion());
1162 (static_cast<SingleVerRelationalSyncTaskContext *>(context))->SetRelationalSyncStrategy(localStrategy, true);
1163 bool permitSync = std::any_of(localStrategy.begin(), localStrategy.end(),
1164 [] (const std::pair<std::string, SyncStrategy> &it) {
1165 return it.second.permitSync;
1166 });
1167 if (permitSync) {
1168 int innerErrCode = (static_cast<RelationalDBSyncInterface *>(storageInterface_)->SaveRemoteDeviceSchema(
1169 deviceId_, remoteSchema, remoteSchemaType));
1170 if (innerErrCode != E_OK) {
1171 LOGE("[AbilitySync][AckRecv] save remote device Schema failed,errCode=%d", innerErrCode);
1172 return innerErrCode;
1173 }
1174 }
1175 int errCode = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->
1176 CreateDistributedDeviceTable(context->GetDeviceId(), localStrategy);
1177 if (errCode != E_OK) {
1178 LOGE("[AbilitySync][AckRecv] create distributed device table failed,errCode=%d", errCode);
1179 }
1180 if (sendOpinion) {
1181 sendPacket.SetRelationalSyncOpinion(localOpinion);
1182 }
1183 auto singleVerContext = static_cast<SingleVerSyncTaskContext *>(context);
1184 auto strategy = localStrategy.find(singleVerContext->GetQuery().GetRelationTableName());
1185 schemaSyncStatus = {
1186 !(strategy == localStrategy.end()) && strategy->second.permitSync,
1187 true
1188 };
1189 return errCode;
1190 }
1191
AckRecvWithHighVersion(const Message * message,ISyncTaskContext * context,const AbilitySyncAckPacket * packet)1192 int AbilitySync::AckRecvWithHighVersion(const Message *message, ISyncTaskContext *context,
1193 const AbilitySyncAckPacket *packet)
1194 {
1195 HandleVersionV3AckSecOptionParam(packet, context);
1196 AbilitySyncAckPacket ackPacket;
1197 std::pair<bool, bool> schemaSyncStatus;
1198 int errCode = HandleVersionV3AckSchemaParam(packet, ackPacket, context, true, schemaSyncStatus);
1199 if (errCode != E_OK) {
1200 context->SetTaskErrCode(errCode);
1201 return errCode;
1202 }
1203 auto singleVerContext = static_cast<SingleVerSyncTaskContext *>(context);
1204 if (!schemaSyncStatus.first) {
1205 singleVerContext->SetTaskErrCode(-E_SCHEMA_MISMATCH);
1206 LOGE("[AbilitySync][AckRecv] scheme check failed");
1207 return -E_SCHEMA_MISMATCH;
1208 }
1209 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
1210 errCode = metadata_->SetDbCreateTime(deviceId_, packet->GetDbCreateTime(), true);
1211 if (errCode != E_OK) {
1212 LOGE("[AbilitySync][AckRecv] set db create time failed,errCode=%d", errCode);
1213 context->SetTaskErrCode(errCode);
1214 return errCode;
1215 }
1216 }
1217 DbAbility remoteDbAbility = packet->GetDbAbility();
1218 singleVerContext->SetDbAbility(remoteDbAbility);
1219 (void)SendAck(message, AbilitySync::CHECK_SUCCESS, true, ackPacket);
1220 return E_OK;
1221 }
1222 } // namespace DistributedDB