• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "ability_sync.h"
17 
18 #include "message_transform.h"
19 #include "version.h"
20 #include "db_errno.h"
21 #include "log_print.h"
22 #include "sync_types.h"
23 #include "db_common.h"
24 #include "single_ver_kvdb_sync_interface.h"
25 #include "single_ver_sync_task_context.h"
26 #include "single_ver_kv_sync_task_context.h"
27 #ifdef RELATIONAL_STORE
28 #include "relational_db_sync_interface.h"
29 #include "single_ver_relational_sync_task_context.h"
30 #endif
31 
32 namespace DistributedDB {
AbilitySyncRequestPacket()33 AbilitySyncRequestPacket::AbilitySyncRequestPacket()
34     : protocolVersion_(ABILITY_SYNC_VERSION_V1),
35       sendCode_(E_OK),
36       softwareVersion_(SOFTWARE_VERSION_CURRENT),
37       secLabel_(0),
38       secFlag_(0),
39       schemaType_(0),
40       dbCreateTime_(0)
41 {
42 }
43 
~AbilitySyncRequestPacket()44 AbilitySyncRequestPacket::~AbilitySyncRequestPacket()
45 {
46 }
47 
SetProtocolVersion(uint32_t protocolVersion)48 void AbilitySyncRequestPacket::SetProtocolVersion(uint32_t protocolVersion)
49 {
50     protocolVersion_ = protocolVersion;
51 }
52 
GetProtocolVersion() const53 uint32_t AbilitySyncRequestPacket::GetProtocolVersion() const
54 {
55     return protocolVersion_;
56 }
57 
SetSendCode(int32_t sendCode)58 void AbilitySyncRequestPacket::SetSendCode(int32_t sendCode)
59 {
60     sendCode_ = sendCode;
61 }
62 
GetSendCode() const63 int32_t AbilitySyncRequestPacket::GetSendCode() const
64 {
65     return sendCode_;
66 }
67 
SetSoftwareVersion(uint32_t swVersion)68 void AbilitySyncRequestPacket::SetSoftwareVersion(uint32_t swVersion)
69 {
70     softwareVersion_ = swVersion;
71 }
72 
GetSoftwareVersion() const73 uint32_t AbilitySyncRequestPacket::GetSoftwareVersion() const
74 {
75     return softwareVersion_;
76 }
77 
SetSchema(const std::string & schema)78 void AbilitySyncRequestPacket::SetSchema(const std::string &schema)
79 {
80     schema_ = schema;
81 }
82 
GetSchema() const83 std::string AbilitySyncRequestPacket::GetSchema() const
84 {
85     return schema_;
86 }
87 
SetSchemaType(uint32_t schemaType)88 void AbilitySyncRequestPacket::SetSchemaType(uint32_t schemaType)
89 {
90     schemaType_ = schemaType;
91 }
92 
GetSchemaType() const93 uint32_t AbilitySyncRequestPacket::GetSchemaType() const
94 {
95     return schemaType_;
96 }
97 
SetSecLabel(int32_t secLabel)98 void AbilitySyncRequestPacket::SetSecLabel(int32_t secLabel)
99 {
100     secLabel_ = secLabel;
101 }
102 
GetSecLabel() const103 int32_t AbilitySyncRequestPacket::GetSecLabel() const
104 {
105     return secLabel_;
106 }
107 
SetSecFlag(int32_t secFlag)108 void AbilitySyncRequestPacket::SetSecFlag(int32_t secFlag)
109 {
110     secFlag_ = secFlag;
111 }
112 
GetSecFlag() const113 int32_t AbilitySyncRequestPacket::GetSecFlag() const
114 {
115     return secFlag_;
116 }
117 
SetDbCreateTime(uint64_t dbCreateTime)118 void AbilitySyncRequestPacket::SetDbCreateTime(uint64_t dbCreateTime)
119 {
120     dbCreateTime_ = dbCreateTime;
121 }
122 
GetDbCreateTime() const123 uint64_t AbilitySyncRequestPacket::GetDbCreateTime() const
124 {
125     return dbCreateTime_;
126 }
127 
CalculateLen() const128 uint32_t AbilitySyncRequestPacket::CalculateLen() const
129 {
130     uint64_t len = 0;
131     len += Parcel::GetUInt32Len(); // protocolVersion_
132     len += Parcel::GetIntLen(); // sendCode_
133     len += Parcel::GetUInt32Len(); // softwareVersion_
134     uint32_t schemLen = Parcel::GetStringLen(schema_);
135     if (schemLen == 0) {
136         LOGE("[AbilitySyncRequestPacket][CalculateLen] schemLen err!");
137         return 0;
138     }
139     len += schemLen;
140     len += Parcel::GetIntLen(); // secLabel_
141     len += Parcel::GetIntLen(); // secFlag_
142     len += Parcel::GetUInt32Len(); // schemaType_
143     len += Parcel::GetUInt64Len(); // dbCreateTime_
144     len += DbAbility::CalculateLen(dbAbility_); // dbAbility_
145     // the reason why not 8-byte align is that old version is not 8-byte align
146     // so it is not possible to set 8-byte align for high version.
147     if (len > INT32_MAX) {
148         LOGE("[AbilitySyncRequestPacket][CalculateLen]  err len:%" PRIu64, len);
149         return 0;
150     }
151     return len;
152 }
153 
GetDbAbility() const154 DbAbility AbilitySyncRequestPacket::GetDbAbility() const
155 {
156     return dbAbility_;
157 }
158 
SetDbAbility(const DbAbility & dbAbility)159 void AbilitySyncRequestPacket::SetDbAbility(const DbAbility &dbAbility)
160 {
161     dbAbility_ = dbAbility;
162 }
163 
AbilitySyncAckPacket()164 AbilitySyncAckPacket::AbilitySyncAckPacket()
165     : protocolVersion_(ABILITY_SYNC_VERSION_V1),
166       softwareVersion_(SOFTWARE_VERSION_CURRENT),
167       ackCode_(E_OK),
168       secLabel_(0),
169       secFlag_(0),
170       schemaType_(0),
171       permitSync_(0),
172       requirePeerConvert_(0),
173       dbCreateTime_(0)
174 {
175 }
176 
~AbilitySyncAckPacket()177 AbilitySyncAckPacket::~AbilitySyncAckPacket()
178 {
179 }
180 
SetProtocolVersion(uint32_t protocolVersion)181 void AbilitySyncAckPacket::SetProtocolVersion(uint32_t protocolVersion)
182 {
183     protocolVersion_ = protocolVersion;
184 }
185 
SetSoftwareVersion(uint32_t swVersion)186 void AbilitySyncAckPacket::SetSoftwareVersion(uint32_t swVersion)
187 {
188     softwareVersion_ = swVersion;
189 }
190 
GetSoftwareVersion() const191 uint32_t AbilitySyncAckPacket::GetSoftwareVersion() const
192 {
193     return softwareVersion_;
194 }
195 
GetProtocolVersion() const196 uint32_t AbilitySyncAckPacket::GetProtocolVersion() const
197 {
198     return protocolVersion_;
199 }
200 
SetAckCode(int32_t ackCode)201 void AbilitySyncAckPacket::SetAckCode(int32_t ackCode)
202 {
203     ackCode_ = ackCode;
204 }
205 
GetAckCode() const206 int32_t AbilitySyncAckPacket::GetAckCode() const
207 {
208     return ackCode_;
209 }
210 
SetSchema(const std::string & schema)211 void AbilitySyncAckPacket::SetSchema(const std::string &schema)
212 {
213     schema_ = schema;
214 }
215 
GetSchema() const216 std::string AbilitySyncAckPacket::GetSchema() const
217 {
218     return schema_;
219 }
220 
SetSchemaType(uint32_t schemaType)221 void AbilitySyncAckPacket::SetSchemaType(uint32_t schemaType)
222 {
223     schemaType_ = schemaType;
224 }
225 
GetSchemaType() const226 uint32_t AbilitySyncAckPacket::GetSchemaType() const
227 {
228     return schemaType_;
229 }
230 
SetSecLabel(int32_t secLabel)231 void AbilitySyncAckPacket::SetSecLabel(int32_t secLabel)
232 {
233     secLabel_ = secLabel;
234 }
235 
GetSecLabel() const236 int32_t AbilitySyncAckPacket::GetSecLabel() const
237 {
238     return secLabel_;
239 }
240 
SetSecFlag(int32_t secFlag)241 void AbilitySyncAckPacket::SetSecFlag(int32_t secFlag)
242 {
243     secFlag_ = secFlag;
244 }
245 
GetSecFlag() const246 int32_t AbilitySyncAckPacket::GetSecFlag() const
247 {
248     return secFlag_;
249 }
250 
SetPermitSync(uint32_t permitSync)251 void AbilitySyncAckPacket::SetPermitSync(uint32_t permitSync)
252 {
253     permitSync_ = permitSync;
254 }
255 
GetPermitSync() const256 uint32_t AbilitySyncAckPacket::GetPermitSync() const
257 {
258     return permitSync_;
259 }
260 
SetRequirePeerConvert(uint32_t requirePeerConvert)261 void AbilitySyncAckPacket::SetRequirePeerConvert(uint32_t requirePeerConvert)
262 {
263     requirePeerConvert_ = requirePeerConvert;
264 }
265 
GetRequirePeerConvert() const266 uint32_t AbilitySyncAckPacket::GetRequirePeerConvert() const
267 {
268     return requirePeerConvert_;
269 }
270 
SetDbCreateTime(uint64_t dbCreateTime)271 void AbilitySyncAckPacket::SetDbCreateTime(uint64_t dbCreateTime)
272 {
273     dbCreateTime_ = dbCreateTime;
274 }
275 
GetDbCreateTime() const276 uint64_t AbilitySyncAckPacket::GetDbCreateTime() const
277 {
278     return dbCreateTime_;
279 }
280 
CalculateLen() const281 uint32_t AbilitySyncAckPacket::CalculateLen() const
282 {
283     uint64_t len = 0;
284     len += Parcel::GetUInt32Len();
285     len += Parcel::GetUInt32Len();
286     len += Parcel::GetIntLen();
287     uint32_t schemLen = Parcel::GetStringLen(schema_);
288     if (schemLen == 0) {
289         LOGE("[AbilitySyncAckPacket][CalculateLen] schemLen err!");
290         return 0;
291     }
292     len += schemLen;
293     len += Parcel::GetIntLen(); // secLabel_
294     len += Parcel::GetIntLen(); // secFlag_
295     len += Parcel::GetUInt32Len(); // schemaType_
296     len += Parcel::GetUInt32Len(); // permitSync_
297     len += Parcel::GetUInt32Len(); // requirePeerConvert_
298     len += Parcel::GetUInt64Len(); // dbCreateTime_
299     len += DbAbility::CalculateLen(dbAbility_); // dbAbility_
300     len += SchemaNegotiate::CalculateParcelLen(relationalSyncOpinion_);
301     if (len > INT32_MAX) {
302         LOGE("[AbilitySyncAckPacket][CalculateLen]  err len:%" PRIu64, len);
303         return 0;
304     }
305     return len;
306 }
307 
GetDbAbility() const308 DbAbility AbilitySyncAckPacket::GetDbAbility() const
309 {
310     return dbAbility_;
311 }
312 
SetDbAbility(const DbAbility & dbAbility)313 void AbilitySyncAckPacket::SetDbAbility(const DbAbility &dbAbility)
314 {
315     dbAbility_ = dbAbility;
316 }
317 
SetRelationalSyncOpinion(const RelationalSyncOpinion & relationalSyncOpinion)318 void AbilitySyncAckPacket::SetRelationalSyncOpinion(const RelationalSyncOpinion &relationalSyncOpinion)
319 {
320     relationalSyncOpinion_ = relationalSyncOpinion;
321 }
322 
GetRelationalSyncOpinion() const323 RelationalSyncOpinion AbilitySyncAckPacket::GetRelationalSyncOpinion() const
324 {
325     return relationalSyncOpinion_;
326 }
327 
AbilitySync()328 AbilitySync::AbilitySync()
329     : communicator_(nullptr),
330       storageInterface_(nullptr),
331       metadata_(nullptr),
332       syncFinished_(false)
333 {
334 }
335 
~AbilitySync()336 AbilitySync::~AbilitySync()
337 {
338     communicator_ = nullptr;
339     storageInterface_ = nullptr;
340     metadata_ = nullptr;
341 }
342 
Initialize(ICommunicator * inCommunicator,ISyncInterface * inStorage,std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)343 int AbilitySync::Initialize(ICommunicator *inCommunicator, ISyncInterface *inStorage,
344     std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
345 {
346     if (inCommunicator == nullptr || inStorage == nullptr || deviceId.empty() || inMetadata == nullptr) {
347         return -E_INVALID_ARGS;
348     }
349     communicator_ = inCommunicator;
350     storageInterface_ = inStorage;
351     metadata_ = inMetadata;
352     deviceId_ = deviceId;
353     return E_OK;
354 }
355 
SyncStart(uint32_t sessionId,uint32_t sequenceId,uint16_t remoteCommunicatorVersion,const CommErrHandler & handler)356 int AbilitySync::SyncStart(uint32_t sessionId, uint32_t sequenceId, uint16_t remoteCommunicatorVersion,
357     const CommErrHandler &handler)
358 {
359     AbilitySyncRequestPacket packet;
360     int errCode = SetAbilityRequestBodyInfo(packet, remoteCommunicatorVersion);
361     if (errCode != E_OK) {
362         return errCode;
363     }
364     Message *message = new (std::nothrow) Message(ABILITY_SYNC_MESSAGE);
365     if (message == nullptr) {
366         return -E_OUT_OF_MEMORY;
367     }
368     message->SetMessageType(TYPE_REQUEST);
369     errCode = message->SetCopiedObject<>(packet);
370     if (errCode != E_OK) {
371         LOGE("[AbilitySync][SyncStart] SetCopiedObject failed, err %d", errCode);
372         delete message;
373         message = nullptr;
374         return errCode;
375     }
376     message->SetVersion(MSG_VERSION_EXT);
377     message->SetSessionId(sessionId);
378     message->SetSequenceId(sequenceId);
379     SendConfig conf;
380     SetSendConfigParam(storageInterface_->GetDbProperties(), deviceId_, false, SEND_TIME_OUT, conf);
381     errCode = communicator_->SendMessage(deviceId_, message, conf, handler);
382     if (errCode != E_OK) {
383         LOGE("[AbilitySync][SyncStart] SendPacket failed, err %d", errCode);
384         delete message;
385         message = nullptr;
386     }
387     return errCode;
388 }
389 
AckRecv(const Message * message,ISyncTaskContext * context)390 int AbilitySync::AckRecv(const Message *message, ISyncTaskContext *context)
391 {
392     int errCode = AckMsgCheck(message, context);
393     if (errCode != E_OK) {
394         return errCode;
395     }
396     const AbilitySyncAckPacket *packet = message->GetObject<AbilitySyncAckPacket>();
397     if (packet == nullptr) {
398         return -E_INVALID_ARGS;
399     }
400     uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
401     context->SetRemoteSoftwareVersion(remoteSoftwareVersion);
402     if (remoteSoftwareVersion > SOFTWARE_VERSION_RELEASE_2_0) {
403         errCode = AckRecvWithHighVersion(message, context, packet);
404     } else {
405         std::string schema = packet->GetSchema();
406         uint8_t schemaType = packet->GetSchemaType();
407         bool isCompatible = static_cast<SyncGenericInterface *>(storageInterface_)->CheckCompatible(schema, schemaType);
408         if (!isCompatible) {
409             (static_cast<SingleVerSyncTaskContext *>(context))->SetTaskErrCode(-E_SCHEMA_MISMATCH);
410             LOGE("[AbilitySync][AckRecv] scheme check failed");
411             return -E_SCHEMA_MISMATCH;
412         }
413         LOGI("[AbilitySync][AckRecv]remoteSoftwareVersion = %u, isCompatible = %d,", remoteSoftwareVersion,
414             isCompatible);
415     }
416     return errCode;
417 }
418 
RequestRecv(const Message * message,ISyncTaskContext * context)419 int AbilitySync::RequestRecv(const Message *message, ISyncTaskContext *context)
420 {
421     if (message == nullptr || context == nullptr) {
422         return -E_INVALID_ARGS;
423     }
424     const AbilitySyncRequestPacket *packet = message->GetObject<AbilitySyncRequestPacket>();
425     if (packet == nullptr) {
426         return -E_INVALID_ARGS;
427     }
428     if (packet->GetSendCode() == -E_VERSION_NOT_SUPPORT) {
429         AbilitySyncAckPacket ackPacket;
430         (void)SendAck(message, -E_VERSION_NOT_SUPPORT, false, ackPacket);
431         LOGI("[AbilitySync][RequestRecv] version can not support, remote version is %u", packet->GetProtocolVersion());
432         return -E_VERSION_NOT_SUPPORT;
433     }
434 
435     std::string schema = packet->GetSchema();
436     uint8_t schemaType = packet->GetSchemaType();
437     bool isCompatible = static_cast<SyncGenericInterface *>(storageInterface_)->CheckCompatible(schema, schemaType);
438     if (!isCompatible) {
439         (static_cast<SingleVerSyncTaskContext *>(context))->SetTaskErrCode(-E_SCHEMA_MISMATCH);
440     }
441     uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
442     context->SetRemoteSoftwareVersion(remoteSoftwareVersion);
443     return HandleRequestRecv(message, context, isCompatible);
444 }
445 
AckNotifyRecv(const Message * message,ISyncTaskContext * context)446 int AbilitySync::AckNotifyRecv(const Message *message, ISyncTaskContext *context)
447 {
448     if (message == nullptr || context == nullptr) {
449         return -E_INVALID_ARGS;
450     }
451     if (message->GetErrorNo() == E_FEEDBACK_UNKNOWN_MESSAGE) {
452         LOGE("[AbilitySync][AckNotifyRecv] Remote device dose not support this message id");
453         context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
454         return -E_FEEDBACK_UNKNOWN_MESSAGE;
455     }
456     const AbilitySyncAckPacket *packet = message->GetObject<AbilitySyncAckPacket>();
457     if (packet == nullptr) {
458         return -E_INVALID_ARGS;
459     }
460     int errCode = packet->GetAckCode();
461     if (errCode != E_OK) {
462         LOGE("[AbilitySync][AckNotifyRecv] received an errCode %d", errCode);
463         return errCode;
464     }
465     std::string schema = packet->GetSchema();
466     uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
467     context->SetRemoteSoftwareVersion(remoteSoftwareVersion);
468     AbilitySyncAckPacket sendPacket;
469     errCode = HandleVersionV3AckSchemaParam(packet, sendPacket, context, false);
470     int ackCode = errCode;
471     LOGI("[AckNotifyRecv] receive dev = %s ack notify, remoteSoftwareVersion = %u, ackCode = %d",
472         STR_MASK(deviceId_), remoteSoftwareVersion, errCode);
473     if (errCode == E_OK) {
474         (static_cast<SingleVerSyncTaskContext *>(context))->SetIsSchemaSync(true);
475         ackCode = AbilitySync::LAST_NOTIFY;
476     }
477     (void)SendAckWithEmptySchema(message, ackCode, true);
478     return errCode;
479 }
480 
GetAbilitySyncFinishedStatus() const481 bool AbilitySync::GetAbilitySyncFinishedStatus() const
482 {
483     return syncFinished_;
484 }
485 
SetAbilitySyncFinishedStatus(bool syncFinished)486 void AbilitySync::SetAbilitySyncFinishedStatus(bool syncFinished)
487 {
488     syncFinished_ = syncFinished;
489 }
490 
SecLabelCheck(const AbilitySyncRequestPacket * packet) const491 bool AbilitySync::SecLabelCheck(const AbilitySyncRequestPacket *packet) const
492 {
493     int32_t remoteSecLabel = packet->GetSecLabel();
494     int32_t remoteSecFlag = packet->GetSecFlag();
495     if (remoteSecLabel == NOT_SURPPORT_SEC_CLASSIFICATION || remoteSecLabel == SecurityLabel::NOT_SET) {
496         return true;
497     }
498     SecurityOption option;
499     int errCode = (static_cast<SyncGenericInterface *>(storageInterface_))->GetSecurityOption(option);
500     LOGI("[AbilitySync][RequestRecv] local l:%d, f:%d, errCode:%d", option.securityLabel, option.securityFlag, errCode);
501     if (errCode == -E_NOT_SUPPORT || (errCode == E_OK && option.securityLabel == SecurityLabel::NOT_SET)) {
502         return true;
503     }
504     if (remoteSecLabel == FAILED_GET_SEC_CLASSIFICATION || errCode != E_OK) {
505         LOGE("[AbilitySync][RequestRecv] check error remoteL:%d, errCode:%d", remoteSecLabel, errCode);
506         return false;
507     }
508     if (remoteSecLabel == option.securityLabel) {
509         return true;
510     } else {
511         LOGE("[AbilitySync][RequestRecv] check error remote:%d , %d local:%d , %d",
512             remoteSecLabel, remoteSecFlag, option.securityLabel, option.securityFlag);
513         return false;
514     }
515 }
516 
HandleVersionV3RequestParam(const AbilitySyncRequestPacket * packet,ISyncTaskContext * context) const517 void AbilitySync::HandleVersionV3RequestParam(const AbilitySyncRequestPacket *packet, ISyncTaskContext *context) const
518 {
519     int32_t remoteSecLabel = packet->GetSecLabel();
520     int32_t remoteSecFlag = packet->GetSecFlag();
521     DbAbility remoteDbAbility = packet->GetDbAbility();
522     (static_cast<SingleVerSyncTaskContext *>(context))->SetDbAbility(remoteDbAbility);
523     (static_cast<SingleVerSyncTaskContext *>(context))->SetRemoteSeccurityOption({remoteSecLabel, remoteSecFlag});
524     (static_cast<SingleVerSyncTaskContext *>(context))->SetReceivcPermitCheck(false);
525     LOGI("[AbilitySync][HandleVersionV3RequestParam] remoteSecLabel = %d, remoteSecFlag = %d, remoteSchemaType = %u",
526         remoteSecLabel, remoteSecFlag, packet->GetSchemaType());
527 }
528 
HandleVersionV3AckSecOptionParam(const AbilitySyncAckPacket * packet,ISyncTaskContext * context) const529 void AbilitySync::HandleVersionV3AckSecOptionParam(const AbilitySyncAckPacket *packet,
530     ISyncTaskContext *context) const
531 {
532     int32_t remoteSecLabel = packet->GetSecLabel();
533     int32_t remoteSecFlag = packet->GetSecFlag();
534     SecurityOption secOption = {remoteSecLabel, remoteSecFlag};
535     (static_cast<SingleVerSyncTaskContext *>(context))->SetRemoteSeccurityOption(secOption);
536     (static_cast<SingleVerSyncTaskContext *>(context))->SetSendPermitCheck(false);
537     LOGI("[AbilitySync][AckRecv] remoteSecLabel = %d, remoteSecFlag = %d", remoteSecLabel, remoteSecFlag);
538 }
539 
HandleVersionV3AckSchemaParam(const AbilitySyncAckPacket * recvPacket,AbilitySyncAckPacket & sendPacket,ISyncTaskContext * context,bool sendOpinion) const540 int AbilitySync::HandleVersionV3AckSchemaParam(const AbilitySyncAckPacket *recvPacket,
541     AbilitySyncAckPacket &sendPacket,  ISyncTaskContext *context, bool sendOpinion) const
542 {
543     if (IsSingleRelationalVer()) {
544         return HandleRelationAckSchemaParam(recvPacket, sendPacket, context, sendOpinion);
545     }
546     HandleKvAckSchemaParam(recvPacket, context, sendPacket);
547     return E_OK;
548 }
549 
GetPacketSecOption(SecurityOption & option) const550 void AbilitySync::GetPacketSecOption(SecurityOption &option) const
551 {
552     int errCode =
553         (static_cast<SyncGenericInterface *>(storageInterface_))->GetSecurityOption(option);
554     if (errCode == -E_NOT_SUPPORT) {
555         LOGE("[AbilitySync][SyncStart] GetSecOpt not surpport sec classification");
556         option.securityLabel = NOT_SURPPORT_SEC_CLASSIFICATION;
557     } else if (errCode != E_OK) {
558         LOGE("[AbilitySync][SyncStart] GetSecOpt errCode:%d", errCode);
559         option.securityLabel = FAILED_GET_SEC_CLASSIFICATION;
560     }
561 }
562 
RegisterTransformFunc()563 int AbilitySync::RegisterTransformFunc()
564 {
565     TransformFunc func;
566     func.computeFunc = std::bind(&AbilitySync::CalculateLen, std::placeholders::_1);
567     func.serializeFunc = std::bind(&AbilitySync::Serialization, std::placeholders::_1,
568                                    std::placeholders::_2, std::placeholders::_3);
569     func.deserializeFunc = std::bind(&AbilitySync::DeSerialization, std::placeholders::_1,
570                                      std::placeholders::_2, std::placeholders::_3);
571     return MessageTransform::RegTransformFunction(ABILITY_SYNC_MESSAGE, func);
572 }
573 
CalculateLen(const Message * inMsg)574 uint32_t AbilitySync::CalculateLen(const Message *inMsg)
575 {
576     if ((inMsg == nullptr) || (inMsg->GetMessageId() != ABILITY_SYNC_MESSAGE)) {
577         return 0;
578     }
579     int errCode;
580     uint32_t len = 0;
581     switch (inMsg->GetMessageType()) {
582         case TYPE_REQUEST:
583             errCode = RequestPacketCalculateLen(inMsg, len);
584             if (errCode != E_OK) {
585                 LOGE("[AbilitySync][CalculateLen] request packet calc length err %d", errCode);
586             }
587             break;
588         case TYPE_RESPONSE:
589             errCode = AckPacketCalculateLen(inMsg, len);
590             if (errCode != E_OK) {
591                 LOGE("[AbilitySync][CalculateLen] ack packet calc length err %d", errCode);
592             }
593             break;
594         case TYPE_NOTIFY:
595             errCode = AckPacketCalculateLen(inMsg, len);
596             if (errCode != E_OK) {
597                 LOGE("[AbilitySync][CalculateLen] ack packet calc length err %d", errCode);
598             }
599             break;
600         default:
601             LOGE("[AbilitySync][CalculateLen] message type not support, type %d", inMsg->GetMessageType());
602             break;
603     }
604     return len;
605 }
606 
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)607 int AbilitySync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
608 {
609     if ((buffer == nullptr) || (inMsg == nullptr)) {
610         return -E_INVALID_ARGS;
611     }
612 
613     switch (inMsg->GetMessageType()) {
614         case TYPE_REQUEST:
615             return RequestPacketSerialization(buffer, length, inMsg);
616         case TYPE_RESPONSE:
617         case TYPE_NOTIFY:
618             return AckPacketSerialization(buffer, length, inMsg);
619         default:
620             return -E_MESSAGE_TYPE_ERROR;
621     }
622 }
623 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)624 int AbilitySync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
625 {
626     if ((buffer == nullptr) || (inMsg == nullptr)) {
627         return -E_INVALID_ARGS;
628     }
629 
630     switch (inMsg->GetMessageType()) {
631         case TYPE_REQUEST:
632             return RequestPacketDeSerialization(buffer, length, inMsg);
633         case TYPE_RESPONSE:
634         case TYPE_NOTIFY:
635             return AckPacketDeSerialization(buffer, length, inMsg);
636         default:
637             return -E_MESSAGE_TYPE_ERROR;
638     }
639 }
640 
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)641 int AbilitySync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
642 {
643     const AbilitySyncRequestPacket *packet = inMsg->GetObject<AbilitySyncRequestPacket>();
644     if (packet == nullptr) {
645         return -E_INVALID_ARGS;
646     }
647 
648     len = packet->CalculateLen();
649     return E_OK;
650 }
651 
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)652 int AbilitySync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
653 {
654     const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
655     if (packet == nullptr) {
656         return -E_INVALID_ARGS;
657     }
658 
659     len = packet->CalculateLen();
660     return E_OK;
661 }
662 
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)663 int AbilitySync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
664 {
665     const AbilitySyncRequestPacket *packet = inMsg->GetObject<AbilitySyncRequestPacket>();
666     if ((packet == nullptr) || (length != packet->CalculateLen())) {
667         return -E_INVALID_ARGS;
668     }
669 
670     Parcel parcel(buffer, length);
671     parcel.WriteUInt32(packet->GetProtocolVersion());
672     parcel.WriteInt(packet->GetSendCode());
673     parcel.WriteUInt32(packet->GetSoftwareVersion());
674     parcel.WriteString(packet->GetSchema());
675     parcel.WriteInt(packet->GetSecLabel());
676     parcel.WriteInt(packet->GetSecFlag());
677     parcel.WriteUInt32(packet->GetSchemaType());
678     parcel.WriteUInt64(packet->GetDbCreateTime());
679     int errCode = DbAbility::Serialize(parcel, packet->GetDbAbility());
680     if (parcel.IsError() || errCode != E_OK) {
681         return -E_PARSE_FAIL;
682     }
683     return E_OK;
684 }
685 
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)686 int AbilitySync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
687 {
688     const AbilitySyncAckPacket *packet = inMsg->GetObject<AbilitySyncAckPacket>();
689     if ((packet == nullptr) || (length != packet->CalculateLen())) {
690         return -E_INVALID_ARGS;
691     }
692 
693     Parcel parcel(buffer, length);
694     parcel.WriteUInt32(ABILITY_SYNC_VERSION_V1);
695     parcel.WriteUInt32(SOFTWARE_VERSION_CURRENT);
696     parcel.WriteInt(packet->GetAckCode());
697     parcel.WriteString(packet->GetSchema());
698     parcel.WriteInt(packet->GetSecLabel());
699     parcel.WriteInt(packet->GetSecFlag());
700     parcel.WriteUInt32(packet->GetSchemaType());
701     parcel.WriteUInt32(packet->GetPermitSync());
702     parcel.WriteUInt32(packet->GetRequirePeerConvert());
703     parcel.WriteUInt64(packet->GetDbCreateTime());
704     int errCode = DbAbility::Serialize(parcel, packet->GetDbAbility());
705     if (parcel.IsError() || errCode != E_OK) {
706         return -E_PARSE_FAIL;
707     }
708     errCode = SchemaNegotiate::SerializeData(packet->GetRelationalSyncOpinion(), parcel);
709     if (parcel.IsError() || errCode != E_OK) {
710         return -E_PARSE_FAIL;
711     }
712     return E_OK;
713 }
714 
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)715 int AbilitySync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
716 {
717     auto *packet = new (std::nothrow) AbilitySyncRequestPacket();
718     if (packet == nullptr) {
719         return -E_OUT_OF_MEMORY;
720     }
721 
722     Parcel parcel(const_cast<uint8_t *>(buffer), length);
723     uint32_t version = 0;
724     uint32_t softwareVersion = 0;
725     std::string schema;
726     int32_t sendCode = 0;
727     int errCode = -E_PARSE_FAIL;
728 
729     parcel.ReadUInt32(version);
730     if (parcel.IsError()) {
731         goto ERROR_OUT;
732     }
733     packet->SetProtocolVersion(version);
734     if (version > ABILITY_SYNC_VERSION_V1) {
735         packet->SetSendCode(-E_VERSION_NOT_SUPPORT);
736         errCode = inMsg->SetExternalObject<>(packet);
737         if (errCode != E_OK) {
738             goto ERROR_OUT;
739         }
740         return errCode;
741     }
742     parcel.ReadInt(sendCode);
743     parcel.ReadUInt32(softwareVersion);
744     parcel.ReadString(schema);
745     errCode = RequestPacketDeSerializationTailPart(parcel, packet, softwareVersion);
746     if (parcel.IsError() || errCode != E_OK) {
747         goto ERROR_OUT;
748     }
749     packet->SetSendCode(sendCode);
750     packet->SetSoftwareVersion(softwareVersion);
751     packet->SetSchema(schema);
752 
753     errCode = inMsg->SetExternalObject<>(packet);
754     if (errCode == E_OK) {
755         return E_OK;
756     }
757 
758 ERROR_OUT:
759     delete packet;
760     packet = nullptr;
761     return errCode;
762 }
763 
RequestPacketDeSerializationTailPart(Parcel & parcel,AbilitySyncRequestPacket * packet,uint32_t version)764 int AbilitySync::RequestPacketDeSerializationTailPart(Parcel &parcel, AbilitySyncRequestPacket *packet,
765     uint32_t version)
766 {
767     if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_2_0) {
768         int32_t secLabel = 0;
769         int32_t secFlag = 0;
770         uint32_t schemaType = 0;
771         parcel.ReadInt(secLabel);
772         parcel.ReadInt(secFlag);
773         parcel.ReadUInt32(schemaType);
774         packet->SetSecLabel(secLabel);
775         packet->SetSecFlag(secFlag);
776         packet->SetSchemaType(schemaType);
777     }
778     if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_3_0) {
779         uint64_t dbCreateTime = 0;
780         parcel.ReadUInt64(dbCreateTime);
781         packet->SetDbCreateTime(dbCreateTime);
782     }
783     DbAbility remoteDbAbility;
784     int errCode = DbAbility::DeSerialize(parcel, remoteDbAbility);
785     if (errCode != E_OK) {
786         LOGE("[AbilitySync] request packet DeSerializ failed.");
787         return errCode;
788     }
789     packet->SetDbAbility(remoteDbAbility);
790     return E_OK;
791 }
792 
AckPacketDeSerializationTailPart(Parcel & parcel,AbilitySyncAckPacket * packet,uint32_t version)793 int AbilitySync::AckPacketDeSerializationTailPart(Parcel &parcel, AbilitySyncAckPacket *packet, uint32_t version)
794 {
795     if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_2_0) {
796         int32_t secLabel = 0;
797         int32_t secFlag = 0;
798         uint32_t schemaType = 0;
799         uint32_t permitSync = 0;
800         uint32_t requirePeerConvert = 0;
801         parcel.ReadInt(secLabel);
802         parcel.ReadInt(secFlag);
803         parcel.ReadUInt32(schemaType);
804         parcel.ReadUInt32(permitSync);
805         parcel.ReadUInt32(requirePeerConvert);
806         packet->SetSecLabel(secLabel);
807         packet->SetSecFlag(secFlag);
808         packet->SetSchemaType(schemaType);
809         packet->SetPermitSync(permitSync);
810         packet->SetRequirePeerConvert(requirePeerConvert);
811     }
812     if (!parcel.IsError() && version > SOFTWARE_VERSION_RELEASE_3_0) {
813         uint64_t dbCreateTime = 0;
814         parcel.ReadUInt64(dbCreateTime);
815         packet->SetDbCreateTime(dbCreateTime);
816     }
817     DbAbility remoteDbAbility;
818     int errCode = DbAbility::DeSerialize(parcel, remoteDbAbility);
819     if (errCode != E_OK) {
820         LOGE("[AbilitySync] ack packet DeSerializ failed.");
821         return errCode;
822     }
823     packet->SetDbAbility(remoteDbAbility);
824     RelationalSyncOpinion relationalSyncOpinion;
825     errCode = SchemaNegotiate::DeserializeData(parcel, relationalSyncOpinion);
826     if (errCode != E_OK) {
827         LOGE("[AbilitySync] ack packet DeSerializ RelationalSyncOpinion failed.");
828         return errCode;
829     }
830     packet->SetRelationalSyncOpinion(relationalSyncOpinion);
831     return E_OK;
832 }
833 
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)834 int AbilitySync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
835 {
836     auto *packet = new (std::nothrow) AbilitySyncAckPacket();
837     if (packet == nullptr) {
838         return -E_OUT_OF_MEMORY;
839     }
840 
841     Parcel parcel(const_cast<uint8_t *>(buffer), length);
842     uint32_t version = 0;
843     uint32_t softwareVersion = 0;
844     int32_t ackCode = E_OK;
845     std::string schema;
846     int errCode;
847     parcel.ReadUInt32(version);
848     if (parcel.IsError()) {
849         LOGE("[AbilitySync][RequestDeSerialization] read version failed!");
850         errCode = -E_PARSE_FAIL;
851         goto ERROR_OUT;
852     }
853     packet->SetProtocolVersion(version);
854     if (version > ABILITY_SYNC_VERSION_V1) {
855         packet->SetAckCode(-E_VERSION_NOT_SUPPORT);
856         errCode = inMsg->SetExternalObject<>(packet);
857         if (errCode != E_OK) {
858             goto ERROR_OUT;
859         }
860         return errCode;
861     }
862     parcel.ReadUInt32(softwareVersion);
863     parcel.ReadInt(ackCode);
864     parcel.ReadString(schema);
865     errCode = AckPacketDeSerializationTailPart(parcel, packet, softwareVersion);
866     if (parcel.IsError() || errCode != E_OK) {
867         LOGE("[AbilitySync][RequestDeSerialization] DeSerialization failed!");
868         errCode = -E_PARSE_FAIL;
869         goto ERROR_OUT;
870     }
871     packet->SetSoftwareVersion(softwareVersion);
872     packet->SetAckCode(ackCode);
873     packet->SetSchema(schema);
874     errCode = inMsg->SetExternalObject<>(packet);
875     if (errCode == E_OK) {
876         return E_OK;
877     }
878 
879 ERROR_OUT:
880     delete packet;
881     packet = nullptr;
882     return errCode;
883 }
884 
SetAbilityRequestBodyInfo(AbilitySyncRequestPacket & packet,uint16_t remoteCommunicatorVersion) const885 int AbilitySync::SetAbilityRequestBodyInfo(AbilitySyncRequestPacket &packet, uint16_t remoteCommunicatorVersion) const
886 {
887     uint64_t dbCreateTime;
888     int errCode =
889         (static_cast<SyncGenericInterface *>(storageInterface_))->GetDatabaseCreateTimestamp(dbCreateTime);
890     if (errCode != E_OK) {
891         LOGE("[AbilitySync][FillAbilityRequest] GetDatabaseCreateTimestamp failed, err %d", errCode);
892         return errCode;
893     }
894     SecurityOption option;
895     GetPacketSecOption(option);
896     std::string schemaStr;
897     uint32_t schemaType = 0;
898     if (IsSingleKvVer()) {
899         SchemaObject schemaObj = (static_cast<SingleVerKvDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
900         schemaStr = schemaObj.ToSchemaString();
901         schemaType = static_cast<uint32_t>(schemaObj.GetSchemaType());
902     } else if (IsSingleRelationalVer()) {
903         auto schemaObj = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
904         schemaStr = schemaObj.ToSchemaString();
905         schemaType = static_cast<uint32_t>(schemaObj.GetSchemaType());
906     }
907     DbAbility dbAbility;
908     errCode = GetDbAbilityInfo(dbAbility);
909     if (errCode != E_OK) {
910         LOGE("[AbilitySync][FillAbilityRequest] GetDbAbility failed, err %d", errCode);
911         return errCode;
912     }
913     // 102 version is forbidden to sync with 103 json-schema or flatbuffer-schema
914     // so schema should put null string while remote is 102 version to avoid this bug.
915     if (remoteCommunicatorVersion == 1) {
916         packet.SetSchema("");
917         packet.SetSchemaType(0);
918     } else {
919         packet.SetSchema(schemaStr);
920         packet.SetSchemaType(schemaType);
921     }
922     packet.SetProtocolVersion(ABILITY_SYNC_VERSION_V1);
923     packet.SetSoftwareVersion(SOFTWARE_VERSION_CURRENT);
924     packet.SetSecLabel(option.securityLabel);
925     packet.SetSecFlag(option.securityFlag);
926     packet.SetDbCreateTime(dbCreateTime);
927     packet.SetDbAbility(dbAbility);
928     LOGI("[AbilitySync][FillRequest] ver=%u,Lab=%d,Flag=%d,dbCreateTime=%" PRId64, SOFTWARE_VERSION_CURRENT,
929         option.securityLabel, option.securityFlag, dbCreateTime);
930     return E_OK;
931 }
932 
SetAbilityAckBodyInfo(AbilitySyncAckPacket & ackPacket,int ackCode,bool isAckNotify) const933 int AbilitySync::SetAbilityAckBodyInfo(AbilitySyncAckPacket &ackPacket, int ackCode, bool isAckNotify) const
934 {
935     int errCode = E_OK;
936     ackPacket.SetProtocolVersion(ABILITY_SYNC_VERSION_V1);
937     ackPacket.SetSoftwareVersion(SOFTWARE_VERSION_CURRENT);
938     if (!isAckNotify) {
939         SecurityOption option;
940         GetPacketSecOption(option);
941         ackPacket.SetSecLabel(option.securityLabel);
942         ackPacket.SetSecFlag(option.securityFlag);
943         uint64_t dbCreateTime = 0;
944         errCode =
945             (static_cast<SyncGenericInterface *>(storageInterface_))->GetDatabaseCreateTimestamp(dbCreateTime);
946         if (errCode != E_OK) {
947             LOGE("[AbilitySync][SyncStart] GetDatabaseCreateTimestamp failed, err %d", errCode);
948             ackCode = errCode;
949         }
950         DbAbility dbAbility;
951         errCode = GetDbAbilityInfo(dbAbility);
952         if (errCode != E_OK) {
953             LOGE("[AbilitySync][FillAbilityRequest] GetDbAbility failed, err %d", errCode);
954             return errCode;
955         }
956         ackPacket.SetDbCreateTime(dbCreateTime);
957         ackPacket.SetDbAbility(dbAbility);
958     }
959     ackPacket.SetAckCode(ackCode);
960     return E_OK;
961 }
962 
SetAbilityAckSchemaInfo(AbilitySyncAckPacket & ackPacket,const ISchema & schemaObj) const963 void AbilitySync::SetAbilityAckSchemaInfo(AbilitySyncAckPacket &ackPacket, const ISchema &schemaObj) const
964 {
965     ackPacket.SetSchema(schemaObj.ToSchemaString());
966     ackPacket.SetSchemaType(static_cast<uint32_t>(schemaObj.GetSchemaType()));
967 }
968 
SetAbilityAckSyncOpinionInfo(AbilitySyncAckPacket & ackPacket,SyncOpinion localOpinion) const969 void AbilitySync::SetAbilityAckSyncOpinionInfo(AbilitySyncAckPacket &ackPacket, SyncOpinion localOpinion) const
970 {
971     ackPacket.SetPermitSync(localOpinion.permitSync);
972     ackPacket.SetRequirePeerConvert(localOpinion.requirePeerConvert);
973 }
974 
GetDbAbilityInfo(DbAbility & dbAbility) const975 int AbilitySync::GetDbAbilityInfo(DbAbility &dbAbility) const
976 {
977     int errCode = E_OK;
978     for (const auto &item : SyncConfig::ABILITYBITS) {
979         errCode = dbAbility.SetAbilityItem(item, SUPPORT_MARK);
980         if (errCode != E_OK) {
981             return errCode;
982         }
983     }
984     return errCode;
985 }
986 
AckMsgCheck(const Message * message,ISyncTaskContext * context) const987 int AbilitySync::AckMsgCheck(const Message *message, ISyncTaskContext *context) const
988 {
989     if (message == nullptr || context == nullptr) {
990         return -E_INVALID_ARGS;
991     }
992     if (message->GetErrorNo() == E_FEEDBACK_UNKNOWN_MESSAGE) {
993         LOGE("[AbilitySync][AckMsgCheck] Remote device dose not support this message id");
994         context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
995         context->SetTaskErrCode(-E_FEEDBACK_UNKNOWN_MESSAGE);
996         return -E_FEEDBACK_UNKNOWN_MESSAGE;
997     }
998     if (message->GetErrorNo() == E_FEEDBACK_COMMUNICATOR_NOT_FOUND) {
999         LOGE("[AbilitySync][AckMsgCheck] Remote db is closed");
1000         context->SetTaskErrCode(-E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
1001         return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
1002     }
1003     const AbilitySyncAckPacket *packet = message->GetObject<AbilitySyncAckPacket>();
1004     if (packet == nullptr) {
1005         return -E_INVALID_ARGS;
1006     }
1007     int ackCode = packet->GetAckCode();
1008     if (ackCode != E_OK) {
1009         LOGE("[AbilitySync][AckMsgCheck] received an errCode %d", ackCode);
1010         context->SetTaskErrCode(ackCode);
1011         return ackCode;
1012     }
1013     return E_OK;
1014 }
1015 
IsSingleKvVer() const1016 bool AbilitySync::IsSingleKvVer() const
1017 {
1018     return storageInterface_->GetInterfaceType() == ISyncInterface::SYNC_SVD;
1019 }
IsSingleRelationalVer() const1020 bool AbilitySync::IsSingleRelationalVer() const
1021 {
1022 #ifdef RELATIONAL_STORE
1023     return storageInterface_->GetInterfaceType() == ISyncInterface::SYNC_RELATION;
1024 #elif
1025     return false;
1026 #endif
1027 }
1028 
HandleRequestRecv(const Message * message,ISyncTaskContext * context,bool isCompatible)1029 int AbilitySync::HandleRequestRecv(const Message *message, ISyncTaskContext *context, bool isCompatible)
1030 {
1031     const AbilitySyncRequestPacket *packet = message->GetObject<AbilitySyncRequestPacket>();
1032     if (packet == nullptr) {
1033         return -E_INVALID_ARGS;
1034     }
1035     uint32_t remoteSoftwareVersion = packet->GetSoftwareVersion();
1036     int ackCode;
1037     std::string schema = packet->GetSchema();
1038     if (remoteSoftwareVersion <= SOFTWARE_VERSION_RELEASE_2_0) {
1039         LOGI("[AbilitySync][RequestRecv] remote version = %u, CheckSchemaCompatible = %d",
1040             remoteSoftwareVersion, isCompatible);
1041         return SendAckWithEmptySchema(message, E_OK, false);
1042     }
1043     HandleVersionV3RequestParam(packet, context);
1044     if (SecLabelCheck(packet)) {
1045         ackCode = E_OK;
1046     } else {
1047         ackCode = -E_SECURITY_OPTION_CHECK_ERROR;
1048     }
1049     if (ackCode == E_OK && remoteSoftwareVersion > SOFTWARE_VERSION_RELEASE_3_0) {
1050         ackCode = metadata_->SetDbCreateTime(deviceId_, packet->GetDbCreateTime(), true);
1051     }
1052     AbilitySyncAckPacket ackPacket;
1053     if (IsSingleRelationalVer()) {
1054         ackPacket.SetRelationalSyncOpinion(MakeRelationSyncOpnion(packet, schema));
1055     } else {
1056         SetAbilityAckSyncOpinionInfo(ackPacket, MakeKvSyncOpnion(packet, schema));
1057     }
1058     LOGI("[AbilitySync][RequestRecv] remote dev=%s,ver=%u,schemaCompatible=%d", STR_MASK(deviceId_),
1059         remoteSoftwareVersion, isCompatible);
1060     return SendAck(message, ackCode, false, ackPacket);
1061 }
1062 
SendAck(const Message * message,int ackCode,bool isAckNotify,AbilitySyncAckPacket & ackPacket)1063 int AbilitySync::SendAck(const Message *message, int ackCode, bool isAckNotify, AbilitySyncAckPacket &ackPacket)
1064 {
1065     int errCode = SetAbilityAckBodyInfo(ackPacket, ackCode, isAckNotify);
1066     if (errCode != E_OK) {
1067         return errCode;
1068     }
1069     if (IsSingleRelationalVer()) {
1070         auto schemaObj = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1071         SetAbilityAckSchemaInfo(ackPacket, schemaObj);
1072     } else if (IsSingleKvVer()) {
1073         SchemaObject schemaObject = static_cast<SingleVerKvDBSyncInterface *>(storageInterface_)->GetSchemaInfo();
1074         SetAbilityAckSchemaInfo(ackPacket, schemaObject);
1075     }
1076     return SendAck(message, ackPacket, isAckNotify);
1077 }
1078 
SendAckWithEmptySchema(const Message * message,int ackCode,bool isAckNotify)1079 int AbilitySync::SendAckWithEmptySchema(const Message *message, int ackCode,
1080     bool isAckNotify)
1081 {
1082     AbilitySyncAckPacket ackPacket;
1083     int errCode = SetAbilityAckBodyInfo(ackPacket, ackCode, isAckNotify);
1084     if (errCode != E_OK) {
1085         return errCode;
1086     }
1087     SetAbilityAckSchemaInfo(ackPacket, SchemaObject());
1088     return SendAck(message, ackPacket, isAckNotify);
1089 }
1090 
SendAck(const Message * inMsg,const AbilitySyncAckPacket & ackPacket,bool isAckNotify)1091 int AbilitySync::SendAck(const Message *inMsg, const AbilitySyncAckPacket &ackPacket, bool isAckNotify)
1092 {
1093     Message *ackMessage = new (std::nothrow) Message(ABILITY_SYNC_MESSAGE);
1094     if (ackMessage == nullptr) {
1095         LOGE("[AbilitySync][SendAck] message create failed, may be memleak!");
1096         return -E_OUT_OF_MEMORY;
1097     }
1098     int errCode = ackMessage->SetCopiedObject<>(ackPacket);
1099     if (errCode != E_OK) {
1100         LOGE("[AbilitySync][SendAck] SetCopiedObject failed, err %d", errCode);
1101         delete ackMessage;
1102         ackMessage = nullptr;
1103         return errCode;
1104     }
1105     (!isAckNotify) ? ackMessage->SetMessageType(TYPE_RESPONSE) : ackMessage->SetMessageType(TYPE_NOTIFY);
1106     ackMessage->SetTarget(deviceId_);
1107     ackMessage->SetSessionId(inMsg->GetSessionId());
1108     ackMessage->SetSequenceId(inMsg->GetSequenceId());
1109     SendConfig conf;
1110     SetSendConfigParam(storageInterface_->GetDbProperties(), deviceId_, false, SEND_TIME_OUT, conf);
1111     errCode = communicator_->SendMessage(deviceId_, ackMessage, conf);
1112     if (errCode != E_OK) {
1113         LOGE("[AbilitySync][SendAck] SendPacket failed, err %d", errCode);
1114         delete ackMessage;
1115         ackMessage = nullptr;
1116     }
1117     return errCode;
1118 }
1119 
MakeKvSyncOpnion(const AbilitySyncRequestPacket * packet,const std::string & remoteSchema) const1120 SyncOpinion AbilitySync::MakeKvSyncOpnion(const AbilitySyncRequestPacket *packet, const std::string &remoteSchema) const
1121 {
1122     uint8_t remoteSchemaType = packet->GetSchemaType();
1123     SchemaObject localSchema = (static_cast<SingleVerKvDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1124     SyncOpinion localSyncOpinion = SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1125     return localSyncOpinion;
1126 }
1127 
MakeRelationSyncOpnion(const AbilitySyncRequestPacket * packet,const std::string & remoteSchema) const1128 RelationalSyncOpinion AbilitySync::MakeRelationSyncOpnion(const AbilitySyncRequestPacket *packet,
1129     const std::string &remoteSchema) const
1130 {
1131     uint8_t remoteSchemaType = packet->GetSchemaType();
1132     RelationalSchemaObject localSchema = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1133     return SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1134 }
1135 
HandleKvAckSchemaParam(const AbilitySyncAckPacket * recvPacket,ISyncTaskContext * context,AbilitySyncAckPacket & sendPacket) const1136 void AbilitySync::HandleKvAckSchemaParam(const AbilitySyncAckPacket *recvPacket,
1137     ISyncTaskContext *context, AbilitySyncAckPacket &sendPacket) const
1138 {
1139     std::string remoteSchema = recvPacket->GetSchema();
1140     uint8_t remoteSchemaType = recvPacket->GetSchemaType();
1141     bool permitSync = static_cast<bool>(recvPacket->GetPermitSync());
1142     bool requirePeerConvert = static_cast<bool>(recvPacket->GetRequirePeerConvert());
1143     SyncOpinion remoteOpinion = {permitSync, requirePeerConvert, true};
1144     SchemaObject localSchema = (static_cast<SingleVerKvDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1145     SyncOpinion syncOpinion = SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1146     SyncStrategy localStrategy = SchemaNegotiate::ConcludeSyncStrategy(syncOpinion, remoteOpinion);
1147     SetAbilityAckSyncOpinionInfo(sendPacket, syncOpinion);
1148     (static_cast<SingleVerKvSyncTaskContext *>(context))->SetSyncStrategy(localStrategy);
1149 }
1150 
HandleRelationAckSchemaParam(const AbilitySyncAckPacket * recvPacket,AbilitySyncAckPacket & sendPacket,ISyncTaskContext * context,bool sendOpinion) const1151 int AbilitySync::HandleRelationAckSchemaParam(const AbilitySyncAckPacket *recvPacket, AbilitySyncAckPacket &sendPacket,
1152     ISyncTaskContext *context, bool sendOpinion) const
1153 {
1154     std::string remoteSchema = recvPacket->GetSchema();
1155     uint8_t remoteSchemaType = recvPacket->GetSchemaType();
1156     auto localSchema = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->GetSchemaInfo();
1157     auto localOpinion = SchemaNegotiate::MakeLocalSyncOpinion(localSchema, remoteSchema, remoteSchemaType);
1158     auto localStrategy = SchemaNegotiate::ConcludeSyncStrategy(localOpinion,
1159         recvPacket->GetRelationalSyncOpinion());
1160     (static_cast<SingleVerRelationalSyncTaskContext *>(context))->SetRelationalSyncStrategy(localStrategy);
1161     int errCode = (static_cast<RelationalDBSyncInterface *>(storageInterface_))->
1162         CreateDistributedDeviceTable(context->GetDeviceId(), localStrategy);
1163     if (errCode != E_OK) {
1164         LOGE("[AbilitySync][AckRecv] create distributed device table failed,errCode=%d", errCode);
1165     }
1166     if (sendOpinion) {
1167         sendPacket.SetRelationalSyncOpinion(localOpinion);
1168     }
1169     return errCode;
1170 }
1171 
AckRecvWithHighVersion(const Message * message,ISyncTaskContext * context,const AbilitySyncAckPacket * packet)1172 int AbilitySync::AckRecvWithHighVersion(const Message *message, ISyncTaskContext *context,
1173     const AbilitySyncAckPacket *packet)
1174 {
1175     HandleVersionV3AckSecOptionParam(packet, context);
1176     AbilitySyncAckPacket ackPacket;
1177     int errCode = HandleVersionV3AckSchemaParam(packet, ackPacket, context, true);
1178     if (errCode != E_OK) {
1179         context->SetTaskErrCode(errCode);
1180         return errCode;
1181     }
1182     auto singleVerContext = static_cast<SingleVerSyncTaskContext *>(context);
1183     auto query = singleVerContext->GetQuery();
1184     bool permitSync = (singleVerContext->GetSyncStrategy(query)).permitSync;
1185     if (!permitSync) {
1186         (static_cast<SingleVerSyncTaskContext *>(context))->SetTaskErrCode(-E_SCHEMA_MISMATCH);
1187         LOGE("[AbilitySync][AckRecv] scheme check failed");
1188         return -E_SCHEMA_MISMATCH;
1189     }
1190     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
1191         errCode = metadata_->SetDbCreateTime(deviceId_, packet->GetDbCreateTime(), true);
1192         if (errCode != E_OK) {
1193             LOGE("[AbilitySync][AckRecv] set db create time failed,errCode=%d", errCode);
1194             context->SetTaskErrCode(errCode);
1195             return errCode;
1196         }
1197     }
1198     DbAbility remoteDbAbility = packet->GetDbAbility();
1199     (static_cast<SingleVerSyncTaskContext *>(context))->SetDbAbility(remoteDbAbility);
1200     (void)SendAck(message, AbilitySync::CHECK_SUCCESS, true, ackPacket);
1201     (static_cast<SingleVerSyncTaskContext *>(context))->SetIsSchemaSync(true);
1202     return E_OK;
1203 }
1204 } // namespace DistributedDB