• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud/asset_operation_utils.h"
16 #include "cloud/cloud_db_constant.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "cloud/cloud_sync_utils.h"
22 
23 namespace DistributedDB {
GetCloudPkVals(const VBucket & datum,const std::vector<std::string> & pkColNames,int64_t dataKey,std::vector<Type> & cloudPkVals)24 int CloudSyncUtils::GetCloudPkVals(const VBucket &datum, const std::vector<std::string> &pkColNames, int64_t dataKey,
25     std::vector<Type> &cloudPkVals)
26 {
27     if (!cloudPkVals.empty()) {
28         LOGE("[CloudSyncer] Output parameter should be empty");
29         return -E_INVALID_ARGS;
30     }
31     for (const auto &pkColName : pkColNames) {
32         // If data is primary key or is a composite primary key, then use rowID as value
33         // The single primary key table, does not contain rowid.
34         if (pkColName == CloudDbConstant::ROW_ID_FIELD_NAME) {
35             cloudPkVals.emplace_back(dataKey);
36             continue;
37         }
38         Type type;
39         bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive(pkColName, datum, type);
40         if (!isExisted) {
41             LOGE("[CloudSyncer] Cloud data do not contain expected primary field value");
42             return -E_CLOUD_ERROR;
43         }
44         cloudPkVals.push_back(type);
45     }
46     return E_OK;
47 }
48 
OpTypeToChangeType(OpType strategy)49 ChangeType CloudSyncUtils::OpTypeToChangeType(OpType strategy)
50 {
51     switch (strategy) {
52         case OpType::INSERT:
53             return OP_INSERT;
54         case OpType::DELETE:
55             return OP_DELETE;
56         case OpType::UPDATE:
57             return OP_UPDATE;
58         default:
59             return OP_BUTT;
60     }
61 }
62 
IsSinglePrimaryKey(const std::vector<std::string> & pkColNames)63 bool CloudSyncUtils::IsSinglePrimaryKey(const std::vector<std::string> &pkColNames)
64 {
65     return pkColNames.size() == 1 && pkColNames[0] != CloudDbConstant::ROW_ID_FIELD_NAME;
66 }
67 
RemoveDataExceptExtendInfo(VBucket & datum,const std::vector<std::string> & pkColNames)68 void CloudSyncUtils::RemoveDataExceptExtendInfo(VBucket &datum, const std::vector<std::string> &pkColNames)
69 {
70     for (auto item = datum.begin(); item != datum.end();) {
71         const auto &key = item->first;
72         if (key != CloudDbConstant::GID_FIELD &&
73             key != CloudDbConstant::CREATE_FIELD &&
74             key != CloudDbConstant::MODIFY_FIELD &&
75             key != CloudDbConstant::DELETE_FIELD &&
76             key != CloudDbConstant::CURSOR_FIELD &&
77             key != CloudDbConstant::VERSION_FIELD &&
78             key != CloudDbConstant::SHARING_RESOURCE_FIELD &&
79             (std::find(pkColNames.begin(), pkColNames.end(), key) == pkColNames.end())) {
80                 item = datum.erase(item);
81             } else {
82                 item++;
83             }
84     }
85 }
86 
StatusToFlag(AssetStatus status)87 AssetOpType CloudSyncUtils::StatusToFlag(AssetStatus status)
88 {
89     auto tmpStatus = static_cast<AssetStatus>(AssetOperationUtils::EraseBitMask(static_cast<uint32_t>(status)));
90     switch (tmpStatus) {
91         case AssetStatus::INSERT:
92             return AssetOpType::INSERT;
93         case AssetStatus::DELETE:
94             return AssetOpType::DELETE;
95         case AssetStatus::UPDATE:
96             return AssetOpType::UPDATE;
97         case AssetStatus::NORMAL:
98             return AssetOpType::NO_CHANGE;
99         default:
100             LOGW("[CloudSyncer] Unexpected Situation and won't be handled"
101                 ", Caller should ensure that current situation won't occur");
102             return AssetOpType::NO_CHANGE;
103     }
104 }
105 
StatusToFlagForAsset(Asset & asset)106 void CloudSyncUtils::StatusToFlagForAsset(Asset &asset)
107 {
108     asset.flag = static_cast<uint32_t>(StatusToFlag(static_cast<AssetStatus>(asset.status)));
109     asset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
110 }
111 
StatusToFlagForAssets(Assets & assets)112 void CloudSyncUtils::StatusToFlagForAssets(Assets &assets)
113 {
114     for (Asset &asset : assets) {
115         StatusToFlagForAsset(asset);
116     }
117 }
118 
StatusToFlagForAssetsInRecord(const std::vector<Field> & fields,VBucket & record)119 void CloudSyncUtils::StatusToFlagForAssetsInRecord(const std::vector<Field> &fields, VBucket &record)
120 {
121     for (const Field &field : fields) {
122         if (field.type == TYPE_INDEX<Assets> && record[field.colName].index() == TYPE_INDEX<Assets>) {
123             StatusToFlagForAssets(std::get<Assets>(record[field.colName]));
124         } else if (field.type == TYPE_INDEX<Asset> && record[field.colName].index() == TYPE_INDEX<Asset>) {
125             StatusToFlagForAsset(std::get<Asset>(record[field.colName]));
126         }
127     }
128 }
129 
IsChangeDataEmpty(const ChangedData & changedData)130 bool CloudSyncUtils::IsChangeDataEmpty(const ChangedData &changedData)
131 {
132     return changedData.primaryData[ChangeType::OP_INSERT].empty() ||
133            changedData.primaryData[ChangeType::OP_UPDATE].empty() ||
134            changedData.primaryData[ChangeType::OP_DELETE].empty();
135 }
136 
EqualInMsLevel(const Timestamp cmp,const Timestamp beCmp)137 bool CloudSyncUtils::EqualInMsLevel(const Timestamp cmp, const Timestamp beCmp)
138 {
139     return (cmp / CloudDbConstant::TEN_THOUSAND) == (beCmp / CloudDbConstant::TEN_THOUSAND);
140 }
141 
NeedSaveData(const LogInfo & localLogInfo,const LogInfo & cloudLogInfo)142 bool CloudSyncUtils::NeedSaveData(const LogInfo &localLogInfo, const LogInfo &cloudLogInfo)
143 {
144     // If timeStamp, write timestamp, cloudGid are all the same,
145     // We thought that the datum is mostly be the same between cloud and local
146     // However, there are still slightly possibility that it may be created from different device,
147     // So, during the strategy policy [i.e. TagSyncDataStatus], the datum was tagged as UPDATE
148     // But we won't notify the datum
149     bool isSame = localLogInfo.timestamp == cloudLogInfo.timestamp &&
150         EqualInMsLevel(localLogInfo.wTimestamp, cloudLogInfo.wTimestamp) &&
151         localLogInfo.cloudGid == cloudLogInfo.cloudGid &&
152         localLogInfo.sharingResource == cloudLogInfo.sharingResource &&
153         localLogInfo.version == cloudLogInfo.version &&
154         (localLogInfo.flag & static_cast<uint64_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC)) == 0;
155     return !isSame;
156 }
157 
CheckParamValid(const std::vector<DeviceID> & devices,SyncMode mode)158 int CloudSyncUtils::CheckParamValid(const std::vector<DeviceID> &devices, SyncMode mode)
159 {
160     if (devices.size() != 1) {
161         LOGE("[CloudSyncer] invalid devices size %zu", devices.size());
162         return -E_INVALID_ARGS;
163     }
164     for (const auto &dev: devices) {
165         if (dev.empty() || dev.size() > DBConstant::MAX_DEV_LENGTH) {
166             LOGE("[CloudSyncer] invalid device, size %zu", dev.size());
167             return -E_INVALID_ARGS;
168         }
169     }
170     if (mode >= SyncMode::SYNC_MODE_PUSH_ONLY && mode < SyncMode::SYNC_MODE_CLOUD_MERGE) {
171         LOGE("[CloudSyncer] not support mode %d", static_cast<int>(mode));
172         return -E_NOT_SUPPORT;
173     }
174     if (mode < SyncMode::SYNC_MODE_PUSH_ONLY || mode > SyncMode::SYNC_MODE_CLOUD_FORCE_PULL) {
175         LOGE("[CloudSyncer] invalid mode %d", static_cast<int>(mode));
176         return -E_INVALID_ARGS;
177     }
178     return E_OK;
179 }
180 
GetCloudLogInfo(DistributedDB::VBucket & datum)181 LogInfo CloudSyncUtils::GetCloudLogInfo(DistributedDB::VBucket &datum)
182 {
183     LogInfo cloudLogInfo;
184     cloudLogInfo.dataKey = 0;
185     cloudLogInfo.timestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::MODIFY_FIELD]);
186     cloudLogInfo.wTimestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::CREATE_FIELD]);
187     cloudLogInfo.flag = (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) ? 1u : 0u;
188     cloudLogInfo.cloudGid = std::get<std::string>(datum[CloudDbConstant::GID_FIELD]);
189     CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, datum, cloudLogInfo.device);
190     (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
191         datum, cloudLogInfo.sharingResource);
192     (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
193         datum, cloudLogInfo.version);
194     return cloudLogInfo;
195 }
196 
SaveChangedDataByType(const VBucket & datum,ChangedData & changedData,const DataInfoWithLog & localInfo,ChangeType type)197 int CloudSyncUtils::SaveChangedDataByType(const VBucket &datum, ChangedData &changedData,
198     const DataInfoWithLog &localInfo, ChangeType type)
199 {
200     int ret = E_OK;
201     std::vector<Type> cloudPkVals;
202     if (type == ChangeType::OP_DELETE) {
203         ret = CloudSyncUtils::GetCloudPkVals(localInfo.primaryKeys, changedData.field, localInfo.logInfo.dataKey,
204             cloudPkVals);
205     } else {
206         ret = CloudSyncUtils::GetCloudPkVals(datum, changedData.field, localInfo.logInfo.dataKey, cloudPkVals);
207     }
208     if (ret != E_OK) {
209         return ret;
210     }
211     InsertOrReplaceChangedDataByType(type, cloudPkVals, changedData);
212     return E_OK;
213 }
214 
CheckCloudSyncDataValid(const CloudSyncData & uploadData,const std::string & tableName,int64_t count)215 int CloudSyncUtils::CheckCloudSyncDataValid(const CloudSyncData &uploadData, const std::string &tableName,
216     int64_t count)
217 {
218     size_t insRecordLen = uploadData.insData.record.size();
219     size_t insExtendLen = uploadData.insData.extend.size();
220     size_t updRecordLen = uploadData.updData.record.size();
221     size_t updExtendLen = uploadData.updData.extend.size();
222     size_t delRecordLen = uploadData.delData.record.size();
223     size_t delExtendLen = uploadData.delData.extend.size();
224 
225     bool syncDataValid = (uploadData.tableName == tableName) &&
226         ((insRecordLen > 0 && insExtendLen > 0 && insRecordLen == insExtendLen) ||
227         (updRecordLen > 0 && updExtendLen > 0 && updRecordLen == updExtendLen) ||
228         (delRecordLen > 0 && delExtendLen > 0 && delRecordLen == delExtendLen) ||
229         (uploadData.lockData.extend.size() > 0));
230     if (!syncDataValid) {
231         LOGE("[CloudSyncUtils] upload data is empty but upload count is not zero or upload table name"
232             " is not the same as table name of sync data.");
233         return -E_INTERNAL_ERROR;
234     }
235     int64_t syncDataCount = static_cast<int64_t>(insRecordLen) + static_cast<int64_t>(updRecordLen) +
236         static_cast<int64_t>(delRecordLen);
237     if (syncDataCount > count) {
238         LOGE("[CloudSyncUtils] Size of a batch of sync data is greater than upload data size. count %d", count);
239         return -E_INTERNAL_ERROR;
240     }
241     return E_OK;
242 }
243 
ClearCloudSyncData(CloudSyncData & uploadData)244 void CloudSyncUtils::ClearCloudSyncData(CloudSyncData &uploadData)
245 {
246     std::vector<VBucket>().swap(uploadData.insData.record);
247     std::vector<VBucket>().swap(uploadData.insData.extend);
248     std::vector<int64_t>().swap(uploadData.insData.rowid);
249     std::vector<VBucket>().swap(uploadData.updData.record);
250     std::vector<VBucket>().swap(uploadData.updData.extend);
251     std::vector<VBucket>().swap(uploadData.delData.record);
252     std::vector<VBucket>().swap(uploadData.delData.extend);
253 }
254 
GetWaterMarkAndUpdateTime(std::vector<VBucket> & extend,Timestamp & waterMark)255 int CloudSyncUtils::GetWaterMarkAndUpdateTime(std::vector<VBucket> &extend, Timestamp &waterMark)
256 {
257     for (auto &extendData: extend) {
258         if (extendData.empty() || extendData.find(CloudDbConstant::MODIFY_FIELD) == extendData.end()) {
259             LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist.");
260             return -E_INTERNAL_ERROR;
261         }
262         if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::MODIFY_FIELD).index()) {
263             LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doesn't fit int64_t.");
264             return -E_INTERNAL_ERROR;
265         }
266         if (extendData.empty() || extendData.find(CloudDbConstant::CREATE_FIELD) == extendData.end()) {
267             LOGE("[CloudSyncer] VBucket is empty or CREATE_FIELD doesn't exist.");
268             return -E_INTERNAL_ERROR;
269         }
270         if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::CREATE_FIELD).index()) {
271             LOGE("[CloudSyncer] VBucket's CREATE_FIELD doesn't fit int64_t.");
272             return -E_INTERNAL_ERROR;
273         }
274         waterMark = std::max(int64_t(waterMark), std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)));
275         int64_t modifyTime =
276             std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)) / CloudDbConstant::TEN_THOUSAND;
277         int64_t createTime =
278             std::get<int64_t>(extendData.at(CloudDbConstant::CREATE_FIELD)) / CloudDbConstant::TEN_THOUSAND;
279         extendData.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
280         extendData.insert_or_assign(CloudDbConstant::CREATE_FIELD, createTime);
281     }
282     return E_OK;
283 }
284 
CheckCloudSyncDataEmpty(const CloudSyncData & uploadData)285 bool CloudSyncUtils::CheckCloudSyncDataEmpty(const CloudSyncData &uploadData)
286 {
287     return uploadData.insData.extend.empty() && uploadData.insData.record.empty() &&
288         uploadData.updData.extend.empty() && uploadData.updData.record.empty() &&
289         uploadData.delData.extend.empty() && uploadData.delData.record.empty() &&
290         uploadData.lockData.rowid.empty();
291 }
292 
ModifyCloudDataTime(DistributedDB::VBucket & data)293 void CloudSyncUtils::ModifyCloudDataTime(DistributedDB::VBucket &data)
294 {
295     // data already check field modify_field and create_field
296     int64_t modifyTime = std::get<int64_t>(data[CloudDbConstant::MODIFY_FIELD]) * CloudDbConstant::TEN_THOUSAND;
297     int64_t createTime = std::get<int64_t>(data[CloudDbConstant::CREATE_FIELD]) * CloudDbConstant::TEN_THOUSAND;
298     data[CloudDbConstant::MODIFY_FIELD] = modifyTime;
299     data[CloudDbConstant::CREATE_FIELD] = createTime;
300 }
301 
302 // After doing a batch upload, we need to use CloudSyncData's maximum timestamp to update the water mark;
UpdateExtendTime(CloudSyncData & uploadData,const int64_t & count,uint64_t taskId,Timestamp & waterMark)303 int CloudSyncUtils::UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count, uint64_t taskId,
304     Timestamp &waterMark)
305 {
306     int ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, uploadData.tableName, count);
307     if (ret != E_OK) {
308         LOGE("[CloudSyncer] Invalid Sync Data when get local water mark.");
309         return ret;
310     }
311     if (!uploadData.insData.extend.empty()) {
312         if (uploadData.insData.record.size() != uploadData.insData.extend.size()) {
313             LOGE("[CloudSyncer] Inconsistent size of inserted data.");
314             return -E_INTERNAL_ERROR;
315         }
316         ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.insData.extend, waterMark);
317         if (ret != E_OK) {
318             return ret;
319         }
320     }
321 
322     if (!uploadData.updData.extend.empty()) {
323         if (uploadData.updData.record.size() != uploadData.updData.extend.size()) {
324             LOGE("[CloudSyncer] Inconsistent size of updated data, %d.", -E_INTERNAL_ERROR);
325             return -E_INTERNAL_ERROR;
326         }
327         ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.updData.extend, waterMark);
328         if (ret != E_OK) {
329             return ret;
330         }
331     }
332 
333     if (!uploadData.delData.extend.empty()) {
334         if (uploadData.delData.record.size() != uploadData.delData.extend.size()) {
335             LOGE("[CloudSyncer] Inconsistent size of deleted data, %d.", -E_INTERNAL_ERROR);
336             return -E_INTERNAL_ERROR;
337         }
338         ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.delData.extend, waterMark);
339         if (ret != E_OK) {
340             return ret;
341         }
342     }
343     return E_OK;
344 }
345 
UpdateLocalCache(OpType opType,const LogInfo & cloudInfo,const LogInfo & localInfo,std::map<std::string,LogInfo> & localLogInfoCache)346 void CloudSyncUtils::UpdateLocalCache(OpType opType, const LogInfo &cloudInfo, const LogInfo &localInfo,
347     std::map<std::string, LogInfo> &localLogInfoCache)
348 {
349     LogInfo updateLogInfo;
350     std::string hashKey(localInfo.hashKey.begin(), localInfo.hashKey.end());
351     bool updateCache = true;
352     switch (opType) {
353         case OpType::INSERT :
354         case OpType::UPDATE :
355         case OpType::DELETE: {
356             updateLogInfo = cloudInfo;
357             updateLogInfo.device = CloudDbConstant::DEFAULT_CLOUD_DEV;
358             updateLogInfo.hashKey = localInfo.hashKey;
359             if (opType == OpType::DELETE) {
360                 updateLogInfo.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
361             } else if (opType == OpType::INSERT) {
362                 updateLogInfo.originDev = CloudDbConstant::DEFAULT_CLOUD_DEV;
363             }
364             break;
365         }
366         case OpType::CLEAR_GID:
367         case OpType::UPDATE_TIMESTAMP: {
368             updateLogInfo = localInfo;
369             updateLogInfo.cloudGid.clear();
370             updateLogInfo.sharingResource.clear();
371             break;
372         }
373         default:
374             updateCache = false;
375             break;
376     }
377     if (updateCache) {
378         localLogInfoCache[hashKey] = updateLogInfo;
379     }
380 }
381 
SaveChangedData(ICloudSyncer::SyncParam & param,size_t dataIndex,const ICloudSyncer::DataInfo & dataInfo,std::vector<std::pair<Key,size_t>> & deletedList)382 int CloudSyncUtils::SaveChangedData(ICloudSyncer::SyncParam &param, size_t dataIndex,
383     const ICloudSyncer::DataInfo &dataInfo, std::vector<std::pair<Key, size_t>> &deletedList)
384 {
385     OpType opType = CalOpType(param, dataIndex);
386     Key hashKey = dataInfo.localInfo.logInfo.hashKey;
387     if (param.deletePrimaryKeySet.find(hashKey) != param.deletePrimaryKeySet.end()) {
388         if (opType == OpType::INSERT) {
389             (void)param.dupHashKeySet.insert(hashKey);
390             opType = OpType::UPDATE;
391             // only composite primary key needs to be processed.
392             if (!param.isSinglePrimaryKey) {
393                 param.withoutRowIdData.updateData.emplace_back(dataIndex,
394                     param.changedData.primaryData[ChangeType::OP_UPDATE].size());
395             }
396         }
397     }
398     // INSERT: for no primary key or composite primary key situation
399     if (!param.isSinglePrimaryKey && opType == OpType::INSERT) {
400         param.withoutRowIdData.insertData.push_back(dataIndex);
401         return E_OK;
402     }
403     switch (opType) {
404         // INSERT: only for single primary key situation
405         case OpType::INSERT:
406             param.info.downLoadInfo.insertCount++;
407             return CloudSyncUtils::SaveChangedDataByType(
408                 param.downloadData.data[dataIndex], param.changedData, dataInfo.localInfo, ChangeType::OP_INSERT);
409         case OpType::UPDATE:
410             param.info.downLoadInfo.updateCount++;
411             if (CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
412                 return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
413                     dataInfo.localInfo, ChangeType::OP_UPDATE);
414             }
415             return E_OK;
416         case OpType::DELETE:
417             param.info.downLoadInfo.deleteCount++;
418             return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
419                 dataInfo.localInfo, ChangeType::OP_DELETE);
420         default:
421             return E_OK;
422     }
423 }
424 
ClearWithoutData(ICloudSyncer::SyncParam & param)425 void CloudSyncUtils::ClearWithoutData(ICloudSyncer::SyncParam &param)
426 {
427     param.withoutRowIdData.insertData.clear();
428     param.withoutRowIdData.updateData.clear();
429     param.withoutRowIdData.assetInsertData.clear();
430 }
431 
IsSkipAssetsMissingRecord(const std::vector<VBucket> & extend)432 bool CloudSyncUtils::IsSkipAssetsMissingRecord(const std::vector<VBucket> &extend)
433 {
434     if (extend.empty()) {
435         return false;
436     }
437     for (size_t i = 0; i < extend.size(); ++i) {
438         if (DBCommon::IsIntTypeRecordError(extend[i]) && !DBCommon::IsRecordAssetsMissing(extend[i])) {
439             return false;
440         }
441     }
442     return true;
443 }
444 
FillAssetIdToAssets(CloudSyncBatch & data,int errorCode,const CloudWaterType & type)445 int CloudSyncUtils::FillAssetIdToAssets(CloudSyncBatch &data, int errorCode, const CloudWaterType &type)
446 {
447     if (data.extend.size() != data.assets.size()) {
448         LOGE("[CloudSyncUtils] size not match, extend:%zu assets:%zu.", data.extend.size(), data.assets.size());
449         return -E_CLOUD_ERROR;
450     }
451     int errCode = E_OK;
452     for (size_t i = 0; i < data.assets.size(); i++) {
453         if (data.assets[i].empty() || DBCommon::IsRecordIgnored(data.extend[i]) ||
454             (errorCode != E_OK &&
455                 (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordAssetsMissing(data.extend[i]))) ||
456             DBCommon::IsNeedCompensatedForUpload(data.extend[i], type)) {
457             if (errCode != E_OK && DBCommon::IsRecordAssetsMissing(data.extend[i])) {
458                 LOGI("[CloudSyncUtils][FileAssetIdToAssets] errCode with assets missing, skip fill assets id");
459             }
460             continue;
461         }
462         for (auto it = data.assets[i].begin(); it != data.assets[i].end();) {
463             auto &[col, value] = *it;
464             if (!CheckIfContainsInsertAssets(value)) {
465                 ++it;
466                 continue;
467             }
468             auto extendIt = data.extend[i].find(col);
469             if (extendIt == data.extend[i].end()) {
470                 LOGI("[CloudSyncUtils] Asset field name can not find in extend. key is:%s.", col.c_str());
471                 it = data.assets[i].erase(it);
472                 continue;
473             }
474             if (extendIt->second.index() != value.index()) {
475                 LOGE("[CloudSyncUtils] Asset field type not same. extend:%zu, data:%zu",
476                     extendIt->second.index(), value.index());
477                 errCode = -E_CLOUD_ERROR;
478                 ++it;
479                 continue;
480             }
481             int ret = FillAssetIdToAssetData(extendIt->second, value);
482             if (ret != E_OK) {
483                 LOGE("[CloudSyncUtils] fail to fill assetId, %d.", ret);
484                 errCode = -E_CLOUD_ERROR;
485             }
486             ++it;
487         }
488     }
489     return errCode;
490 }
491 
FillAssetIdToAssetData(const Type & extend,Type & assetData)492 int CloudSyncUtils::FillAssetIdToAssetData(const Type &extend, Type &assetData)
493 {
494     if (extend.index() == TYPE_INDEX<Asset>) {
495         if (std::get<Asset>(assetData).name != std::get<Asset>(extend).name) {
496             LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset name can not find in extend.");
497             return -E_CLOUD_ERROR;
498         }
499         if (std::get<Asset>(extend).assetId.empty()) {
500             LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset id is empty.");
501             return -E_CLOUD_ERROR;
502         }
503         std::get<Asset>(assetData).assetId = std::get<Asset>(extend).assetId;
504     }
505     if (extend.index() == TYPE_INDEX<Assets>) {
506         FillAssetIdToAssetsData(std::get<Assets>(extend), std::get<Assets>(assetData));
507     }
508     return E_OK;
509 }
510 
FillAssetIdToAssetsData(const Assets & extend,Assets & assets)511 void CloudSyncUtils::FillAssetIdToAssetsData(const Assets &extend, Assets &assets)
512 {
513     for (auto it = assets.begin(); it != assets.end();) {
514         auto &asset = *it;
515         if (asset.flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
516             ++it;
517             continue;
518         }
519         auto extendAssets = extend;
520         bool isAssetExisted = false;
521         for (const auto &extendAsset : extendAssets) {
522             if (asset.name == extendAsset.name && !extendAsset.assetId.empty()) {
523                 asset.assetId = extendAsset.assetId;
524                 isAssetExisted = true;
525                 break;
526             }
527         }
528         if (!isAssetExisted) {
529             LOGI("Unable to sync local asset, skip fill assetId.");
530             it = assets.erase(it);
531         } else {
532             ++it;
533         }
534     }
535 }
536 
CheckIfContainsInsertAssets(const Type & assetData)537 bool CloudSyncUtils::CheckIfContainsInsertAssets(const Type &assetData)
538 {
539     if (assetData.index() == TYPE_INDEX<Asset>) {
540         if (std::get<Asset>(assetData).flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
541             return false;
542         }
543     } else if (assetData.index() == TYPE_INDEX<Assets>) {
544         bool hasInsertAsset = false;
545         for (const auto &asset : std::get<Assets>(assetData)) {
546             if (asset.flag == static_cast<uint32_t>(AssetOpType::INSERT)) {
547                 hasInsertAsset = true;
548                 break;
549             }
550         }
551         if (!hasInsertAsset) {
552             return false;
553         }
554     }
555     return true;
556 }
557 
UpdateAssetsFlag(CloudSyncData & uploadData)558 void CloudSyncUtils::UpdateAssetsFlag(CloudSyncData &uploadData)
559 {
560     AssetOperationUtils::UpdateAssetsFlag(uploadData.insData.record, uploadData.insData.assets);
561     AssetOperationUtils::UpdateAssetsFlag(uploadData.updData.record, uploadData.updData.assets);
562     AssetOperationUtils::UpdateAssetsFlag(uploadData.delData.record, uploadData.delData.assets);
563 }
564 
InsertOrReplaceChangedDataByType(ChangeType type,std::vector<Type> & pkVal,ChangedData & changedData)565 void CloudSyncUtils::InsertOrReplaceChangedDataByType(ChangeType type, std::vector<Type> &pkVal,
566     ChangedData &changedData)
567 {
568     // erase old changedData if exist
569     for (auto &changePkValList : changedData.primaryData) {
570         changePkValList.erase(std::remove_if(changePkValList.begin(), changePkValList.end(),
571             [&pkVal](const std::vector<Type> &existPkVal) {
572             return existPkVal == pkVal;
573             }), changePkValList.end());
574     }
575     // insert new changeData
576     changedData.primaryData[type].emplace_back(std::move(pkVal));
577 }
578 
CalOpType(ICloudSyncer::SyncParam & param,size_t dataIndex)579 OpType CloudSyncUtils::CalOpType(ICloudSyncer::SyncParam &param, size_t dataIndex)
580 {
581     OpType opType = param.downloadData.opType[dataIndex];
582     if (opType != OpType::INSERT && opType != OpType::UPDATE) {
583         return opType;
584     }
585 
586     std::vector<Type> cloudPkVal;
587     // use dataIndex as dataKey avoid get same pk with no pk schema
588     int errCode = CloudSyncUtils::GetCloudPkVals(param.downloadData.data[dataIndex], param.changedData.field, dataIndex,
589         cloudPkVal);
590     if (errCode != E_OK) {
591         LOGW("[CloudSyncUtils] Get pk from download data failed %d", errCode);
592         // use origin opType
593         return opType;
594     }
595     auto iter = std::find_if(param.insertPk.begin(), param.insertPk.end(), [&cloudPkVal](const auto &item) {
596         return item == cloudPkVal;
597     });
598     if (opType == OpType::INSERT) {
599         // record all insert pk in one batch
600         if (iter == param.insertPk.end()) {
601             param.insertPk.push_back(cloudPkVal);
602         }
603         return OpType::INSERT;
604     }
605     // notify with insert because this data not exist in local before query
606     return (iter == param.insertPk.end()) ? OpType::UPDATE : OpType::INSERT;
607 }
608 
InitCompensatedSyncTaskInfo()609 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo()
610 {
611     CloudSyncer::CloudTaskInfo taskInfo;
612     taskInfo.priorityTask = true;
613     taskInfo.timeout = CloudDbConstant::CLOUD_DEFAULT_TIMEOUT;
614     taskInfo.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
615     taskInfo.callback = nullptr;
616     taskInfo.compensatedTask = true;
617     return taskInfo;
618 }
619 
InitCompensatedSyncTaskInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess)620 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncOption &option,
621     const SyncProcessCallback &onProcess)
622 {
623     CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
624     taskInfo.callback = onProcess;
625     taskInfo.devices = option.devices;
626     taskInfo.prepareTraceId = option.prepareTraceId;
627     if (option.users.empty()) {
628         taskInfo.users.push_back("");
629     } else {
630         taskInfo.users = option.users;
631     }
632     taskInfo.lockAction = option.lockAction;
633     return taskInfo;
634 }
635 
InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo & oriTaskInfo)636 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo &oriTaskInfo)
637 {
638     CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
639     taskInfo.lockAction = oriTaskInfo.lockAction;
640     taskInfo.users = oriTaskInfo.users;
641     taskInfo.devices = oriTaskInfo.devices;
642     taskInfo.storeId = oriTaskInfo.storeId;
643     taskInfo.prepareTraceId = oriTaskInfo.prepareTraceId;
644     return taskInfo;
645 }
646 }