• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "single_ver_data_sync_utils.h"
16 
17 #include <mutex>
18 #include "db_common.h"
19 #include "version.h"
20 #include "log_print.h"
21 #include "message.h"
22 namespace DistributedDB {
23 namespace {
FillPermissionCheckParam(const SyncGenericInterface * storage,int mode,PermissionCheckParam & param,uint8_t & flag)24 void FillPermissionCheckParam(const SyncGenericInterface* storage, int mode, PermissionCheckParam &param, uint8_t &flag)
25 {
26     param.appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
27     param.userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
28     param.storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
29     param.instanceId = storage->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
30     switch (mode) {
31         case SyncModeType::PUSH:
32             flag = CHECK_FLAG_RECEIVE;
33             break;
34         case SyncModeType::PULL:
35             flag = CHECK_FLAG_SEND;
36             break;
37         case SyncModeType::PUSH_AND_PULL:
38             flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE;
39             break;
40         default:
41             flag = CHECK_FLAG_RECEIVE;
42             break;
43     }
44 }
45 }
46 
QuerySyncCheck(const SingleVerSyncTaskContext * context,bool & isCheckStatus)47 int SingleVerDataSyncUtils::QuerySyncCheck(const SingleVerSyncTaskContext *context, bool &isCheckStatus)
48 {
49     if (context == nullptr) {
50         isCheckStatus = false;
51         return -E_INVALID_ARGS;
52     }
53     if (!context->IsQuerySync()) {
54         isCheckStatus = true;
55         return E_OK;
56     }
57     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
58     // for 101 version, no need to do abilitySync, just send request to remote
59     if (version <= SOFTWARE_VERSION_RELEASE_1_0) {
60         isCheckStatus = true;
61         return E_OK;
62     }
63     if (version < SOFTWARE_VERSION_RELEASE_4_0) {
64         LOGE("[SingleVerDataSync] not support query sync when remote ver lower than 104");
65         isCheckStatus = false;
66         return E_OK;
67     }
68     if (version < SOFTWARE_VERSION_RELEASE_5_0 && !(context->GetQuery().IsQueryOnlyByKey())) {
69         LOGE("[SingleVerDataSync] remote version only support prefix key");
70         isCheckStatus = false;
71         return E_OK;
72     }
73     if (context->GetQuery().HasInKeys() && context->IsNotSupportAbility(SyncConfig::INKEYS_QUERY)) {
74         isCheckStatus = false;
75         return E_OK;
76     }
77     isCheckStatus = true;
78     return E_OK;
79 }
80 
AckMsgErrnoCheck(const SingleVerSyncTaskContext * context,const Message * message)81 int SingleVerDataSyncUtils::AckMsgErrnoCheck(const SingleVerSyncTaskContext *context, const Message *message)
82 {
83     if (context == nullptr || message == nullptr) {
84         return -E_INVALID_ARGS;
85     }
86     if (message->IsFeedbackError()) {
87         LOGE("[DataSync][AckMsgErrnoCheck] message errNo=%d", message->GetErrorNo());
88         return -static_cast<int>(message->GetErrorNo());
89     }
90     return E_OK;
91 }
92 
RequestQueryCheck(const DataRequestPacket * packet,SyncGenericInterface * storage)93 int SingleVerDataSyncUtils::RequestQueryCheck(const DataRequestPacket *packet, SyncGenericInterface *storage)
94 {
95     if (storage == nullptr || packet == nullptr) {
96         return -E_INVALID_ARGS;
97     }
98     if (SyncOperation::GetSyncType(packet->GetMode()) != SyncType::QUERY_SYNC_TYPE) {
99         return E_OK;
100     }
101     QuerySyncObject syncQuery = packet->GetQuery();
102     int errCode = storage->CheckAndInitQueryCondition(syncQuery);
103     if (errCode != E_OK) {
104         LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
105         return errCode;
106     }
107     return E_OK;
108 }
109 
IsPermitLocalDeviceRecvData(const std::string & deviceId,const SecurityOption & remoteSecOption)110 bool SingleVerDataSyncUtils::IsPermitLocalDeviceRecvData(const std::string &deviceId,
111     const SecurityOption &remoteSecOption)
112 {
113     return RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(deviceId, remoteSecOption);
114 }
115 
IsPermitRemoteDeviceRecvData(const std::string & deviceId,const SecurityOption & remoteSecOption,SyncGenericInterface * storage)116 bool SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(const std::string &deviceId,
117     const SecurityOption &remoteSecOption, SyncGenericInterface *storage)
118 {
119     if (storage == nullptr) {
120         return false;
121     }
122     SecurityOption localSecOption;
123     if (remoteSecOption.securityLabel == NOT_SUPPORT_SEC_CLASSIFICATION) {
124         return true;
125     }
126     int errCode = storage->GetSecurityOption(localSecOption);
127     if (errCode == -E_NOT_SUPPORT) {
128         return true;
129     }
130     if (errCode != E_OK) {
131         LOGE("[SingleVerDataSyncUtils] get security option error %d", errCode);
132         return false;
133     }
134     if (localSecOption.securityLabel == NOT_SET) {
135         LOGE("[SingleVerDataSyncUtils] local label is not set!");
136         return false;
137     }
138     return RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(deviceId, localSecOption);
139 }
140 
TransDbDataItemToSendDataItem(const std::string & localHashName,std::vector<SendDataItem> & outData)141 void SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(const std::string &localHashName,
142     std::vector<SendDataItem> &outData)
143 {
144     for (size_t i = 0; i < outData.size(); i++) {
145         if (outData[i] == nullptr) {
146             continue;
147         }
148         outData[i]->SetOrigDevice(outData[i]->GetOrigDevice().empty() ? localHashName : outData[i]->GetOrigDevice());
149         if (i == 0 || i == (outData.size() - 1)) {
150             LOGD("[DataSync][TransToSendItem] printData packet=%zu,timestamp=%" PRIu64 ",flag=%" PRIu64, i,
151                 outData[i]->GetTimestamp(), outData[i]->GetFlag());
152         }
153     }
154 }
155 
TransferForeignOrigDevName(const std::string & deviceName,const std::string & localHashName)156 std::string SingleVerDataSyncUtils::TransferForeignOrigDevName(const std::string &deviceName,
157     const std::string &localHashName)
158 {
159     if (localHashName == deviceName) {
160         return "";
161     }
162     return deviceName;
163 }
164 
TransSendDataItemToLocal(const SingleVerSyncTaskContext * context,const std::string & localHashName,const std::vector<SendDataItem> & data)165 void SingleVerDataSyncUtils::TransSendDataItemToLocal(const SingleVerSyncTaskContext *context,
166     const std::string &localHashName, const std::vector<SendDataItem> &data)
167 {
168     TimeOffset offset = context->GetTimeOffset();
169     for (auto &item : data) {
170         if (item == nullptr) {
171             continue;
172         }
173         item->SetOrigDevice(TransferForeignOrigDevName(item->GetOrigDevice(), localHashName));
174         Timestamp tempTimestamp = item->GetTimestamp();
175         Timestamp tempWriteTimestamp = item->GetWriteTimestamp();
176         item->SetTimestamp(tempTimestamp - static_cast<Timestamp>(offset));
177         if (tempWriteTimestamp != 0) {
178             item->SetWriteTimestamp(tempWriteTimestamp - static_cast<Timestamp>(offset));
179         }
180 
181         Timestamp currentLocalTime = context->GetCurrentLocalTime();
182         if (item->GetTimestamp() > currentLocalTime) {
183             item->SetTimestamp(currentLocalTime);
184         }
185         if (item->GetWriteTimestamp() > currentLocalTime) {
186             item->SetWriteTimestamp(currentLocalTime);
187         }
188     }
189 }
190 
TranslateErrCodeIfNeed(int mode,uint32_t version,int & errCode)191 void SingleVerDataSyncUtils::TranslateErrCodeIfNeed(int mode, uint32_t version, int &errCode)
192 {
193     // once get data occur E_EKEYREVOKED error, should also send request to remote dev to pull data.
194     if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL &&
195         version > SOFTWARE_VERSION_RELEASE_2_0 && errCode == -E_EKEYREVOKED) {
196         errCode = E_OK;
197     }
198 }
199 
RunPermissionCheck(SingleVerSyncTaskContext * context,const SyncGenericInterface * storage,const std::string & label,const DataRequestPacket * packet)200 int SingleVerDataSyncUtils::RunPermissionCheck(SingleVerSyncTaskContext *context, const SyncGenericInterface* storage,
201     const std::string &label, const DataRequestPacket *packet)
202 {
203     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
204     return RunPermissionCheckInner(context, storage, label, packet, mode);
205 }
206 
RunPermissionCheck(SingleVerSyncTaskContext * context,const SyncGenericInterface * storage,const std::string & label,int mode)207 int SingleVerDataSyncUtils::RunPermissionCheck(SingleVerSyncTaskContext *context, const SyncGenericInterface* storage,
208     const std::string &label, int mode)
209 {
210     return RunPermissionCheckInner(context, storage, label, nullptr, mode);
211 }
212 
CheckPermitReceiveData(const SingleVerSyncTaskContext * context,const ICommunicator * communicator,const SyncGenericInterface * storage)213 bool SingleVerDataSyncUtils::CheckPermitReceiveData(const SingleVerSyncTaskContext *context,
214     const ICommunicator *communicator, const SyncGenericInterface *storage)
215 {
216     if (storage == nullptr) {
217         LOGE("[SingleVerDataSyncUtils] storage is nullptr when check receive data");
218         return false;
219     }
220     // check memory db here because remote maybe low version
221     // it will send option with not set rather than not support when remote is memory db
222     bool memory = storage->GetDbProperties().GetBoolProp(KvDBProperties::MEMORY_MODE, false);
223     if (memory) {
224         LOGI("[SingleVerDataSyncUtils] skip check receive data because local is memory db");
225         return true;
226     }
227     SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
228     std::string localDeviceId;
229     if (communicator == nullptr || remoteSecOption.securityLabel == NOT_SUPPORT_SEC_CLASSIFICATION) {
230         return true;
231     }
232     communicator->GetLocalIdentity(localDeviceId);
233     SecurityOption localOption;
234     int errCode = storage->GetSecurityOption(localOption);
235     if (errCode == -E_NOT_SUPPORT) {
236         LOGD("[SingleVerDataSyncUtils] local not support get security label");
237         return true;
238     }
239     if (errCode == E_OK && localOption.securityLabel == SecurityLabel::NOT_SET) {
240         LOGE("[SingleVerDataSyncUtils] local label is not set!");
241         return false;
242     }
243     if (remoteSecOption.securityLabel == SecurityLabel::S0 && localOption.securityLabel == SecurityLabel::S1) {
244         remoteSecOption.securityLabel = SecurityLabel::S1;
245         LOGI("[SingleVerDataSyncUtils] Transform Remote SecLabel From S0 To S1 When Receive Data");
246     }
247     if (remoteSecOption.securityLabel == FAILED_GET_SEC_CLASSIFICATION) {
248         LOGE("[SingleVerDataSyncUtils] remote label get failed!");
249         return false;
250     }
251     if (remoteSecOption.securityLabel != SecurityLabel::NOT_SET &&
252         remoteSecOption.securityLabel != localOption.securityLabel) {
253         LOGE("[SingleVerDataSyncUtils] label is not equal remote %d local %d", remoteSecOption.securityLabel,
254             localOption.securityLabel);
255         return false;
256     }
257     bool isPermitSync = SingleVerDataSyncUtils::IsPermitLocalDeviceRecvData(localDeviceId, remoteSecOption);
258     if (isPermitSync) {
259         return isPermitSync;
260     }
261     LOGE("[SingleVerDataSyncUtils][PermitReceiveData] check failed: permitReceive=%d, localDev=%s, seclabel=%d,"
262         " secflag=%d", isPermitSync, STR_MASK(localDeviceId), remoteSecOption.securityLabel,
263         remoteSecOption.securityFlag);
264     return isPermitSync;
265 }
266 
SetPacketId(DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t version)267 void SingleVerDataSyncUtils::SetPacketId(DataRequestPacket *packet, SingleVerSyncTaskContext *context, uint32_t version)
268 {
269     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
270         context->IncPacketId(); // begin from 1
271         std::vector<uint64_t> reserved {context->GetPacketId()};
272         packet->SetReserved(reserved);
273     }
274 }
275 
GetMessageId(SyncType syncType)276 int SingleVerDataSyncUtils::GetMessageId(SyncType syncType)
277 {
278     if (syncType == SyncType::QUERY_SYNC_TYPE) {
279         return QUERY_SYNC_MESSAGE;
280     }
281     return DATA_SYNC_MESSAGE;
282 }
283 
PushAndPullKeyRevokHandle(SingleVerSyncTaskContext * context)284 void SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(SingleVerSyncTaskContext *context)
285 {
286     // for push_and_pull mode it may be EKEYREVOKED error before receive watermarkexception
287     // should clear errCode and restart pushpull request.
288     int mode = SyncOperation::TransferSyncMode(context->GetMode());
289     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 && mode == SyncModeType::PUSH_AND_PULL &&
290         context->GetTaskErrCode() == -E_EKEYREVOKED) {
291         context->SetTaskErrCode(E_OK);
292     }
293 }
294 
GetReSendMode(int mode,uint32_t sequenceId,SyncType syncType)295 int SingleVerDataSyncUtils::GetReSendMode(int mode, uint32_t sequenceId, SyncType syncType)
296 {
297     int curMode = SyncOperation::TransferSyncMode(mode);
298     if (curMode == SyncModeType::PUSH || curMode == SyncModeType::PULL) {
299         return mode;
300     }
301     if (curMode == SyncModeType::RESPONSE_PULL) {
302         return (syncType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
303     }
304     // set push_and_pull mode when resend first sequenceId to inform remote to run RESPONSE_PULL task
305     // for sequenceId which is larger than first, only need to send data, means to set push or query_push mode
306     if (sequenceId == 1) {
307         return (syncType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH_PULL : SyncModeType::PUSH_AND_PULL;
308     }
309     return (syncType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
310 }
311 
FillControlRequestPacket(ControlRequestPacket * packet,SingleVerSyncTaskContext * context)312 void SingleVerDataSyncUtils::FillControlRequestPacket(ControlRequestPacket *packet, SingleVerSyncTaskContext *context)
313 {
314     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
315     uint32_t flag = 0;
316     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY && context->IsAutoSubscribe()) {
317         flag = SubscribeRequest::IS_AUTO_SUBSCRIBE;
318     }
319     packet->SetPacketHead(E_OK, version, GetControlCmdType(context->GetMode()), flag);
320     packet->SetQuery(context->GetQuery());
321 }
322 
GetControlCmdType(int mode)323 ControlCmdType SingleVerDataSyncUtils::GetControlCmdType(int mode)
324 {
325     if (mode == SyncModeType::SUBSCRIBE_QUERY) {
326         return ControlCmdType::SUBSCRIBE_QUERY_CMD;
327     } else if  (mode == SyncModeType::UNSUBSCRIBE_QUERY) {
328         return ControlCmdType::UNSUBSCRIBE_QUERY_CMD;
329     }
330     return ControlCmdType::INVALID_CONTROL_CMD;
331 }
332 
GetModeByControlCmdType(ControlCmdType controlCmd)333 int SingleVerDataSyncUtils::GetModeByControlCmdType(ControlCmdType controlCmd)
334 {
335     if (controlCmd == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
336         return SyncModeType::SUBSCRIBE_QUERY;
337     } else if  (controlCmd == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
338         return SyncModeType::UNSUBSCRIBE_QUERY;
339     }
340     return SyncModeType::INVALID_MODE;
341 }
342 
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)343 bool SingleVerDataSyncUtils::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
344 {
345     if (inMsg == nullptr) {
346         return false;
347     }
348     if (inMsg->GetMessageId() != CONTROL_SYNC_MESSAGE || inMsg->GetMessageType() != TYPE_REQUEST) {
349         return false;
350     }
351     auto packet = inMsg->GetObject<SubscribeRequest>();
352     if (packet == nullptr) {
353         return false;
354     }
355     uint32_t controlCmdType = packet->GetcontrolCmdType();
356     if (controlCmdType == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
357         query = packet->GetQuery();
358         LOGI("[SingleVerDataSync] receive sub scribe query cmd,begin to trigger query auto sync");
359         return true;
360     }
361     return false;
362 }
363 
ControlAckErrorHandle(const SingleVerSyncTaskContext * context,const std::shared_ptr<SubscribeManager> & subManager)364 void SingleVerDataSyncUtils::ControlAckErrorHandle(const SingleVerSyncTaskContext *context,
365     const std::shared_ptr<SubscribeManager> &subManager)
366 {
367     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
368         // reserve before need clear
369         subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
370     }
371 }
372 
SetMessageHeadInfo(Message & message,uint16_t inMsgType,const std::string & inTarget,uint32_t inSequenceId,uint32_t inSessionId)373 void SingleVerDataSyncUtils::SetMessageHeadInfo(Message &message, uint16_t inMsgType, const std::string &inTarget,
374     uint32_t inSequenceId, uint32_t inSessionId)
375 {
376     message.SetMessageType(inMsgType);
377     message.SetTarget(inTarget);
378     message.SetSequenceId(inSequenceId);
379     message.SetSessionId(inSessionId);
380 }
381 
IsGetDataSuccessfully(int errCode)382 bool SingleVerDataSyncUtils::IsGetDataSuccessfully(int errCode)
383 {
384     return (errCode == E_OK || errCode == -E_UNFINISHED);
385 }
386 
GetMaxSendDataTime(const std::vector<SendDataItem> & inData)387 Timestamp SingleVerDataSyncUtils::GetMaxSendDataTime(const std::vector<SendDataItem> &inData)
388 {
389     Timestamp stamp = 0;
390     for (size_t i = 0; i < inData.size(); i++) {
391         if (inData[i] == nullptr) {
392             continue;
393         }
394         Timestamp tempStamp = inData[i]->GetTimestamp();
395         if (stamp < tempStamp) {
396             stamp = tempStamp;
397         }
398     }
399     return stamp;
400 }
401 
GetFullSyncDataTimeRange(const std::vector<SendDataItem> & inData,WaterMark localMark,UpdateWaterMark & isUpdate)402 SyncTimeRange SingleVerDataSyncUtils::GetFullSyncDataTimeRange(const std::vector<SendDataItem> &inData,
403     WaterMark localMark, UpdateWaterMark &isUpdate)
404 {
405     Timestamp maxTimestamp = localMark;
406     Timestamp minTimestamp = localMark;
407     for (size_t i = 0; i < inData.size(); i++) {
408         if (inData[i] == nullptr) {
409             continue;
410         }
411         Timestamp tempStamp = inData[i]->GetTimestamp();
412         if (maxTimestamp < tempStamp) {
413             maxTimestamp = tempStamp;
414         }
415         if (minTimestamp > tempStamp) {
416             minTimestamp = tempStamp;
417         }
418         isUpdate.normalUpdateMark = true;
419     }
420     return {minTimestamp, 0, maxTimestamp, 0};
421 }
422 
GetQuerySyncDataTimeRange(const std::vector<SendDataItem> & inData,WaterMark localMark,WaterMark deleteLocalMark,UpdateWaterMark & isUpdate)423 SyncTimeRange SingleVerDataSyncUtils::GetQuerySyncDataTimeRange(const std::vector<SendDataItem> &inData,
424     WaterMark localMark, WaterMark deleteLocalMark, UpdateWaterMark &isUpdate)
425 {
426     SyncTimeRange dataTimeRange = {localMark, deleteLocalMark, localMark, deleteLocalMark};
427     for (size_t i = 0; i < inData.size(); i++) {
428         if (inData[i] == nullptr) {
429             continue;
430         }
431         Timestamp tempStamp = inData[i]->GetTimestamp();
432         if ((inData[i]->GetFlag() & DataItem::DELETE_FLAG) == 0) { // query data
433             if (dataTimeRange.endTime < tempStamp) {
434                 dataTimeRange.endTime = tempStamp;
435             }
436             if (dataTimeRange.beginTime > tempStamp) {
437                 dataTimeRange.beginTime = tempStamp;
438             }
439             isUpdate.normalUpdateMark = true;
440         }
441         if ((inData[i]->GetFlag() & DataItem::DELETE_FLAG) != 0) { // delete data
442             if (dataTimeRange.deleteEndTime < tempStamp) {
443                 dataTimeRange.deleteEndTime = tempStamp;
444             }
445             if (dataTimeRange.deleteBeginTime > tempStamp) {
446                 dataTimeRange.deleteBeginTime = tempStamp;
447             }
448             isUpdate.deleteUpdateMark = true;
449         }
450     }
451     return dataTimeRange;
452 }
453 
ReviseLocalMark(SyncType syncType,const SyncTimeRange & dataTimeRange,UpdateWaterMark updateMark)454 SyncTimeRange SingleVerDataSyncUtils::ReviseLocalMark(SyncType syncType, const SyncTimeRange &dataTimeRange,
455     UpdateWaterMark updateMark)
456 {
457     SyncTimeRange tmpDataTime = dataTimeRange;
458     if (updateMark.deleteUpdateMark && syncType == SyncType::QUERY_SYNC_TYPE) {
459         tmpDataTime.deleteEndTime += 1;
460     }
461     if (updateMark.normalUpdateMark) {
462         tmpDataTime.endTime += 1;
463     }
464     return tmpDataTime;
465 }
466 
GetRecvDataTimeRange(SyncType syncType,const std::vector<SendDataItem> & data,UpdateWaterMark & isUpdate)467 SyncTimeRange SingleVerDataSyncUtils::GetRecvDataTimeRange(SyncType syncType,
468     const std::vector<SendDataItem> &data, UpdateWaterMark &isUpdate)
469 {
470     if (syncType != SyncType::QUERY_SYNC_TYPE) {
471         return SingleVerDataSyncUtils::GetFullSyncDataTimeRange(data, 0, isUpdate);
472     }
473     return SingleVerDataSyncUtils::GetQuerySyncDataTimeRange(data, 0, 0, isUpdate);
474 }
475 
GetSyncDataTimeRange(SyncType syncType,WaterMark localMark,WaterMark deleteMark,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)476 SyncTimeRange SingleVerDataSyncUtils::GetSyncDataTimeRange(SyncType syncType, WaterMark localMark, WaterMark deleteMark,
477     const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
478 {
479     if (syncType != SyncType::QUERY_SYNC_TYPE) {
480         return SingleVerDataSyncUtils::GetFullSyncDataTimeRange(inData, localMark, isUpdate);
481     }
482     return SingleVerDataSyncUtils::GetQuerySyncDataTimeRange(inData, localMark, deleteMark, isUpdate);
483 }
484 
RunPermissionCheckInner(const SingleVerSyncTaskContext * context,const SyncGenericInterface * storage,const std::string & label,const DataRequestPacket * packet,int mode)485 int SingleVerDataSyncUtils::RunPermissionCheckInner(const SingleVerSyncTaskContext *context,
486     const SyncGenericInterface* storage, const std::string &label, const DataRequestPacket *packet, int mode)
487 {
488     PermissionCheckParam param;
489     uint8_t flag = 0u;
490     FillPermissionCheckParam(storage, mode, param, flag);
491     param.deviceId = context->GetDeviceId();
492     if (packet != nullptr) {
493         param.extraConditions = packet->GetExtraConditions();
494     }
495     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(param, flag);
496     if (errCode != E_OK) {
497         LOGE("[DataSync][RunPermissionCheck] check failed flag=%" PRIu8 ",dev=%s", flag,
498             STR_MASK(context->GetDeviceId()));
499     }
500     return errCode;
501 }
502 
GetTimeOffsetFromRequestMsg(const Message * message)503 std::pair<TimeOffset, TimeOffset> SingleVerDataSyncUtils::GetTimeOffsetFromRequestMsg(const Message *message)
504 {
505     std::pair<TimeOffset, TimeOffset> res;
506     auto &[systemOffset, senderLocalOffset] = res;
507     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
508     if (packet == nullptr) {
509         systemOffset = 0;
510         senderLocalOffset = 0;
511         return res;
512     }
513     systemOffset = packet->GetSystemTimeOffset();
514     senderLocalOffset = packet->GetSenderTimeOffset();
515     return res;
516 }
517 
RecordClientId(const SingleVerSyncTaskContext & context,const SyncGenericInterface & storage,std::shared_ptr<Metadata> & metadata)518 void SingleVerDataSyncUtils::RecordClientId(const SingleVerSyncTaskContext &context,
519     const SyncGenericInterface &storage, std::shared_ptr<Metadata> &metadata)
520 {
521     StoreInfo info = {
522         storage.GetDbProperties().GetStringProp(DBProperties::USER_ID, ""),
523         storage.GetDbProperties().GetStringProp(DBProperties::APP_ID, ""),
524         storage.GetDbProperties().GetStringProp(DBProperties::STORE_ID, "")
525     };
526     std::string clientId;
527     if (RuntimeContext::GetInstance()->TranslateDeviceId(context.GetDeviceId(), info, clientId) == E_OK) {
528         int errCode = metadata->SaveClientId(context.GetDeviceId(), clientId);
529         if (errCode != E_OK) {
530             LOGW("[DataSync] record clientId failed %d", errCode);
531         }
532     }
533 }
534 
SetDataRequestCommonInfo(const SingleVerSyncTaskContext & context,const SyncGenericInterface & storage,DataRequestPacket & packet,std::shared_ptr<Metadata> & metadata)535 void SingleVerDataSyncUtils::SetDataRequestCommonInfo(const SingleVerSyncTaskContext &context,
536     const SyncGenericInterface &storage, DataRequestPacket &packet, std::shared_ptr<Metadata> &metadata)
537 {
538     packet.SetSenderTimeOffset(metadata->GetLocalTimeOffset());
539     packet.SetSystemTimeOffset(metadata->GetSystemTimeOffset(context.GetDeviceId(), context.GetTargetUserId()));
540     if (context.GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_9_0) {
541         return;
542     }
543     auto [err, localSchemaVer] = metadata->GetLocalSchemaVersion();
544     if (err != E_OK) {
545         LOGW("[DataSync] get local schema version failed:%d", err);
546         return;
547     }
548     packet.SetSchemaVersion(localSchemaVer);
549     SecurityOption localOption;
550     err = storage.GetSecurityOption(localOption);
551     if (err == -E_NOT_SUPPORT) {
552         LOGW("[DataSync] local not support sec classification");
553         localOption.securityLabel = NOT_SUPPORT_SEC_CLASSIFICATION;
554     } else if (err != E_OK) {
555         LOGE("[DataSync] get local security option errCode:%d", err);
556         localOption.securityLabel = FAILED_GET_SEC_CLASSIFICATION;
557     }
558     packet.SetSecurityOption(localOption);
559 }
560 
SchemaVersionMatchCheck(const SingleVerSyncTaskContext & context,const DataRequestPacket & packet,std::shared_ptr<Metadata> & metadata)561 int SingleVerDataSyncUtils::SchemaVersionMatchCheck(const SingleVerSyncTaskContext &context,
562     const DataRequestPacket &packet, std::shared_ptr<Metadata> &metadata)
563 {
564     if (context.GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_9_0) {
565         return E_OK;
566     }
567     auto remoteSchemaVersion = metadata->GetRemoteSchemaVersion(context.GetDeviceId(), context.GetTargetUserId());
568     if (remoteSchemaVersion != packet.GetSchemaVersion()) {
569         LOGE("[DataSync] remote schema version misMatch, need ability sync again, packet %" PRIu64 " cache %" PRIu64,
570              packet.GetSchemaVersion(), remoteSchemaVersion);
571         return -E_NEED_ABILITY_SYNC;
572     }
573     return E_OK;
574 }
575 
GetUnsyncTotal(const SingleVerSyncTaskContext * context,const SyncGenericInterface * storage,uint32_t & total)576 int SingleVerDataSyncUtils::GetUnsyncTotal(const SingleVerSyncTaskContext *context, const SyncGenericInterface *storage,
577     uint32_t &total)
578 {
579     SyncTimeRange waterRange;
580     WaterMark startMark = context->GetInitWaterMark();
581     if (waterRange.endTime == 0 || startMark > waterRange.endTime) {
582         return E_OK;
583     }
584 
585     waterRange.beginTime = startMark;
586     waterRange.deleteBeginTime = context->GetInitDeletedMark();
587     return GetUnsyncTotal(context, storage, waterRange, total);
588 }
589 
GetUnsyncTotal(const SingleVerSyncTaskContext * context,const SyncGenericInterface * storage,SyncTimeRange & waterMarkInfo,uint32_t & total)590 int SingleVerDataSyncUtils::GetUnsyncTotal(const SingleVerSyncTaskContext *context, const SyncGenericInterface *storage,
591     SyncTimeRange &waterMarkInfo, uint32_t &total)
592 {
593     int errCode = E_OK;
594     SyncType curType = (context->IsQuerySync() ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE);
595     if (curType != SyncType::QUERY_SYNC_TYPE) {
596         errCode = storage->GetUnSyncTotal(waterMarkInfo.beginTime, waterMarkInfo.endTime, total);
597     } else {
598         QuerySyncObject queryObj = context->GetQuery();
599         errCode = storage->GetUnSyncTotal(queryObj, waterMarkInfo, total);
600     }
601     if (errCode != E_OK) {
602         LOGE("[DataSync][GetUnsyncTotal] Get unsync data num failed, errCode=%d", errCode);
603     }
604     return errCode;
605 }
606 
IsSupportRequestTotal(uint32_t version)607 bool SingleVerDataSyncUtils::IsSupportRequestTotal(uint32_t version)
608 {
609     return version >= SOFTWARE_VERSION_RELEASE_10_0;
610 }
611 
UpdateSyncProcess(SingleVerSyncTaskContext * context,const DataRequestPacket * packet)612 void SingleVerDataSyncUtils::UpdateSyncProcess(SingleVerSyncTaskContext *context, const DataRequestPacket *packet)
613 {
614     const std::vector<SendDataItem> &data = packet->GetData();
615     int32_t dataSize = std::count_if(data.begin(), data.end(), [](SendDataItem item) {
616         return (item->GetFlag() & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == 0;
617     });
618     if (dataSize < 0) {
619         return;
620     }
621 
622     LOGD("[DataSync][UpdateSyncProcess] mode=%d, total=%" PRIu64 ", size=%d", packet->GetMode(),
623         packet->GetTotalDataCount(), dataSize);
624     if (packet->GetMode() == SyncModeType::PUSH || packet->GetMode() == SyncModeType::QUERY_PUSH) {
625         // save total count to sync process
626         if (packet->GetTotalDataCount() > 0) {
627             context->SetOperationSyncProcessTotal(context->GetDeviceId(), packet->GetTotalDataCount());
628         }
629         context->UpdateOperationFinishedCount(context->GetDeviceId(), static_cast<uint32_t>(dataSize));
630     }
631 }
632 
CacheInitWaterMark(SingleVerSyncTaskContext * context,SingleVerDataSync * dataSync)633 void SingleVerDataSyncUtils::CacheInitWaterMark(SingleVerSyncTaskContext *context, SingleVerDataSync *dataSync)
634 {
635     SyncType curType = (context->IsQuerySync() ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE);
636     WaterMark startMark = 0;
637     dataSync->GetLocalWaterMark(curType, context->GetQuerySyncId(), context, startMark);
638     context->SetInitWaterMark(startMark);
639 
640     WaterMark deletedMark = 0;
641     dataSync->GetLocalDeleteSyncWaterMark(context, deletedMark);
642     context->SetInitDeletedMark(deletedMark);
643     LOGI("[SingleVerDataSync][CacheInitWaterMark] startMark %" PRIu64 " deletedMark %" PRIu64, startMark, deletedMark);
644 }
645 
GetQueryFromDataRequest(const DataRequestPacket & packet,const SingleVerSyncTaskContext & context,uint32_t sessionId)646 QuerySyncObject SingleVerDataSyncUtils::GetQueryFromDataRequest(const DataRequestPacket &packet,
647     const SingleVerSyncTaskContext &context, uint32_t sessionId)
648 {
649     auto query = packet.GetQuery();
650     query.SetRemoteDev(context.GetDeviceId());
651     query.SetUseLocalSchema(sessionId == context.GetRequestSessionId());
652     return query;
653 }
654 }