• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "single_ver_data_sync_utils.h"
16 
17 #include <mutex>
18 #include "db_common.h"
19 #include "version.h"
20 #include "log_print.h"
21 #include "message.h"
22 namespace DistributedDB {
QuerySyncCheck(const SingleVerSyncTaskContext * context)23 bool SingleVerDataSyncUtils::QuerySyncCheck(const SingleVerSyncTaskContext *context)
24 {
25     if (!context->IsQuerySync()) {
26         return true;
27     }
28     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
29     // for 101 version, no need to do abilitySync, just send request to remote
30     if (version <= SOFTWARE_VERSION_RELEASE_1_0) {
31         return true;
32     }
33     if (version < SOFTWARE_VERSION_RELEASE_4_0) {
34         LOGE("[SingleVerDataSync] not support query sync when remote ver lower than 104");
35         return false;
36     }
37     if (version < SOFTWARE_VERSION_RELEASE_5_0 && !(context->GetQuery().IsQueryOnlyByKey())) {
38         LOGE("[SingleVerDataSync] remote version only support prefix key");
39         return false;
40     }
41     if (context->GetQuery().HasInKeys() &&
42         context->IsNotSupportAbility(SyncConfig::INKEYS_QUERY)) {
43         return false;
44     }
45     return true;
46 }
47 
AckMsgErrnoCheck(const SingleVerSyncTaskContext * context,const Message * message)48 int SingleVerDataSyncUtils::AckMsgErrnoCheck(const SingleVerSyncTaskContext *context, const Message *message)
49 {
50     if (context == nullptr || message == nullptr) {
51         return -E_INVALID_ARGS;
52     }
53     if (message->IsFeedbackError()) {
54         LOGE("[DataSync][AckMsgErrnoCheck] message errNo=%d", message->GetErrorNo());
55         return -static_cast<int>(message->GetErrorNo());
56     }
57     return E_OK;
58 }
59 
RequestQueryCheck(const DataRequestPacket * packet,SyncGenericInterface * storage)60 int SingleVerDataSyncUtils::RequestQueryCheck(const DataRequestPacket *packet, SyncGenericInterface *storage)
61 {
62     if (storage == nullptr || packet == nullptr) {
63         return -E_INVALID_ARGS;
64     }
65     if (SyncOperation::GetSyncType(packet->GetMode()) != SyncType::QUERY_SYNC_TYPE) {
66         return E_OK;
67     }
68     QuerySyncObject syncQuery = packet->GetQuery();
69     int errCode = storage->CheckAndInitQueryCondition(syncQuery);
70     if (errCode != E_OK) {
71         LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
72         return errCode;
73     }
74     return E_OK;
75 }
76 
IsPermitLocalDeviceRecvData(const std::string & deviceId,const SecurityOption & remoteSecOption)77 bool SingleVerDataSyncUtils::IsPermitLocalDeviceRecvData(const std::string &deviceId,
78     const SecurityOption &remoteSecOption)
79 {
80     return RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(deviceId, remoteSecOption);
81 }
82 
IsPermitRemoteDeviceRecvData(const std::string & deviceId,const SecurityOption & remoteSecOption,SyncGenericInterface * storage)83 bool SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(const std::string &deviceId,
84     const SecurityOption &remoteSecOption, SyncGenericInterface *storage)
85 {
86     if (storage == nullptr) {
87         return -E_INVALID_ARGS;
88     }
89     SecurityOption localSecOption;
90     if (remoteSecOption.securityLabel == NOT_SURPPORT_SEC_CLASSIFICATION) {
91         return true;
92     }
93     int errCode = storage->GetSecurityOption(localSecOption);
94     if (errCode == -E_NOT_SUPPORT) {
95         return true;
96     }
97     return RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(deviceId, localSecOption);
98 }
99 
TransDbDataItemToSendDataItem(const std::string & localHashName,std::vector<SendDataItem> & outData)100 void SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(const std::string &localHashName,
101     std::vector<SendDataItem> &outData)
102 {
103     for (size_t i = 0; i < outData.size(); i++) {
104         if (outData[i] == nullptr) {
105             continue;
106         }
107         outData[i]->SetOrigDevice(outData[i]->GetOrigDevice().empty() ? localHashName : outData[i]->GetOrigDevice());
108         if (i == 0 || i == (outData.size() - 1)) {
109             LOGD("[DataSync][TransToSendItem] printData packet=%zu,timestamp=%" PRIu64 ",flag=%" PRIu64, i,
110                 outData[i]->GetTimestamp(), outData[i]->GetFlag());
111         }
112     }
113 }
114 
TransferForeignOrigDevName(const std::string & deviceName,const std::string & localHashName)115 std::string SingleVerDataSyncUtils::TransferForeignOrigDevName(const std::string &deviceName,
116     const std::string &localHashName)
117 {
118     if (localHashName == deviceName) {
119         return "";
120     }
121     return deviceName;
122 }
123 
TransSendDataItemToLocal(const SingleVerSyncTaskContext * context,const std::string & localHashName,const std::vector<SendDataItem> & data)124 void SingleVerDataSyncUtils::TransSendDataItemToLocal(const SingleVerSyncTaskContext *context,
125     const std::string &localHashName, const std::vector<SendDataItem> &data)
126 {
127     TimeOffset offset = context->GetTimeOffset();
128     Timestamp currentLocalTime = context->GetCurrentLocalTime();
129     for (auto &item : data) {
130         if (item == nullptr) {
131             continue;
132         }
133         item->SetOrigDevice(TransferForeignOrigDevName(item->GetOrigDevice(), localHashName));
134         Timestamp tempTimestamp = item->GetTimestamp();
135         Timestamp tempWriteTimestamp = item->GetWriteTimestamp();
136         item->SetTimestamp(tempTimestamp - static_cast<Timestamp>(offset));
137         if (tempWriteTimestamp != 0) {
138             item->SetWriteTimestamp(tempWriteTimestamp - static_cast<Timestamp>(offset));
139         }
140 
141         if (item->GetTimestamp() > currentLocalTime) {
142             item->SetTimestamp(currentLocalTime);
143         }
144         if (item->GetWriteTimestamp() > currentLocalTime) {
145             item->SetWriteTimestamp(currentLocalTime);
146         }
147     }
148 }
149 
TranslateErrCodeIfNeed(int mode,uint32_t version,int & errCode)150 void SingleVerDataSyncUtils::TranslateErrCodeIfNeed(int mode, uint32_t version, int &errCode)
151 {
152     // once get data occur E_EKEYREVOKED error, should also send request to remote dev to pull data.
153     if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL &&
154         version > SOFTWARE_VERSION_RELEASE_2_0 && errCode == -E_EKEYREVOKED) {
155         errCode = E_OK;
156     }
157 }
158 
RunPermissionCheck(SingleVerSyncTaskContext * context,const SyncGenericInterface * storage,const std::string & label,const DataRequestPacket * packet)159 int SingleVerDataSyncUtils::RunPermissionCheck(SingleVerSyncTaskContext *context, const SyncGenericInterface* storage,
160     const std::string &label, const DataRequestPacket *packet)
161 {
162     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
163     std::string appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
164     std::string userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
165     std::string storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
166     int32_t instanceId = storage->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
167     uint8_t flag;
168     switch (mode) {
169         case SyncModeType::PUSH:
170             flag = CHECK_FLAG_RECEIVE;
171             break;
172         case SyncModeType::PULL:
173             flag = CHECK_FLAG_SEND;
174             break;
175         case SyncModeType::PUSH_AND_PULL:
176             flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE;
177             break;
178         default:
179             flag = CHECK_FLAG_RECEIVE;
180             break;
181     }
182     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
183         { userId, appId, storeId, context->GetDeviceId(), instanceId, packet->GetExtraConditions() },
184         flag);
185     if (errCode != E_OK) {
186         LOGE("[DataSync][RunPermissionCheck] check failed flag=%" PRIu8 ",Label=%s,dev=%s", flag, label.c_str(),
187             STR_MASK(context->GetDeviceId()));
188     }
189     return errCode;
190 }
191 
CheckPermitReceiveData(const SingleVerSyncTaskContext * context,const ICommunicator * communicator)192 bool SingleVerDataSyncUtils::CheckPermitReceiveData(const SingleVerSyncTaskContext *context,
193     const ICommunicator *communicator)
194 {
195     SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
196     std::string localDeviceId;
197     if (communicator == nullptr || remoteSecOption.securityLabel == NOT_SURPPORT_SEC_CLASSIFICATION) {
198         return true;
199     }
200     communicator->GetLocalIdentity(localDeviceId);
201     bool isPermitSync = SingleVerDataSyncUtils::IsPermitLocalDeviceRecvData(localDeviceId, remoteSecOption);
202     if (isPermitSync) {
203         return isPermitSync;
204     }
205     LOGE("[DataSync][PermitReceiveData] check failed: permitReceive=%d, localDev=%s, seclabel=%d, secflag=%d",
206         isPermitSync, STR_MASK(localDeviceId), remoteSecOption.securityLabel, remoteSecOption.securityFlag);
207     return isPermitSync;
208 }
209 
SetPacketId(DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t version)210 void SingleVerDataSyncUtils::SetPacketId(DataRequestPacket *packet, SingleVerSyncTaskContext *context, uint32_t version)
211 {
212     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
213         context->IncPacketId(); // begin from 1
214         std::vector<uint64_t> reserved {context->GetPacketId()};
215         packet->SetReserved(reserved);
216     }
217 }
218 
GetMessageId(SyncType syncType)219 int SingleVerDataSyncUtils::GetMessageId(SyncType syncType)
220 {
221     if (syncType == SyncType::QUERY_SYNC_TYPE) {
222         return QUERY_SYNC_MESSAGE;
223     }
224     return DATA_SYNC_MESSAGE;
225 }
226 
PushAndPullKeyRevokHandle(SingleVerSyncTaskContext * context)227 void SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(SingleVerSyncTaskContext *context)
228 {
229     // for push_and_pull mode it may be EKEYREVOKED error before receive watermarkexception
230     // should clear errCode and restart pushpull request.
231     int mode = SyncOperation::TransferSyncMode(context->GetMode());
232     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0 && mode == SyncModeType::PUSH_AND_PULL &&
233         context->GetTaskErrCode() == -E_EKEYREVOKED) {
234         context->SetTaskErrCode(E_OK);
235     }
236 }
237 
GetReSendMode(int mode,uint32_t sequenceId,SyncType syncType)238 int SingleVerDataSyncUtils::GetReSendMode(int mode, uint32_t sequenceId, SyncType syncType)
239 {
240     int curMode = SyncOperation::TransferSyncMode(mode);
241     if (curMode == SyncModeType::PUSH || curMode == SyncModeType::PULL) {
242         return mode;
243     }
244     if (curMode == SyncModeType::RESPONSE_PULL) {
245         return (syncType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
246     }
247     // set push_and_pull mode when resend first sequenceId to inform remote to run RESPONSE_PULL task
248     // for sequenceId which is larger than first, only need to send data, means to set push or query_push mode
249     if (sequenceId == 1) {
250         return (syncType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH_PULL : SyncModeType::PUSH_AND_PULL;
251     }
252     return (syncType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
253 }
254 
FillControlRequestPacket(ControlRequestPacket * packet,SingleVerSyncTaskContext * context)255 void SingleVerDataSyncUtils::FillControlRequestPacket(ControlRequestPacket *packet, SingleVerSyncTaskContext *context)
256 {
257     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
258     uint32_t flag = 0;
259     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY && context->IsAutoSubscribe()) {
260         flag = SubscribeRequest::IS_AUTO_SUBSCRIBE;
261     }
262     packet->SetPacketHead(E_OK, version, GetControlCmdType(context->GetMode()), flag);
263     packet->SetQuery(context->GetQuery());
264 }
265 
GetControlCmdType(int mode)266 ControlCmdType SingleVerDataSyncUtils::GetControlCmdType(int mode)
267 {
268     if (mode == SyncModeType::SUBSCRIBE_QUERY) {
269         return ControlCmdType::SUBSCRIBE_QUERY_CMD;
270     } else if  (mode == SyncModeType::UNSUBSCRIBE_QUERY) {
271         return ControlCmdType::UNSUBSCRIBE_QUERY_CMD;
272     }
273     return ControlCmdType::INVALID_CONTROL_CMD;
274 }
275 
GetModeByControlCmdType(ControlCmdType controlCmd)276 int SingleVerDataSyncUtils::GetModeByControlCmdType(ControlCmdType controlCmd)
277 {
278     if (controlCmd == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
279         return SyncModeType::SUBSCRIBE_QUERY;
280     } else if  (controlCmd == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
281         return SyncModeType::UNSUBSCRIBE_QUERY;
282     }
283     return SyncModeType::INVALID_MODE;
284 }
285 
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)286 bool SingleVerDataSyncUtils::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
287 {
288     if (inMsg == nullptr) {
289         return false;
290     }
291     if (inMsg->GetMessageId() != CONTROL_SYNC_MESSAGE) {
292         return false;
293     }
294     const ControlRequestPacket *packet = inMsg->GetObject<ControlRequestPacket>();
295     if (packet == nullptr) {
296         return false;
297     }
298     uint32_t controlCmdType = packet->GetcontrolCmdType();
299     if (controlCmdType == ControlCmdType::SUBSCRIBE_QUERY_CMD && inMsg->GetMessageType() == TYPE_REQUEST) {
300         const SubscribeRequest *subPacket = inMsg->GetObject<SubscribeRequest>();
301         if (subPacket == nullptr) {
302             return false;
303         }
304         query = subPacket->GetQuery();
305         LOGI("[SingleVerDataSync] receive sub scribe query cmd,begin to trigger query auto sync");
306         return true;
307     }
308     return false;
309 }
310 
ControlAckErrorHandle(const SingleVerSyncTaskContext * context,const std::shared_ptr<SubscribeManager> & subManager)311 void SingleVerDataSyncUtils::ControlAckErrorHandle(const SingleVerSyncTaskContext *context,
312     const std::shared_ptr<SubscribeManager> &subManager)
313 {
314     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
315         // reserve before need clear
316         subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
317     }
318 }
319 
SetMessageHeadInfo(Message & message,uint16_t inMsgType,const std::string & inTarget,uint32_t inSequenceId,uint32_t inSessionId)320 void SingleVerDataSyncUtils::SetMessageHeadInfo(Message &message, uint16_t inMsgType, const std::string &inTarget,
321     uint32_t inSequenceId, uint32_t inSessionId)
322 {
323     message.SetMessageType(inMsgType);
324     message.SetTarget(inTarget);
325     message.SetSequenceId(inSequenceId);
326     message.SetSessionId(inSessionId);
327 }
328 
IsGetDataSuccessfully(int errCode)329 bool SingleVerDataSyncUtils::IsGetDataSuccessfully(int errCode)
330 {
331     return (errCode == E_OK || errCode == -E_UNFINISHED);
332 }
333 
GetMaxSendDataTime(const std::vector<SendDataItem> & inData)334 Timestamp SingleVerDataSyncUtils::GetMaxSendDataTime(const std::vector<SendDataItem> &inData)
335 {
336     Timestamp stamp = 0;
337     for (size_t i = 0; i < inData.size(); i++) {
338         if (inData[i] == nullptr) {
339             continue;
340         }
341         Timestamp tempStamp = inData[i]->GetTimestamp();
342         if (stamp < tempStamp) {
343             stamp = tempStamp;
344         }
345     }
346     return stamp;
347 }
348 
GetFullSyncDataTimeRange(const std::vector<SendDataItem> & inData,WaterMark localMark,UpdateWaterMark & isUpdate)349 SyncTimeRange SingleVerDataSyncUtils::GetFullSyncDataTimeRange(const std::vector<SendDataItem> &inData,
350     WaterMark localMark, UpdateWaterMark &isUpdate)
351 {
352     Timestamp maxTimestamp = localMark;
353     Timestamp minTimestamp = localMark;
354     for (size_t i = 0; i < inData.size(); i++) {
355         if (inData[i] == nullptr) {
356             continue;
357         }
358         Timestamp tempStamp = inData[i]->GetTimestamp();
359         if (maxTimestamp < tempStamp) {
360             maxTimestamp = tempStamp;
361         }
362         if (minTimestamp > tempStamp) {
363             minTimestamp = tempStamp;
364         }
365         isUpdate.normalUpdateMark = true;
366     }
367     return {minTimestamp, 0, maxTimestamp, 0};
368 }
369 
GetQuerySyncDataTimeRange(const std::vector<SendDataItem> & inData,WaterMark localMark,WaterMark deleteLocalMark,UpdateWaterMark & isUpdate)370 SyncTimeRange SingleVerDataSyncUtils::GetQuerySyncDataTimeRange(const std::vector<SendDataItem> &inData,
371     WaterMark localMark, WaterMark deleteLocalMark, UpdateWaterMark &isUpdate)
372 {
373     SyncTimeRange dataTimeRange = {localMark, deleteLocalMark, localMark, deleteLocalMark};
374     for (size_t i = 0; i < inData.size(); i++) {
375         if (inData[i] == nullptr) {
376             continue;
377         }
378         Timestamp tempStamp = inData[i]->GetTimestamp();
379         if ((inData[i]->GetFlag() & DataItem::DELETE_FLAG) == 0) { // query data
380             if (dataTimeRange.endTime < tempStamp) {
381                 dataTimeRange.endTime = tempStamp;
382             }
383             if (dataTimeRange.beginTime > tempStamp) {
384                 dataTimeRange.beginTime = tempStamp;
385             }
386             isUpdate.normalUpdateMark = true;
387         }
388         if ((inData[i]->GetFlag() & DataItem::DELETE_FLAG) != 0) { // delete data
389             if (dataTimeRange.deleteEndTime < tempStamp) {
390                 dataTimeRange.deleteEndTime = tempStamp;
391             }
392             if (dataTimeRange.deleteBeginTime > tempStamp) {
393                 dataTimeRange.deleteBeginTime = tempStamp;
394             }
395             isUpdate.deleteUpdateMark = true;
396         }
397     }
398     return dataTimeRange;
399 }
400 
ReviseLocalMark(SyncType syncType,const SyncTimeRange & dataTimeRange,UpdateWaterMark updateMark)401 SyncTimeRange SingleVerDataSyncUtils::ReviseLocalMark(SyncType syncType, const SyncTimeRange &dataTimeRange,
402     UpdateWaterMark updateMark)
403 {
404     SyncTimeRange tmpDataTime = dataTimeRange;
405     if (updateMark.deleteUpdateMark && syncType == SyncType::QUERY_SYNC_TYPE) {
406         tmpDataTime.deleteEndTime += 1;
407     }
408     if (updateMark.normalUpdateMark) {
409         tmpDataTime.endTime += 1;
410     }
411     return tmpDataTime;
412 }
413 
GetRecvDataTimeRange(SyncType syncType,const std::vector<SendDataItem> & data,UpdateWaterMark & isUpdate)414 SyncTimeRange SingleVerDataSyncUtils::GetRecvDataTimeRange(SyncType syncType,
415     const std::vector<SendDataItem> &data, UpdateWaterMark &isUpdate)
416 {
417     if (syncType != SyncType::QUERY_SYNC_TYPE) {
418         return SingleVerDataSyncUtils::GetFullSyncDataTimeRange(data, 0, isUpdate);
419     }
420     return SingleVerDataSyncUtils::GetQuerySyncDataTimeRange(data, 0, 0, isUpdate);
421 }
422 
GetSyncDataTimeRange(SyncType syncType,WaterMark localMark,WaterMark deleteMark,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)423 SyncTimeRange SingleVerDataSyncUtils::GetSyncDataTimeRange(SyncType syncType, WaterMark localMark, WaterMark deleteMark,
424     const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
425 {
426     if (syncType != SyncType::QUERY_SYNC_TYPE) {
427         return SingleVerDataSyncUtils::GetFullSyncDataTimeRange(inData, localMark, isUpdate);
428     }
429     return SingleVerDataSyncUtils::GetQuerySyncDataTimeRange(inData, localMark, deleteMark, isUpdate);
430 }
431 }