1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "single_ver_data_sync_utils.h"
16
17 #include <mutex>
18 #include "db_common.h"
19 #include "version.h"
20 #include "log_print.h"
21 #include "message.h"
22 namespace DistributedDB {
23 namespace {
FillPermissionCheckParam(const SyncGenericInterface * storage,int mode,PermissionCheckParam & param,uint8_t & flag)24 void FillPermissionCheckParam(const SyncGenericInterface* storage, int mode, PermissionCheckParam ¶m, uint8_t &flag)
25 {
26 param.appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
27 param.userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
28 param.storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
29 param.instanceId = storage->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
30 switch (mode) {
31 case SyncModeType::PUSH:
32 flag = CHECK_FLAG_RECEIVE;
33 break;
34 case SyncModeType::PULL:
35 flag = CHECK_FLAG_SEND;
36 break;
37 case SyncModeType::PUSH_AND_PULL:
38 flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE;
39 break;
40 default:
41 flag = CHECK_FLAG_RECEIVE;
42 break;
43 }
44 }
45 }
46
QuerySyncCheck(const SingleVerSyncTaskContext * context,bool & isCheckStatus)47 int SingleVerDataSyncUtils::QuerySyncCheck(const SingleVerSyncTaskContext *context, bool &isCheckStatus)
48 {
49 if (context == nullptr) {
50 isCheckStatus = false;
51 return -E_INVALID_ARGS;
52 }
53 if (!context->IsQuerySync()) {
54 isCheckStatus = true;
55 return E_OK;
56 }
57 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
58 // for 101 version, no need to do abilitySync, just send request to remote
59 if (version <= SOFTWARE_VERSION_RELEASE_1_0) {
60 isCheckStatus = true;
61 return E_OK;
62 }
63 if (version < SOFTWARE_VERSION_RELEASE_4_0) {
64 LOGE("[SingleVerDataSync] not support query sync when remote ver lower than 104");
65 isCheckStatus = false;
66 return E_OK;
67 }
68 if (version < SOFTWARE_VERSION_RELEASE_5_0 && !(context->GetQuery().IsQueryOnlyByKey())) {
69 LOGE("[SingleVerDataSync] remote version only support prefix key");
70 isCheckStatus = false;
71 return E_OK;
72 }
73 if (context->GetQuery().HasInKeys() && context->IsNotSupportAbility(SyncConfig::INKEYS_QUERY)) {
74 isCheckStatus = false;
75 return E_OK;
76 }
77 isCheckStatus = true;
78 return E_OK;
79 }
80
AckMsgErrnoCheck(const SingleVerSyncTaskContext * context,const Message * message)81 int SingleVerDataSyncUtils::AckMsgErrnoCheck(const SingleVerSyncTaskContext *context, const Message *message)
82 {
83 if (context == nullptr || message == nullptr) {
84 return -E_INVALID_ARGS;
85 }
86 if (message->IsFeedbackError()) {
87 LOGE("[DataSync][AckMsgErrnoCheck] message errNo=%d", message->GetErrorNo());
88 return -static_cast<int>(message->GetErrorNo());
89 }
90 return E_OK;
91 }
92
RequestQueryCheck(const DataRequestPacket * packet,SyncGenericInterface * storage)93 int SingleVerDataSyncUtils::RequestQueryCheck(const DataRequestPacket *packet, SyncGenericInterface *storage)
94 {
95 if (storage == nullptr || packet == nullptr) {
96 return -E_INVALID_ARGS;
97 }
98 if (SyncOperation::GetSyncType(packet->GetMode()) != SyncType::QUERY_SYNC_TYPE) {
99 return E_OK;
100 }
101 QuerySyncObject syncQuery = packet->GetQuery();
102 int errCode = storage->CheckAndInitQueryCondition(syncQuery);
103 if (errCode != E_OK) {
104 LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
105 return errCode;
106 }
107 return E_OK;
108 }
109
IsPermitLocalDeviceRecvData(const std::string & deviceId,const SecurityOption & remoteSecOption)110 bool SingleVerDataSyncUtils::IsPermitLocalDeviceRecvData(const std::string &deviceId,
111 const SecurityOption &remoteSecOption)
112 {
113 return RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(deviceId, remoteSecOption);
114 }
115
IsPermitRemoteDeviceRecvData(const std::string & deviceId,const SecurityOption & remoteSecOption,SyncGenericInterface * storage)116 bool SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(const std::string &deviceId,
117 const SecurityOption &remoteSecOption, SyncGenericInterface *storage)
118 {
119 if (storage == nullptr) {
120 return false;
121 }
122 SecurityOption localSecOption;
123 if (remoteSecOption.securityLabel == NOT_SUPPORT_SEC_CLASSIFICATION) {
124 return true;
125 }
126 int errCode = storage->GetSecurityOption(localSecOption);
127 if (errCode == -E_NOT_SUPPORT) {
128 return true;
129 }
130 if (errCode != E_OK) {
131 LOGE("[SingleVerDataSyncUtils] get security option error %d", errCode);
132 return false;
133 }
134 if (localSecOption.securityLabel == NOT_SET) {
135 LOGE("[SingleVerDataSyncUtils] local label is not set!");
136 return false;
137 }
138 return RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(deviceId, localSecOption);
139 }
140
TransDbDataItemToSendDataItem(const std::string & localHashName,std::vector<SendDataItem> & outData)141 void SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(const std::string &localHashName,
142 std::vector<SendDataItem> &outData)
143 {
144 for (size_t i = 0; i < outData.size(); i++) {
145 if (outData[i] == nullptr) {
146 continue;
147 }
148 outData[i]->SetOrigDevice(outData[i]->GetOrigDevice().empty() ? localHashName : outData[i]->GetOrigDevice());
149 if (i == 0 || i == (outData.size() - 1)) {
150 LOGD("[DataSync][TransToSendItem] printData packet=%zu,timestamp=%" PRIu64 ",flag=%" PRIu64, i,
151 outData[i]->GetTimestamp(), outData[i]->GetFlag());
152 }
153 }
154 }
155
TransferForeignOrigDevName(const std::string & deviceName,const std::string & localHashName)156 std::string SingleVerDataSyncUtils::TransferForeignOrigDevName(const std::string &deviceName,
157 const std::string &localHashName)
158 {
159 if (localHashName == deviceName) {
160 return "";
161 }
162 return deviceName;
163 }
164
TransSendDataItemToLocal(const SingleVerSyncTaskContext * context,const std::string & localHashName,const std::vector<SendDataItem> & data)165 void SingleVerDataSyncUtils::TransSendDataItemToLocal(const SingleVerSyncTaskContext *context,
166 const std::string &localHashName, const std::vector<SendDataItem> &data)
167 {
168 TimeOffset offset = context->GetTimeOffset();
169 for (auto &item : data) {
170 if (item == nullptr) {
171 continue;
172 }
173 item->SetOrigDevice(TransferForeignOrigDevName(item->GetOrigDevice(), localHashName));
174 Timestamp tempTimestamp = item->GetTimestamp();
175 Timestamp tempWriteTimestamp = item->GetWriteTimestamp();
176 item->SetTimestamp(tempTimestamp - static_cast<Timestamp>(offset));
177 if (tempWriteTimestamp != 0) {
178 item->SetWriteTimestamp(tempWriteTimestamp - static_cast<Timestamp>(offset));
179 }
180
181 Timestamp currentLocalTime = context->GetCurrentLocalTime();
182 if (item->GetTimestamp() > currentLocalTime) {
183 item->SetTimestamp(currentLocalTime);
184 }
185 if (item->GetWriteTimestamp() > currentLocalTime) {
186 item->SetWriteTimestamp(currentLocalTime);
187 }
188 }
189 }
190
TranslateErrCodeIfNeed(int mode,uint32_t version,int & errCode)191 void SingleVerDataSyncUtils::TranslateErrCodeIfNeed(int mode, uint32_t version, int &errCode)
192 {
193 // once get data occur E_EKEYREVOKED error, should also send request to remote dev to pull data.
194 if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL &&
195 version > SOFTWARE_VERSION_RELEASE_2_0 && errCode == -E_EKEYREVOKED) {
196 errCode = E_OK;
197 }
198 }
199
RunPermissionCheck(SingleVerSyncTaskContext * context,const SyncGenericInterface * storage,const std::string & label,const DataRequestPacket * packet)200 int SingleVerDataSyncUtils::RunPermissionCheck(SingleVerSyncTaskContext *context, const SyncGenericInterface* storage,
201 const std::string &label, const DataRequestPacket *packet)
202 {
203 int mode = SyncOperation::TransferSyncMode(packet->GetMode());
204 return RunPermissionCheckInner(context, storage, label, packet, mode);
205 }
206
RunPermissionCheck(SingleVerSyncTaskContext * context,const SyncGenericInterface * storage,const std::string & label,int mode)207 int SingleVerDataSyncUtils::RunPermissionCheck(SingleVerSyncTaskContext *context, const SyncGenericInterface* storage,
208 const std::string &label, int mode)
209 {
210 return RunPermissionCheckInner(context, storage, label, nullptr, mode);
211 }
212
CheckPermitReceiveData(const SingleVerSyncTaskContext * context,const ICommunicator * communicator,const SyncGenericInterface * storage)213 bool SingleVerDataSyncUtils::CheckPermitReceiveData(const SingleVerSyncTaskContext *context,
214 const ICommunicator *communicator, const SyncGenericInterface *storage)
215 {
216 if (storage == nullptr) {
217 LOGE("[SingleVerDataSyncUtils] storage is nullptr when check receive data");
218 return false;
219 }
220 // check memory db here because remote maybe low version
221 // it will send option with not set rather than not support when remote is memory db
222 bool memory = storage->GetDbProperties().GetBoolProp(KvDBProperties::MEMORY_MODE, false);
223 if (memory) {
224 LOGI("[SingleVerDataSyncUtils] skip check receive data because local is memory db");
225 return true;
226 }
227 SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
228 std::string localDeviceId;
229 if (communicator == nullptr || remoteSecOption.securityLabel == NOT_SUPPORT_SEC_CLASSIFICATION) {
230 return true;
231 }
232 communicator->GetLocalIdentity(localDeviceId);
233 SecurityOption localOption;
234 int errCode = storage->GetSecurityOption(localOption);
235 if (errCode == -E_NOT_SUPPORT) {
236 LOGD("[SingleVerDataSyncUtils] local not support get security label");
237 return true;
238 }
239 if (errCode == E_OK && localOption.securityLabel == SecurityLabel::NOT_SET) {
240 LOGE("[SingleVerDataSyncUtils] local label is not set!");
241 return false;
242 }
243 if (remoteSecOption.securityLabel == SecurityLabel::S0 && localOption.securityLabel == SecurityLabel::S1) {
244 remoteSecOption.securityLabel = SecurityLabel::S1;
245 LOGI("[SingleVerDataSyncUtils] Transform Remote SecLabel From S0 To S1 When Receive Data");
246 }
247 if (remoteSecOption.securityLabel == FAILED_GET_SEC_CLASSIFICATION) {
248 LOGE("[SingleVerDataSyncUtils] remote label get failed!");
249 return false;
250 }
251 if (remoteSecOption.securityLabel != SecurityLabel::NOT_SET &&
252 remoteSecOption.securityLabel != localOption.securityLabel) {
253 LOGE("[SingleVerDataSyncUtils] label is not equal remote %d local %d", remoteSecOption.securityLabel,
254 localOption.securityLabel);
255 return false;
256 }
257 bool isPermitSync = SingleVerDataSyncUtils::IsPermitLocalDeviceRecvData(localDeviceId, remoteSecOption);
258 if (isPermitSync) {
259 return isPermitSync;
260 }
261 LOGE("[SingleVerDataSyncUtils][PermitReceiveData] check failed: permitReceive=%d, localDev=%s, seclabel=%d,"
262 " secflag=%d", isPermitSync, STR_MASK(localDeviceId), remoteSecOption.securityLabel,
263 remoteSecOption.securityFlag);
264 return isPermitSync;
265 }
266
SetPacketId(DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t version)267 void SingleVerDataSyncUtils::SetPacketId(DataRequestPacket *packet, SingleVerSyncTaskContext *context, uint32_t version)
268 {
269 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
270 context->IncPacketId(); // begin from 1
271 std::vector<uint64_t> reserved {context->GetPacketId()};
272 packet->SetReserved(reserved);
273 }
274 }
275
GetMessageId(SyncType syncType)276 int SingleVerDataSyncUtils::GetMessageId(SyncType syncType)
277 {
278 if (syncType == SyncType::QUERY_SYNC_TYPE) {
279 return QUERY_SYNC_MESSAGE;
280 }
281 return DATA_SYNC_MESSAGE;
282 }
283
PushAndPullKeyRevokHandle(SingleVerSyncTaskContext * context)284 void SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(SingleVerSyncTaskContext *context)
285 {
286 // for push_and_pull mode it may be EKEYREVOKED error before receive watermarkexception
287 // should clear errCode and restart pushpull request.
288 int mode = SyncOperation::TransferSyncMode(context->GetMode());
289 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 && mode == SyncModeType::PUSH_AND_PULL &&
290 context->GetTaskErrCode() == -E_EKEYREVOKED) {
291 context->SetTaskErrCode(E_OK);
292 }
293 }
294
GetReSendMode(int mode,uint32_t sequenceId,SyncType syncType)295 int SingleVerDataSyncUtils::GetReSendMode(int mode, uint32_t sequenceId, SyncType syncType)
296 {
297 int curMode = SyncOperation::TransferSyncMode(mode);
298 if (curMode == SyncModeType::PUSH || curMode == SyncModeType::PULL) {
299 return mode;
300 }
301 if (curMode == SyncModeType::RESPONSE_PULL) {
302 return (syncType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
303 }
304 // set push_and_pull mode when resend first sequenceId to inform remote to run RESPONSE_PULL task
305 // for sequenceId which is larger than first, only need to send data, means to set push or query_push mode
306 if (sequenceId == 1) {
307 return (syncType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH_PULL : SyncModeType::PUSH_AND_PULL;
308 }
309 return (syncType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
310 }
311
FillControlRequestPacket(ControlRequestPacket * packet,SingleVerSyncTaskContext * context)312 void SingleVerDataSyncUtils::FillControlRequestPacket(ControlRequestPacket *packet, SingleVerSyncTaskContext *context)
313 {
314 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
315 uint32_t flag = 0;
316 if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY && context->IsAutoSubscribe()) {
317 flag = SubscribeRequest::IS_AUTO_SUBSCRIBE;
318 }
319 packet->SetPacketHead(E_OK, version, GetControlCmdType(context->GetMode()), flag);
320 packet->SetQuery(context->GetQuery());
321 }
322
GetControlCmdType(int mode)323 ControlCmdType SingleVerDataSyncUtils::GetControlCmdType(int mode)
324 {
325 if (mode == SyncModeType::SUBSCRIBE_QUERY) {
326 return ControlCmdType::SUBSCRIBE_QUERY_CMD;
327 } else if (mode == SyncModeType::UNSUBSCRIBE_QUERY) {
328 return ControlCmdType::UNSUBSCRIBE_QUERY_CMD;
329 }
330 return ControlCmdType::INVALID_CONTROL_CMD;
331 }
332
GetModeByControlCmdType(ControlCmdType controlCmd)333 int SingleVerDataSyncUtils::GetModeByControlCmdType(ControlCmdType controlCmd)
334 {
335 if (controlCmd == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
336 return SyncModeType::SUBSCRIBE_QUERY;
337 } else if (controlCmd == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
338 return SyncModeType::UNSUBSCRIBE_QUERY;
339 }
340 return SyncModeType::INVALID_MODE;
341 }
342
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)343 bool SingleVerDataSyncUtils::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
344 {
345 if (inMsg == nullptr) {
346 return false;
347 }
348 if (inMsg->GetMessageId() != CONTROL_SYNC_MESSAGE || inMsg->GetMessageType() != TYPE_REQUEST) {
349 return false;
350 }
351 auto packet = inMsg->GetObject<SubscribeRequest>();
352 if (packet == nullptr) {
353 return false;
354 }
355 uint32_t controlCmdType = packet->GetcontrolCmdType();
356 if (controlCmdType == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
357 query = packet->GetQuery();
358 LOGI("[SingleVerDataSync] receive sub scribe query cmd,begin to trigger query auto sync");
359 return true;
360 }
361 return false;
362 }
363
ControlAckErrorHandle(const SingleVerSyncTaskContext * context,const std::shared_ptr<SubscribeManager> & subManager)364 void SingleVerDataSyncUtils::ControlAckErrorHandle(const SingleVerSyncTaskContext *context,
365 const std::shared_ptr<SubscribeManager> &subManager)
366 {
367 if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
368 // reserve before need clear
369 subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
370 }
371 }
372
SetMessageHeadInfo(Message & message,uint16_t inMsgType,const std::string & inTarget,uint32_t inSequenceId,uint32_t inSessionId)373 void SingleVerDataSyncUtils::SetMessageHeadInfo(Message &message, uint16_t inMsgType, const std::string &inTarget,
374 uint32_t inSequenceId, uint32_t inSessionId)
375 {
376 message.SetMessageType(inMsgType);
377 message.SetTarget(inTarget);
378 message.SetSequenceId(inSequenceId);
379 message.SetSessionId(inSessionId);
380 }
381
IsGetDataSuccessfully(int errCode)382 bool SingleVerDataSyncUtils::IsGetDataSuccessfully(int errCode)
383 {
384 return (errCode == E_OK || errCode == -E_UNFINISHED);
385 }
386
GetMaxSendDataTime(const std::vector<SendDataItem> & inData)387 Timestamp SingleVerDataSyncUtils::GetMaxSendDataTime(const std::vector<SendDataItem> &inData)
388 {
389 Timestamp stamp = 0;
390 for (size_t i = 0; i < inData.size(); i++) {
391 if (inData[i] == nullptr) {
392 continue;
393 }
394 Timestamp tempStamp = inData[i]->GetTimestamp();
395 if (stamp < tempStamp) {
396 stamp = tempStamp;
397 }
398 }
399 return stamp;
400 }
401
GetFullSyncDataTimeRange(const std::vector<SendDataItem> & inData,WaterMark localMark,UpdateWaterMark & isUpdate)402 SyncTimeRange SingleVerDataSyncUtils::GetFullSyncDataTimeRange(const std::vector<SendDataItem> &inData,
403 WaterMark localMark, UpdateWaterMark &isUpdate)
404 {
405 Timestamp maxTimestamp = localMark;
406 Timestamp minTimestamp = localMark;
407 for (size_t i = 0; i < inData.size(); i++) {
408 if (inData[i] == nullptr) {
409 continue;
410 }
411 Timestamp tempStamp = inData[i]->GetTimestamp();
412 if (maxTimestamp < tempStamp) {
413 maxTimestamp = tempStamp;
414 }
415 if (minTimestamp > tempStamp) {
416 minTimestamp = tempStamp;
417 }
418 isUpdate.normalUpdateMark = true;
419 }
420 return {minTimestamp, 0, maxTimestamp, 0};
421 }
422
GetQuerySyncDataTimeRange(const std::vector<SendDataItem> & inData,WaterMark localMark,WaterMark deleteLocalMark,UpdateWaterMark & isUpdate)423 SyncTimeRange SingleVerDataSyncUtils::GetQuerySyncDataTimeRange(const std::vector<SendDataItem> &inData,
424 WaterMark localMark, WaterMark deleteLocalMark, UpdateWaterMark &isUpdate)
425 {
426 SyncTimeRange dataTimeRange = {localMark, deleteLocalMark, localMark, deleteLocalMark};
427 for (size_t i = 0; i < inData.size(); i++) {
428 if (inData[i] == nullptr) {
429 continue;
430 }
431 Timestamp tempStamp = inData[i]->GetTimestamp();
432 if ((inData[i]->GetFlag() & DataItem::DELETE_FLAG) == 0) { // query data
433 if (dataTimeRange.endTime < tempStamp) {
434 dataTimeRange.endTime = tempStamp;
435 }
436 if (dataTimeRange.beginTime > tempStamp) {
437 dataTimeRange.beginTime = tempStamp;
438 }
439 isUpdate.normalUpdateMark = true;
440 }
441 if ((inData[i]->GetFlag() & DataItem::DELETE_FLAG) != 0) { // delete data
442 if (dataTimeRange.deleteEndTime < tempStamp) {
443 dataTimeRange.deleteEndTime = tempStamp;
444 }
445 if (dataTimeRange.deleteBeginTime > tempStamp) {
446 dataTimeRange.deleteBeginTime = tempStamp;
447 }
448 isUpdate.deleteUpdateMark = true;
449 }
450 }
451 return dataTimeRange;
452 }
453
ReviseLocalMark(SyncType syncType,const SyncTimeRange & dataTimeRange,UpdateWaterMark updateMark)454 SyncTimeRange SingleVerDataSyncUtils::ReviseLocalMark(SyncType syncType, const SyncTimeRange &dataTimeRange,
455 UpdateWaterMark updateMark)
456 {
457 SyncTimeRange tmpDataTime = dataTimeRange;
458 if (updateMark.deleteUpdateMark && syncType == SyncType::QUERY_SYNC_TYPE) {
459 tmpDataTime.deleteEndTime += 1;
460 }
461 if (updateMark.normalUpdateMark) {
462 tmpDataTime.endTime += 1;
463 }
464 return tmpDataTime;
465 }
466
GetRecvDataTimeRange(SyncType syncType,const std::vector<SendDataItem> & data,UpdateWaterMark & isUpdate)467 SyncTimeRange SingleVerDataSyncUtils::GetRecvDataTimeRange(SyncType syncType,
468 const std::vector<SendDataItem> &data, UpdateWaterMark &isUpdate)
469 {
470 if (syncType != SyncType::QUERY_SYNC_TYPE) {
471 return SingleVerDataSyncUtils::GetFullSyncDataTimeRange(data, 0, isUpdate);
472 }
473 return SingleVerDataSyncUtils::GetQuerySyncDataTimeRange(data, 0, 0, isUpdate);
474 }
475
GetSyncDataTimeRange(SyncType syncType,WaterMark localMark,WaterMark deleteMark,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)476 SyncTimeRange SingleVerDataSyncUtils::GetSyncDataTimeRange(SyncType syncType, WaterMark localMark, WaterMark deleteMark,
477 const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
478 {
479 if (syncType != SyncType::QUERY_SYNC_TYPE) {
480 return SingleVerDataSyncUtils::GetFullSyncDataTimeRange(inData, localMark, isUpdate);
481 }
482 return SingleVerDataSyncUtils::GetQuerySyncDataTimeRange(inData, localMark, deleteMark, isUpdate);
483 }
484
RunPermissionCheckInner(const SingleVerSyncTaskContext * context,const SyncGenericInterface * storage,const std::string & label,const DataRequestPacket * packet,int mode)485 int SingleVerDataSyncUtils::RunPermissionCheckInner(const SingleVerSyncTaskContext *context,
486 const SyncGenericInterface* storage, const std::string &label, const DataRequestPacket *packet, int mode)
487 {
488 PermissionCheckParam param;
489 uint8_t flag = 0u;
490 FillPermissionCheckParam(storage, mode, param, flag);
491 param.deviceId = context->GetDeviceId();
492 if (packet != nullptr) {
493 param.extraConditions = packet->GetExtraConditions();
494 }
495 int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(param, flag);
496 if (errCode != E_OK) {
497 LOGE("[DataSync][RunPermissionCheck] check failed flag=%" PRIu8 ",dev=%s", flag,
498 STR_MASK(context->GetDeviceId()));
499 }
500 return errCode;
501 }
502
GetTimeOffsetFromRequestMsg(const Message * message)503 std::pair<TimeOffset, TimeOffset> SingleVerDataSyncUtils::GetTimeOffsetFromRequestMsg(const Message *message)
504 {
505 std::pair<TimeOffset, TimeOffset> res;
506 auto &[systemOffset, senderLocalOffset] = res;
507 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
508 if (packet == nullptr) {
509 systemOffset = 0;
510 senderLocalOffset = 0;
511 return res;
512 }
513 systemOffset = packet->GetSystemTimeOffset();
514 senderLocalOffset = packet->GetSenderTimeOffset();
515 return res;
516 }
517
RecordClientId(const SingleVerSyncTaskContext & context,const SyncGenericInterface & storage,std::shared_ptr<Metadata> & metadata)518 void SingleVerDataSyncUtils::RecordClientId(const SingleVerSyncTaskContext &context,
519 const SyncGenericInterface &storage, std::shared_ptr<Metadata> &metadata)
520 {
521 StoreInfo info = {
522 storage.GetDbProperties().GetStringProp(DBProperties::USER_ID, ""),
523 storage.GetDbProperties().GetStringProp(DBProperties::APP_ID, ""),
524 storage.GetDbProperties().GetStringProp(DBProperties::STORE_ID, "")
525 };
526 std::string clientId;
527 if (RuntimeContext::GetInstance()->TranslateDeviceId(context.GetDeviceId(), info, clientId) == E_OK) {
528 int errCode = metadata->SaveClientId(context.GetDeviceId(), clientId);
529 if (errCode != E_OK) {
530 LOGW("[DataSync] record clientId failed %d", errCode);
531 }
532 }
533 }
534
SetDataRequestCommonInfo(const SingleVerSyncTaskContext & context,const SyncGenericInterface & storage,DataRequestPacket & packet,std::shared_ptr<Metadata> & metadata)535 void SingleVerDataSyncUtils::SetDataRequestCommonInfo(const SingleVerSyncTaskContext &context,
536 const SyncGenericInterface &storage, DataRequestPacket &packet, std::shared_ptr<Metadata> &metadata)
537 {
538 packet.SetSenderTimeOffset(metadata->GetLocalTimeOffset());
539 packet.SetSystemTimeOffset(metadata->GetSystemTimeOffset(context.GetDeviceId(), context.GetTargetUserId()));
540 if (context.GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_9_0) {
541 return;
542 }
543 auto [err, localSchemaVer] = metadata->GetLocalSchemaVersion();
544 if (err != E_OK) {
545 LOGW("[DataSync] get local schema version failed:%d", err);
546 return;
547 }
548 packet.SetSchemaVersion(localSchemaVer);
549 SecurityOption localOption;
550 err = storage.GetSecurityOption(localOption);
551 if (err == -E_NOT_SUPPORT) {
552 LOGW("[DataSync] local not support sec classification");
553 localOption.securityLabel = NOT_SUPPORT_SEC_CLASSIFICATION;
554 } else if (err != E_OK) {
555 LOGE("[DataSync] get local security option errCode:%d", err);
556 localOption.securityLabel = FAILED_GET_SEC_CLASSIFICATION;
557 }
558 packet.SetSecurityOption(localOption);
559 }
560
SchemaVersionMatchCheck(const SingleVerSyncTaskContext & context,const DataRequestPacket & packet,std::shared_ptr<Metadata> & metadata)561 int SingleVerDataSyncUtils::SchemaVersionMatchCheck(const SingleVerSyncTaskContext &context,
562 const DataRequestPacket &packet, std::shared_ptr<Metadata> &metadata)
563 {
564 if (context.GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_9_0) {
565 return E_OK;
566 }
567 auto remoteSchemaVersion = metadata->GetRemoteSchemaVersion(context.GetDeviceId(), context.GetTargetUserId());
568 if (remoteSchemaVersion != packet.GetSchemaVersion()) {
569 LOGE("[DataSync] remote schema version misMatch, need ability sync again, packet %" PRIu64 " cache %" PRIu64,
570 packet.GetSchemaVersion(), remoteSchemaVersion);
571 return -E_NEED_ABILITY_SYNC;
572 }
573 return E_OK;
574 }
575
GetUnsyncTotal(const SingleVerSyncTaskContext * context,const SyncGenericInterface * storage,uint32_t & total)576 int SingleVerDataSyncUtils::GetUnsyncTotal(const SingleVerSyncTaskContext *context, const SyncGenericInterface *storage,
577 uint32_t &total)
578 {
579 SyncTimeRange waterRange;
580 WaterMark startMark = context->GetInitWaterMark();
581 if (waterRange.endTime == 0 || startMark > waterRange.endTime) {
582 return E_OK;
583 }
584
585 waterRange.beginTime = startMark;
586 waterRange.deleteBeginTime = context->GetInitDeletedMark();
587 return GetUnsyncTotal(context, storage, waterRange, total);
588 }
589
GetUnsyncTotal(const SingleVerSyncTaskContext * context,const SyncGenericInterface * storage,SyncTimeRange & waterMarkInfo,uint32_t & total)590 int SingleVerDataSyncUtils::GetUnsyncTotal(const SingleVerSyncTaskContext *context, const SyncGenericInterface *storage,
591 SyncTimeRange &waterMarkInfo, uint32_t &total)
592 {
593 int errCode = E_OK;
594 SyncType curType = (context->IsQuerySync() ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE);
595 if (curType != SyncType::QUERY_SYNC_TYPE) {
596 errCode = storage->GetUnSyncTotal(waterMarkInfo.beginTime, waterMarkInfo.endTime, total);
597 } else {
598 QuerySyncObject queryObj = context->GetQuery();
599 errCode = storage->GetUnSyncTotal(queryObj, waterMarkInfo, total);
600 }
601 if (errCode != E_OK) {
602 LOGE("[DataSync][GetUnsyncTotal] Get unsync data num failed, errCode=%d", errCode);
603 }
604 return errCode;
605 }
606
IsSupportRequestTotal(uint32_t version)607 bool SingleVerDataSyncUtils::IsSupportRequestTotal(uint32_t version)
608 {
609 return version >= SOFTWARE_VERSION_RELEASE_10_0;
610 }
611
UpdateSyncProcess(SingleVerSyncTaskContext * context,const DataRequestPacket * packet)612 void SingleVerDataSyncUtils::UpdateSyncProcess(SingleVerSyncTaskContext *context, const DataRequestPacket *packet)
613 {
614 const std::vector<SendDataItem> &data = packet->GetData();
615 int32_t dataSize = std::count_if(data.begin(), data.end(), [](SendDataItem item) {
616 return (item->GetFlag() & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == 0;
617 });
618 if (dataSize < 0) {
619 return;
620 }
621
622 LOGD("[DataSync][UpdateSyncProcess] mode=%d, total=%" PRIu64 ", size=%d", packet->GetMode(),
623 packet->GetTotalDataCount(), dataSize);
624 if (packet->GetMode() == SyncModeType::PUSH || packet->GetMode() == SyncModeType::QUERY_PUSH) {
625 // save total count to sync process
626 if (packet->GetTotalDataCount() > 0) {
627 context->SetOperationSyncProcessTotal(context->GetDeviceId(), packet->GetTotalDataCount());
628 }
629 context->UpdateOperationFinishedCount(context->GetDeviceId(), static_cast<uint32_t>(dataSize));
630 }
631 }
632
CacheInitWaterMark(SingleVerSyncTaskContext * context,SingleVerDataSync * dataSync)633 void SingleVerDataSyncUtils::CacheInitWaterMark(SingleVerSyncTaskContext *context, SingleVerDataSync *dataSync)
634 {
635 SyncType curType = (context->IsQuerySync() ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE);
636 WaterMark startMark = 0;
637 dataSync->GetLocalWaterMark(curType, context->GetQuerySyncId(), context, startMark);
638 context->SetInitWaterMark(startMark);
639
640 WaterMark deletedMark = 0;
641 dataSync->GetLocalDeleteSyncWaterMark(context, deletedMark);
642 context->SetInitDeletedMark(deletedMark);
643 LOGI("[SingleVerDataSync][CacheInitWaterMark] startMark %" PRIu64 " deletedMark %" PRIu64, startMark, deletedMark);
644 }
645
GetQueryFromDataRequest(const DataRequestPacket & packet,const SingleVerSyncTaskContext & context,uint32_t sessionId)646 QuerySyncObject SingleVerDataSyncUtils::GetQueryFromDataRequest(const DataRequestPacket &packet,
647 const SingleVerSyncTaskContext &context, uint32_t sessionId)
648 {
649 auto query = packet.GetQuery();
650 query.SetRemoteDev(context.GetDeviceId());
651 query.SetUseLocalSchema(sessionId == context.GetRequestSessionId());
652 return query;
653 }
654 }