• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud/cloud_sync_utils.h"
16 
17 #include "cloud/asset_operation_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_storage_utils.h"
20 #include "db_common.h"
21 #include "db_dfx_adapter.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 
25 namespace DistributedDB {
GetCloudPkVals(const VBucket & datum,const std::vector<std::string> & pkColNames,int64_t dataKey,std::vector<Type> & cloudPkVals)26 int CloudSyncUtils::GetCloudPkVals(const VBucket &datum, const std::vector<std::string> &pkColNames, int64_t dataKey,
27     std::vector<Type> &cloudPkVals)
28 {
29     if (!cloudPkVals.empty()) {
30         LOGE("[CloudSyncer] Output parameter should be empty");
31         return -E_INVALID_ARGS;
32     }
33     for (const auto &pkColName : pkColNames) {
34         // If data is primary key or is a composite primary key, then use rowID as value
35         // The single primary key table, does not contain rowid.
36         if (pkColName == DBConstant::ROWID) {
37             cloudPkVals.emplace_back(dataKey);
38             continue;
39         }
40         Type type;
41         bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive(pkColName, datum, type);
42         if (!isExisted) {
43             LOGE("[CloudSyncer] Cloud data do not contain expected primary field value");
44             return -E_CLOUD_ERROR;
45         }
46         cloudPkVals.push_back(type);
47     }
48     return E_OK;
49 }
50 
OpTypeToChangeType(OpType strategy)51 ChangeType CloudSyncUtils::OpTypeToChangeType(OpType strategy)
52 {
53     switch (strategy) {
54         case OpType::INSERT:
55             return OP_INSERT;
56         case OpType::DELETE:
57             return OP_DELETE;
58         case OpType::UPDATE:
59             return OP_UPDATE;
60         default:
61             return OP_BUTT;
62     }
63 }
64 
IsSinglePrimaryKey(const std::vector<std::string> & pkColNames)65 bool CloudSyncUtils::IsSinglePrimaryKey(const std::vector<std::string> &pkColNames)
66 {
67     return pkColNames.size() == 1 && pkColNames[0] != DBConstant::ROWID;
68 }
69 
RemoveDataExceptExtendInfo(VBucket & datum,const std::vector<std::string> & pkColNames)70 void CloudSyncUtils::RemoveDataExceptExtendInfo(VBucket &datum, const std::vector<std::string> &pkColNames)
71 {
72     for (auto item = datum.begin(); item != datum.end();) {
73         const auto &key = item->first;
74         if (key != CloudDbConstant::GID_FIELD &&
75             key != CloudDbConstant::CREATE_FIELD &&
76             key != CloudDbConstant::MODIFY_FIELD &&
77             key != CloudDbConstant::DELETE_FIELD &&
78             key != CloudDbConstant::CURSOR_FIELD &&
79             key != CloudDbConstant::VERSION_FIELD &&
80             key != CloudDbConstant::SHARING_RESOURCE_FIELD &&
81             (std::find(pkColNames.begin(), pkColNames.end(), key) == pkColNames.end())) {
82                 item = datum.erase(item);
83             } else {
84                 item++;
85             }
86     }
87 }
88 
StatusToFlag(AssetStatus status)89 AssetOpType CloudSyncUtils::StatusToFlag(AssetStatus status)
90 {
91     auto tmpStatus = static_cast<AssetStatus>(AssetOperationUtils::EraseBitMask(static_cast<uint32_t>(status)));
92     switch (tmpStatus) {
93         case AssetStatus::INSERT:
94             return AssetOpType::INSERT;
95         case AssetStatus::DELETE:
96             return AssetOpType::DELETE;
97         case AssetStatus::UPDATE:
98             return AssetOpType::UPDATE;
99         case AssetStatus::NORMAL:
100             return AssetOpType::NO_CHANGE;
101         default:
102             LOGW("[CloudSyncer] Unexpected Situation and won't be handled"
103                 ", Caller should ensure that current situation won't occur");
104             return AssetOpType::NO_CHANGE;
105     }
106 }
107 
StatusToFlagForAsset(Asset & asset)108 void CloudSyncUtils::StatusToFlagForAsset(Asset &asset)
109 {
110     asset.flag = static_cast<uint32_t>(StatusToFlag(static_cast<AssetStatus>(asset.status)));
111     asset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
112 }
113 
StatusToFlagForAssets(Assets & assets)114 void CloudSyncUtils::StatusToFlagForAssets(Assets &assets)
115 {
116     for (Asset &asset : assets) {
117         StatusToFlagForAsset(asset);
118     }
119 }
120 
StatusToFlagForAssetsInRecord(const std::vector<Field> & fields,VBucket & record)121 void CloudSyncUtils::StatusToFlagForAssetsInRecord(const std::vector<Field> &fields, VBucket &record)
122 {
123     for (const Field &field : fields) {
124         if (field.type == TYPE_INDEX<Assets> && record[field.colName].index() == TYPE_INDEX<Assets>) {
125             StatusToFlagForAssets(std::get<Assets>(record[field.colName]));
126         } else if (field.type == TYPE_INDEX<Asset> && record[field.colName].index() == TYPE_INDEX<Asset>) {
127             StatusToFlagForAsset(std::get<Asset>(record[field.colName]));
128         }
129     }
130 }
131 
IsChangeDataEmpty(const ChangedData & changedData)132 bool CloudSyncUtils::IsChangeDataEmpty(const ChangedData &changedData)
133 {
134     return changedData.primaryData[ChangeType::OP_INSERT].empty() ||
135            changedData.primaryData[ChangeType::OP_UPDATE].empty() ||
136            changedData.primaryData[ChangeType::OP_DELETE].empty();
137 }
138 
EqualInMsLevel(const Timestamp cmp,const Timestamp beCmp)139 bool CloudSyncUtils::EqualInMsLevel(const Timestamp cmp, const Timestamp beCmp)
140 {
141     return (cmp / CloudDbConstant::TEN_THOUSAND) == (beCmp / CloudDbConstant::TEN_THOUSAND);
142 }
143 
NeedSaveData(const LogInfo & localLogInfo,const LogInfo & cloudLogInfo)144 bool CloudSyncUtils::NeedSaveData(const LogInfo &localLogInfo, const LogInfo &cloudLogInfo)
145 {
146     // If timeStamp, write timestamp, cloudGid are all the same,
147     // We thought that the datum is mostly be the same between cloud and local
148     // However, there are still slightly possibility that it may be created from different device,
149     // So, during the strategy policy [i.e. TagSyncDataStatus], the datum was tagged as UPDATE
150     // But we won't notify the datum
151     bool isSame = localLogInfo.timestamp == cloudLogInfo.timestamp &&
152         EqualInMsLevel(localLogInfo.wTimestamp, cloudLogInfo.wTimestamp) &&
153         localLogInfo.cloudGid == cloudLogInfo.cloudGid &&
154         localLogInfo.sharingResource == cloudLogInfo.sharingResource &&
155         localLogInfo.version == cloudLogInfo.version &&
156         (localLogInfo.flag & static_cast<uint64_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC)) == 0 &&
157         !localLogInfo.isNeedUpdateAsset;
158     return !isSame;
159 }
160 
CheckParamValid(const std::vector<DeviceID> & devices,SyncMode mode)161 int CloudSyncUtils::CheckParamValid(const std::vector<DeviceID> &devices, SyncMode mode)
162 {
163     if (devices.size() != 1) {
164         LOGE("[CloudSyncer] invalid devices size %zu", devices.size());
165         return -E_INVALID_ARGS;
166     }
167     for (const auto &dev: devices) {
168         if (dev.empty() || dev.size() > DBConstant::MAX_DEV_LENGTH) {
169             LOGE("[CloudSyncer] invalid device, size %zu", dev.size());
170             return -E_INVALID_ARGS;
171         }
172     }
173     if (mode >= SyncMode::SYNC_MODE_PUSH_ONLY && mode < SyncMode::SYNC_MODE_CLOUD_MERGE) {
174         LOGE("[CloudSyncer] not support mode %d", static_cast<int>(mode));
175         return -E_NOT_SUPPORT;
176     }
177     if (mode < SyncMode::SYNC_MODE_PUSH_ONLY || mode > SyncMode::SYNC_MODE_CLOUD_FORCE_PULL) {
178         LOGE("[CloudSyncer] invalid mode %d", static_cast<int>(mode));
179         return -E_INVALID_ARGS;
180     }
181     return E_OK;
182 }
183 
GetCloudLogInfo(DistributedDB::VBucket & datum)184 LogInfo CloudSyncUtils::GetCloudLogInfo(DistributedDB::VBucket &datum)
185 {
186     LogInfo cloudLogInfo;
187     cloudLogInfo.dataKey = 0;
188     cloudLogInfo.timestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::MODIFY_FIELD]);
189     cloudLogInfo.wTimestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::CREATE_FIELD]);
190     cloudLogInfo.flag = (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) ? 1u : 0u;
191     cloudLogInfo.cloudGid = std::get<std::string>(datum[CloudDbConstant::GID_FIELD]);
192     CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, datum, cloudLogInfo.device);
193     (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
194         datum, cloudLogInfo.sharingResource);
195     (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
196         datum, cloudLogInfo.version);
197     return cloudLogInfo;
198 }
199 
SaveChangedDataByType(const VBucket & datum,ChangedData & changedData,const DataInfoWithLog & localInfo,ChangeType type)200 int CloudSyncUtils::SaveChangedDataByType(const VBucket &datum, ChangedData &changedData,
201     const DataInfoWithLog &localInfo, ChangeType type)
202 {
203     int ret = E_OK;
204     std::vector<Type> cloudPkVals;
205     if (type == ChangeType::OP_DELETE) {
206         ret = CloudSyncUtils::GetCloudPkVals(localInfo.primaryKeys, changedData.field, localInfo.logInfo.dataKey,
207             cloudPkVals);
208     } else {
209         ret = CloudSyncUtils::GetCloudPkVals(datum, changedData.field, localInfo.logInfo.dataKey, cloudPkVals);
210     }
211     if (ret != E_OK) {
212         return ret;
213     }
214     InsertOrReplaceChangedDataByType(type, cloudPkVals, changedData);
215     return E_OK;
216 }
217 
CheckCloudSyncDataValid(const CloudSyncData & uploadData,const std::string & tableName,int64_t count)218 int CloudSyncUtils::CheckCloudSyncDataValid(const CloudSyncData &uploadData, const std::string &tableName,
219     int64_t count)
220 {
221     size_t insRecordLen = uploadData.insData.record.size();
222     size_t insExtendLen = uploadData.insData.extend.size();
223     size_t updRecordLen = uploadData.updData.record.size();
224     size_t updExtendLen = uploadData.updData.extend.size();
225     size_t delRecordLen = uploadData.delData.record.size();
226     size_t delExtendLen = uploadData.delData.extend.size();
227 
228     bool syncDataValid = (uploadData.tableName == tableName) &&
229         ((insRecordLen > 0 && insExtendLen > 0 && insRecordLen == insExtendLen) ||
230         (updRecordLen > 0 && updExtendLen > 0 && updRecordLen == updExtendLen) ||
231         (delRecordLen > 0 && delExtendLen > 0 && delRecordLen == delExtendLen) ||
232         (uploadData.lockData.extend.size() > 0));
233     if (!syncDataValid) {
234         LOGE("[CloudSyncUtils] upload data is empty but upload count is not zero or upload table name"
235             " is not the same as table name of sync data.");
236         return -E_INTERNAL_ERROR;
237     }
238     int64_t syncDataCount = static_cast<int64_t>(insRecordLen) + static_cast<int64_t>(updRecordLen) +
239         static_cast<int64_t>(delRecordLen);
240     if (syncDataCount > count) {
241         LOGW("[CloudSyncUtils] Size of a batch of sync data is greater than upload data size. insRecordLen:%zu, "
242             "updRecordLen:%zu, delRecordLen:%zu, count %d", insRecordLen, updRecordLen, delRecordLen, count);
243     }
244     return E_OK;
245 }
246 
ClearCloudSyncData(CloudSyncData & uploadData)247 void CloudSyncUtils::ClearCloudSyncData(CloudSyncData &uploadData)
248 {
249     std::vector<VBucket>().swap(uploadData.insData.record);
250     std::vector<VBucket>().swap(uploadData.insData.extend);
251     std::vector<int64_t>().swap(uploadData.insData.rowid);
252     std::vector<VBucket>().swap(uploadData.updData.record);
253     std::vector<VBucket>().swap(uploadData.updData.extend);
254     std::vector<VBucket>().swap(uploadData.delData.record);
255     std::vector<VBucket>().swap(uploadData.delData.extend);
256 }
257 
GetWaterMarkAndUpdateTime(std::vector<VBucket> & extend,Timestamp & waterMark)258 int CloudSyncUtils::GetWaterMarkAndUpdateTime(std::vector<VBucket> &extend, Timestamp &waterMark)
259 {
260     for (auto &extendData: extend) {
261         if (extendData.empty() || extendData.find(CloudDbConstant::MODIFY_FIELD) == extendData.end()) {
262             LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist.");
263             return -E_INTERNAL_ERROR;
264         }
265         if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::MODIFY_FIELD).index()) {
266             LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doesn't fit int64_t.");
267             return -E_INTERNAL_ERROR;
268         }
269         if (extendData.empty() || extendData.find(CloudDbConstant::CREATE_FIELD) == extendData.end()) {
270             LOGE("[CloudSyncer] VBucket is empty or CREATE_FIELD doesn't exist.");
271             return -E_INTERNAL_ERROR;
272         }
273         if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::CREATE_FIELD).index()) {
274             LOGE("[CloudSyncer] VBucket's CREATE_FIELD doesn't fit int64_t.");
275             return -E_INTERNAL_ERROR;
276         }
277         waterMark = std::max(int64_t(waterMark), std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)));
278         int64_t modifyTime =
279             std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)) / CloudDbConstant::TEN_THOUSAND;
280         int64_t createTime =
281             std::get<int64_t>(extendData.at(CloudDbConstant::CREATE_FIELD)) / CloudDbConstant::TEN_THOUSAND;
282         extendData.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
283         extendData.insert_or_assign(CloudDbConstant::CREATE_FIELD, createTime);
284     }
285     return E_OK;
286 }
287 
CheckCloudSyncDataEmpty(const CloudSyncData & uploadData)288 bool CloudSyncUtils::CheckCloudSyncDataEmpty(const CloudSyncData &uploadData)
289 {
290     return uploadData.insData.extend.empty() && uploadData.insData.record.empty() &&
291         uploadData.updData.extend.empty() && uploadData.updData.record.empty() &&
292         uploadData.delData.extend.empty() && uploadData.delData.record.empty() &&
293         uploadData.lockData.rowid.empty();
294 }
295 
ModifyCloudDataTime(DistributedDB::VBucket & data)296 void CloudSyncUtils::ModifyCloudDataTime(DistributedDB::VBucket &data)
297 {
298     // data already check field modify_field and create_field
299     int64_t modifyTime = std::get<int64_t>(data[CloudDbConstant::MODIFY_FIELD]) * CloudDbConstant::TEN_THOUSAND;
300     int64_t createTime = std::get<int64_t>(data[CloudDbConstant::CREATE_FIELD]) * CloudDbConstant::TEN_THOUSAND;
301     data[CloudDbConstant::MODIFY_FIELD] = modifyTime;
302     data[CloudDbConstant::CREATE_FIELD] = createTime;
303 }
304 
305 // After doing a batch upload, we need to use CloudSyncData's maximum timestamp to update the water mark;
UpdateExtendTime(CloudSyncData & uploadData,const int64_t & count,uint64_t taskId,Timestamp & waterMark)306 int CloudSyncUtils::UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count, uint64_t taskId,
307     Timestamp &waterMark)
308 {
309     int ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, uploadData.tableName, count);
310     if (ret != E_OK) {
311         LOGE("[CloudSyncer] Invalid Sync Data when get local water mark.");
312         return ret;
313     }
314     if (!uploadData.insData.extend.empty()) {
315         if (uploadData.insData.record.size() != uploadData.insData.extend.size()) {
316             LOGE("[CloudSyncer] Inconsistent size of inserted data.");
317             return -E_INTERNAL_ERROR;
318         }
319         ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.insData.extend, waterMark);
320         if (ret != E_OK) {
321             return ret;
322         }
323     }
324 
325     if (!uploadData.updData.extend.empty()) {
326         if (uploadData.updData.record.size() != uploadData.updData.extend.size()) {
327             LOGE("[CloudSyncer] Inconsistent size of updated data, %d.", -E_INTERNAL_ERROR);
328             return -E_INTERNAL_ERROR;
329         }
330         ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.updData.extend, waterMark);
331         if (ret != E_OK) {
332             return ret;
333         }
334     }
335 
336     if (!uploadData.delData.extend.empty()) {
337         if (uploadData.delData.record.size() != uploadData.delData.extend.size()) {
338             LOGE("[CloudSyncer] Inconsistent size of deleted data, %d.", -E_INTERNAL_ERROR);
339             return -E_INTERNAL_ERROR;
340         }
341         ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.delData.extend, waterMark);
342         if (ret != E_OK) {
343             return ret;
344         }
345     }
346     return E_OK;
347 }
348 
UpdateLocalCache(OpType opType,const LogInfo & cloudInfo,const LogInfo & localInfo,std::map<std::string,LogInfo> & localLogInfoCache)349 void CloudSyncUtils::UpdateLocalCache(OpType opType, const LogInfo &cloudInfo, const LogInfo &localInfo,
350     std::map<std::string, LogInfo> &localLogInfoCache)
351 {
352     LogInfo updateLogInfo;
353     std::string hashKey(localInfo.hashKey.begin(), localInfo.hashKey.end());
354     bool updateCache = true;
355     switch (opType) {
356         case OpType::INSERT :
357         case OpType::UPDATE :
358         case OpType::DELETE: {
359             updateLogInfo = cloudInfo;
360             updateLogInfo.device = CloudDbConstant::DEFAULT_CLOUD_DEV;
361             updateLogInfo.hashKey = localInfo.hashKey;
362             if (opType == OpType::DELETE) {
363                 updateLogInfo.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
364             } else if (opType == OpType::INSERT) {
365                 updateLogInfo.originDev = CloudDbConstant::DEFAULT_CLOUD_DEV;
366             }
367             break;
368         }
369         case OpType::CLEAR_GID:
370         case OpType::UPDATE_TIMESTAMP: {
371             updateLogInfo = localInfo;
372             updateLogInfo.cloudGid.clear();
373             updateLogInfo.sharingResource.clear();
374             break;
375         }
376         default:
377             updateCache = false;
378             break;
379     }
380     if (updateCache) {
381         localLogInfoCache[hashKey] = updateLogInfo;
382     }
383 }
384 
SaveChangedData(ICloudSyncer::SyncParam & param,size_t dataIndex,const ICloudSyncer::DataInfo & dataInfo,std::vector<std::pair<Key,size_t>> & deletedList)385 int CloudSyncUtils::SaveChangedData(ICloudSyncer::SyncParam &param, size_t dataIndex,
386     const ICloudSyncer::DataInfo &dataInfo, std::vector<std::pair<Key, size_t>> &deletedList)
387 {
388     OpType opType = CalOpType(param, dataIndex);
389     Key hashKey = dataInfo.localInfo.logInfo.hashKey;
390     if (param.deletePrimaryKeySet.find(hashKey) != param.deletePrimaryKeySet.end()) {
391         if (opType == OpType::INSERT) {
392             (void)param.dupHashKeySet.insert(hashKey);
393             opType = OpType::UPDATE;
394             // only composite primary key needs to be processed.
395             if (!param.isSinglePrimaryKey) {
396                 param.withoutRowIdData.updateData.emplace_back(dataIndex,
397                     param.changedData.primaryData[ChangeType::OP_UPDATE].size());
398             }
399         }
400     }
401     // INSERT: for no primary key or composite primary key situation
402     if (!param.isSinglePrimaryKey && opType == OpType::INSERT) {
403         param.info.downLoadInfo.insertCount++;
404         param.withoutRowIdData.insertData.push_back(dataIndex);
405         return E_OK;
406     }
407     switch (opType) {
408         // INSERT: only for single primary key situation
409         case OpType::INSERT:
410             param.info.downLoadInfo.insertCount++;
411             param.info.retryInfo.downloadBatchOpCount++;
412             return CloudSyncUtils::SaveChangedDataByType(
413                 param.downloadData.data[dataIndex], param.changedData, dataInfo.localInfo, ChangeType::OP_INSERT);
414         case OpType::UPDATE:
415             param.info.downLoadInfo.updateCount++;
416             param.info.retryInfo.downloadBatchOpCount++;
417             if (CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
418                 return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
419                     dataInfo.localInfo, ChangeType::OP_UPDATE);
420             }
421             return E_OK;
422         case OpType::DELETE:
423             param.info.downLoadInfo.deleteCount++;
424             param.info.retryInfo.downloadBatchOpCount++;
425             return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
426                 dataInfo.localInfo, ChangeType::OP_DELETE);
427         case OpType::UPDATE_TIMESTAMP:
428             param.info.retryInfo.downloadBatchOpCount++;
429             return E_OK;
430         default:
431             return E_OK;
432     }
433 }
434 
ClearWithoutData(ICloudSyncer::SyncParam & param)435 void CloudSyncUtils::ClearWithoutData(ICloudSyncer::SyncParam &param)
436 {
437     param.withoutRowIdData.insertData.clear();
438     param.withoutRowIdData.updateData.clear();
439     param.withoutRowIdData.assetInsertData.clear();
440 }
441 
IsSkipAssetsMissingRecord(const std::vector<VBucket> & extend)442 bool CloudSyncUtils::IsSkipAssetsMissingRecord(const std::vector<VBucket> &extend)
443 {
444     if (extend.empty()) {
445         return false;
446     }
447     for (size_t i = 0; i < extend.size(); ++i) {
448         if (DBCommon::IsIntTypeRecordError(extend[i]) && !DBCommon::IsRecordAssetsMissing(extend[i])) {
449             return false;
450         }
451     }
452     return true;
453 }
454 
FillAssetIdToAssets(CloudSyncBatch & data,int errorCode,const CloudWaterType & type)455 int CloudSyncUtils::FillAssetIdToAssets(CloudSyncBatch &data, int errorCode, const CloudWaterType &type)
456 {
457     if (data.extend.size() != data.assets.size()) {
458         LOGE("[CloudSyncUtils] size not match, extend:%zu assets:%zu.", data.extend.size(), data.assets.size());
459         return -E_CLOUD_ERROR;
460     }
461     int errCode = E_OK;
462     for (size_t i = 0; i < data.assets.size(); i++) {
463         if (data.assets[i].empty() || DBCommon::IsRecordIgnored(data.extend[i]) ||
464             (errorCode != E_OK &&
465                 (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordAssetsMissing(data.extend[i]))) ||
466             DBCommon::IsNeedCompensatedForUpload(data.extend[i], type)) {
467             if (errCode != E_OK && DBCommon::IsRecordAssetsMissing(data.extend[i])) {
468                 LOGI("[CloudSyncUtils][FileAssetIdToAssets] errCode with assets missing, skip fill assets id");
469             }
470             continue;
471         }
472         for (auto it = data.assets[i].begin(); it != data.assets[i].end();) {
473             auto &[col, value] = *it;
474             if (!CheckIfContainsInsertAssets(value)) {
475                 ++it;
476                 continue;
477             }
478             auto extendIt = data.extend[i].find(col);
479             if (extendIt == data.extend[i].end()) {
480                 LOGI("[CloudSyncUtils] Asset field name can not find in extend.");
481                 it = data.assets[i].erase(it);
482                 continue;
483             }
484             if (extendIt->second.index() != value.index()) {
485                 LOGE("[CloudSyncUtils] Asset field type not same. extend:%zu, data:%zu",
486                     extendIt->second.index(), value.index());
487                 errCode = -E_CLOUD_ERROR;
488                 ++it;
489                 continue;
490             }
491             int ret = FillAssetIdToAssetData(extendIt->second, value);
492             if (ret != E_OK) {
493                 LOGE("[CloudSyncUtils] fail to fill assetId, %d.", ret);
494                 errCode = -E_CLOUD_ERROR;
495             }
496             ++it;
497         }
498     }
499     return errCode;
500 }
501 
FillAssetIdToAssetData(const Type & extend,Type & assetData)502 int CloudSyncUtils::FillAssetIdToAssetData(const Type &extend, Type &assetData)
503 {
504     if (extend.index() == TYPE_INDEX<Asset>) {
505         if (std::get<Asset>(assetData).name != std::get<Asset>(extend).name) {
506             LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset name can not find in extend.");
507             return -E_CLOUD_ERROR;
508         }
509         if (std::get<Asset>(extend).assetId.empty()) {
510             LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset id is empty.");
511             return -E_CLOUD_ERROR;
512         }
513         std::get<Asset>(assetData).assetId = std::get<Asset>(extend).assetId;
514     }
515     if (extend.index() == TYPE_INDEX<Assets>) {
516         FillAssetIdToAssetsData(std::get<Assets>(extend), std::get<Assets>(assetData));
517     }
518     return E_OK;
519 }
520 
FillAssetIdToAssetsData(const Assets & extend,Assets & assets)521 void CloudSyncUtils::FillAssetIdToAssetsData(const Assets &extend, Assets &assets)
522 {
523     for (auto it = assets.begin(); it != assets.end();) {
524         auto &asset = *it;
525         if (asset.flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
526             ++it;
527             continue;
528         }
529         auto extendAssets = extend;
530         bool isAssetExisted = false;
531         for (const auto &extendAsset : extendAssets) {
532             if (asset.name == extendAsset.name && !extendAsset.assetId.empty()) {
533                 asset.assetId = extendAsset.assetId;
534                 isAssetExisted = true;
535                 break;
536             }
537         }
538         if (!isAssetExisted) {
539             LOGI("Unable to sync local asset, skip fill assetId.");
540             it = assets.erase(it);
541         } else {
542             ++it;
543         }
544     }
545 }
546 
CheckIfContainsInsertAssets(const Type & assetData)547 bool CloudSyncUtils::CheckIfContainsInsertAssets(const Type &assetData)
548 {
549     if (assetData.index() == TYPE_INDEX<Asset>) {
550         if (std::get<Asset>(assetData).flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
551             return false;
552         }
553     } else if (assetData.index() == TYPE_INDEX<Assets>) {
554         bool hasInsertAsset = false;
555         for (const auto &asset : std::get<Assets>(assetData)) {
556             if (asset.flag == static_cast<uint32_t>(AssetOpType::INSERT)) {
557                 hasInsertAsset = true;
558                 break;
559             }
560         }
561         if (!hasInsertAsset) {
562             return false;
563         }
564     }
565     return true;
566 }
567 
UpdateAssetsFlag(CloudSyncData & uploadData)568 void CloudSyncUtils::UpdateAssetsFlag(CloudSyncData &uploadData)
569 {
570     AssetOperationUtils::UpdateAssetsFlag(uploadData.insData.record, uploadData.insData.assets);
571     AssetOperationUtils::UpdateAssetsFlag(uploadData.updData.record, uploadData.updData.assets);
572     AssetOperationUtils::UpdateAssetsFlag(uploadData.delData.record, uploadData.delData.assets);
573 }
574 
InsertOrReplaceChangedDataByType(ChangeType type,std::vector<Type> & pkVal,ChangedData & changedData)575 void CloudSyncUtils::InsertOrReplaceChangedDataByType(ChangeType type, std::vector<Type> &pkVal,
576     ChangedData &changedData)
577 {
578     // erase old changedData if exist
579     for (auto &changePkValList : changedData.primaryData) {
580         changePkValList.erase(std::remove_if(changePkValList.begin(), changePkValList.end(),
581             [&pkVal](const std::vector<Type> &existPkVal) {
582             return existPkVal == pkVal;
583             }), changePkValList.end());
584     }
585     // insert new changeData
586     changedData.primaryData[type].emplace_back(std::move(pkVal));
587 }
588 
CalOpType(ICloudSyncer::SyncParam & param,size_t dataIndex)589 OpType CloudSyncUtils::CalOpType(ICloudSyncer::SyncParam &param, size_t dataIndex)
590 {
591     OpType opType = param.downloadData.opType[dataIndex];
592     if (opType != OpType::INSERT && opType != OpType::UPDATE) {
593         return opType;
594     }
595 
596     std::vector<Type> cloudPkVal;
597     // use dataIndex as dataKey avoid get same pk with no pk schema
598     int errCode = CloudSyncUtils::GetCloudPkVals(param.downloadData.data[dataIndex], param.changedData.field, dataIndex,
599         cloudPkVal);
600     if (errCode != E_OK) {
601         LOGW("[CloudSyncUtils] Get pk from download data failed %d", errCode);
602         // use origin opType
603         return opType;
604     }
605     auto iter = std::find_if(param.insertPk.begin(), param.insertPk.end(), [&cloudPkVal](const auto &item) {
606         return item == cloudPkVal;
607     });
608     if (opType == OpType::INSERT) {
609         // record all insert pk in one batch
610         if (iter == param.insertPk.end()) {
611             param.insertPk.push_back(cloudPkVal);
612         }
613         return OpType::INSERT;
614     }
615     // notify with insert because this data not exist in local before query
616     return (iter == param.insertPk.end()) ? OpType::UPDATE : OpType::INSERT;
617 }
618 
InitCompensatedSyncTaskInfo()619 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo()
620 {
621     CloudSyncer::CloudTaskInfo taskInfo;
622     taskInfo.priorityTask = true;
623     taskInfo.priorityLevel = CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL;
624     taskInfo.timeout = CloudDbConstant::CLOUD_DEFAULT_TIMEOUT;
625     taskInfo.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
626     taskInfo.callback = nullptr;
627     taskInfo.compensatedTask = true;
628     return taskInfo;
629 }
630 
InitCompensatedSyncTaskInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess)631 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncOption &option,
632     const SyncProcessCallback &onProcess)
633 {
634     CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
635     taskInfo.callback = onProcess;
636     taskInfo.devices = option.devices;
637     taskInfo.prepareTraceId = option.prepareTraceId;
638     if (option.users.empty()) {
639         taskInfo.users.push_back("");
640     } else {
641         taskInfo.users = option.users;
642     }
643     taskInfo.lockAction = option.lockAction;
644     return taskInfo;
645 }
646 
InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo & oriTaskInfo)647 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo &oriTaskInfo)
648 {
649     CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
650     taskInfo.lockAction = oriTaskInfo.lockAction;
651     taskInfo.users = oriTaskInfo.users;
652     taskInfo.devices = oriTaskInfo.devices;
653     taskInfo.storeId = oriTaskInfo.storeId;
654     taskInfo.prepareTraceId = oriTaskInfo.prepareTraceId;
655     return taskInfo;
656 }
657 
CheckQueryCloudData(std::string & traceId,DownloadData & downloadData,std::vector<std::string> & pkColNames)658 void CloudSyncUtils::CheckQueryCloudData(std::string &traceId, DownloadData &downloadData,
659     std::vector<std::string> &pkColNames)
660 {
661     for (auto &data : downloadData.data) {
662         bool isVersionExist = data.count(CloudDbConstant::VERSION_FIELD) != 0;
663         bool isContainAllPk = true;
664         for (auto &pkColName : pkColNames) {
665             if (data.count(pkColName) == 0) {
666                 isContainAllPk = false;
667                 break;
668             }
669         }
670         std::string gid;
671         (void)CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, data, gid);
672         bool isDelete = true;
673         (void)CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::DELETE_FIELD, data, isDelete);
674         if (!isDelete && (!isVersionExist || !isContainAllPk)) {
675             LOGE("[CloudSyncer] Invalid data from cloud, no version[%d], lost primary key[%d], gid[%s], traceId[%s]",
676                 static_cast<int>(!isVersionExist), static_cast<int>(!isContainAllPk), gid.c_str(), traceId.c_str());
677         }
678     }
679 }
680 
IsNeedUpdateAsset(const VBucket & data)681 bool CloudSyncUtils::IsNeedUpdateAsset(const VBucket &data)
682 {
683     for (const auto &item : data) {
684         const Asset *asset = std::get_if<TYPE_INDEX<Asset>>(&item.second);
685         if (asset != nullptr) {
686             uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(asset->status);
687             if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
688                 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
689                 return true;
690             }
691             continue;
692         }
693         const Assets *assets = std::get_if<TYPE_INDEX<Assets>>(&item.second);
694         if (assets == nullptr) {
695             continue;
696         }
697         for (const auto &oneAsset : *assets) {
698             uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(oneAsset.status);
699             if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
700                 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
701                 return true;
702             }
703         }
704     }
705     return false;
706 }
707 
GetDownloadListByGid(const std::shared_ptr<StorageProxy> & proxy,const std::vector<std::string> & data,const std::string & table)708 std::tuple<int, DownloadList, ChangedData> CloudSyncUtils::GetDownloadListByGid(
709     const std::shared_ptr<StorageProxy> &proxy, const std::vector<std::string> &data, const std::string &table)
710 {
711     std::tuple<int, DownloadList, ChangedData> res;
712     std::vector<std::string> pkColNames;
713     std::vector<Field> assetFields;
714     auto &[errCode, downloadList, changeData] = res;
715     errCode = proxy->GetPrimaryColNamesWithAssetsFields(table, pkColNames, assetFields);
716     if (errCode != E_OK) {
717         LOGE("[CloudSyncUtils] Get %s pk names by failed %d", DBCommon::StringMiddleMasking(table).c_str(), errCode);
718         return res;
719     }
720     changeData.tableName = table;
721     changeData.type = ChangedDataType::ASSET;
722     changeData.field = pkColNames;
723     for (const auto &gid : data) {
724         VBucket assetInfo;
725         VBucket record;
726         record[CloudDbConstant::GID_FIELD] = gid;
727         DataInfoWithLog dataInfo;
728         errCode = proxy->GetInfoByPrimaryKeyOrGid(table, record, false, dataInfo, assetInfo);
729         if (errCode != E_OK) {
730             LOGE("[CloudSyncUtils] Get download list by gid failed %s %d", gid.c_str(), errCode);
731             break;
732         }
733         Type prefix;
734         std::vector<Type> pkVal;
735         OpType strategy;
736         if ((dataInfo.logInfo.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_CLOUD_UPDATE_LOCAL)) ==
737             static_cast<uint32_t>(LogInfoFlag::FLAG_CLOUD_UPDATE_LOCAL)) {
738             strategy = OpType::UPDATE;
739         } else if ((dataInfo.logInfo.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) ==
740                    static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) {
741             strategy = OpType::DELETE;
742         } else {
743             strategy = OpType::INSERT;
744         }
745         errCode = CloudSyncUtils::GetCloudPkVals(dataInfo.primaryKeys, pkColNames, dataInfo.logInfo.dataKey, pkVal);
746         if (errCode != E_OK) {
747             LOGE("[CloudSyncUtils] HandleTagAssets cannot get primary key value list. %d", errCode);
748             break;
749         }
750         if (IsSinglePrimaryKey(pkColNames) && !pkVal.empty()) {
751             prefix = pkVal[0];
752         }
753         auto assetsMap = AssetOperationUtils::FilterNeedDownloadAsset(assetInfo);
754         downloadList.push_back(
755             std::make_tuple(dataInfo.logInfo.cloudGid, prefix, strategy, assetsMap, dataInfo.logInfo.hashKey,
756                 pkVal, dataInfo.logInfo.timestamp));
757     }
758     return res;
759 }
760 
UpdateMaxTimeWithDownloadList(const DownloadList & downloadList,const std::string & table,std::map<std::string,int64_t> & downloadBeginTime)761 void CloudSyncUtils::UpdateMaxTimeWithDownloadList(const DownloadList &downloadList, const std::string &table,
762     std::map<std::string, int64_t> &downloadBeginTime)
763 {
764     auto origin = downloadBeginTime[table];
765     for (const auto &item : downloadList) {
766         auto timestamp = std::get<CloudSyncUtils::TIMESTAMP_INDEX>(item);
767         downloadBeginTime[table] = std::max(static_cast<int64_t>(timestamp), downloadBeginTime[table]);
768     }
769     if (downloadBeginTime[table] == origin) {
770         downloadBeginTime[table]++;
771     }
772 }
773 
IsContainDownloading(const DownloadAssetUnit & downloadAssetUnit)774 bool CloudSyncUtils::IsContainDownloading(const DownloadAssetUnit &downloadAssetUnit)
775 {
776     auto &assets = std::get<CloudSyncUtils::ASSETS_INDEX>(downloadAssetUnit);
777     for (const auto &item : assets) {
778         for (const auto &asset : item.second) {
779             if ((AssetOperationUtils::EraseBitMask(asset.status) & static_cast<uint32_t>(AssetStatus::DOWNLOADING))
780                 != 0) {
781                 return true;
782             }
783         }
784     }
785     return false;
786 }
787 
GetDownloadAssetsOnlyMapFromDownLoadData(size_t idx,ICloudSyncer::SyncParam & param,std::map<std::string,Assets> & downloadAssetsMap)788 int CloudSyncUtils::GetDownloadAssetsOnlyMapFromDownLoadData(
789     size_t idx, ICloudSyncer::SyncParam &param, std::map<std::string, Assets> &downloadAssetsMap)
790 {
791     std::string gid;
792     int errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(
793         CloudDbConstant::GID_FIELD, param.downloadData.data[idx], gid);
794     if (errCode != E_OK) {
795         LOGE("Get gid from bucket fail when get download assets only map from download data, error code %d", errCode);
796         return errCode;
797     }
798 
799     auto assetsMap = param.gidAssetsMap[gid];
800     for (auto &item : param.downloadData.data[idx]) {
801         auto findAssetList = assetsMap.find(item.first);
802         if (findAssetList == assetsMap.end()) {
803             continue;
804         }
805         Asset *asset = std::get_if<Asset>(&item.second);
806         if (asset != nullptr) {
807             auto matchName = std::find_if(findAssetList->second.begin(),
808                 findAssetList->second.end(),
809                 [&asset](const std::string &a) { return a == asset->name; });
810             if (matchName != findAssetList->second.end()) {
811                 Asset tmpAsset = *asset;
812                 tmpAsset.status = static_cast<uint32_t>(AssetStatus::UPDATE);
813                 tmpAsset.flag = static_cast<uint32_t>(AssetOpType::UPDATE);
814                 downloadAssetsMap[item.first].push_back(tmpAsset);
815             }
816             continue;
817         }
818         Assets *assets = std::get_if<Assets>(&item.second);
819         if (assets == nullptr) {
820             continue;
821         }
822         for (const auto &assetItem : (*assets)) {
823             auto matchName = std::find_if(findAssetList->second.begin(),
824                 findAssetList->second.end(),
825                 [&assetItem](const std::string &a) { return a == assetItem.name; });
826             if (matchName != findAssetList->second.end()) {
827                 Asset tmpAsset = assetItem;
828                 tmpAsset.status = static_cast<uint32_t>(AssetStatus::UPDATE);
829                 tmpAsset.flag = static_cast<uint32_t>(AssetOpType::UPDATE);
830                 downloadAssetsMap[item.first].push_back(tmpAsset);
831             }
832         }
833     }
834     return E_OK;
835 }
836 
NotifyChangeData(const std::string & dev,const std::shared_ptr<StorageProxy> & proxy,ChangedData && changedData)837 int CloudSyncUtils::NotifyChangeData(const std::string &dev, const std::shared_ptr<StorageProxy> &proxy,
838     ChangedData &&changedData)
839 {
840     int ret = proxy->NotifyChangedData(dev, std::move(changedData));
841     if (ret != E_OK) {
842         DBDfxAdapter::ReportBehavior(
843             {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, ret});
844         LOGE("[CloudSyncer] Cannot notify changed data while downloading, %d.", ret);
845     } else {
846         DBDfxAdapter::ReportBehavior(
847             {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::SUCC, ret});
848     }
849     return ret;
850 }
851 
GetQueryAndUsersForCompensatedSync(bool isQueryDownloadRecords,std::shared_ptr<StorageProxy> & storageProxy,std::vector<std::string> & users,std::vector<QuerySyncObject> & syncQuery)852 int CloudSyncUtils::GetQueryAndUsersForCompensatedSync(bool isQueryDownloadRecords,
853     std::shared_ptr<StorageProxy> &storageProxy, std::vector<std::string> &users,
854     std::vector<QuerySyncObject> &syncQuery)
855 {
856     int errCode = storageProxy->GetCompensatedSyncQuery(syncQuery, users, isQueryDownloadRecords);
857     if (errCode != E_OK) {
858         LOGW("[CloudSyncer] get query for compensated sync failed! errCode = %d", errCode);
859         return errCode;
860     }
861     if (syncQuery.empty()) {
862         LOGD("[CloudSyncer] Not need generate compensated sync");
863     }
864     return E_OK;
865 }
866 
GetUserListForCompensatedSync(CloudDBProxy & cloudDB,const std::vector<std::string> & users,std::vector<std::string> & userList)867 void CloudSyncUtils::GetUserListForCompensatedSync(
868     CloudDBProxy &cloudDB, const std::vector<std::string> &users, std::vector<std::string> &userList)
869 {
870     auto cloudDBs = cloudDB.GetCloudDB();
871     if (cloudDBs.empty()) {
872         LOGW("[CloudSyncer][GetUserListForCompensatedSync] not set cloud db");
873         return;
874     }
875     for (auto &[user, cloudDb] : cloudDBs) {
876         auto it = std::find(users.begin(), users.end(), user);
877         if (it != users.end()) {
878             userList.push_back(user);
879         }
880     }
881 }
882 
SetAssetsMapByCloudGid(std::vector<std::string> & cloudGid,const AssetsMap & groupAssetsMap,std::map<std::string,AssetsMap> & gidAssetsMap)883 bool CloudSyncUtils::SetAssetsMapByCloudGid(
884     std::vector<std::string> &cloudGid, const AssetsMap &groupAssetsMap, std::map<std::string, AssetsMap> &gidAssetsMap)
885 {
886     bool isFindOneRecord = false;
887     for (auto &iter : cloudGid) {
888         auto gidIter = gidAssetsMap.find(iter);
889         if (gidIter == gidAssetsMap.end()) {
890             continue;
891         }
892         for (const auto &pair : groupAssetsMap) {
893             if (gidIter->second.find(pair.first) == gidIter->second.end()) {
894                 gidIter->second[pair.first] = pair.second;
895             } else {
896                 // merge assets
897                 gidIter->second[pair.first].insert(pair.second.begin(), pair.second.end());
898             }
899         }
900         isFindOneRecord = true;
901     }
902     return isFindOneRecord;
903 }
904 
CheckAssetsOnlyIsEmptyInGroup(const std::map<std::string,AssetsMap> & gidAssetsMap,const AssetsMap & assetsMap)905 bool CloudSyncUtils::CheckAssetsOnlyIsEmptyInGroup(
906     const std::map<std::string, AssetsMap> &gidAssetsMap, const AssetsMap &assetsMap)
907 {
908     if (gidAssetsMap.empty()) {
909         return true;
910     }
911     for (const auto &item : gidAssetsMap) {
912         const auto &gidAssets = item.second;
913         if (gidAssets.empty()) {
914             return true;
915         }
916         bool isMatch = true;
917         for (const auto &assets : assetsMap) {
918             auto iter = gidAssets.find(assets.first);
919             if (iter == gidAssets.end()) {
920                 isMatch = false;
921                 break;
922             }
923             if (!std::includes(iter->second.begin(), iter->second.end(), assets.second.begin(), assets.second.end())) {
924                 isMatch = false;
925                 break;
926             }
927         }
928         if (isMatch) {
929             // find one match, so group is not empty.
930             return false;
931         }
932     }
933     return true;
934 }
935 
IsAssetOnlyData(VBucket & queryData,AssetsMap & assetsMap,bool isDownloading)936 bool CloudSyncUtils::IsAssetOnlyData(VBucket &queryData, AssetsMap &assetsMap, bool isDownloading)
937 {
938     if (assetsMap.empty()) {
939         return false;
940     }
941     for (auto &item : assetsMap) {
942         auto &assetNameList = item.second;
943         auto findAssetField = queryData.find(item.first);
944         if (findAssetField == queryData.end() || assetNameList.empty()) {
945             // if not find asset field or assetNameList is empty, mean this is not asset only data.
946             return false;
947         }
948 
949         Asset *asset = std::get_if<Asset>(&(findAssetField->second));
950         if (asset != nullptr) {
951             // if is Asset type, assetNameList size must be 1.
952             if (assetNameList.size() != 1u || *(assetNameList.begin()) != asset->name ||
953                 asset->status == AssetStatus::DELETE) {
954                 // if data is delele, also not asset only data.
955                 return false;
956             }
957             if (isDownloading) {
958                 asset->status = static_cast<uint32_t>(AssetStatus::DOWNLOADING);
959             }
960             continue;
961         }
962 
963         Assets *assets = std::get_if<Assets>(&(findAssetField->second));
964         if (assets == nullptr) {
965             return false;
966         }
967         for (auto &assetName : assetNameList) {
968             auto findAsset = std::find_if(
969                 assets->begin(), assets->end(), [&assetName](const Asset &a) { return a.name == assetName; });
970             if (findAsset == assets->end() || (*findAsset).status == AssetStatus::DELETE) {
971                 // if data is delele, also not asset only data.
972                 return false;
973             }
974             if (isDownloading) {
975                 (*findAsset).status = AssetStatus::DOWNLOADING;
976             }
977         }
978     }
979     return true;
980 }
981 
ClearCloudWatermark(const std::vector<std::string> & tableNameList,std::shared_ptr<StorageProxy> & storageProxy)982 int CloudSyncUtils::ClearCloudWatermark(const std::vector<std::string> &tableNameList,
983     std::shared_ptr<StorageProxy> &storageProxy)
984 {
985     for (const auto &tableName: tableNameList) {
986         LOGD("[CloudSyncUtils] Start clear cloud watermark.");
987         int ret = storageProxy->CleanWaterMark(tableName);
988         if (ret != E_OK) {
989             std::string maskedName = DBCommon::StringMiddleMasking(tableName);
990             LOGE("[CloudSyncUtils] failed to clear watermark. err: %d. table: %s, name length: %zu",
991                 ret, maskedName.c_str(), maskedName.length());
992             return ret;
993         }
994     }
995     int errCode = storageProxy->StartTransaction(TransactType::IMMEDIATE);
996     if (errCode != E_OK) {
997         LOGE("[CloudSyncUtils] failed to start Transaction before clear cloud log version, %d", errCode);
998         return errCode;
999     }
1000 
1001     errCode = storageProxy->ClearCloudLogVersion(tableNameList);
1002     if (errCode != E_OK) {
1003         LOGE("[CloudSyncUtils] failed to clear log version, %d.", errCode);
1004         storageProxy->Rollback();
1005         return errCode;
1006     }
1007 
1008     return storageProxy->Commit();
1009 }
1010 
HaveReferenceOrReferenceByTable(const CloudSyncer::CloudTaskInfo & taskInfo,std::shared_ptr<StorageProxy> & storageProxy)1011 bool CloudSyncUtils::HaveReferenceOrReferenceByTable(
1012     const CloudSyncer::CloudTaskInfo &taskInfo, std::shared_ptr<StorageProxy> &storageProxy)
1013 {
1014     for (size_t i = 0u; i < taskInfo.table.size(); ++i) {
1015         if (storageProxy->IsTableExistReferenceOrReferenceBy(taskInfo.table[i])) {
1016             return true;
1017         }
1018     }
1019     return false;
1020 }
1021 
StartTransactionIfNeed(const CloudSyncer::CloudTaskInfo & taskInfo,std::shared_ptr<StorageProxy> & storageProxy)1022 int CloudSyncUtils::StartTransactionIfNeed(
1023     const CloudSyncer::CloudTaskInfo &taskInfo, std::shared_ptr<StorageProxy> &storageProxy)
1024 {
1025     bool isStartTransaction = true;
1026     if (taskInfo.table.size() <= 1u || !HaveReferenceOrReferenceByTable(taskInfo, storageProxy)) {
1027         // only one table or no reference table, no need to start transaction.
1028         isStartTransaction = false;
1029     }
1030     return isStartTransaction ? storageProxy->StartTransaction() : E_OK;
1031 }
1032 
EndTransactionIfNeed(const int & errCode,const CloudSyncer::CloudTaskInfo & taskInfo,std::shared_ptr<StorageProxy> & storageProxy)1033 void CloudSyncUtils::EndTransactionIfNeed(
1034     const int &errCode, const CloudSyncer::CloudTaskInfo &taskInfo, std::shared_ptr<StorageProxy> &storageProxy)
1035 {
1036     if (!storageProxy->GetTransactionExeFlag()) {
1037         // no need to end transaction.
1038         return;
1039     }
1040     if (errCode == E_OK || errCode == -E_TASK_PAUSED) {
1041         int commitErrorCode = storageProxy->Commit();
1042         if (commitErrorCode != E_OK) {
1043             LOGE("[CloudSyncer] cannot commit transaction: %d.", commitErrorCode);
1044         }
1045     } else {
1046         int rollBackErrorCode = storageProxy->Rollback();
1047         if (rollBackErrorCode != E_OK) {
1048             LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
1049         }
1050     }
1051 }
1052 
CanStartAsyncDownload(int scheduleCount)1053 bool CloudSyncUtils::CanStartAsyncDownload(int scheduleCount)
1054 {
1055     if (!RuntimeContext::GetInstance()->GetAssetsDownloadManager()->CanStartNewTask()) {
1056         LOGW("[CloudSyncer] Too many download tasks");
1057         return false;
1058     }
1059     return scheduleCount <= 0;
1060 }
1061 }