• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud/cloud_sync_utils.h"
16 
17 #include "cloud/asset_operation_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_storage_utils.h"
20 #include "db_common.h"
21 #include "db_dfx_adapter.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 
25 namespace DistributedDB {
GetCloudPkVals(const VBucket & datum,const std::vector<std::string> & pkColNames,int64_t dataKey,std::vector<Type> & cloudPkVals)26 int CloudSyncUtils::GetCloudPkVals(const VBucket &datum, const std::vector<std::string> &pkColNames, int64_t dataKey,
27     std::vector<Type> &cloudPkVals)
28 {
29     if (!cloudPkVals.empty()) {
30         LOGE("[CloudSyncer] Output parameter should be empty");
31         return -E_INVALID_ARGS;
32     }
33     for (const auto &pkColName : pkColNames) {
34         // If data is primary key or is a composite primary key, then use rowID as value
35         // The single primary key table, does not contain rowid.
36         if (pkColName == DBConstant::ROWID) {
37             cloudPkVals.emplace_back(dataKey);
38             continue;
39         }
40         Type type;
41         bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive(pkColName, datum, type);
42         if (!isExisted) {
43             LOGE("[CloudSyncer] Cloud data do not contain expected primary field value");
44             return -E_CLOUD_ERROR;
45         }
46         cloudPkVals.push_back(type);
47     }
48     return E_OK;
49 }
50 
OpTypeToChangeType(OpType strategy)51 ChangeType CloudSyncUtils::OpTypeToChangeType(OpType strategy)
52 {
53     switch (strategy) {
54         case OpType::INSERT:
55             return OP_INSERT;
56         case OpType::DELETE:
57             return OP_DELETE;
58         case OpType::UPDATE:
59             return OP_UPDATE;
60         default:
61             return OP_BUTT;
62     }
63 }
64 
IsSinglePrimaryKey(const std::vector<std::string> & pkColNames)65 bool CloudSyncUtils::IsSinglePrimaryKey(const std::vector<std::string> &pkColNames)
66 {
67     return pkColNames.size() == 1 && pkColNames[0] != DBConstant::ROWID;
68 }
69 
RemoveDataExceptExtendInfo(VBucket & datum,const std::vector<std::string> & pkColNames)70 void CloudSyncUtils::RemoveDataExceptExtendInfo(VBucket &datum, const std::vector<std::string> &pkColNames)
71 {
72     for (auto item = datum.begin(); item != datum.end();) {
73         const auto &key = item->first;
74         if (key != CloudDbConstant::GID_FIELD &&
75             key != CloudDbConstant::CREATE_FIELD &&
76             key != CloudDbConstant::MODIFY_FIELD &&
77             key != CloudDbConstant::DELETE_FIELD &&
78             key != CloudDbConstant::CURSOR_FIELD &&
79             key != CloudDbConstant::VERSION_FIELD &&
80             key != CloudDbConstant::SHARING_RESOURCE_FIELD &&
81             (std::find(pkColNames.begin(), pkColNames.end(), key) == pkColNames.end())) {
82                 item = datum.erase(item);
83             } else {
84                 item++;
85             }
86     }
87 }
88 
StatusToFlag(AssetStatus status)89 AssetOpType CloudSyncUtils::StatusToFlag(AssetStatus status)
90 {
91     auto tmpStatus = static_cast<AssetStatus>(AssetOperationUtils::EraseBitMask(static_cast<uint32_t>(status)));
92     switch (tmpStatus) {
93         case AssetStatus::INSERT:
94             return AssetOpType::INSERT;
95         case AssetStatus::DELETE:
96             return AssetOpType::DELETE;
97         case AssetStatus::UPDATE:
98             return AssetOpType::UPDATE;
99         case AssetStatus::NORMAL:
100             return AssetOpType::NO_CHANGE;
101         default:
102             LOGW("[CloudSyncer] Unexpected Situation and won't be handled"
103                 ", Caller should ensure that current situation won't occur");
104             return AssetOpType::NO_CHANGE;
105     }
106 }
107 
StatusToFlagForAsset(Asset & asset)108 void CloudSyncUtils::StatusToFlagForAsset(Asset &asset)
109 {
110     asset.flag = static_cast<uint32_t>(StatusToFlag(static_cast<AssetStatus>(asset.status)));
111     asset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
112 }
113 
StatusToFlagForAssets(Assets & assets)114 void CloudSyncUtils::StatusToFlagForAssets(Assets &assets)
115 {
116     for (Asset &asset : assets) {
117         StatusToFlagForAsset(asset);
118     }
119 }
120 
StatusToFlagForAssetsInRecord(const std::vector<Field> & fields,VBucket & record)121 void CloudSyncUtils::StatusToFlagForAssetsInRecord(const std::vector<Field> &fields, VBucket &record)
122 {
123     for (const Field &field : fields) {
124         if (field.type == TYPE_INDEX<Assets> && record[field.colName].index() == TYPE_INDEX<Assets>) {
125             StatusToFlagForAssets(std::get<Assets>(record[field.colName]));
126         } else if (field.type == TYPE_INDEX<Asset> && record[field.colName].index() == TYPE_INDEX<Asset>) {
127             StatusToFlagForAsset(std::get<Asset>(record[field.colName]));
128         }
129     }
130 }
131 
IsChangeDataEmpty(const ChangedData & changedData)132 bool CloudSyncUtils::IsChangeDataEmpty(const ChangedData &changedData)
133 {
134     return changedData.primaryData[ChangeType::OP_INSERT].empty() ||
135            changedData.primaryData[ChangeType::OP_UPDATE].empty() ||
136            changedData.primaryData[ChangeType::OP_DELETE].empty();
137 }
138 
EqualInMsLevel(const Timestamp cmp,const Timestamp beCmp)139 bool CloudSyncUtils::EqualInMsLevel(const Timestamp cmp, const Timestamp beCmp)
140 {
141     return (cmp / CloudDbConstant::TEN_THOUSAND) == (beCmp / CloudDbConstant::TEN_THOUSAND);
142 }
143 
NeedSaveData(const LogInfo & localLogInfo,const LogInfo & cloudLogInfo)144 bool CloudSyncUtils::NeedSaveData(const LogInfo &localLogInfo, const LogInfo &cloudLogInfo)
145 {
146     // If timeStamp, write timestamp, cloudGid are all the same,
147     // We thought that the datum is mostly be the same between cloud and local
148     // However, there are still slightly possibility that it may be created from different device,
149     // So, during the strategy policy [i.e. TagSyncDataStatus], the datum was tagged as UPDATE
150     // But we won't notify the datum
151     bool isSame = localLogInfo.timestamp == cloudLogInfo.timestamp &&
152         EqualInMsLevel(localLogInfo.wTimestamp, cloudLogInfo.wTimestamp) &&
153         localLogInfo.cloudGid == cloudLogInfo.cloudGid &&
154         localLogInfo.sharingResource == cloudLogInfo.sharingResource &&
155         localLogInfo.version == cloudLogInfo.version &&
156         (localLogInfo.flag & static_cast<uint64_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC)) == 0;
157     return !isSame;
158 }
159 
CheckParamValid(const std::vector<DeviceID> & devices,SyncMode mode)160 int CloudSyncUtils::CheckParamValid(const std::vector<DeviceID> &devices, SyncMode mode)
161 {
162     if (devices.size() != 1) {
163         LOGE("[CloudSyncer] invalid devices size %zu", devices.size());
164         return -E_INVALID_ARGS;
165     }
166     for (const auto &dev: devices) {
167         if (dev.empty() || dev.size() > DBConstant::MAX_DEV_LENGTH) {
168             LOGE("[CloudSyncer] invalid device, size %zu", dev.size());
169             return -E_INVALID_ARGS;
170         }
171     }
172     if (mode >= SyncMode::SYNC_MODE_PUSH_ONLY && mode < SyncMode::SYNC_MODE_CLOUD_MERGE) {
173         LOGE("[CloudSyncer] not support mode %d", static_cast<int>(mode));
174         return -E_NOT_SUPPORT;
175     }
176     if (mode < SyncMode::SYNC_MODE_PUSH_ONLY || mode > SyncMode::SYNC_MODE_CLOUD_FORCE_PULL) {
177         LOGE("[CloudSyncer] invalid mode %d", static_cast<int>(mode));
178         return -E_INVALID_ARGS;
179     }
180     return E_OK;
181 }
182 
GetCloudLogInfo(DistributedDB::VBucket & datum)183 LogInfo CloudSyncUtils::GetCloudLogInfo(DistributedDB::VBucket &datum)
184 {
185     LogInfo cloudLogInfo;
186     cloudLogInfo.dataKey = 0;
187     cloudLogInfo.timestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::MODIFY_FIELD]);
188     cloudLogInfo.wTimestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::CREATE_FIELD]);
189     cloudLogInfo.flag = (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) ? 1u : 0u;
190     cloudLogInfo.cloudGid = std::get<std::string>(datum[CloudDbConstant::GID_FIELD]);
191     CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, datum, cloudLogInfo.device);
192     (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
193         datum, cloudLogInfo.sharingResource);
194     (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
195         datum, cloudLogInfo.version);
196     return cloudLogInfo;
197 }
198 
SaveChangedDataByType(const VBucket & datum,ChangedData & changedData,const DataInfoWithLog & localInfo,ChangeType type)199 int CloudSyncUtils::SaveChangedDataByType(const VBucket &datum, ChangedData &changedData,
200     const DataInfoWithLog &localInfo, ChangeType type)
201 {
202     int ret = E_OK;
203     std::vector<Type> cloudPkVals;
204     if (type == ChangeType::OP_DELETE) {
205         ret = CloudSyncUtils::GetCloudPkVals(localInfo.primaryKeys, changedData.field, localInfo.logInfo.dataKey,
206             cloudPkVals);
207     } else {
208         ret = CloudSyncUtils::GetCloudPkVals(datum, changedData.field, localInfo.logInfo.dataKey, cloudPkVals);
209     }
210     if (ret != E_OK) {
211         return ret;
212     }
213     InsertOrReplaceChangedDataByType(type, cloudPkVals, changedData);
214     return E_OK;
215 }
216 
CheckCloudSyncDataValid(const CloudSyncData & uploadData,const std::string & tableName,int64_t count)217 int CloudSyncUtils::CheckCloudSyncDataValid(const CloudSyncData &uploadData, const std::string &tableName,
218     int64_t count)
219 {
220     size_t insRecordLen = uploadData.insData.record.size();
221     size_t insExtendLen = uploadData.insData.extend.size();
222     size_t updRecordLen = uploadData.updData.record.size();
223     size_t updExtendLen = uploadData.updData.extend.size();
224     size_t delRecordLen = uploadData.delData.record.size();
225     size_t delExtendLen = uploadData.delData.extend.size();
226 
227     bool syncDataValid = (uploadData.tableName == tableName) &&
228         ((insRecordLen > 0 && insExtendLen > 0 && insRecordLen == insExtendLen) ||
229         (updRecordLen > 0 && updExtendLen > 0 && updRecordLen == updExtendLen) ||
230         (delRecordLen > 0 && delExtendLen > 0 && delRecordLen == delExtendLen) ||
231         (uploadData.lockData.extend.size() > 0));
232     if (!syncDataValid) {
233         LOGE("[CloudSyncUtils] upload data is empty but upload count is not zero or upload table name"
234             " is not the same as table name of sync data.");
235         return -E_INTERNAL_ERROR;
236     }
237     int64_t syncDataCount = static_cast<int64_t>(insRecordLen) + static_cast<int64_t>(updRecordLen) +
238         static_cast<int64_t>(delRecordLen);
239     if (syncDataCount > count) {
240         LOGE("[CloudSyncUtils] Size of a batch of sync data is greater than upload data size. count %d", count);
241         return -E_INTERNAL_ERROR;
242     }
243     return E_OK;
244 }
245 
ClearCloudSyncData(CloudSyncData & uploadData)246 void CloudSyncUtils::ClearCloudSyncData(CloudSyncData &uploadData)
247 {
248     std::vector<VBucket>().swap(uploadData.insData.record);
249     std::vector<VBucket>().swap(uploadData.insData.extend);
250     std::vector<int64_t>().swap(uploadData.insData.rowid);
251     std::vector<VBucket>().swap(uploadData.updData.record);
252     std::vector<VBucket>().swap(uploadData.updData.extend);
253     std::vector<VBucket>().swap(uploadData.delData.record);
254     std::vector<VBucket>().swap(uploadData.delData.extend);
255 }
256 
GetWaterMarkAndUpdateTime(std::vector<VBucket> & extend,Timestamp & waterMark)257 int CloudSyncUtils::GetWaterMarkAndUpdateTime(std::vector<VBucket> &extend, Timestamp &waterMark)
258 {
259     for (auto &extendData: extend) {
260         if (extendData.empty() || extendData.find(CloudDbConstant::MODIFY_FIELD) == extendData.end()) {
261             LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist.");
262             return -E_INTERNAL_ERROR;
263         }
264         if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::MODIFY_FIELD).index()) {
265             LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doesn't fit int64_t.");
266             return -E_INTERNAL_ERROR;
267         }
268         if (extendData.empty() || extendData.find(CloudDbConstant::CREATE_FIELD) == extendData.end()) {
269             LOGE("[CloudSyncer] VBucket is empty or CREATE_FIELD doesn't exist.");
270             return -E_INTERNAL_ERROR;
271         }
272         if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::CREATE_FIELD).index()) {
273             LOGE("[CloudSyncer] VBucket's CREATE_FIELD doesn't fit int64_t.");
274             return -E_INTERNAL_ERROR;
275         }
276         waterMark = std::max(int64_t(waterMark), std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)));
277         int64_t modifyTime =
278             std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)) / CloudDbConstant::TEN_THOUSAND;
279         int64_t createTime =
280             std::get<int64_t>(extendData.at(CloudDbConstant::CREATE_FIELD)) / CloudDbConstant::TEN_THOUSAND;
281         extendData.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
282         extendData.insert_or_assign(CloudDbConstant::CREATE_FIELD, createTime);
283     }
284     return E_OK;
285 }
286 
CheckCloudSyncDataEmpty(const CloudSyncData & uploadData)287 bool CloudSyncUtils::CheckCloudSyncDataEmpty(const CloudSyncData &uploadData)
288 {
289     return uploadData.insData.extend.empty() && uploadData.insData.record.empty() &&
290         uploadData.updData.extend.empty() && uploadData.updData.record.empty() &&
291         uploadData.delData.extend.empty() && uploadData.delData.record.empty() &&
292         uploadData.lockData.rowid.empty();
293 }
294 
ModifyCloudDataTime(DistributedDB::VBucket & data)295 void CloudSyncUtils::ModifyCloudDataTime(DistributedDB::VBucket &data)
296 {
297     // data already check field modify_field and create_field
298     int64_t modifyTime = std::get<int64_t>(data[CloudDbConstant::MODIFY_FIELD]) * CloudDbConstant::TEN_THOUSAND;
299     int64_t createTime = std::get<int64_t>(data[CloudDbConstant::CREATE_FIELD]) * CloudDbConstant::TEN_THOUSAND;
300     data[CloudDbConstant::MODIFY_FIELD] = modifyTime;
301     data[CloudDbConstant::CREATE_FIELD] = createTime;
302 }
303 
304 // After doing a batch upload, we need to use CloudSyncData's maximum timestamp to update the water mark;
UpdateExtendTime(CloudSyncData & uploadData,const int64_t & count,uint64_t taskId,Timestamp & waterMark)305 int CloudSyncUtils::UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count, uint64_t taskId,
306     Timestamp &waterMark)
307 {
308     int ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, uploadData.tableName, count);
309     if (ret != E_OK) {
310         LOGE("[CloudSyncer] Invalid Sync Data when get local water mark.");
311         return ret;
312     }
313     if (!uploadData.insData.extend.empty()) {
314         if (uploadData.insData.record.size() != uploadData.insData.extend.size()) {
315             LOGE("[CloudSyncer] Inconsistent size of inserted data.");
316             return -E_INTERNAL_ERROR;
317         }
318         ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.insData.extend, waterMark);
319         if (ret != E_OK) {
320             return ret;
321         }
322     }
323 
324     if (!uploadData.updData.extend.empty()) {
325         if (uploadData.updData.record.size() != uploadData.updData.extend.size()) {
326             LOGE("[CloudSyncer] Inconsistent size of updated data, %d.", -E_INTERNAL_ERROR);
327             return -E_INTERNAL_ERROR;
328         }
329         ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.updData.extend, waterMark);
330         if (ret != E_OK) {
331             return ret;
332         }
333     }
334 
335     if (!uploadData.delData.extend.empty()) {
336         if (uploadData.delData.record.size() != uploadData.delData.extend.size()) {
337             LOGE("[CloudSyncer] Inconsistent size of deleted data, %d.", -E_INTERNAL_ERROR);
338             return -E_INTERNAL_ERROR;
339         }
340         ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.delData.extend, waterMark);
341         if (ret != E_OK) {
342             return ret;
343         }
344     }
345     return E_OK;
346 }
347 
UpdateLocalCache(OpType opType,const LogInfo & cloudInfo,const LogInfo & localInfo,std::map<std::string,LogInfo> & localLogInfoCache)348 void CloudSyncUtils::UpdateLocalCache(OpType opType, const LogInfo &cloudInfo, const LogInfo &localInfo,
349     std::map<std::string, LogInfo> &localLogInfoCache)
350 {
351     LogInfo updateLogInfo;
352     std::string hashKey(localInfo.hashKey.begin(), localInfo.hashKey.end());
353     bool updateCache = true;
354     switch (opType) {
355         case OpType::INSERT :
356         case OpType::UPDATE :
357         case OpType::DELETE: {
358             updateLogInfo = cloudInfo;
359             updateLogInfo.device = CloudDbConstant::DEFAULT_CLOUD_DEV;
360             updateLogInfo.hashKey = localInfo.hashKey;
361             if (opType == OpType::DELETE) {
362                 updateLogInfo.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
363             } else if (opType == OpType::INSERT) {
364                 updateLogInfo.originDev = CloudDbConstant::DEFAULT_CLOUD_DEV;
365             }
366             break;
367         }
368         case OpType::CLEAR_GID:
369         case OpType::UPDATE_TIMESTAMP: {
370             updateLogInfo = localInfo;
371             updateLogInfo.cloudGid.clear();
372             updateLogInfo.sharingResource.clear();
373             break;
374         }
375         default:
376             updateCache = false;
377             break;
378     }
379     if (updateCache) {
380         localLogInfoCache[hashKey] = updateLogInfo;
381     }
382 }
383 
SaveChangedData(ICloudSyncer::SyncParam & param,size_t dataIndex,const ICloudSyncer::DataInfo & dataInfo,std::vector<std::pair<Key,size_t>> & deletedList)384 int CloudSyncUtils::SaveChangedData(ICloudSyncer::SyncParam &param, size_t dataIndex,
385     const ICloudSyncer::DataInfo &dataInfo, std::vector<std::pair<Key, size_t>> &deletedList)
386 {
387     OpType opType = CalOpType(param, dataIndex);
388     Key hashKey = dataInfo.localInfo.logInfo.hashKey;
389     if (param.deletePrimaryKeySet.find(hashKey) != param.deletePrimaryKeySet.end()) {
390         if (opType == OpType::INSERT) {
391             (void)param.dupHashKeySet.insert(hashKey);
392             opType = OpType::UPDATE;
393             // only composite primary key needs to be processed.
394             if (!param.isSinglePrimaryKey) {
395                 param.withoutRowIdData.updateData.emplace_back(dataIndex,
396                     param.changedData.primaryData[ChangeType::OP_UPDATE].size());
397             }
398         }
399     }
400     // INSERT: for no primary key or composite primary key situation
401     if (!param.isSinglePrimaryKey && opType == OpType::INSERT) {
402         param.info.downLoadInfo.insertCount++;
403         param.withoutRowIdData.insertData.push_back(dataIndex);
404         return E_OK;
405     }
406     switch (opType) {
407         // INSERT: only for single primary key situation
408         case OpType::INSERT:
409             param.info.downLoadInfo.insertCount++;
410             param.info.retryInfo.downloadBatchOpCount++;
411             return CloudSyncUtils::SaveChangedDataByType(
412                 param.downloadData.data[dataIndex], param.changedData, dataInfo.localInfo, ChangeType::OP_INSERT);
413         case OpType::UPDATE:
414             param.info.downLoadInfo.updateCount++;
415             param.info.retryInfo.downloadBatchOpCount++;
416             if (CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
417                 return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
418                     dataInfo.localInfo, ChangeType::OP_UPDATE);
419             }
420             return E_OK;
421         case OpType::DELETE:
422             param.info.downLoadInfo.deleteCount++;
423             param.info.retryInfo.downloadBatchOpCount++;
424             return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
425                 dataInfo.localInfo, ChangeType::OP_DELETE);
426         case OpType::UPDATE_TIMESTAMP:
427             param.info.retryInfo.downloadBatchOpCount++;
428             return E_OK;
429         default:
430             return E_OK;
431     }
432 }
433 
ClearWithoutData(ICloudSyncer::SyncParam & param)434 void CloudSyncUtils::ClearWithoutData(ICloudSyncer::SyncParam &param)
435 {
436     param.withoutRowIdData.insertData.clear();
437     param.withoutRowIdData.updateData.clear();
438     param.withoutRowIdData.assetInsertData.clear();
439 }
440 
IsSkipAssetsMissingRecord(const std::vector<VBucket> & extend)441 bool CloudSyncUtils::IsSkipAssetsMissingRecord(const std::vector<VBucket> &extend)
442 {
443     if (extend.empty()) {
444         return false;
445     }
446     for (size_t i = 0; i < extend.size(); ++i) {
447         if (DBCommon::IsIntTypeRecordError(extend[i]) && !DBCommon::IsRecordAssetsMissing(extend[i])) {
448             return false;
449         }
450     }
451     return true;
452 }
453 
FillAssetIdToAssets(CloudSyncBatch & data,int errorCode,const CloudWaterType & type)454 int CloudSyncUtils::FillAssetIdToAssets(CloudSyncBatch &data, int errorCode, const CloudWaterType &type)
455 {
456     if (data.extend.size() != data.assets.size()) {
457         LOGE("[CloudSyncUtils] size not match, extend:%zu assets:%zu.", data.extend.size(), data.assets.size());
458         return -E_CLOUD_ERROR;
459     }
460     int errCode = E_OK;
461     for (size_t i = 0; i < data.assets.size(); i++) {
462         if (data.assets[i].empty() || DBCommon::IsRecordIgnored(data.extend[i]) ||
463             (errorCode != E_OK &&
464                 (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordAssetsMissing(data.extend[i]))) ||
465             DBCommon::IsNeedCompensatedForUpload(data.extend[i], type)) {
466             if (errCode != E_OK && DBCommon::IsRecordAssetsMissing(data.extend[i])) {
467                 LOGI("[CloudSyncUtils][FileAssetIdToAssets] errCode with assets missing, skip fill assets id");
468             }
469             continue;
470         }
471         for (auto it = data.assets[i].begin(); it != data.assets[i].end();) {
472             auto &[col, value] = *it;
473             if (!CheckIfContainsInsertAssets(value)) {
474                 ++it;
475                 continue;
476             }
477             auto extendIt = data.extend[i].find(col);
478             if (extendIt == data.extend[i].end()) {
479                 LOGI("[CloudSyncUtils] Asset field name can not find in extend. key is:%s.", col.c_str());
480                 it = data.assets[i].erase(it);
481                 continue;
482             }
483             if (extendIt->second.index() != value.index()) {
484                 LOGE("[CloudSyncUtils] Asset field type not same. extend:%zu, data:%zu",
485                     extendIt->second.index(), value.index());
486                 errCode = -E_CLOUD_ERROR;
487                 ++it;
488                 continue;
489             }
490             int ret = FillAssetIdToAssetData(extendIt->second, value);
491             if (ret != E_OK) {
492                 LOGE("[CloudSyncUtils] fail to fill assetId, %d.", ret);
493                 errCode = -E_CLOUD_ERROR;
494             }
495             ++it;
496         }
497     }
498     return errCode;
499 }
500 
FillAssetIdToAssetData(const Type & extend,Type & assetData)501 int CloudSyncUtils::FillAssetIdToAssetData(const Type &extend, Type &assetData)
502 {
503     if (extend.index() == TYPE_INDEX<Asset>) {
504         if (std::get<Asset>(assetData).name != std::get<Asset>(extend).name) {
505             LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset name can not find in extend.");
506             return -E_CLOUD_ERROR;
507         }
508         if (std::get<Asset>(extend).assetId.empty()) {
509             LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset id is empty.");
510             return -E_CLOUD_ERROR;
511         }
512         std::get<Asset>(assetData).assetId = std::get<Asset>(extend).assetId;
513     }
514     if (extend.index() == TYPE_INDEX<Assets>) {
515         FillAssetIdToAssetsData(std::get<Assets>(extend), std::get<Assets>(assetData));
516     }
517     return E_OK;
518 }
519 
FillAssetIdToAssetsData(const Assets & extend,Assets & assets)520 void CloudSyncUtils::FillAssetIdToAssetsData(const Assets &extend, Assets &assets)
521 {
522     for (auto it = assets.begin(); it != assets.end();) {
523         auto &asset = *it;
524         if (asset.flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
525             ++it;
526             continue;
527         }
528         auto extendAssets = extend;
529         bool isAssetExisted = false;
530         for (const auto &extendAsset : extendAssets) {
531             if (asset.name == extendAsset.name && !extendAsset.assetId.empty()) {
532                 asset.assetId = extendAsset.assetId;
533                 isAssetExisted = true;
534                 break;
535             }
536         }
537         if (!isAssetExisted) {
538             LOGI("Unable to sync local asset, skip fill assetId.");
539             it = assets.erase(it);
540         } else {
541             ++it;
542         }
543     }
544 }
545 
CheckIfContainsInsertAssets(const Type & assetData)546 bool CloudSyncUtils::CheckIfContainsInsertAssets(const Type &assetData)
547 {
548     if (assetData.index() == TYPE_INDEX<Asset>) {
549         if (std::get<Asset>(assetData).flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
550             return false;
551         }
552     } else if (assetData.index() == TYPE_INDEX<Assets>) {
553         bool hasInsertAsset = false;
554         for (const auto &asset : std::get<Assets>(assetData)) {
555             if (asset.flag == static_cast<uint32_t>(AssetOpType::INSERT)) {
556                 hasInsertAsset = true;
557                 break;
558             }
559         }
560         if (!hasInsertAsset) {
561             return false;
562         }
563     }
564     return true;
565 }
566 
UpdateAssetsFlag(CloudSyncData & uploadData)567 void CloudSyncUtils::UpdateAssetsFlag(CloudSyncData &uploadData)
568 {
569     AssetOperationUtils::UpdateAssetsFlag(uploadData.insData.record, uploadData.insData.assets);
570     AssetOperationUtils::UpdateAssetsFlag(uploadData.updData.record, uploadData.updData.assets);
571     AssetOperationUtils::UpdateAssetsFlag(uploadData.delData.record, uploadData.delData.assets);
572 }
573 
InsertOrReplaceChangedDataByType(ChangeType type,std::vector<Type> & pkVal,ChangedData & changedData)574 void CloudSyncUtils::InsertOrReplaceChangedDataByType(ChangeType type, std::vector<Type> &pkVal,
575     ChangedData &changedData)
576 {
577     // erase old changedData if exist
578     for (auto &changePkValList : changedData.primaryData) {
579         changePkValList.erase(std::remove_if(changePkValList.begin(), changePkValList.end(),
580             [&pkVal](const std::vector<Type> &existPkVal) {
581             return existPkVal == pkVal;
582             }), changePkValList.end());
583     }
584     // insert new changeData
585     changedData.primaryData[type].emplace_back(std::move(pkVal));
586 }
587 
CalOpType(ICloudSyncer::SyncParam & param,size_t dataIndex)588 OpType CloudSyncUtils::CalOpType(ICloudSyncer::SyncParam &param, size_t dataIndex)
589 {
590     OpType opType = param.downloadData.opType[dataIndex];
591     if (opType != OpType::INSERT && opType != OpType::UPDATE) {
592         return opType;
593     }
594 
595     std::vector<Type> cloudPkVal;
596     // use dataIndex as dataKey avoid get same pk with no pk schema
597     int errCode = CloudSyncUtils::GetCloudPkVals(param.downloadData.data[dataIndex], param.changedData.field, dataIndex,
598         cloudPkVal);
599     if (errCode != E_OK) {
600         LOGW("[CloudSyncUtils] Get pk from download data failed %d", errCode);
601         // use origin opType
602         return opType;
603     }
604     auto iter = std::find_if(param.insertPk.begin(), param.insertPk.end(), [&cloudPkVal](const auto &item) {
605         return item == cloudPkVal;
606     });
607     if (opType == OpType::INSERT) {
608         // record all insert pk in one batch
609         if (iter == param.insertPk.end()) {
610             param.insertPk.push_back(cloudPkVal);
611         }
612         return OpType::INSERT;
613     }
614     // notify with insert because this data not exist in local before query
615     return (iter == param.insertPk.end()) ? OpType::UPDATE : OpType::INSERT;
616 }
617 
InitCompensatedSyncTaskInfo()618 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo()
619 {
620     CloudSyncer::CloudTaskInfo taskInfo;
621     taskInfo.priorityTask = true;
622     taskInfo.priorityLevel = CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL;
623     taskInfo.timeout = CloudDbConstant::CLOUD_DEFAULT_TIMEOUT;
624     taskInfo.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
625     taskInfo.callback = nullptr;
626     taskInfo.compensatedTask = true;
627     return taskInfo;
628 }
629 
InitCompensatedSyncTaskInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess)630 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncOption &option,
631     const SyncProcessCallback &onProcess)
632 {
633     CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
634     taskInfo.callback = onProcess;
635     taskInfo.devices = option.devices;
636     taskInfo.prepareTraceId = option.prepareTraceId;
637     if (option.users.empty()) {
638         taskInfo.users.push_back("");
639     } else {
640         taskInfo.users = option.users;
641     }
642     taskInfo.lockAction = option.lockAction;
643     return taskInfo;
644 }
645 
InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo & oriTaskInfo)646 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo &oriTaskInfo)
647 {
648     CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
649     taskInfo.lockAction = oriTaskInfo.lockAction;
650     taskInfo.users = oriTaskInfo.users;
651     taskInfo.devices = oriTaskInfo.devices;
652     taskInfo.storeId = oriTaskInfo.storeId;
653     taskInfo.prepareTraceId = oriTaskInfo.prepareTraceId;
654     return taskInfo;
655 }
656 
CheckQueryCloudData(std::string & traceId,DownloadData & downloadData,std::vector<std::string> & pkColNames)657 void CloudSyncUtils::CheckQueryCloudData(std::string &traceId, DownloadData &downloadData,
658     std::vector<std::string> &pkColNames)
659 {
660     for (auto &data : downloadData.data) {
661         bool isVersionExist = data.count(CloudDbConstant::VERSION_FIELD) != 0;
662         bool isContainAllPk = true;
663         for (auto &pkColName : pkColNames) {
664             if (data.count(pkColName) == 0) {
665                 isContainAllPk = false;
666                 break;
667             }
668         }
669         std::string gid;
670         (void)CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, data, gid);
671         if (!isVersionExist || !isContainAllPk) {
672             LOGE("[CloudSyncer] Invalid data from cloud, no version[%d], lost primary key[%d], gid[%s], traceId[%s]",
673                 static_cast<int>(!isVersionExist), static_cast<int>(!isContainAllPk), gid.c_str(), traceId.c_str());
674         }
675     }
676 }
677 
IsNeedUpdateAsset(const VBucket & data)678 bool CloudSyncUtils::IsNeedUpdateAsset(const VBucket &data)
679 {
680     for (const auto &item : data) {
681         const Asset *asset = std::get_if<TYPE_INDEX<Asset>>(&item.second);
682         if (asset != nullptr) {
683             uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(asset->status);
684             if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
685                 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
686                 return true;
687             }
688             continue;
689         }
690         const Assets *assets = std::get_if<TYPE_INDEX<Assets>>(&item.second);
691         if (assets == nullptr) {
692             continue;
693         }
694         for (const auto &oneAsset : *assets) {
695             uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(oneAsset.status);
696             if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
697                 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
698                 return true;
699             }
700         }
701     }
702     return false;
703 }
704 
GetDownloadListByGid(const std::shared_ptr<StorageProxy> & proxy,const std::vector<std::string> & data,const std::string & table)705 std::tuple<int, DownloadList, ChangedData> CloudSyncUtils::GetDownloadListByGid(
706     const std::shared_ptr<StorageProxy> &proxy, const std::vector<std::string> &data, const std::string &table)
707 {
708     std::tuple<int, DownloadList, ChangedData> res;
709     std::vector<std::string> pkColNames;
710     std::vector<Field> assetFields;
711     auto &[errCode, downloadList, changeData] = res;
712     errCode = proxy->GetPrimaryColNamesWithAssetsFields(table, pkColNames, assetFields);
713     if (errCode != E_OK) {
714         LOGE("[CloudSyncUtils] Get %s pk names by failed %d", DBCommon::StringMiddleMasking(table).c_str(), errCode);
715         return res;
716     }
717     changeData.tableName = table;
718     changeData.type = ChangedDataType::ASSET;
719     changeData.field = pkColNames;
720     for (const auto &gid : data) {
721         VBucket assetInfo;
722         VBucket record;
723         record[CloudDbConstant::GID_FIELD] = gid;
724         DataInfoWithLog dataInfo;
725         errCode = proxy->GetInfoByPrimaryKeyOrGid(table, record, false, dataInfo, assetInfo);
726         if (errCode != E_OK) {
727             LOGE("[CloudSyncUtils] Get download list by gid failed %s %d", gid.c_str(), errCode);
728             break;
729         }
730         Type prefix;
731         std::vector<Type> pkVal;
732         OpType strategy = OpType::UPDATE;
733         errCode = CloudSyncUtils::GetCloudPkVals(dataInfo.primaryKeys, pkColNames, dataInfo.logInfo.dataKey, pkVal);
734         if (errCode != E_OK) {
735             LOGE("[CloudSyncUtils] HandleTagAssets cannot get primary key value list. %d", errCode);
736             break;
737         }
738         if (IsSinglePrimaryKey(pkColNames) && !pkVal.empty()) {
739             prefix = pkVal[0];
740         }
741         auto assetsMap = AssetOperationUtils::FilterNeedDownloadAsset(assetInfo);
742         downloadList.push_back(
743             std::make_tuple(dataInfo.logInfo.cloudGid, prefix, strategy, assetsMap, dataInfo.logInfo.hashKey,
744                 pkVal, dataInfo.logInfo.timestamp));
745     }
746     return res;
747 }
748 
UpdateMaxTimeWithDownloadList(const DownloadList & downloadList,const std::string & table,std::map<std::string,int64_t> & downloadBeginTime)749 void CloudSyncUtils::UpdateMaxTimeWithDownloadList(const DownloadList &downloadList, const std::string &table,
750     std::map<std::string, int64_t> &downloadBeginTime)
751 {
752     auto origin = downloadBeginTime[table];
753     for (const auto &item : downloadList) {
754         auto timestamp = std::get<CloudSyncUtils::TIMESTAMP_INDEX>(item);
755         downloadBeginTime[table] = std::max(static_cast<int64_t>(timestamp), downloadBeginTime[table]);
756     }
757     if (downloadBeginTime[table] == origin) {
758         downloadBeginTime[table]++;
759     }
760 }
761 
IsContainDownloading(const DownloadAssetUnit & downloadAssetUnit)762 bool CloudSyncUtils::IsContainDownloading(const DownloadAssetUnit &downloadAssetUnit)
763 {
764     auto &assets = std::get<CloudSyncUtils::ASSETS_INDEX>(downloadAssetUnit);
765     for (const auto &item : assets) {
766         for (const auto &asset : item.second) {
767             if ((AssetOperationUtils::EraseBitMask(asset.status) & static_cast<uint32_t>(AssetStatus::DOWNLOADING))
768                 != 0) {
769                 return true;
770             }
771         }
772     }
773     return false;
774 }
775 
GetDownloadAssetsOnlyMapFromDownLoadData(size_t idx,ICloudSyncer::SyncParam & param,std::map<std::string,Assets> & downloadAssetsMap)776 int CloudSyncUtils::GetDownloadAssetsOnlyMapFromDownLoadData(
777     size_t idx, ICloudSyncer::SyncParam &param, std::map<std::string, Assets> &downloadAssetsMap)
778 {
779     std::string gid;
780     int errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(
781         CloudDbConstant::GID_FIELD, param.downloadData.data[idx], gid);
782     if (errCode != E_OK) {
783         LOGE("Get gid from bucket fail when get download assets only map from download data, error code %d", errCode);
784         return errCode;
785     }
786 
787     auto assetsMap = param.gidAssetsMap[gid];
788     for (auto &item : param.downloadData.data[idx]) {
789         auto findAssetList = assetsMap.find(item.first);
790         if (findAssetList == assetsMap.end()) {
791             continue;
792         }
793         Asset *asset = std::get_if<Asset>(&item.second);
794         if (asset != nullptr) {
795             auto matchName = std::find_if(findAssetList->second.begin(),
796                 findAssetList->second.end(),
797                 [&asset](const std::string &a) { return a == asset->name; });
798             if (matchName != findAssetList->second.end()) {
799                 Asset tmpAsset = *asset;
800                 tmpAsset.status = static_cast<uint32_t>(AssetStatus::UPDATE);
801                 tmpAsset.flag = static_cast<uint32_t>(AssetOpType::UPDATE);
802                 downloadAssetsMap[item.first].push_back(tmpAsset);
803             }
804             continue;
805         }
806         Assets *assets = std::get_if<Assets>(&item.second);
807         if (assets == nullptr) {
808             continue;
809         }
810         for (const auto &assetItem : (*assets)) {
811             auto matchName = std::find_if(findAssetList->second.begin(),
812                 findAssetList->second.end(),
813                 [&assetItem](const std::string &a) { return a == assetItem.name; });
814             if (matchName != findAssetList->second.end()) {
815                 Asset tmpAsset = assetItem;
816                 tmpAsset.status = static_cast<uint32_t>(AssetStatus::UPDATE);
817                 tmpAsset.flag = static_cast<uint32_t>(AssetOpType::UPDATE);
818                 downloadAssetsMap[item.first].push_back(tmpAsset);
819             }
820         }
821     }
822     return E_OK;
823 }
824 
NotifyChangeData(const std::string & dev,const std::shared_ptr<StorageProxy> & proxy,ChangedData && changedData)825 int CloudSyncUtils::NotifyChangeData(const std::string &dev, const std::shared_ptr<StorageProxy> &proxy,
826     ChangedData &&changedData)
827 {
828     int ret = proxy->NotifyChangedData(dev, std::move(changedData));
829     if (ret != E_OK) {
830         DBDfxAdapter::ReportBehavior(
831             {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, ret});
832         LOGE("[CloudSyncer] Cannot notify changed data while downloading, %d.", ret);
833     } else {
834         DBDfxAdapter::ReportBehavior(
835             {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::SUCC, ret});
836     }
837     return ret;
838 }
839 
GetQueryAndUsersForCompensatedSync(bool isQueryDownloadRecords,std::shared_ptr<StorageProxy> & storageProxy,std::vector<std::string> & users,std::vector<QuerySyncObject> & syncQuery)840 int CloudSyncUtils::GetQueryAndUsersForCompensatedSync(bool isQueryDownloadRecords,
841     std::shared_ptr<StorageProxy> &storageProxy, std::vector<std::string> &users,
842     std::vector<QuerySyncObject> &syncQuery)
843 {
844     int errCode = storageProxy->GetCompensatedSyncQuery(syncQuery, users, isQueryDownloadRecords);
845     if (errCode != E_OK) {
846         LOGW("[CloudSyncer] get query for compensated sync failed! errCode = %d", errCode);
847         return errCode;
848     }
849     if (syncQuery.empty()) {
850         LOGD("[CloudSyncer] Not need generate compensated sync");
851     }
852     return E_OK;
853 }
854 
GetUserListForCompensatedSync(CloudDBProxy & cloudDB,const std::vector<std::string> & users,std::vector<std::string> & userList)855 void CloudSyncUtils::GetUserListForCompensatedSync(
856     CloudDBProxy &cloudDB, const std::vector<std::string> &users, std::vector<std::string> &userList)
857 {
858     auto cloudDBs = cloudDB.GetCloudDB();
859     if (cloudDBs.empty()) {
860         LOGW("[CloudSyncer][GetUserListForCompensatedSync] not set cloud db");
861         return;
862     }
863     for (auto &[user, cloudDb] : cloudDBs) {
864         auto it = std::find(users.begin(), users.end(), user);
865         if (it != users.end()) {
866             userList.push_back(user);
867         }
868     }
869 }
870 
SetAssetsMapByCloudGid(std::vector<std::string> & cloudGid,const AssetsMap & groupAssetsMap,std::map<std::string,AssetsMap> & gidAssetsMap)871 bool CloudSyncUtils::SetAssetsMapByCloudGid(
872     std::vector<std::string> &cloudGid, const AssetsMap &groupAssetsMap, std::map<std::string, AssetsMap> &gidAssetsMap)
873 {
874     bool isFindOneRecord = false;
875     for (auto &iter : cloudGid) {
876         auto gidIter = gidAssetsMap.find(iter);
877         if (gidIter == gidAssetsMap.end()) {
878             continue;
879         }
880         for (const auto &pair : groupAssetsMap) {
881             if (gidIter->second.find(pair.first) == gidIter->second.end()) {
882                 gidIter->second[pair.first] = pair.second;
883             } else {
884                 // merge assets
885                 gidIter->second[pair.first].insert(pair.second.begin(), pair.second.end());
886             }
887         }
888         isFindOneRecord = true;
889     }
890     return isFindOneRecord;
891 }
892 
CheckAssetsOnlyIsEmptyInGroup(const std::map<std::string,AssetsMap> & gidAssetsMap,const AssetsMap & assetsMap)893 bool CloudSyncUtils::CheckAssetsOnlyIsEmptyInGroup(
894     const std::map<std::string, AssetsMap> &gidAssetsMap, const AssetsMap &assetsMap)
895 {
896     if (gidAssetsMap.empty()) {
897         return true;
898     }
899     for (const auto &item : gidAssetsMap) {
900         const auto &gidAssets = item.second;
901         if (gidAssets.empty()) {
902             return true;
903         }
904         bool isMatch = true;
905         for (const auto &assets : assetsMap) {
906             auto iter = gidAssets.find(assets.first);
907             if (iter == gidAssets.end()) {
908                 isMatch = false;
909                 break;
910             }
911             if (!std::includes(iter->second.begin(), iter->second.end(), assets.second.begin(), assets.second.end())) {
912                 isMatch = false;
913                 break;
914             }
915         }
916         if (isMatch) {
917             // find one match, so group is not empty.
918             return false;
919         }
920     }
921     return true;
922 }
923 
IsAssetOnlyData(VBucket & queryData,AssetsMap & assetsMap,bool isDownloading)924 bool CloudSyncUtils::IsAssetOnlyData(VBucket &queryData, AssetsMap &assetsMap, bool isDownloading)
925 {
926     if (assetsMap.empty()) {
927         return false;
928     }
929     for (auto &item : assetsMap) {
930         auto &assetNameList = item.second;
931         auto findAssetField = queryData.find(item.first);
932         if (findAssetField == queryData.end() || assetNameList.empty()) {
933             // if not find asset field or assetNameList is empty, mean this is not asset only data.
934             return false;
935         }
936 
937         Asset *asset = std::get_if<Asset>(&(findAssetField->second));
938         if (asset != nullptr) {
939             // if is Asset type, assetNameList size must be 1.
940             if (assetNameList.size() != 1u || *(assetNameList.begin()) != asset->name ||
941                 asset->status == AssetStatus::DELETE) {
942                 // if data is delele, also not asset only data.
943                 return false;
944             }
945             if (isDownloading) {
946                 asset->status = static_cast<uint32_t>(AssetStatus::DOWNLOADING);
947             }
948             continue;
949         }
950 
951         Assets *assets = std::get_if<Assets>(&(findAssetField->second));
952         if (assets == nullptr) {
953             return false;
954         }
955         for (auto &assetName : assetNameList) {
956             auto findAsset = std::find_if(
957                 assets->begin(), assets->end(), [&assetName](const Asset &a) { return a.name == assetName; });
958             if (findAsset == assets->end() || (*findAsset).status == AssetStatus::DELETE) {
959                 // if data is delele, also not asset only data.
960                 return false;
961             }
962             if (isDownloading) {
963                 (*findAsset).status = AssetStatus::DOWNLOADING;
964             }
965         }
966     }
967     return true;
968 }
969 }