• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "ability_sync.h"
17 
18 #include "message_transform.h"
19 #include "version.h"
20 #include "db_errno.h"
21 #include "log_print.h"
22 #include "sync_types.h"
23 #include "db_common.h"
24 #include "single_ver_kvdb_sync_interface.h"
25 #include "single_ver_sync_task_context.h"
26 #include "single_ver_kv_sync_task_context.h"
27 #ifdef RELATIONAL_STORE
28 #include "relational_db_sync_interface.h"
29 #include "single_ver_relational_sync_task_context.h"
30 #endif
31 
32 namespace DistributedDB {
AbilitySyncRequestPacket()33 AbilitySyncRequestPacket::AbilitySyncRequestPacket()
34     : protocolVersion_(ABILITY_SYNC_VERSION_V1),
35       sendCode_(E_OK),
36       softwareVersion_(SOFTWARE_VERSION_CURRENT),
37       secLabel_(0),
38       secFlag_(0),
39       schemaType_(0),
40       dbCreateTime_(0)
41 {
42 }
43 
~AbilitySyncRequestPacket()44 AbilitySyncRequestPacket::~AbilitySyncRequestPacket()
45 {
46 }
47 
SetProtocolVersion(uint32_t protocolVersion)48 void AbilitySyncRequestPacket::SetProtocolVersion(uint32_t protocolVersion)
49 {
50     protocolVersion_ = protocolVersion;
51 }
52 
GetProtocolVersion() const53 uint32_t AbilitySyncRequestPacket::GetProtocolVersion() const
54 {
55     return protocolVersion_;
56 }
57 
SetSendCode(int32_t sendCode)58 void AbilitySyncRequestPacket::SetSendCode(int32_t sendCode)
59 {
60     sendCode_ = sendCode;
61 }
62 
GetSendCode() const63 int32_t AbilitySyncRequestPacket::GetSendCode() const
64 {
65     return sendCode_;
66 }
67 
SetSoftwareVersion(uint32_t swVersion)68 void AbilitySyncRequestPacket::SetSoftwareVersion(uint32_t swVersion)
69 {
70     softwareVersion_ = swVersion;
71 }
72 
GetSoftwareVersion() const73 uint32_t AbilitySyncRequestPacket::GetSoftwareVersion() const
74 {
75     return softwareVersion_;
76 }
77 
SetSchema(const std::string & schema)78 void AbilitySyncRequestPacket::SetSchema(const std::string &schema)
79 {
80     schema_ = schema;
81 }
82 
GetSchema() const83 std::string AbilitySyncRequestPacket::GetSchema() const
84 {
85     return schema_;
86 }
87 
SetSchemaType(uint32_t schemaType)88 void AbilitySyncRequestPacket::SetSchemaType(uint32_t schemaType)
89 {
90     schemaType_ = schemaType;
91 }
92 
GetSchemaType() const93 uint32_t AbilitySyncRequestPacket::GetSchemaType() const
94 {
95     return schemaType_;
96 }
97 
SetSecLabel(int32_t secLabel)98 void AbilitySyncRequestPacket::SetSecLabel(int32_t secLabel)
99 {
100     secLabel_ = secLabel;
101 }
102 
GetSecLabel() const103 int32_t AbilitySyncRequestPacket::GetSecLabel() const
104 {
105     return secLabel_;
106 }
107 
SetSecFlag(int32_t secFlag)108 void AbilitySyncRequestPacket::SetSecFlag(int32_t secFlag)
109 {
110     secFlag_ = secFlag;
111 }
112 
GetSecFlag() const113 int32_t AbilitySyncRequestPacket::GetSecFlag() const
114 {
115     return secFlag_;
116 }
117 
SetDbCreateTime(uint64_t dbCreateTime)118 void AbilitySyncRequestPacket::SetDbCreateTime(uint64_t dbCreateTime)
119 {
120     dbCreateTime_ = dbCreateTime;
121 }
122 
GetDbCreateTime() const123 uint64_t AbilitySyncRequestPacket::GetDbCreateTime() const
124 {
125     return dbCreateTime_;
126 }
127 
CalculateLen() const128 uint32_t AbilitySyncRequestPacket::CalculateLen() const
129 {
130     uint64_t len = 0;
131     len += Parcel::GetUInt32Len(); // protocolVersion_
132     len += Parcel::GetIntLen(); // sendCode_
133     len += Parcel::GetUInt32Len(); // softwareVersion_
134     uint32_t schemaLen = Parcel::GetStringLen(schema_);
135     if (schemaLen == 0) {
136         LOGE("[AbilitySyncRequestPacket][CalculateLen] schemaLen err!");
137         return 0;
138     }
139     len += schemaLen;
140     len += Parcel::GetIntLen(); // secLabel_
141     len += Parcel::GetIntLen(); // secFlag_
142     len += Parcel::GetUInt32Len(); // schemaType_
143     len += Parcel::GetUInt64Len(); // dbCreateTime_
144     len += DbAbility::CalculateLen(dbAbility_); // dbAbility_
145     // the reason why not 8-byte align is that old version is not 8-byte align
146     // so it is not possible to set 8-byte align for high version.
147     if (len > INT32_MAX) {
148         LOGE("[AbilitySyncRequestPacket][CalculateLen] err len:%" PRIu64, len);
149         return 0;
150     }
151     return len;
152 }
153 
GetDbAbility() const154 DbAbility AbilitySyncRequestPacket::GetDbAbility() const
155 {
156     return dbAbility_;
157 }
158 
SetDbAbility(const DbAbility & dbAbility)159 void AbilitySyncRequestPacket::SetDbAbility(const DbAbility &dbAbility)
160 {
161     dbAbility_ = dbAbility;
162 }
163 
AbilitySyncAckPacket()164 AbilitySyncAckPacket::AbilitySyncAckPacket()
165     : protocolVersion_(ABILITY_SYNC_VERSION_V1),
166       softwareVersion_(SOFTWARE_VERSION_CURRENT),
167       ackCode_(E_OK),
168       secLabel_(0),
169       secFlag_(0),
170       schemaType_(0),
171       permitSync_(0),
172       requirePeerConvert_(0),
173       dbCreateTime_(0)
174 {
175 }
176 
~AbilitySyncAckPacket()177 AbilitySyncAckPacket::~AbilitySyncAckPacket()
178 {
179 }
180 
SetProtocolVersion(uint32_t protocolVersion)181 void AbilitySyncAckPacket::SetProtocolVersion(uint32_t protocolVersion)
182 {
183     protocolVersion_ = protocolVersion;
184 }
185 
SetSoftwareVersion(uint32_t swVersion)186 void AbilitySyncAckPacket::SetSoftwareVersion(uint32_t swVersion)
187 {
188     softwareVersion_ = swVersion;
189 }
190 
GetSoftwareVersion() const191 uint32_t AbilitySyncAckPacket::GetSoftwareVersion() const
192 {
193     return softwareVersion_;
194 }
195 
GetProtocolVersion() const196 uint32_t AbilitySyncAckPacket::GetProtocolVersion() const
197 {
198     return protocolVersion_;
199 }
200 
SetAckCode(int32_t ackCode)201 void AbilitySyncAckPacket::SetAckCode(int32_t ackCode)
202 {
203     ackCode_ = ackCode;
204 }
205 
GetAckCode() const206 int32_t AbilitySyncAckPacket::GetAckCode() const
207 {
208     return ackCode_;
209 }
210 
SetSchema(const std::string & schema)211 void AbilitySyncAckPacket::SetSchema(const std::string &schema)
212 {
213     schema_ = schema;
214 }
215 
GetSchema() const216 std::string AbilitySyncAckPacket::GetSchema() const
217 {
218     return schema_;
219 }
220 
SetSchemaType(uint32_t schemaType)221 void AbilitySyncAckPacket::SetSchemaType(uint32_t schemaType)
222 {
223     schemaType_ = schemaType;
224 }
225 
GetSchemaType() const226 uint32_t AbilitySyncAckPacket::GetSchemaType() const
227 {
228     return schemaType_;
229 }
230 
SetSecLabel(int32_t secLabel)231 void AbilitySyncAckPacket::SetSecLabel(int32_t secLabel)
232 {
233     secLabel_ = secLabel;
234 }
235 
GetSecLabel() const236 int32_t AbilitySyncAckPacket::GetSecLabel() const
237 {
238     return secLabel_;
239 }
240 
SetSecFlag(int32_t secFlag)241 void AbilitySyncAckPacket::SetSecFlag(int32_t secFlag)
242 {
243     secFlag_ = secFlag;
244 }
245 
GetSecFlag() const246 int32_t AbilitySyncAckPacket::GetSecFlag() const
247 {
248     return secFlag_;
249 }
250 
SetPermitSync(uint32_t permitSync)251 void AbilitySyncAckPacket::SetPermitSync(uint32_t permitSync)
252 {
253     permitSync_ = permitSync;
254 }
255 
GetPermitSync() const256 uint32_t AbilitySyncAckPacket::GetPermitSync() const
257 {
258     return permitSync_;
259 }
260 
SetRequirePeerConvert(uint32_t requirePeerConvert)261 void AbilitySyncAckPacket::SetRequirePeerConvert(uint32_t requirePeerConvert)
262 {
263     requirePeerConvert_ = requirePeerConvert;
264 }
265 
GetRequirePeerConvert() const266 uint32_t AbilitySyncAckPacket::GetRequirePeerConvert() const
267 {
268     return requirePeerConvert_;
269 }
270 
SetDbCreateTime(uint64_t dbCreateTime)271 void AbilitySyncAckPacket::SetDbCreateTime(uint64_t dbCreateTime)
272 {
273     dbCreateTime_ = dbCreateTime;
274 }
275 
GetDbCreateTime() const276 uint64_t AbilitySyncAckPacket::GetDbCreateTime() const
277 {
278     return dbCreateTime_;
279 }
280 
CalculateLen() const281 uint32_t AbilitySyncAckPacket::CalculateLen() const
282 {
283     uint64_t len = 0;
284     len += Parcel::GetUInt32Len();
285     len += Parcel::GetUInt32Len();
286     len += Parcel::GetIntLen();
287     uint32_t schemaLen = Parcel::GetStringLen(schema_);
288     if (schemaLen == 0) {
289         LOGE("[AbilitySyncAckPacket][CalculateLen] schemaLen err!");
290         return 0;
291     }
292     len += schemaLen;
293     len += Parcel::GetIntLen(); // secLabel_
294     len += Parcel::GetIntLen(); // secFlag_
295     len += Parcel::GetUInt32Len(); // schemaType_
296     len += Parcel::GetUInt32Len(); // permitSync_
297     len += Parcel::GetUInt32Len(); // requirePeerConvert_
298     len += Parcel::GetUInt64Len(); // dbCreateTime_
299     len += DbAbility::CalculateLen(dbAbility_); // dbAbility_
300     len += SchemaNegotiate::CalculateParcelLen(relationalSyncOpinion_);
301     if (len > INT32_MAX) {
302         LOGE("[AbilitySyncAckPacket][CalculateLen] err len:%" PRIu64, len);
303         return 0;
304     }
305     return len;
306 }
307 
GetDbAbility() const308 DbAbility AbilitySyncAckPacket::GetDbAbility() const
309 {
310     return dbAbility_;
311 }
312 
SetDbAbility(const DbAbility & dbAbility)313 void AbilitySyncAckPacket::SetDbAbility(const DbAbility &dbAbility)
314 {
315     dbAbility_ = dbAbility;
316 }
317 
SetRelationalSyncOpinion(const RelationalSyncOpinion & relationalSyncOpinion)318 void AbilitySyncAckPacket::SetRelationalSyncOpinion(const RelationalSyncOpinion &relationalSyncOpinion)
319 {
320     relationalSyncOpinion_ = relationalSyncOpinion;
321 }
322 
GetRelationalSyncOpinion() const323 RelationalSyncOpinion AbilitySyncAckPacket::GetRelationalSyncOpinion() const
324 {
325     return relationalSyncOpinion_;
326 }
327 
AbilitySync()328 AbilitySync::AbilitySync()
329     : communicator_(nullptr),
330       storageInterface_(nullptr),
331       metadata_(nullptr),
332       syncFinished_(false)
333 {
334 }
335 
~AbilitySync()336 AbilitySync::~AbilitySync()
337 {
338     communicator_ = nullptr;
339     storageInterface_ = nullptr;
340     metadata_ = nullptr;
341 }
342 
Initialize(ICommunicator * inCommunicator,ISyncInterface * inStorage,const std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)343 int AbilitySync::Initialize(ICommunicator *inCommunicator, ISyncInterface *inStorage,
344     const std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
345 {
346     if (inCommunicator == nullptr || inStorage == nullptr || deviceId.empty() || inMetadata == nullptr) {
347         return -E_INVALID_ARGS;
348     }
349     communicator_ = inCommunicator;
350     storageInterface_ = inStorage;
351     metadata_ = inMetadata;
352     deviceId_ = deviceId;
353     return E_OK;
354 }
355 
SyncStart(uint32_t sessionId,uint32_t sequenceId,uint16_t remoteCommunicatorVersion,const CommErrHandler & handler)356 int AbilitySync::SyncStart(uint32_t sessionId, uint32_t sequenceId, uint16_t remoteCommunicatorVersion,
357     const CommErrHandler &handler)
358 {
359     AbilitySyncRequestPacket packet;
360     int errCode = SetAbilityRequestBodyInfo(packet, remoteCommunicatorVersion);
361     if (errCode != E_OK) {
362         return errCode;
363     }
364     Message *message = new (std::nothrow) Message(ABILITY_SYNC_MESSAGE);
365     if (message == nullptr) {
366         return -E_OUT_OF_MEMORY;
367     }
368     message->SetMessageType(TYPE_REQUEST);
369     errCode = message->SetCopiedObject<>(packet);
370     if (errCode != E_OK) {
371         LOGE("[AbilitySync][SyncStart] SetCopiedObject failed, err %d", errCode);
372         delete message;
373         message = nullptr;
374         return errCode;
375     }
376     message->SetVersion(MSG_VERSION_EXT);
377     message->SetSessionId(sessionId);
378     message->SetSequenceId(sequenceId);
379     SendConfig conf;
380     SetSendConfigParam(storageInterface_->GetDbProperties(), deviceId_, false, SEND_TIME_OUT, conf);
381     errCode = communicator_->SendMessage(deviceId_, message, conf, handler);
382     if (errCode != E_OK) {
383         LOGE("[AbilitySync][SyncStart] SendPacket failed, err %d", errCode);
384         delete message;
385         message = nullptr;
386     }
387     return errCode;
388 }
389 
AckRecv(const Message * message,ISyncTaskContext * context)390 int AbilitySync::AckRecv(const Message *message, ISyncTaskContext *context)
391 {
392     int errCode = AckMsgCheck(message, context);
393     if (errCode != E_OK) {
394         return errCode;
395     }
396     const AbilitySyncAckPacket *packet = message->GetObject<AbilitySyncAckPacket>();
397     if (packet == nullptr) {
398         return -E_INVALID_ARGS;
399     }
400     uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
401     context->SetRemoteSoftwareVersion(remoteSoftwareVersion);
402     if (remoteSoftwareVersion > SOFTWARE_VERSION_RELEASE_2_0) {
403         errCode = AckRecvWithHighVersion(message, context, packet);
404     } else {
405         std::string schema = packet->GetSchema();
406         uint8_t schemaType = packet->GetSchemaType();
407         bool isCompatible = static_cast<SyncGenericInterface *>(storageInterface_)->CheckCompatible(schema, schemaType);
408         if (!isCompatible) {
409             (static_cast<SingleVerSyncTaskContext *>(context))->SetTaskErrCode(-E_SCHEMA_MISMATCH);
410             LOGE("[AbilitySync][AckRecv] scheme check failed");
411             return -E_SCHEMA_MISMATCH;
412         }
413         LOGI("[AbilitySync][AckRecv]remoteSoftwareVersion = %u, isCompatible = %d,", remoteSoftwareVersion,
414             isCompatible);
415     }
416     return errCode;
417 }
418 
RequestRecv(const Message * message,ISyncTaskContext * context)419 int AbilitySync::RequestRecv(const Message *message, ISyncTaskContext *context)
420 {
421     if (message == nullptr || context == nullptr) {
422         return -E_INVALID_ARGS;
423     }
424     const AbilitySyncRequestPacket *packet = message->GetObject<AbilitySyncRequestPacket>();
425     if (packet == nullptr) {
426         return -E_INVALID_ARGS;
427     }
428     if (packet->GetSendCode() == -E_VERSION_NOT_SUPPORT) {
429         AbilitySyncAckPacket ackPacket;
430         (void)SendAck(message, -E_VERSION_NOT_SUPPORT, false, ackPacket);
431         LOGI("[AbilitySync][RequestRecv] version can not support, remote version is %u", packet->GetProtocolVersion());
432         return -E_VERSION_NOT_SUPPORT;
433     }
434 
435     std::string schema = packet->GetSchema();
436     uint8_t schemaType = packet->GetSchemaType();
437     bool isCompatible = static_cast<SyncGenericInterface *>(storageInterface_)->CheckCompatible(schema, schemaType);
438     if (!isCompatible) {
439         (static_cast<SingleVerSyncTaskContext *>(context))->SetTaskErrCode(-E_SCHEMA_MISMATCH);
440     }
441     uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
442     context->SetRemoteSoftwareVersion(remoteSoftwareVersion);
443     return HandleRequestRecv(message, context, isCompatible);
444 }
445 
AckNotifyRecv(const Message * message,ISyncTaskContext * context)446 int AbilitySync::AckNotifyRecv(const Message *message, ISyncTaskContext *context)
447 {
448     if (message == nullptr || context == nullptr) {
449         return -E_INVALID_ARGS;
450     }
451     if (message->GetErrorNo() == E_FEEDBACK_UNKNOWN_MESSAGE) {
452         LOGE("[AbilitySync][AckNotifyRecv] Remote device dose not support this message id");
453         context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
454         return -E_FEEDBACK_UNKNOWN_MESSAGE;
455     }
456     const AbilitySyncAckPacket *packet = message->GetObject<AbilitySyncAckPacket>();
457     if (packet == nullptr) {
458         return -E_INVALID_ARGS;
459     }
460     int errCode = packet->GetAckCode();
461     if (errCode != E_OK) {
462         LOGE("[AbilitySync][AckNotifyRecv] received an errCode %d", errCode);
463         return errCode;
464     }
465     uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
466     context->SetRemoteSoftwareVersion(remoteSoftwareVersion);
467     AbilitySyncAckPacket sendPacket;
468     std::pair<bool, bool> schemaSyncStatus;
469     errCode = HandleVersionV3AckSchemaParam(packet, sendPacket, context, false, schemaSyncStatus);
470     int ackCode = errCode;
471     LOGI("[AckNotifyRecv] receive dev = %s ack notify, remoteSoftwareVersion = %u, ackCode = %d",
472         STR_MASK(deviceId_), remoteSoftwareVersion, errCode);
473     if (errCode == E_OK) {
474         ackCode = AbilitySync::LAST_NOTIFY;
475     }
476     (void)SendAckWithEmptySchema(message, ackCode, true);
477     return errCode;
478 }
479 
GetAbilitySyncFinishedStatus() const480 bool AbilitySync::GetAbilitySyncFinishedStatus() const
481 {
482     return syncFinished_;
483 }
484 
SetAbilitySyncFinishedStatus(bool syncFinished)485 void AbilitySync::SetAbilitySyncFinishedStatus(bool syncFinished)
486 {
487     syncFinished_ = syncFinished;
488 }
489 
SecLabelCheck(const AbilitySyncRequestPacket * packet) const490 bool AbilitySync::SecLabelCheck(const AbilitySyncRequestPacket *packet) const
491 {
492     int32_t remoteSecLabel = packet->GetSecLabel();
493     int32_t remoteSecFlag = packet->GetSecFlag();
494     if (remoteSecLabel == NOT_SURPPORT_SEC_CLASSIFICATION || remoteSecLabel == SecurityLabel::NOT_SET) {
495         return true;
496     }
497     SecurityOption option;
498     int errCode = (static_cast<SyncGenericInterface *>(storageInterface_))->GetSecurityOption(option);
499     LOGI("[AbilitySync][RequestRecv] local l:%d, f:%d, errCode:%d", option.securityLabel, option.securityFlag, errCode);
500     if (errCode == -E_NOT_SUPPORT || (errCode == E_OK && option.securityLabel == SecurityLabel::NOT_SET)) {
501         return true;
502     }
503     if (remoteSecLabel == FAILED_GET_SEC_CLASSIFICATION || errCode != E_OK) {
504         LOGE("[AbilitySync][RequestRecv] check error remoteL:%d, errCode:%d", remoteSecLabel, errCode);
505         return false;
506     }
507     if (remoteSecLabel == option.securityLabel) {
508         return true;
509     } else {
510         LOGE("[AbilitySync][RequestRecv] check error remote:%d , %d local:%d , %d",
511             remoteSecLabel, remoteSecFlag, option.securityLabel, option.securityFlag);
512         return false;
513     }
514 }
515 
HandleVersionV3RequestParam(const AbilitySyncRequestPacket * packet,ISyncTaskContext * context)516 void AbilitySync::HandleVersionV3RequestParam(const AbilitySyncRequestPacket *packet, ISyncTaskContext *context)
517 {
518     int32_t remoteSecLabel = packet->GetSecLabel();
519     int32_t remoteSecFlag = packet->GetSecFlag();
520     DbAbility remoteDbAbility = packet->GetDbAbility();
521     (static_cast<SingleVerSyncTaskContext *>(context))->SetDbAbility(remoteDbAbility);
522     (static_cast<SingleVerSyncTaskContext *>(context))->SetRemoteSeccurityOption({remoteSecLabel, remoteSecFlag});
523     (static_cast<SingleVerSyncTaskContext *>(context))->SetReceivcPermitCheck(false);
524     LOGI("[AbilitySync][HandleVersionV3RequestParam] remoteSecLabel = %d, remoteSecFlag = %d, remoteSchemaType = %u",
525         remoteSecLabel, remoteSecFlag, packet->GetSchemaType());
526 }
527 
HandleVersionV3AckSecOptionParam(const AbilitySyncAckPacket * packet,ISyncTaskContext * context)528 void AbilitySync::HandleVersionV3AckSecOptionParam(const AbilitySyncAckPacket *packet,
529     ISyncTaskContext *context)
530 {
531     int32_t remoteSecLabel = packet->GetSecLabel();
532     int32_t remoteSecFlag = packet->GetSecFlag();
533     SecurityOption secOption = {remoteSecLabel, remoteSecFlag};
534     (static_cast<SingleVerSyncTaskContext *>(context))->SetRemoteSeccurityOption(secOption);
535     (static_cast<SingleVerSyncTaskContext *>(context))->SetSendPermitCheck(false);
536     LOGI("[AbilitySync][AckRecv] remoteSecLabel = %d, remoteSecFlag = %d", remoteSecLabel, remoteSecFlag);
537 }
538 
HandleVersionV3AckSchemaParam(const AbilitySyncAckPacket * recvPacket,AbilitySyncAckPacket & sendPacket,ISyncTaskContext * context,bool sendOpinion,std::pair<bool,bool> & schemaSyncStatus) const539 int AbilitySync::HandleVersionV3AckSchemaParam(const AbilitySyncAckPacket *recvPacket,
540     AbilitySyncAckPacket &sendPacket,  ISyncTaskContext *context, bool sendOpinion,
541     std::pair<bool, bool> &schemaSyncStatus) const
542 {
543     if (IsSingleRelationalVer()) {
544         return HandleRelationAckSchemaParam(recvPacket, sendPacket, context, sendOpinion, schemaSyncStatus);
545     }
546     HandleKvAckSchemaParam(recvPacket, context, sendPacket, schemaSyncStatus);
547     return E_OK;
548 }
549 
GetPacketSecOption(SecurityOption & option) const550 void AbilitySync::GetPacketSecOption(SecurityOption &option) const
551 {
552     int errCode =
553         (static_cast<SyncGenericInterface *>(storageInterface_))->GetSecurityOption(option);
554     if (errCode == -E_NOT_SUPPORT) {
555         LOGE("[AbilitySync][SyncStart] GetSecOpt not surpport sec classification");
556         option.securityLabel = NOT_SURPPORT_SEC_CLASSIFICATION;
557     } else if (errCode != E_OK) {
558         LOGE("[AbilitySync][SyncStart] GetSecOpt errCode:%d", errCode);
559         option.securityLabel = FAILED_GET_SEC_CLASSIFICATION;
560     }
561 }
562 
RegisterTransformFunc()563 int AbilitySync::RegisterTransformFunc()
564 {
565     TransformFunc func;
566     func.computeFunc = std::bind(&AbilitySync::CalculateLen, std::placeholders::_1);
567     func.serializeFunc = std::bind(&AbilitySync::Serialization, std::placeholders::_1,
568                                    std::placeholders::_2, std::placeholders::_3);
569     func.deserializeFunc = std::bind(&AbilitySync::DeSerialization, std::placeholders::_1,
570                                      std::placeholders::_2, std::placeholders::_3);
571     return MessageTransform::RegTransformFunction(ABILITY_SYNC_MESSAGE, func);
572 }
573 
CalculateLen(const Message * inMsg)574 uint32_t AbilitySync::CalculateLen(const Message *inMsg)
575 {
576     if ((inMsg == nullptr) || (inMsg->GetMessageId() != ABILITY_SYNC_MESSAGE)) {
577         return 0;
578     }
579     int errCode;
580     uint32_t len = 0;
581     switch (inMsg->GetMessageType()) {
582         case TYPE_REQUEST:
583             errCode = RequestPacketCalculateLen(inMsg, len);
584             if (errCode != E_OK) {
585                 LOGE("[AbilitySync][CalculateLen] request packet calc length err %d", errCode);
586             }
587             break;
588         case TYPE_RESPONSE:
589             errCode = AckPacketCalculateLen(inMsg, len);
590             if (errCode != E_OK) {
591                 LOGE("[AbilitySync][CalculateLen] ack packet calc length err %d", errCode);
592             }
593             break;
594         case TYPE_NOTIFY:
595             errCode = AckPacketCalculateLen(inMsg, len);
596             if (errCode != E_OK) {
597                 LOGE("[AbilitySync][CalculateLen] ack packet calc length err %d", errCode);
598             }
599             break;
600         default:
601             LOGE("[AbilitySync][CalculateLen] message type not support, type %d", inMsg->GetMessageType());
602             break;
603     }
604     return len;
605 }
606 
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)607 int AbilitySync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
608 {
609     if ((buffer == nullptr) || (inMsg == nullptr)) {
610         return -E_INVALID_ARGS;
611     }
612 
613     switch (inMsg->GetMessageType()) {
614         case TYPE_REQUEST:
615             return RequestPacketSerialization(buffer, length, inMsg);
616         case TYPE_RESPONSE:
617         case TYPE_NOTIFY:
618             return AckPacketSerialization(buffer, length, inMsg);
619         default:
620             return -E_MESSAGE_TYPE_ERROR;
621     }
622 }
623 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)624 int AbilitySync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
625 {
626     if ((buffer == nullptr) || (inMsg == nullptr)) {
627         return -E_INVALID_ARGS;
628     }
629 
630     switch (inMsg->GetMessageType()) {
631         case TYPE_REQUEST:
632             return RequestPacketDeSerialization(buffer, length, inMsg);
633         case TYPE_RESPONSE:
634         case TYPE_NOTIFY:
635             return AckPacketDeSerialization(buffer, length, inMsg);
636         default:
637             return -E_MESSAGE_TYPE_ERROR;
638     }
639 }
640 
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)641 int AbilitySync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
642 {
643     const AbilitySyncRequestPacket *packet = inMsg->GetObject<AbilitySyncRequestPacket>();
644     if (packet == nullptr) {
645         return -E_INVALID_ARGS;
646     }
647 
648     len = packet->CalculateLen();
649     return E_OK;
650 }
651 
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)652 int AbilitySync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
653 {
654     const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
655     if (packet == nullptr) {
656         return -E_INVALID_ARGS;
657     }
658 
659     len = packet->CalculateLen();
660     return E_OK;
661 }
662 
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)663 int AbilitySync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
664 {
665     const AbilitySyncRequestPacket *packet = inMsg->GetObject<AbilitySyncRequestPacket>();
666     if ((packet == nullptr) || (length != packet->CalculateLen())) {
667         return -E_INVALID_ARGS;
668     }
669 
670     Parcel parcel(buffer, length);
671     parcel.WriteUInt32(packet->GetProtocolVersion());
672     parcel.WriteInt(packet->GetSendCode());
673     parcel.WriteUInt32(packet->GetSoftwareVersion());
674     parcel.WriteString(packet->GetSchema());
675     parcel.WriteInt(packet->GetSecLabel());
676     parcel.WriteInt(packet->GetSecFlag());
677     parcel.WriteUInt32(packet->GetSchemaType());
678     parcel.WriteUInt64(packet->GetDbCreateTime());
679     int errCode = DbAbility::Serialize(parcel, packet->GetDbAbility());
680     if (parcel.IsError() || errCode != E_OK) {
681         return -E_PARSE_FAIL;
682     }
683     return E_OK;
684 }
685 
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)686 int AbilitySync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
687 {
688     const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
689     if ((packet == nullptr) || (length != packet->CalculateLen())) {
690         return -E_INVALID_ARGS;
691     }
692 
693     Parcel parcel(buffer, length);
694     parcel.WriteUInt32(ABILITY_SYNC_VERSION_V1);
695     parcel.WriteUInt32(SOFTWARE_VERSION_CURRENT);
696     parcel.WriteInt(packet->GetAckCode());
697     parcel.WriteString(packet->GetSchema());
698     parcel.WriteInt(packet->GetSecLabel());
699     parcel.WriteInt(packet->GetSecFlag());
700     parcel.WriteUInt32(packet->GetSchemaType());
701     parcel.WriteUInt32(packet->GetPermitSync());
702     parcel.WriteUInt32(packet->GetRequirePeerConvert());
703     parcel.WriteUInt64(packet->GetDbCreateTime());
704     int errCode = DbAbility::Serialize(parcel, packet->GetDbAbility());
705     if (parcel.IsError() || errCode != E_OK) {
706         return -E_PARSE_FAIL;
707     }
708     errCode = SchemaNegotiate::SerializeData(packet->GetRelationalSyncOpinion(), parcel);
709     if (parcel.IsError() || errCode != E_OK) {
710         return -E_PARSE_FAIL;
711     }
712     return E_OK;
713 }
714 
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)715 int AbilitySync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
716 {
717     auto *packet = new (std::nothrow) AbilitySyncRequestPacket();
718     if (packet == nullptr) {
719         return -E_OUT_OF_MEMORY;
720     }
721 
722     Parcel parcel(const_cast<uint8_t *>(buffer), length);
723     uint32_t version = 0;
724     uint32_t softwareVersion = 0;
725     std::string schema;
726     int32_t sendCode = 0;
727     int errCode = -E_PARSE_FAIL;
728 
729     parcel.ReadUInt32(version);
730     if (parcel.IsError()) {
731         goto ERROR_OUT;
732     }
733     packet->SetProtocolVersion(version);
734     if (version > ABILITY_SYNC_VERSION_V1) {
735         packet->SetSendCode(-E_VERSION_NOT_SUPPORT);
736         errCode = inMsg->SetExternalObject<>(packet);
737         if (errCode != E_OK) {
738             goto ERROR_OUT;
739         }
740         return errCode;
741     }
742     parcel.ReadInt(sendCode);
743     parcel.ReadUInt32(softwareVersion);
744     parcel.ReadString(schema);
745     errCode = RequestPacketDeSerializationTailPart(parcel, packet, softwareVersion);
746     if (parcel.IsError() || errCode != E_OK) {
747         goto ERROR_OUT;
748     }
749     packet->SetSendCode(sendCode);
750     packet->SetSoftwareVersion(softwareVersion);
751     packet->SetSchema(schema);
752 
753     errCode = inMsg->SetExternalObject<>(packet);
754     if (errCode == E_OK) {
755         return E_OK;
756     }
757 
758 ERROR_OUT:
759     delete packet;
760     return errCode;
761 }
762 
RequestPacketDeSerializationTailPart(Parcel & parcel,AbilitySyncRequestPacket * packet,uint32_t version)763 int AbilitySync::RequestPacketDeSerializationTailPart(Parcel &parcel, AbilitySyncRequestPacket *packet,
764     uint32_t version)
765 {
766     if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_2_0) {
767         int32_t secLabel = 0;
768         int32_t secFlag = 0;
769         uint32_t schemaType = 0;
770         parcel.ReadInt(secLabel);
771         parcel.ReadInt(secFlag);
772         parcel.ReadUInt32(schemaType);
773         packet->SetSecLabel(secLabel);
774         packet->SetSecFlag(secFlag);
775         packet->SetSchemaType(schemaType);
776     }
777     if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_3_0) {
778         uint64_t dbCreateTime = 0;
779         parcel.ReadUInt64(dbCreateTime);
780         packet->SetDbCreateTime(dbCreateTime);
781     }
782     DbAbility remoteDbAbility;
783     int errCode = DbAbility::DeSerialize(parcel, remoteDbAbility);
784     if (errCode != E_OK) {
785         LOGE("[AbilitySync] request packet DeSerializ failed.");
786         return errCode;
787     }
788     packet->SetDbAbility(remoteDbAbility);
789     return E_OK;
790 }
791 
AckPacketDeSerializationTailPart(Parcel & parcel,AbilitySyncAckPacket * packet,uint32_t version)792 int AbilitySync::AckPacketDeSerializationTailPart(Parcel &parcel, AbilitySyncAckPacket *packet, uint32_t version)
793 {
794     if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_2_0) {
795         int32_t secLabel = 0;
796         int32_t secFlag = 0;
797         uint32_t schemaType = 0;
798         uint32_t permitSync = 0;
799         uint32_t requirePeerConvert = 0;
800         parcel.ReadInt(secLabel);
801         parcel.ReadInt(secFlag);
802         parcel.ReadUInt32(schemaType);
803         parcel.ReadUInt32(permitSync);
804         parcel.ReadUInt32(requirePeerConvert);
805         packet->SetSecLabel(secLabel);
806         packet->SetSecFlag(secFlag);
807         packet->SetSchemaType(schemaType);
808         packet->SetPermitSync(permitSync);
809         packet->SetRequirePeerConvert(requirePeerConvert);
810     }
811     if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_3_0) {
812         uint64_t dbCreateTime = 0;
813         parcel.ReadUInt64(dbCreateTime);
814         packet->SetDbCreateTime(dbCreateTime);
815     }
816     DbAbility remoteDbAbility;
817     int errCode = DbAbility::DeSerialize(parcel, remoteDbAbility);
818     if (errCode != E_OK) {
819         LOGE("[AbilitySync] ack packet DeSerializ failed.");
820         return errCode;
821     }
822     packet->SetDbAbility(remoteDbAbility);
823     RelationalSyncOpinion relationalSyncOpinion;
824     errCode = SchemaNegotiate::DeserializeData(parcel, relationalSyncOpinion);
825     if (errCode != E_OK) {
826         LOGE("[AbilitySync] ack packet DeSerializ RelationalSyncOpinion failed.");
827         return errCode;
828     }
829     packet->SetRelationalSyncOpinion(relationalSyncOpinion);
830     return E_OK;
831 }
832 
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)833 int AbilitySync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
834 {
835     auto *packet = new (std::nothrow) AbilitySyncAckPacket();
836     if (packet == nullptr) {
837         return -E_OUT_OF_MEMORY;
838     }
839 
840     Parcel parcel(const_cast<uint8_t *>(buffer), length);
841     uint32_t version = 0;
842     uint32_t softwareVersion = 0;
843     int32_t ackCode = E_OK;
844     std::string schema;
845     int errCode;
846     parcel.ReadUInt32(version);
847     if (parcel.IsError()) {
848         LOGE("[AbilitySync][RequestDeSerialization] read version failed!");
849         errCode = -E_PARSE_FAIL;
850         goto ERROR_OUT;
851     }
852     packet->SetProtocolVersion(version);
853     if (version > ABILITY_SYNC_VERSION_V1) {
854         packet->SetAckCode(-E_VERSION_NOT_SUPPORT);
855         errCode = inMsg->SetExternalObject<>(packet);
856         if (errCode != E_OK) {
857             goto ERROR_OUT;
858         }
859         return errCode;
860     }
861     parcel.ReadUInt32(softwareVersion);
862     parcel.ReadInt(ackCode);
863     parcel.ReadString(schema);
864     errCode = AckPacketDeSerializationTailPart(parcel, packet, softwareVersion);
865     if (parcel.IsError() || errCode != E_OK) {
866         LOGE("[AbilitySync][RequestDeSerialization] DeSerialization failed!");
867         errCode = -E_PARSE_FAIL;
868         goto ERROR_OUT;
869     }
870     packet->SetSoftwareVersion(softwareVersion);
871     packet->SetAckCode(ackCode);
872     packet->SetSchema(schema);
873     errCode = inMsg->SetExternalObject<>(packet);
874     if (errCode == E_OK) {
875         return E_OK;
876     }
877 
878 ERROR_OUT:
879     delete packet;
880     return errCode;
881 }
882 
SetAbilityRequestBodyInfo(AbilitySyncRequestPacket & packet,uint16_t remoteCommunicatorVersion) const883 int AbilitySync::SetAbilityRequestBodyInfo(AbilitySyncRequestPacket &packet, uint16_t remoteCommunicatorVersion) const
884 {
885     uint64_t dbCreateTime;
886     int errCode =
887         (static_cast<SyncGenericInterface *>(storageInterface_))->GetDatabaseCreateTimestamp(dbCreateTime);
888     if (errCode != E_OK) {
889         LOGE("[AbilitySync][FillAbilityRequest] GetDatabaseCreateTimestamp failed, err %d", errCode);
890         return errCode;
891     }
892     SecurityOption option;
893     GetPacketSecOption(option);
894     std::string schemaStr;
895     uint32_t schemaType = 0;
896     if (IsSingleKvVer()) {
897         SchemaObject schemaObj = (static_cast<SingleVerKvDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
898         schemaStr = schemaObj.ToSchemaString();
899         schemaType = static_cast<uint32_t>(schemaObj.GetSchemaType());
900     } else if (IsSingleRelationalVer()) {
901         auto schemaObj = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
902         schemaStr = schemaObj.ToSchemaString();
903         schemaType = static_cast<uint32_t>(schemaObj.GetSchemaType());
904     }
905     DbAbility dbAbility;
906     errCode = GetDbAbilityInfo(dbAbility);
907     if (errCode != E_OK) {
908         LOGE("[AbilitySync][FillAbilityRequest] GetDbAbility failed, err %d", errCode);
909         return errCode;
910     }
911     // 102 version is forbidden to sync with 103 json-schema or flatbuffer-schema
912     // so schema should put null string while remote is 102 version to avoid this bug.
913     if (remoteCommunicatorVersion == 1) {
914         packet.SetSchema("");
915         packet.SetSchemaType(0);
916     } else {
917         packet.SetSchema(schemaStr);
918         packet.SetSchemaType(schemaType);
919     }
920     packet.SetProtocolVersion(ABILITY_SYNC_VERSION_V1);
921     packet.SetSoftwareVersion(SOFTWARE_VERSION_CURRENT);
922     packet.SetSecLabel(option.securityLabel);
923     packet.SetSecFlag(option.securityFlag);
924     packet.SetDbCreateTime(dbCreateTime);
925     packet.SetDbAbility(dbAbility);
926     LOGI("[AbilitySync][FillRequest] ver=%u,Lab=%d,Flag=%d,dbCreateTime=%" PRId64, SOFTWARE_VERSION_CURRENT,
927         option.securityLabel, option.securityFlag, dbCreateTime);
928     return E_OK;
929 }
930 
SetAbilityAckBodyInfo(AbilitySyncAckPacket & ackPacket,int ackCode,bool isAckNotify) const931 int AbilitySync::SetAbilityAckBodyInfo(AbilitySyncAckPacket &ackPacket, int ackCode, bool isAckNotify) const
932 {
933     ackPacket.SetProtocolVersion(ABILITY_SYNC_VERSION_V1);
934     ackPacket.SetSoftwareVersion(SOFTWARE_VERSION_CURRENT);
935     if (!isAckNotify) {
936         SecurityOption option;
937         GetPacketSecOption(option);
938         ackPacket.SetSecLabel(option.securityLabel);
939         ackPacket.SetSecFlag(option.securityFlag);
940         uint64_t dbCreateTime = 0;
941         int errCode =
942             (static_cast<SyncGenericInterface *>(storageInterface_))->GetDatabaseCreateTimestamp(dbCreateTime);
943         if (errCode != E_OK) {
944             LOGE("[AbilitySync][SyncStart] GetDatabaseCreateTimestamp failed, err %d", errCode);
945             ackCode = errCode;
946         }
947         DbAbility dbAbility;
948         errCode = GetDbAbilityInfo(dbAbility);
949         if (errCode != E_OK) {
950             LOGE("[AbilitySync][FillAbilityRequest] GetDbAbility failed, err %d", errCode);
951             return errCode;
952         }
953         ackPacket.SetDbCreateTime(dbCreateTime);
954         ackPacket.SetDbAbility(dbAbility);
955     }
956     ackPacket.SetAckCode(ackCode);
957     return E_OK;
958 }
959 
SetAbilityAckSchemaInfo(AbilitySyncAckPacket & ackPacket,const ISchema & schemaObj)960 void AbilitySync::SetAbilityAckSchemaInfo(AbilitySyncAckPacket &ackPacket, const ISchema &schemaObj)
961 {
962     ackPacket.SetSchema(schemaObj.ToSchemaString());
963     ackPacket.SetSchemaType(static_cast<uint32_t>(schemaObj.GetSchemaType()));
964 }
965 
SetAbilityAckSyncOpinionInfo(AbilitySyncAckPacket & ackPacket,SyncOpinion localOpinion)966 void AbilitySync::SetAbilityAckSyncOpinionInfo(AbilitySyncAckPacket &ackPacket, SyncOpinion localOpinion)
967 {
968     ackPacket.SetPermitSync(localOpinion.permitSync);
969     ackPacket.SetRequirePeerConvert(localOpinion.requirePeerConvert);
970 }
971 
GetDbAbilityInfo(DbAbility & dbAbility)972 int AbilitySync::GetDbAbilityInfo(DbAbility &dbAbility)
973 {
974     int errCode = E_OK;
975     for (const auto &item : SyncConfig::ABILITYBITS) {
976         errCode = dbAbility.SetAbilityItem(item, SUPPORT_MARK);
977         if (errCode != E_OK) {
978             return errCode;
979         }
980     }
981     return errCode;
982 }
983 
AckMsgCheck(const Message * message,ISyncTaskContext * context) const984 int AbilitySync::AckMsgCheck(const Message *message, ISyncTaskContext *context) const
985 {
986     if (message == nullptr || context == nullptr) {
987         return -E_INVALID_ARGS;
988     }
989     if (message->GetErrorNo() == E_FEEDBACK_UNKNOWN_MESSAGE) {
990         LOGE("[AbilitySync][AckMsgCheck] Remote device dose not support this message id");
991         context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
992         context->SetTaskErrCode(-E_FEEDBACK_UNKNOWN_MESSAGE);
993         return -E_FEEDBACK_UNKNOWN_MESSAGE;
994     }
995     if (message->GetErrorNo() == E_FEEDBACK_COMMUNICATOR_NOT_FOUND) {
996         LOGE("[AbilitySync][AckMsgCheck] Remote db is closed");
997         context->SetTaskErrCode(-E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
998         return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
999     }
1000     const AbilitySyncAckPacket *packet = message->GetObject<AbilitySyncAckPacket>();
1001     if (packet == nullptr) {
1002         return -E_INVALID_ARGS;
1003     }
1004     int ackCode = packet->GetAckCode();
1005     if (ackCode != E_OK) {
1006         LOGE("[AbilitySync][AckMsgCheck] received an errCode %d", ackCode);
1007         context->SetTaskErrCode(ackCode);
1008         return ackCode;
1009     }
1010     return E_OK;
1011 }
1012 
IsSingleKvVer() const1013 bool AbilitySync::IsSingleKvVer() const
1014 {
1015     return storageInterface_->GetInterfaceType() == ISyncInterface::SYNC_SVD;
1016 }
IsSingleRelationalVer() const1017 bool AbilitySync::IsSingleRelationalVer() const
1018 {
1019 #ifdef RELATIONAL_STORE
1020     return storageInterface_->GetInterfaceType() == ISyncInterface::SYNC_RELATION;
1021 #else
1022     return false;
1023 #endif
1024 }
1025 
HandleRequestRecv(const Message * message,ISyncTaskContext * context,bool isCompatible)1026 int AbilitySync::HandleRequestRecv(const Message *message, ISyncTaskContext *context, bool isCompatible)
1027 {
1028     const AbilitySyncRequestPacket *packet = message->GetObject<AbilitySyncRequestPacket>();
1029     if (packet == nullptr) {
1030         return -E_INVALID_ARGS;
1031     }
1032     uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
1033     int ackCode;
1034     std::string schema = packet->GetSchema();
1035     if (remoteSoftwareVersion <= SOFTWARE_VERSION_RELEASE_2_0) {
1036         LOGI("[AbilitySync][RequestRecv] remote version = %u, CheckSchemaCompatible = %d",
1037             remoteSoftwareVersion, isCompatible);
1038         return SendAckWithEmptySchema(message, E_OK, false);
1039     }
1040     HandleVersionV3RequestParam(packet, context);
1041     if (SecLabelCheck(packet)) {
1042         ackCode = E_OK;
1043     } else {
1044         ackCode = -E_SECURITY_OPTION_CHECK_ERROR;
1045     }
1046     if (ackCode == E_OK && remoteSoftwareVersion > SOFTWARE_VERSION_RELEASE_3_0) {
1047         ackCode = metadata_->SetDbCreateTime(deviceId_, packet->GetDbCreateTime(), true);
1048     }
1049     AbilitySyncAckPacket ackPacket;
1050     if (IsSingleRelationalVer()) {
1051         ackPacket.SetRelationalSyncOpinion(MakeRelationSyncOpinion(packet, schema));
1052     } else {
1053         SetAbilityAckSyncOpinionInfo(ackPacket, MakeKvSyncOpinion(packet, schema));
1054     }
1055     LOGI("[AbilitySync][RequestRecv] remote dev=%s,ver=%u,schemaCompatible=%d", STR_MASK(deviceId_),
1056         remoteSoftwareVersion, isCompatible);
1057     return SendAck(message, ackCode, false, ackPacket);
1058 }
1059 
SendAck(const Message * message,int ackCode,bool isAckNotify,AbilitySyncAckPacket & ackPacket)1060 int AbilitySync::SendAck(const Message *message, int ackCode, bool isAckNotify, AbilitySyncAckPacket &ackPacket)
1061 {
1062     int errCode = SetAbilityAckBodyInfo(ackPacket, ackCode, isAckNotify);
1063     if (errCode != E_OK) {
1064         return errCode;
1065     }
1066     if (IsSingleRelationalVer()) {
1067         auto schemaObj = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1068         SetAbilityAckSchemaInfo(ackPacket, schemaObj);
1069     } else if (IsSingleKvVer()) {
1070         SchemaObject schemaObject = static_cast<SingleVerKvDBSyncInterface *>(storageInterface_)->GetSchemaInfo();
1071         SetAbilityAckSchemaInfo(ackPacket, schemaObject);
1072     }
1073     return SendAck(message, ackPacket, isAckNotify);
1074 }
1075 
SendAckWithEmptySchema(const Message * message,int ackCode,bool isAckNotify)1076 int AbilitySync::SendAckWithEmptySchema(const Message *message, int ackCode,
1077     bool isAckNotify)
1078 {
1079     AbilitySyncAckPacket ackPacket;
1080     int errCode = SetAbilityAckBodyInfo(ackPacket, ackCode, isAckNotify);
1081     if (errCode != E_OK) {
1082         return errCode;
1083     }
1084     SetAbilityAckSchemaInfo(ackPacket, SchemaObject());
1085     return SendAck(message, ackPacket, isAckNotify);
1086 }
1087 
SendAck(const Message * inMsg,const AbilitySyncAckPacket & ackPacket,bool isAckNotify)1088 int AbilitySync::SendAck(const Message *inMsg, const AbilitySyncAckPacket &ackPacket, bool isAckNotify)
1089 {
1090     Message *ackMessage = new (std::nothrow) Message(ABILITY_SYNC_MESSAGE);
1091     if (ackMessage == nullptr) {
1092         LOGE("[AbilitySync][SendAck] message create failed, may be memleak!");
1093         return -E_OUT_OF_MEMORY;
1094     }
1095     int errCode = ackMessage->SetCopiedObject<>(ackPacket);
1096     if (errCode != E_OK) {
1097         LOGE("[AbilitySync][SendAck] SetCopiedObject failed, err %d", errCode);
1098         delete ackMessage;
1099         ackMessage = nullptr;
1100         return errCode;
1101     }
1102     (!isAckNotify) ? ackMessage->SetMessageType(TYPE_RESPONSE) : ackMessage->SetMessageType(TYPE_NOTIFY);
1103     ackMessage->SetTarget(deviceId_);
1104     ackMessage->SetSessionId(inMsg->GetSessionId());
1105     ackMessage->SetSequenceId(inMsg->GetSequenceId());
1106     SendConfig conf;
1107     SetSendConfigParam(storageInterface_->GetDbProperties(), deviceId_, false, SEND_TIME_OUT, conf);
1108     errCode = communicator_->SendMessage(deviceId_, ackMessage, conf);
1109     if (errCode != E_OK) {
1110         LOGE("[AbilitySync][SendAck] SendPacket failed, err %d", errCode);
1111         delete ackMessage;
1112         ackMessage = nullptr;
1113     }
1114     return errCode;
1115 }
1116 
MakeKvSyncOpinion(const AbilitySyncRequestPacket * packet,const std::string & remoteSchema) const1117 SyncOpinion AbilitySync::MakeKvSyncOpinion(const AbilitySyncRequestPacket *packet,
1118     const std::string &remoteSchema) const
1119 {
1120     uint8_t remoteSchemaType = packet->GetSchemaType();
1121     SchemaObject localSchema = (static_cast<SingleVerKvDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1122     SyncOpinion localSyncOpinion = SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1123     return localSyncOpinion;
1124 }
1125 
MakeRelationSyncOpinion(const AbilitySyncRequestPacket * packet,const std::string & remoteSchema) const1126 RelationalSyncOpinion AbilitySync::MakeRelationSyncOpinion(const AbilitySyncRequestPacket *packet,
1127     const std::string &remoteSchema) const
1128 {
1129     uint8_t remoteSchemaType = packet->GetSchemaType();
1130     RelationalSchemaObject localSchema = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1131     return SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1132 }
1133 
HandleKvAckSchemaParam(const AbilitySyncAckPacket * recvPacket,ISyncTaskContext * context,AbilitySyncAckPacket & sendPacket,std::pair<bool,bool> & schemaSyncStatus) const1134 void AbilitySync::HandleKvAckSchemaParam(const AbilitySyncAckPacket *recvPacket,
1135     ISyncTaskContext *context, AbilitySyncAckPacket &sendPacket, std::pair<bool, bool> &schemaSyncStatus) const
1136 {
1137     std::string remoteSchema = recvPacket->GetSchema();
1138     uint8_t remoteSchemaType = recvPacket->GetSchemaType();
1139     bool permitSync = static_cast<bool>(recvPacket->GetPermitSync());
1140     bool requirePeerConvert = static_cast<bool>(recvPacket->GetRequirePeerConvert());
1141     SyncOpinion remoteOpinion = {permitSync, requirePeerConvert, true};
1142     SchemaObject localSchema = (static_cast<SingleVerKvDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1143     SyncOpinion syncOpinion = SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1144     SyncStrategy localStrategy = SchemaNegotiate::ConcludeSyncStrategy(syncOpinion, remoteOpinion);
1145     SetAbilityAckSyncOpinionInfo(sendPacket, syncOpinion);
1146     (static_cast<SingleVerKvSyncTaskContext *>(context))->SetSyncStrategy(localStrategy, true);
1147     schemaSyncStatus = {
1148         localStrategy.permitSync,
1149         true
1150     };
1151 }
1152 
HandleRelationAckSchemaParam(const AbilitySyncAckPacket * recvPacket,AbilitySyncAckPacket & sendPacket,ISyncTaskContext * context,bool sendOpinion,std::pair<bool,bool> & schemaSyncStatus) const1153 int AbilitySync::HandleRelationAckSchemaParam(const AbilitySyncAckPacket *recvPacket, AbilitySyncAckPacket &sendPacket,
1154     ISyncTaskContext *context, bool sendOpinion, std::pair<bool, bool> &schemaSyncStatus) const
1155 {
1156     std::string remoteSchema = recvPacket->GetSchema();
1157     uint8_t remoteSchemaType = recvPacket->GetSchemaType();
1158     auto localSchema = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1159     auto localOpinion = SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1160     auto localStrategy = SchemaNegotiate::ConcludeSyncStrategy(localOpinion,
1161         recvPacket->GetRelationalSyncOpinion());
1162     (static_cast<SingleVerRelationalSyncTaskContext *>(context))->SetRelationalSyncStrategy(localStrategy, true);
1163     bool permitSync = std::any_of(localStrategy.begin(), localStrategy.end(),
1164         [] (const std::pair<std::string, SyncStrategy> &it) {
1165         return it.second.permitSync;
1166         });
1167     if (permitSync) {
1168         int innerErrCode = (static_cast<RelationalDBSyncInterface *>(storageInterface_)->SaveRemoteDeviceSchema(
1169             deviceId_, remoteSchema, remoteSchemaType));
1170         if (innerErrCode != E_OK) {
1171             LOGE("[AbilitySync][AckRecv] save remote device Schema failed,errCode=%d", innerErrCode);
1172             return innerErrCode;
1173         }
1174     }
1175     int errCode = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->
1176         CreateDistributedDeviceTable(context->GetDeviceId(), localStrategy);
1177     if (errCode != E_OK) {
1178         LOGE("[AbilitySync][AckRecv] create distributed device table failed,errCode=%d", errCode);
1179     }
1180     if (sendOpinion) {
1181         sendPacket.SetRelationalSyncOpinion(localOpinion);
1182     }
1183     auto singleVerContext = static_cast<SingleVerSyncTaskContext *>(context);
1184     auto strategy = localStrategy.find(singleVerContext->GetQuery().GetRelationTableName());
1185     schemaSyncStatus = {
1186         !(strategy == localStrategy.end()) && strategy->second.permitSync,
1187         true
1188     };
1189     return errCode;
1190 }
1191 
AckRecvWithHighVersion(const Message * message,ISyncTaskContext * context,const AbilitySyncAckPacket * packet)1192 int AbilitySync::AckRecvWithHighVersion(const Message *message, ISyncTaskContext *context,
1193     const AbilitySyncAckPacket *packet)
1194 {
1195     HandleVersionV3AckSecOptionParam(packet, context);
1196     AbilitySyncAckPacket ackPacket;
1197     std::pair<bool, bool> schemaSyncStatus;
1198     int errCode = HandleVersionV3AckSchemaParam(packet, ackPacket, context, true, schemaSyncStatus);
1199     if (errCode != E_OK) {
1200         context->SetTaskErrCode(errCode);
1201         return errCode;
1202     }
1203     auto singleVerContext = static_cast<SingleVerSyncTaskContext *>(context);
1204     if (!schemaSyncStatus.first) {
1205         singleVerContext->SetTaskErrCode(-E_SCHEMA_MISMATCH);
1206         LOGE("[AbilitySync][AckRecv] scheme check failed");
1207         return -E_SCHEMA_MISMATCH;
1208     }
1209     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
1210         errCode = metadata_->SetDbCreateTime(deviceId_, packet->GetDbCreateTime(), true);
1211         if (errCode != E_OK) {
1212             LOGE("[AbilitySync][AckRecv] set db create time failed,errCode=%d", errCode);
1213             context->SetTaskErrCode(errCode);
1214             return errCode;
1215         }
1216     }
1217     DbAbility remoteDbAbility = packet->GetDbAbility();
1218     singleVerContext->SetDbAbility(remoteDbAbility);
1219     (void)SendAck(message, AbilitySync::CHECK_SUCCESS, true, ackPacket);
1220     return E_OK;
1221 }
1222 } // namespace DistributedDB