1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud/cloud_sync_utils.h"
16
17 #include "cloud/asset_operation_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_storage_utils.h"
20 #include "db_common.h"
21 #include "db_dfx_adapter.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24
25 namespace DistributedDB {
GetCloudPkVals(const VBucket & datum,const std::vector<std::string> & pkColNames,int64_t dataKey,std::vector<Type> & cloudPkVals)26 int CloudSyncUtils::GetCloudPkVals(const VBucket &datum, const std::vector<std::string> &pkColNames, int64_t dataKey,
27 std::vector<Type> &cloudPkVals)
28 {
29 if (!cloudPkVals.empty()) {
30 LOGE("[CloudSyncer] Output parameter should be empty");
31 return -E_INVALID_ARGS;
32 }
33 for (const auto &pkColName : pkColNames) {
34 // If data is primary key or is a composite primary key, then use rowID as value
35 // The single primary key table, does not contain rowid.
36 if (pkColName == DBConstant::ROWID) {
37 cloudPkVals.emplace_back(dataKey);
38 continue;
39 }
40 Type type;
41 bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive(pkColName, datum, type);
42 if (!isExisted) {
43 LOGE("[CloudSyncer] Cloud data do not contain expected primary field value");
44 return -E_CLOUD_ERROR;
45 }
46 cloudPkVals.push_back(type);
47 }
48 return E_OK;
49 }
50
OpTypeToChangeType(OpType strategy)51 ChangeType CloudSyncUtils::OpTypeToChangeType(OpType strategy)
52 {
53 switch (strategy) {
54 case OpType::INSERT:
55 return OP_INSERT;
56 case OpType::DELETE:
57 return OP_DELETE;
58 case OpType::UPDATE:
59 return OP_UPDATE;
60 default:
61 return OP_BUTT;
62 }
63 }
64
IsSinglePrimaryKey(const std::vector<std::string> & pkColNames)65 bool CloudSyncUtils::IsSinglePrimaryKey(const std::vector<std::string> &pkColNames)
66 {
67 return pkColNames.size() == 1 && pkColNames[0] != DBConstant::ROWID;
68 }
69
RemoveDataExceptExtendInfo(VBucket & datum,const std::vector<std::string> & pkColNames)70 void CloudSyncUtils::RemoveDataExceptExtendInfo(VBucket &datum, const std::vector<std::string> &pkColNames)
71 {
72 for (auto item = datum.begin(); item != datum.end();) {
73 const auto &key = item->first;
74 if (key != CloudDbConstant::GID_FIELD &&
75 key != CloudDbConstant::CREATE_FIELD &&
76 key != CloudDbConstant::MODIFY_FIELD &&
77 key != CloudDbConstant::DELETE_FIELD &&
78 key != CloudDbConstant::CURSOR_FIELD &&
79 key != CloudDbConstant::VERSION_FIELD &&
80 key != CloudDbConstant::SHARING_RESOURCE_FIELD &&
81 (std::find(pkColNames.begin(), pkColNames.end(), key) == pkColNames.end())) {
82 item = datum.erase(item);
83 } else {
84 item++;
85 }
86 }
87 }
88
StatusToFlag(AssetStatus status)89 AssetOpType CloudSyncUtils::StatusToFlag(AssetStatus status)
90 {
91 auto tmpStatus = static_cast<AssetStatus>(AssetOperationUtils::EraseBitMask(static_cast<uint32_t>(status)));
92 switch (tmpStatus) {
93 case AssetStatus::INSERT:
94 return AssetOpType::INSERT;
95 case AssetStatus::DELETE:
96 return AssetOpType::DELETE;
97 case AssetStatus::UPDATE:
98 return AssetOpType::UPDATE;
99 case AssetStatus::NORMAL:
100 return AssetOpType::NO_CHANGE;
101 default:
102 LOGW("[CloudSyncer] Unexpected Situation and won't be handled"
103 ", Caller should ensure that current situation won't occur");
104 return AssetOpType::NO_CHANGE;
105 }
106 }
107
StatusToFlagForAsset(Asset & asset)108 void CloudSyncUtils::StatusToFlagForAsset(Asset &asset)
109 {
110 asset.flag = static_cast<uint32_t>(StatusToFlag(static_cast<AssetStatus>(asset.status)));
111 asset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
112 }
113
StatusToFlagForAssets(Assets & assets)114 void CloudSyncUtils::StatusToFlagForAssets(Assets &assets)
115 {
116 for (Asset &asset : assets) {
117 StatusToFlagForAsset(asset);
118 }
119 }
120
StatusToFlagForAssetsInRecord(const std::vector<Field> & fields,VBucket & record)121 void CloudSyncUtils::StatusToFlagForAssetsInRecord(const std::vector<Field> &fields, VBucket &record)
122 {
123 for (const Field &field : fields) {
124 if (field.type == TYPE_INDEX<Assets> && record[field.colName].index() == TYPE_INDEX<Assets>) {
125 StatusToFlagForAssets(std::get<Assets>(record[field.colName]));
126 } else if (field.type == TYPE_INDEX<Asset> && record[field.colName].index() == TYPE_INDEX<Asset>) {
127 StatusToFlagForAsset(std::get<Asset>(record[field.colName]));
128 }
129 }
130 }
131
IsChangeDataEmpty(const ChangedData & changedData)132 bool CloudSyncUtils::IsChangeDataEmpty(const ChangedData &changedData)
133 {
134 return changedData.primaryData[ChangeType::OP_INSERT].empty() ||
135 changedData.primaryData[ChangeType::OP_UPDATE].empty() ||
136 changedData.primaryData[ChangeType::OP_DELETE].empty();
137 }
138
EqualInMsLevel(const Timestamp cmp,const Timestamp beCmp)139 bool CloudSyncUtils::EqualInMsLevel(const Timestamp cmp, const Timestamp beCmp)
140 {
141 return (cmp / CloudDbConstant::TEN_THOUSAND) == (beCmp / CloudDbConstant::TEN_THOUSAND);
142 }
143
NeedSaveData(const LogInfo & localLogInfo,const LogInfo & cloudLogInfo)144 bool CloudSyncUtils::NeedSaveData(const LogInfo &localLogInfo, const LogInfo &cloudLogInfo)
145 {
146 // If timeStamp, write timestamp, cloudGid are all the same,
147 // We thought that the datum is mostly be the same between cloud and local
148 // However, there are still slightly possibility that it may be created from different device,
149 // So, during the strategy policy [i.e. TagSyncDataStatus], the datum was tagged as UPDATE
150 // But we won't notify the datum
151 bool isSame = localLogInfo.timestamp == cloudLogInfo.timestamp &&
152 EqualInMsLevel(localLogInfo.wTimestamp, cloudLogInfo.wTimestamp) &&
153 localLogInfo.cloudGid == cloudLogInfo.cloudGid &&
154 localLogInfo.sharingResource == cloudLogInfo.sharingResource &&
155 localLogInfo.version == cloudLogInfo.version &&
156 (localLogInfo.flag & static_cast<uint64_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC)) == 0 &&
157 !localLogInfo.isNeedUpdateAsset;
158 return !isSame;
159 }
160
CheckParamValid(const std::vector<DeviceID> & devices,SyncMode mode)161 int CloudSyncUtils::CheckParamValid(const std::vector<DeviceID> &devices, SyncMode mode)
162 {
163 if (devices.size() != 1) {
164 LOGE("[CloudSyncer] invalid devices size %zu", devices.size());
165 return -E_INVALID_ARGS;
166 }
167 for (const auto &dev: devices) {
168 if (dev.empty() || dev.size() > DBConstant::MAX_DEV_LENGTH) {
169 LOGE("[CloudSyncer] invalid device, size %zu", dev.size());
170 return -E_INVALID_ARGS;
171 }
172 }
173 if (mode >= SyncMode::SYNC_MODE_PUSH_ONLY && mode < SyncMode::SYNC_MODE_CLOUD_MERGE) {
174 LOGE("[CloudSyncer] not support mode %d", static_cast<int>(mode));
175 return -E_NOT_SUPPORT;
176 }
177 if (mode < SyncMode::SYNC_MODE_PUSH_ONLY || mode > SyncMode::SYNC_MODE_CLOUD_FORCE_PULL) {
178 LOGE("[CloudSyncer] invalid mode %d", static_cast<int>(mode));
179 return -E_INVALID_ARGS;
180 }
181 return E_OK;
182 }
183
GetCloudLogInfo(DistributedDB::VBucket & datum)184 LogInfo CloudSyncUtils::GetCloudLogInfo(DistributedDB::VBucket &datum)
185 {
186 LogInfo cloudLogInfo;
187 cloudLogInfo.dataKey = 0;
188 cloudLogInfo.timestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::MODIFY_FIELD]);
189 cloudLogInfo.wTimestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::CREATE_FIELD]);
190 cloudLogInfo.flag = (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) ? 1u : 0u;
191 cloudLogInfo.cloudGid = std::get<std::string>(datum[CloudDbConstant::GID_FIELD]);
192 CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, datum, cloudLogInfo.device);
193 (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
194 datum, cloudLogInfo.sharingResource);
195 (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
196 datum, cloudLogInfo.version);
197 return cloudLogInfo;
198 }
199
SaveChangedDataByType(const VBucket & datum,ChangedData & changedData,const DataInfoWithLog & localInfo,ChangeType type)200 int CloudSyncUtils::SaveChangedDataByType(const VBucket &datum, ChangedData &changedData,
201 const DataInfoWithLog &localInfo, ChangeType type)
202 {
203 int ret = E_OK;
204 std::vector<Type> cloudPkVals;
205 if (type == ChangeType::OP_DELETE) {
206 ret = CloudSyncUtils::GetCloudPkVals(localInfo.primaryKeys, changedData.field, localInfo.logInfo.dataKey,
207 cloudPkVals);
208 } else {
209 ret = CloudSyncUtils::GetCloudPkVals(datum, changedData.field, localInfo.logInfo.dataKey, cloudPkVals);
210 }
211 if (ret != E_OK) {
212 return ret;
213 }
214 InsertOrReplaceChangedDataByType(type, cloudPkVals, changedData);
215 return E_OK;
216 }
217
CheckCloudSyncDataValid(const CloudSyncData & uploadData,const std::string & tableName,int64_t count)218 int CloudSyncUtils::CheckCloudSyncDataValid(const CloudSyncData &uploadData, const std::string &tableName,
219 int64_t count)
220 {
221 size_t insRecordLen = uploadData.insData.record.size();
222 size_t insExtendLen = uploadData.insData.extend.size();
223 size_t updRecordLen = uploadData.updData.record.size();
224 size_t updExtendLen = uploadData.updData.extend.size();
225 size_t delRecordLen = uploadData.delData.record.size();
226 size_t delExtendLen = uploadData.delData.extend.size();
227
228 bool syncDataValid = (uploadData.tableName == tableName) &&
229 ((insRecordLen > 0 && insExtendLen > 0 && insRecordLen == insExtendLen) ||
230 (updRecordLen > 0 && updExtendLen > 0 && updRecordLen == updExtendLen) ||
231 (delRecordLen > 0 && delExtendLen > 0 && delRecordLen == delExtendLen) ||
232 (uploadData.lockData.extend.size() > 0));
233 if (!syncDataValid) {
234 LOGE("[CloudSyncUtils] upload data is empty but upload count is not zero or upload table name"
235 " is not the same as table name of sync data.");
236 return -E_INTERNAL_ERROR;
237 }
238 int64_t syncDataCount = static_cast<int64_t>(insRecordLen) + static_cast<int64_t>(updRecordLen) +
239 static_cast<int64_t>(delRecordLen);
240 if (syncDataCount > count) {
241 LOGW("[CloudSyncUtils] Size of a batch of sync data is greater than upload data size. insRecordLen:%zu, "
242 "updRecordLen:%zu, delRecordLen:%zu, count %d", insRecordLen, updRecordLen, delRecordLen, count);
243 }
244 return E_OK;
245 }
246
ClearCloudSyncData(CloudSyncData & uploadData)247 void CloudSyncUtils::ClearCloudSyncData(CloudSyncData &uploadData)
248 {
249 std::vector<VBucket>().swap(uploadData.insData.record);
250 std::vector<VBucket>().swap(uploadData.insData.extend);
251 std::vector<int64_t>().swap(uploadData.insData.rowid);
252 std::vector<VBucket>().swap(uploadData.updData.record);
253 std::vector<VBucket>().swap(uploadData.updData.extend);
254 std::vector<VBucket>().swap(uploadData.delData.record);
255 std::vector<VBucket>().swap(uploadData.delData.extend);
256 }
257
GetWaterMarkAndUpdateTime(std::vector<VBucket> & extend,Timestamp & waterMark)258 int CloudSyncUtils::GetWaterMarkAndUpdateTime(std::vector<VBucket> &extend, Timestamp &waterMark)
259 {
260 for (auto &extendData: extend) {
261 if (extendData.empty() || extendData.find(CloudDbConstant::MODIFY_FIELD) == extendData.end()) {
262 LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist.");
263 return -E_INTERNAL_ERROR;
264 }
265 if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::MODIFY_FIELD).index()) {
266 LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doesn't fit int64_t.");
267 return -E_INTERNAL_ERROR;
268 }
269 if (extendData.empty() || extendData.find(CloudDbConstant::CREATE_FIELD) == extendData.end()) {
270 LOGE("[CloudSyncer] VBucket is empty or CREATE_FIELD doesn't exist.");
271 return -E_INTERNAL_ERROR;
272 }
273 if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::CREATE_FIELD).index()) {
274 LOGE("[CloudSyncer] VBucket's CREATE_FIELD doesn't fit int64_t.");
275 return -E_INTERNAL_ERROR;
276 }
277 waterMark = std::max(int64_t(waterMark), std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)));
278 int64_t modifyTime =
279 std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)) / CloudDbConstant::TEN_THOUSAND;
280 int64_t createTime =
281 std::get<int64_t>(extendData.at(CloudDbConstant::CREATE_FIELD)) / CloudDbConstant::TEN_THOUSAND;
282 extendData.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
283 extendData.insert_or_assign(CloudDbConstant::CREATE_FIELD, createTime);
284 }
285 return E_OK;
286 }
287
CheckCloudSyncDataEmpty(const CloudSyncData & uploadData)288 bool CloudSyncUtils::CheckCloudSyncDataEmpty(const CloudSyncData &uploadData)
289 {
290 return uploadData.insData.extend.empty() && uploadData.insData.record.empty() &&
291 uploadData.updData.extend.empty() && uploadData.updData.record.empty() &&
292 uploadData.delData.extend.empty() && uploadData.delData.record.empty() &&
293 uploadData.lockData.rowid.empty();
294 }
295
ModifyCloudDataTime(DistributedDB::VBucket & data)296 void CloudSyncUtils::ModifyCloudDataTime(DistributedDB::VBucket &data)
297 {
298 // data already check field modify_field and create_field
299 int64_t modifyTime = std::get<int64_t>(data[CloudDbConstant::MODIFY_FIELD]) * CloudDbConstant::TEN_THOUSAND;
300 int64_t createTime = std::get<int64_t>(data[CloudDbConstant::CREATE_FIELD]) * CloudDbConstant::TEN_THOUSAND;
301 data[CloudDbConstant::MODIFY_FIELD] = modifyTime;
302 data[CloudDbConstant::CREATE_FIELD] = createTime;
303 }
304
305 // After doing a batch upload, we need to use CloudSyncData's maximum timestamp to update the water mark;
UpdateExtendTime(CloudSyncData & uploadData,const int64_t & count,uint64_t taskId,Timestamp & waterMark)306 int CloudSyncUtils::UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count, uint64_t taskId,
307 Timestamp &waterMark)
308 {
309 int ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, uploadData.tableName, count);
310 if (ret != E_OK) {
311 LOGE("[CloudSyncer] Invalid Sync Data when get local water mark.");
312 return ret;
313 }
314 if (!uploadData.insData.extend.empty()) {
315 if (uploadData.insData.record.size() != uploadData.insData.extend.size()) {
316 LOGE("[CloudSyncer] Inconsistent size of inserted data.");
317 return -E_INTERNAL_ERROR;
318 }
319 ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.insData.extend, waterMark);
320 if (ret != E_OK) {
321 return ret;
322 }
323 }
324
325 if (!uploadData.updData.extend.empty()) {
326 if (uploadData.updData.record.size() != uploadData.updData.extend.size()) {
327 LOGE("[CloudSyncer] Inconsistent size of updated data, %d.", -E_INTERNAL_ERROR);
328 return -E_INTERNAL_ERROR;
329 }
330 ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.updData.extend, waterMark);
331 if (ret != E_OK) {
332 return ret;
333 }
334 }
335
336 if (!uploadData.delData.extend.empty()) {
337 if (uploadData.delData.record.size() != uploadData.delData.extend.size()) {
338 LOGE("[CloudSyncer] Inconsistent size of deleted data, %d.", -E_INTERNAL_ERROR);
339 return -E_INTERNAL_ERROR;
340 }
341 ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.delData.extend, waterMark);
342 if (ret != E_OK) {
343 return ret;
344 }
345 }
346 return E_OK;
347 }
348
UpdateLocalCache(OpType opType,const LogInfo & cloudInfo,const LogInfo & localInfo,std::map<std::string,LogInfo> & localLogInfoCache)349 void CloudSyncUtils::UpdateLocalCache(OpType opType, const LogInfo &cloudInfo, const LogInfo &localInfo,
350 std::map<std::string, LogInfo> &localLogInfoCache)
351 {
352 LogInfo updateLogInfo;
353 std::string hashKey(localInfo.hashKey.begin(), localInfo.hashKey.end());
354 bool updateCache = true;
355 switch (opType) {
356 case OpType::INSERT :
357 case OpType::UPDATE :
358 case OpType::DELETE: {
359 updateLogInfo = cloudInfo;
360 updateLogInfo.device = CloudDbConstant::DEFAULT_CLOUD_DEV;
361 updateLogInfo.hashKey = localInfo.hashKey;
362 if (opType == OpType::DELETE) {
363 updateLogInfo.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
364 } else if (opType == OpType::INSERT) {
365 updateLogInfo.originDev = CloudDbConstant::DEFAULT_CLOUD_DEV;
366 }
367 break;
368 }
369 case OpType::CLEAR_GID:
370 case OpType::UPDATE_TIMESTAMP: {
371 updateLogInfo = localInfo;
372 updateLogInfo.cloudGid.clear();
373 updateLogInfo.sharingResource.clear();
374 break;
375 }
376 default:
377 updateCache = false;
378 break;
379 }
380 if (updateCache) {
381 localLogInfoCache[hashKey] = updateLogInfo;
382 }
383 }
384
SaveChangedData(ICloudSyncer::SyncParam & param,size_t dataIndex,const ICloudSyncer::DataInfo & dataInfo,std::vector<std::pair<Key,size_t>> & deletedList)385 int CloudSyncUtils::SaveChangedData(ICloudSyncer::SyncParam ¶m, size_t dataIndex,
386 const ICloudSyncer::DataInfo &dataInfo, std::vector<std::pair<Key, size_t>> &deletedList)
387 {
388 OpType opType = CalOpType(param, dataIndex);
389 Key hashKey = dataInfo.localInfo.logInfo.hashKey;
390 if (param.deletePrimaryKeySet.find(hashKey) != param.deletePrimaryKeySet.end()) {
391 if (opType == OpType::INSERT) {
392 (void)param.dupHashKeySet.insert(hashKey);
393 opType = OpType::UPDATE;
394 // only composite primary key needs to be processed.
395 if (!param.isSinglePrimaryKey) {
396 param.withoutRowIdData.updateData.emplace_back(dataIndex,
397 param.changedData.primaryData[ChangeType::OP_UPDATE].size());
398 }
399 }
400 }
401 // INSERT: for no primary key or composite primary key situation
402 if (!param.isSinglePrimaryKey && opType == OpType::INSERT) {
403 param.info.downLoadInfo.insertCount++;
404 param.withoutRowIdData.insertData.push_back(dataIndex);
405 return E_OK;
406 }
407 switch (opType) {
408 // INSERT: only for single primary key situation
409 case OpType::INSERT:
410 param.info.downLoadInfo.insertCount++;
411 param.info.retryInfo.downloadBatchOpCount++;
412 return CloudSyncUtils::SaveChangedDataByType(
413 param.downloadData.data[dataIndex], param.changedData, dataInfo.localInfo, ChangeType::OP_INSERT);
414 case OpType::UPDATE:
415 param.info.downLoadInfo.updateCount++;
416 param.info.retryInfo.downloadBatchOpCount++;
417 if (CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
418 return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
419 dataInfo.localInfo, ChangeType::OP_UPDATE);
420 }
421 return E_OK;
422 case OpType::DELETE:
423 param.info.downLoadInfo.deleteCount++;
424 param.info.retryInfo.downloadBatchOpCount++;
425 return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
426 dataInfo.localInfo, ChangeType::OP_DELETE);
427 case OpType::UPDATE_TIMESTAMP:
428 param.info.retryInfo.downloadBatchOpCount++;
429 return E_OK;
430 default:
431 return E_OK;
432 }
433 }
434
ClearWithoutData(ICloudSyncer::SyncParam & param)435 void CloudSyncUtils::ClearWithoutData(ICloudSyncer::SyncParam ¶m)
436 {
437 param.withoutRowIdData.insertData.clear();
438 param.withoutRowIdData.updateData.clear();
439 param.withoutRowIdData.assetInsertData.clear();
440 }
441
IsSkipAssetsMissingRecord(const std::vector<VBucket> & extend)442 bool CloudSyncUtils::IsSkipAssetsMissingRecord(const std::vector<VBucket> &extend)
443 {
444 if (extend.empty()) {
445 return false;
446 }
447 for (size_t i = 0; i < extend.size(); ++i) {
448 if (DBCommon::IsIntTypeRecordError(extend[i]) && !DBCommon::IsRecordAssetsMissing(extend[i])) {
449 return false;
450 }
451 }
452 return true;
453 }
454
FillAssetIdToAssets(CloudSyncBatch & data,int errorCode,const CloudWaterType & type)455 int CloudSyncUtils::FillAssetIdToAssets(CloudSyncBatch &data, int errorCode, const CloudWaterType &type)
456 {
457 if (data.extend.size() != data.assets.size()) {
458 LOGE("[CloudSyncUtils] size not match, extend:%zu assets:%zu.", data.extend.size(), data.assets.size());
459 return -E_CLOUD_ERROR;
460 }
461 int errCode = E_OK;
462 for (size_t i = 0; i < data.assets.size(); i++) {
463 if (data.assets[i].empty() || DBCommon::IsRecordIgnored(data.extend[i]) ||
464 (errorCode != E_OK &&
465 (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordAssetsMissing(data.extend[i]))) ||
466 DBCommon::IsNeedCompensatedForUpload(data.extend[i], type)) {
467 if (errCode != E_OK && DBCommon::IsRecordAssetsMissing(data.extend[i])) {
468 LOGI("[CloudSyncUtils][FileAssetIdToAssets] errCode with assets missing, skip fill assets id");
469 }
470 continue;
471 }
472 for (auto it = data.assets[i].begin(); it != data.assets[i].end();) {
473 auto &[col, value] = *it;
474 if (!CheckIfContainsInsertAssets(value)) {
475 ++it;
476 continue;
477 }
478 auto extendIt = data.extend[i].find(col);
479 if (extendIt == data.extend[i].end()) {
480 LOGI("[CloudSyncUtils] Asset field name can not find in extend.");
481 it = data.assets[i].erase(it);
482 continue;
483 }
484 if (extendIt->second.index() != value.index()) {
485 LOGE("[CloudSyncUtils] Asset field type not same. extend:%zu, data:%zu",
486 extendIt->second.index(), value.index());
487 errCode = -E_CLOUD_ERROR;
488 ++it;
489 continue;
490 }
491 int ret = FillAssetIdToAssetData(extendIt->second, value);
492 if (ret != E_OK) {
493 LOGE("[CloudSyncUtils] fail to fill assetId, %d.", ret);
494 errCode = -E_CLOUD_ERROR;
495 }
496 ++it;
497 }
498 }
499 return errCode;
500 }
501
FillAssetIdToAssetData(const Type & extend,Type & assetData)502 int CloudSyncUtils::FillAssetIdToAssetData(const Type &extend, Type &assetData)
503 {
504 if (extend.index() == TYPE_INDEX<Asset>) {
505 if (std::get<Asset>(assetData).name != std::get<Asset>(extend).name) {
506 LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset name can not find in extend.");
507 return -E_CLOUD_ERROR;
508 }
509 if (std::get<Asset>(extend).assetId.empty()) {
510 LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset id is empty.");
511 return -E_CLOUD_ERROR;
512 }
513 std::get<Asset>(assetData).assetId = std::get<Asset>(extend).assetId;
514 }
515 if (extend.index() == TYPE_INDEX<Assets>) {
516 FillAssetIdToAssetsData(std::get<Assets>(extend), std::get<Assets>(assetData));
517 }
518 return E_OK;
519 }
520
FillAssetIdToAssetsData(const Assets & extend,Assets & assets)521 void CloudSyncUtils::FillAssetIdToAssetsData(const Assets &extend, Assets &assets)
522 {
523 for (auto it = assets.begin(); it != assets.end();) {
524 auto &asset = *it;
525 if (asset.flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
526 ++it;
527 continue;
528 }
529 auto extendAssets = extend;
530 bool isAssetExisted = false;
531 for (const auto &extendAsset : extendAssets) {
532 if (asset.name == extendAsset.name && !extendAsset.assetId.empty()) {
533 asset.assetId = extendAsset.assetId;
534 isAssetExisted = true;
535 break;
536 }
537 }
538 if (!isAssetExisted) {
539 LOGI("Unable to sync local asset, skip fill assetId.");
540 it = assets.erase(it);
541 } else {
542 ++it;
543 }
544 }
545 }
546
CheckIfContainsInsertAssets(const Type & assetData)547 bool CloudSyncUtils::CheckIfContainsInsertAssets(const Type &assetData)
548 {
549 if (assetData.index() == TYPE_INDEX<Asset>) {
550 if (std::get<Asset>(assetData).flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
551 return false;
552 }
553 } else if (assetData.index() == TYPE_INDEX<Assets>) {
554 bool hasInsertAsset = false;
555 for (const auto &asset : std::get<Assets>(assetData)) {
556 if (asset.flag == static_cast<uint32_t>(AssetOpType::INSERT)) {
557 hasInsertAsset = true;
558 break;
559 }
560 }
561 if (!hasInsertAsset) {
562 return false;
563 }
564 }
565 return true;
566 }
567
UpdateAssetsFlag(CloudSyncData & uploadData)568 void CloudSyncUtils::UpdateAssetsFlag(CloudSyncData &uploadData)
569 {
570 AssetOperationUtils::UpdateAssetsFlag(uploadData.insData.record, uploadData.insData.assets);
571 AssetOperationUtils::UpdateAssetsFlag(uploadData.updData.record, uploadData.updData.assets);
572 AssetOperationUtils::UpdateAssetsFlag(uploadData.delData.record, uploadData.delData.assets);
573 }
574
InsertOrReplaceChangedDataByType(ChangeType type,std::vector<Type> & pkVal,ChangedData & changedData)575 void CloudSyncUtils::InsertOrReplaceChangedDataByType(ChangeType type, std::vector<Type> &pkVal,
576 ChangedData &changedData)
577 {
578 // erase old changedData if exist
579 for (auto &changePkValList : changedData.primaryData) {
580 changePkValList.erase(std::remove_if(changePkValList.begin(), changePkValList.end(),
581 [&pkVal](const std::vector<Type> &existPkVal) {
582 return existPkVal == pkVal;
583 }), changePkValList.end());
584 }
585 // insert new changeData
586 changedData.primaryData[type].emplace_back(std::move(pkVal));
587 }
588
CalOpType(ICloudSyncer::SyncParam & param,size_t dataIndex)589 OpType CloudSyncUtils::CalOpType(ICloudSyncer::SyncParam ¶m, size_t dataIndex)
590 {
591 OpType opType = param.downloadData.opType[dataIndex];
592 if (opType != OpType::INSERT && opType != OpType::UPDATE) {
593 return opType;
594 }
595
596 std::vector<Type> cloudPkVal;
597 // use dataIndex as dataKey avoid get same pk with no pk schema
598 int errCode = CloudSyncUtils::GetCloudPkVals(param.downloadData.data[dataIndex], param.changedData.field, dataIndex,
599 cloudPkVal);
600 if (errCode != E_OK) {
601 LOGW("[CloudSyncUtils] Get pk from download data failed %d", errCode);
602 // use origin opType
603 return opType;
604 }
605 auto iter = std::find_if(param.insertPk.begin(), param.insertPk.end(), [&cloudPkVal](const auto &item) {
606 return item == cloudPkVal;
607 });
608 if (opType == OpType::INSERT) {
609 // record all insert pk in one batch
610 if (iter == param.insertPk.end()) {
611 param.insertPk.push_back(cloudPkVal);
612 }
613 return OpType::INSERT;
614 }
615 // notify with insert because this data not exist in local before query
616 return (iter == param.insertPk.end()) ? OpType::UPDATE : OpType::INSERT;
617 }
618
InitCompensatedSyncTaskInfo()619 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo()
620 {
621 CloudSyncer::CloudTaskInfo taskInfo;
622 taskInfo.priorityTask = true;
623 taskInfo.priorityLevel = CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL;
624 taskInfo.timeout = CloudDbConstant::CLOUD_DEFAULT_TIMEOUT;
625 taskInfo.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
626 taskInfo.callback = nullptr;
627 taskInfo.compensatedTask = true;
628 return taskInfo;
629 }
630
InitCompensatedSyncTaskInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess)631 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncOption &option,
632 const SyncProcessCallback &onProcess)
633 {
634 CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
635 taskInfo.callback = onProcess;
636 taskInfo.devices = option.devices;
637 taskInfo.prepareTraceId = option.prepareTraceId;
638 if (option.users.empty()) {
639 taskInfo.users.push_back("");
640 } else {
641 taskInfo.users = option.users;
642 }
643 taskInfo.lockAction = option.lockAction;
644 return taskInfo;
645 }
646
InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo & oriTaskInfo)647 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo &oriTaskInfo)
648 {
649 CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
650 taskInfo.lockAction = oriTaskInfo.lockAction;
651 taskInfo.users = oriTaskInfo.users;
652 taskInfo.devices = oriTaskInfo.devices;
653 taskInfo.storeId = oriTaskInfo.storeId;
654 taskInfo.prepareTraceId = oriTaskInfo.prepareTraceId;
655 return taskInfo;
656 }
657
CheckQueryCloudData(std::string & traceId,DownloadData & downloadData,std::vector<std::string> & pkColNames)658 void CloudSyncUtils::CheckQueryCloudData(std::string &traceId, DownloadData &downloadData,
659 std::vector<std::string> &pkColNames)
660 {
661 for (auto &data : downloadData.data) {
662 bool isVersionExist = data.count(CloudDbConstant::VERSION_FIELD) != 0;
663 bool isContainAllPk = true;
664 for (auto &pkColName : pkColNames) {
665 if (data.count(pkColName) == 0) {
666 isContainAllPk = false;
667 break;
668 }
669 }
670 std::string gid;
671 (void)CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, data, gid);
672 bool isDelete = true;
673 (void)CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::DELETE_FIELD, data, isDelete);
674 if (!isDelete && (!isVersionExist || !isContainAllPk)) {
675 LOGE("[CloudSyncer] Invalid data from cloud, no version[%d], lost primary key[%d], gid[%s], traceId[%s]",
676 static_cast<int>(!isVersionExist), static_cast<int>(!isContainAllPk), gid.c_str(), traceId.c_str());
677 }
678 }
679 }
680
IsNeedUpdateAsset(const VBucket & data)681 bool CloudSyncUtils::IsNeedUpdateAsset(const VBucket &data)
682 {
683 for (const auto &item : data) {
684 const Asset *asset = std::get_if<TYPE_INDEX<Asset>>(&item.second);
685 if (asset != nullptr) {
686 uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(asset->status);
687 if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
688 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
689 return true;
690 }
691 continue;
692 }
693 const Assets *assets = std::get_if<TYPE_INDEX<Assets>>(&item.second);
694 if (assets == nullptr) {
695 continue;
696 }
697 for (const auto &oneAsset : *assets) {
698 uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(oneAsset.status);
699 if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
700 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
701 return true;
702 }
703 }
704 }
705 return false;
706 }
707
GetDownloadListByGid(const std::shared_ptr<StorageProxy> & proxy,const std::vector<std::string> & data,const std::string & table)708 std::tuple<int, DownloadList, ChangedData> CloudSyncUtils::GetDownloadListByGid(
709 const std::shared_ptr<StorageProxy> &proxy, const std::vector<std::string> &data, const std::string &table)
710 {
711 std::tuple<int, DownloadList, ChangedData> res;
712 std::vector<std::string> pkColNames;
713 std::vector<Field> assetFields;
714 auto &[errCode, downloadList, changeData] = res;
715 errCode = proxy->GetPrimaryColNamesWithAssetsFields(table, pkColNames, assetFields);
716 if (errCode != E_OK) {
717 LOGE("[CloudSyncUtils] Get %s pk names by failed %d", DBCommon::StringMiddleMasking(table).c_str(), errCode);
718 return res;
719 }
720 changeData.tableName = table;
721 changeData.type = ChangedDataType::ASSET;
722 changeData.field = pkColNames;
723 for (const auto &gid : data) {
724 VBucket assetInfo;
725 VBucket record;
726 record[CloudDbConstant::GID_FIELD] = gid;
727 DataInfoWithLog dataInfo;
728 errCode = proxy->GetInfoByPrimaryKeyOrGid(table, record, false, dataInfo, assetInfo);
729 if (errCode != E_OK) {
730 LOGE("[CloudSyncUtils] Get download list by gid failed %s %d", gid.c_str(), errCode);
731 break;
732 }
733 Type prefix;
734 std::vector<Type> pkVal;
735 OpType strategy;
736 if ((dataInfo.logInfo.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_CLOUD_UPDATE_LOCAL)) ==
737 static_cast<uint32_t>(LogInfoFlag::FLAG_CLOUD_UPDATE_LOCAL)) {
738 strategy = OpType::UPDATE;
739 } else if ((dataInfo.logInfo.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) ==
740 static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) {
741 strategy = OpType::DELETE;
742 } else {
743 strategy = OpType::INSERT;
744 }
745 errCode = CloudSyncUtils::GetCloudPkVals(dataInfo.primaryKeys, pkColNames, dataInfo.logInfo.dataKey, pkVal);
746 if (errCode != E_OK) {
747 LOGE("[CloudSyncUtils] HandleTagAssets cannot get primary key value list. %d", errCode);
748 break;
749 }
750 if (IsSinglePrimaryKey(pkColNames) && !pkVal.empty()) {
751 prefix = pkVal[0];
752 }
753 auto assetsMap = AssetOperationUtils::FilterNeedDownloadAsset(assetInfo);
754 downloadList.push_back(
755 std::make_tuple(dataInfo.logInfo.cloudGid, prefix, strategy, assetsMap, dataInfo.logInfo.hashKey,
756 pkVal, dataInfo.logInfo.timestamp));
757 }
758 return res;
759 }
760
UpdateMaxTimeWithDownloadList(const DownloadList & downloadList,const std::string & table,std::map<std::string,int64_t> & downloadBeginTime)761 void CloudSyncUtils::UpdateMaxTimeWithDownloadList(const DownloadList &downloadList, const std::string &table,
762 std::map<std::string, int64_t> &downloadBeginTime)
763 {
764 auto origin = downloadBeginTime[table];
765 for (const auto &item : downloadList) {
766 auto timestamp = std::get<CloudSyncUtils::TIMESTAMP_INDEX>(item);
767 downloadBeginTime[table] = std::max(static_cast<int64_t>(timestamp), downloadBeginTime[table]);
768 }
769 if (downloadBeginTime[table] == origin) {
770 downloadBeginTime[table]++;
771 }
772 }
773
IsContainDownloading(const DownloadAssetUnit & downloadAssetUnit)774 bool CloudSyncUtils::IsContainDownloading(const DownloadAssetUnit &downloadAssetUnit)
775 {
776 auto &assets = std::get<CloudSyncUtils::ASSETS_INDEX>(downloadAssetUnit);
777 for (const auto &item : assets) {
778 for (const auto &asset : item.second) {
779 if ((AssetOperationUtils::EraseBitMask(asset.status) & static_cast<uint32_t>(AssetStatus::DOWNLOADING))
780 != 0) {
781 return true;
782 }
783 }
784 }
785 return false;
786 }
787
GetDownloadAssetsOnlyMapFromDownLoadData(size_t idx,ICloudSyncer::SyncParam & param,std::map<std::string,Assets> & downloadAssetsMap)788 int CloudSyncUtils::GetDownloadAssetsOnlyMapFromDownLoadData(
789 size_t idx, ICloudSyncer::SyncParam ¶m, std::map<std::string, Assets> &downloadAssetsMap)
790 {
791 std::string gid;
792 int errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(
793 CloudDbConstant::GID_FIELD, param.downloadData.data[idx], gid);
794 if (errCode != E_OK) {
795 LOGE("Get gid from bucket fail when get download assets only map from download data, error code %d", errCode);
796 return errCode;
797 }
798
799 auto assetsMap = param.gidAssetsMap[gid];
800 for (auto &item : param.downloadData.data[idx]) {
801 auto findAssetList = assetsMap.find(item.first);
802 if (findAssetList == assetsMap.end()) {
803 continue;
804 }
805 Asset *asset = std::get_if<Asset>(&item.second);
806 if (asset != nullptr) {
807 auto matchName = std::find_if(findAssetList->second.begin(),
808 findAssetList->second.end(),
809 [&asset](const std::string &a) { return a == asset->name; });
810 if (matchName != findAssetList->second.end()) {
811 Asset tmpAsset = *asset;
812 tmpAsset.status = static_cast<uint32_t>(AssetStatus::UPDATE);
813 tmpAsset.flag = static_cast<uint32_t>(AssetOpType::UPDATE);
814 downloadAssetsMap[item.first].push_back(tmpAsset);
815 }
816 continue;
817 }
818 Assets *assets = std::get_if<Assets>(&item.second);
819 if (assets == nullptr) {
820 continue;
821 }
822 for (const auto &assetItem : (*assets)) {
823 auto matchName = std::find_if(findAssetList->second.begin(),
824 findAssetList->second.end(),
825 [&assetItem](const std::string &a) { return a == assetItem.name; });
826 if (matchName != findAssetList->second.end()) {
827 Asset tmpAsset = assetItem;
828 tmpAsset.status = static_cast<uint32_t>(AssetStatus::UPDATE);
829 tmpAsset.flag = static_cast<uint32_t>(AssetOpType::UPDATE);
830 downloadAssetsMap[item.first].push_back(tmpAsset);
831 }
832 }
833 }
834 return E_OK;
835 }
836
NotifyChangeData(const std::string & dev,const std::shared_ptr<StorageProxy> & proxy,ChangedData && changedData)837 int CloudSyncUtils::NotifyChangeData(const std::string &dev, const std::shared_ptr<StorageProxy> &proxy,
838 ChangedData &&changedData)
839 {
840 int ret = proxy->NotifyChangedData(dev, std::move(changedData));
841 if (ret != E_OK) {
842 DBDfxAdapter::ReportBehavior(
843 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, ret});
844 LOGE("[CloudSyncer] Cannot notify changed data while downloading, %d.", ret);
845 } else {
846 DBDfxAdapter::ReportBehavior(
847 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::SUCC, ret});
848 }
849 return ret;
850 }
851
GetQueryAndUsersForCompensatedSync(bool isQueryDownloadRecords,std::shared_ptr<StorageProxy> & storageProxy,std::vector<std::string> & users,std::vector<QuerySyncObject> & syncQuery)852 int CloudSyncUtils::GetQueryAndUsersForCompensatedSync(bool isQueryDownloadRecords,
853 std::shared_ptr<StorageProxy> &storageProxy, std::vector<std::string> &users,
854 std::vector<QuerySyncObject> &syncQuery)
855 {
856 int errCode = storageProxy->GetCompensatedSyncQuery(syncQuery, users, isQueryDownloadRecords);
857 if (errCode != E_OK) {
858 LOGW("[CloudSyncer] get query for compensated sync failed! errCode = %d", errCode);
859 return errCode;
860 }
861 if (syncQuery.empty()) {
862 LOGD("[CloudSyncer] Not need generate compensated sync");
863 }
864 return E_OK;
865 }
866
GetUserListForCompensatedSync(CloudDBProxy & cloudDB,const std::vector<std::string> & users,std::vector<std::string> & userList)867 void CloudSyncUtils::GetUserListForCompensatedSync(
868 CloudDBProxy &cloudDB, const std::vector<std::string> &users, std::vector<std::string> &userList)
869 {
870 auto cloudDBs = cloudDB.GetCloudDB();
871 if (cloudDBs.empty()) {
872 LOGW("[CloudSyncer][GetUserListForCompensatedSync] not set cloud db");
873 return;
874 }
875 for (auto &[user, cloudDb] : cloudDBs) {
876 auto it = std::find(users.begin(), users.end(), user);
877 if (it != users.end()) {
878 userList.push_back(user);
879 }
880 }
881 }
882
SetAssetsMapByCloudGid(std::vector<std::string> & cloudGid,const AssetsMap & groupAssetsMap,std::map<std::string,AssetsMap> & gidAssetsMap)883 bool CloudSyncUtils::SetAssetsMapByCloudGid(
884 std::vector<std::string> &cloudGid, const AssetsMap &groupAssetsMap, std::map<std::string, AssetsMap> &gidAssetsMap)
885 {
886 bool isFindOneRecord = false;
887 for (auto &iter : cloudGid) {
888 auto gidIter = gidAssetsMap.find(iter);
889 if (gidIter == gidAssetsMap.end()) {
890 continue;
891 }
892 for (const auto &pair : groupAssetsMap) {
893 if (gidIter->second.find(pair.first) == gidIter->second.end()) {
894 gidIter->second[pair.first] = pair.second;
895 } else {
896 // merge assets
897 gidIter->second[pair.first].insert(pair.second.begin(), pair.second.end());
898 }
899 }
900 isFindOneRecord = true;
901 }
902 return isFindOneRecord;
903 }
904
CheckAssetsOnlyIsEmptyInGroup(const std::map<std::string,AssetsMap> & gidAssetsMap,const AssetsMap & assetsMap)905 bool CloudSyncUtils::CheckAssetsOnlyIsEmptyInGroup(
906 const std::map<std::string, AssetsMap> &gidAssetsMap, const AssetsMap &assetsMap)
907 {
908 if (gidAssetsMap.empty()) {
909 return true;
910 }
911 for (const auto &item : gidAssetsMap) {
912 const auto &gidAssets = item.second;
913 if (gidAssets.empty()) {
914 return true;
915 }
916 bool isMatch = true;
917 for (const auto &assets : assetsMap) {
918 auto iter = gidAssets.find(assets.first);
919 if (iter == gidAssets.end()) {
920 isMatch = false;
921 break;
922 }
923 if (!std::includes(iter->second.begin(), iter->second.end(), assets.second.begin(), assets.second.end())) {
924 isMatch = false;
925 break;
926 }
927 }
928 if (isMatch) {
929 // find one match, so group is not empty.
930 return false;
931 }
932 }
933 return true;
934 }
935
IsAssetOnlyData(VBucket & queryData,AssetsMap & assetsMap,bool isDownloading)936 bool CloudSyncUtils::IsAssetOnlyData(VBucket &queryData, AssetsMap &assetsMap, bool isDownloading)
937 {
938 if (assetsMap.empty()) {
939 return false;
940 }
941 for (auto &item : assetsMap) {
942 auto &assetNameList = item.second;
943 auto findAssetField = queryData.find(item.first);
944 if (findAssetField == queryData.end() || assetNameList.empty()) {
945 // if not find asset field or assetNameList is empty, mean this is not asset only data.
946 return false;
947 }
948
949 Asset *asset = std::get_if<Asset>(&(findAssetField->second));
950 if (asset != nullptr) {
951 // if is Asset type, assetNameList size must be 1.
952 if (assetNameList.size() != 1u || *(assetNameList.begin()) != asset->name ||
953 asset->status == AssetStatus::DELETE) {
954 // if data is delele, also not asset only data.
955 return false;
956 }
957 if (isDownloading) {
958 asset->status = static_cast<uint32_t>(AssetStatus::DOWNLOADING);
959 }
960 continue;
961 }
962
963 Assets *assets = std::get_if<Assets>(&(findAssetField->second));
964 if (assets == nullptr) {
965 return false;
966 }
967 for (auto &assetName : assetNameList) {
968 auto findAsset = std::find_if(
969 assets->begin(), assets->end(), [&assetName](const Asset &a) { return a.name == assetName; });
970 if (findAsset == assets->end() || (*findAsset).status == AssetStatus::DELETE) {
971 // if data is delele, also not asset only data.
972 return false;
973 }
974 if (isDownloading) {
975 (*findAsset).status = AssetStatus::DOWNLOADING;
976 }
977 }
978 }
979 return true;
980 }
981
ClearCloudWatermark(const std::vector<std::string> & tableNameList,std::shared_ptr<StorageProxy> & storageProxy)982 int CloudSyncUtils::ClearCloudWatermark(const std::vector<std::string> &tableNameList,
983 std::shared_ptr<StorageProxy> &storageProxy)
984 {
985 for (const auto &tableName: tableNameList) {
986 LOGD("[CloudSyncUtils] Start clear cloud watermark.");
987 int ret = storageProxy->CleanWaterMark(tableName);
988 if (ret != E_OK) {
989 std::string maskedName = DBCommon::StringMiddleMasking(tableName);
990 LOGE("[CloudSyncUtils] failed to clear watermark. err: %d. table: %s, name length: %zu",
991 ret, maskedName.c_str(), maskedName.length());
992 return ret;
993 }
994 }
995 int errCode = storageProxy->StartTransaction(TransactType::IMMEDIATE);
996 if (errCode != E_OK) {
997 LOGE("[CloudSyncUtils] failed to start Transaction before clear cloud log version, %d", errCode);
998 return errCode;
999 }
1000
1001 errCode = storageProxy->ClearCloudLogVersion(tableNameList);
1002 if (errCode != E_OK) {
1003 LOGE("[CloudSyncUtils] failed to clear log version, %d.", errCode);
1004 storageProxy->Rollback();
1005 return errCode;
1006 }
1007
1008 return storageProxy->Commit();
1009 }
1010
HaveReferenceOrReferenceByTable(const CloudSyncer::CloudTaskInfo & taskInfo,std::shared_ptr<StorageProxy> & storageProxy)1011 bool CloudSyncUtils::HaveReferenceOrReferenceByTable(
1012 const CloudSyncer::CloudTaskInfo &taskInfo, std::shared_ptr<StorageProxy> &storageProxy)
1013 {
1014 for (size_t i = 0u; i < taskInfo.table.size(); ++i) {
1015 if (storageProxy->IsTableExistReferenceOrReferenceBy(taskInfo.table[i])) {
1016 return true;
1017 }
1018 }
1019 return false;
1020 }
1021
StartTransactionIfNeed(const CloudSyncer::CloudTaskInfo & taskInfo,std::shared_ptr<StorageProxy> & storageProxy)1022 int CloudSyncUtils::StartTransactionIfNeed(
1023 const CloudSyncer::CloudTaskInfo &taskInfo, std::shared_ptr<StorageProxy> &storageProxy)
1024 {
1025 bool isStartTransaction = true;
1026 if (taskInfo.table.size() <= 1u || !HaveReferenceOrReferenceByTable(taskInfo, storageProxy)) {
1027 // only one table or no reference table, no need to start transaction.
1028 isStartTransaction = false;
1029 }
1030 return isStartTransaction ? storageProxy->StartTransaction() : E_OK;
1031 }
1032
EndTransactionIfNeed(const int & errCode,const CloudSyncer::CloudTaskInfo & taskInfo,std::shared_ptr<StorageProxy> & storageProxy)1033 void CloudSyncUtils::EndTransactionIfNeed(
1034 const int &errCode, const CloudSyncer::CloudTaskInfo &taskInfo, std::shared_ptr<StorageProxy> &storageProxy)
1035 {
1036 if (!storageProxy->GetTransactionExeFlag()) {
1037 // no need to end transaction.
1038 return;
1039 }
1040 if (errCode == E_OK || errCode == -E_TASK_PAUSED) {
1041 int commitErrorCode = storageProxy->Commit();
1042 if (commitErrorCode != E_OK) {
1043 LOGE("[CloudSyncer] cannot commit transaction: %d.", commitErrorCode);
1044 }
1045 } else {
1046 int rollBackErrorCode = storageProxy->Rollback();
1047 if (rollBackErrorCode != E_OK) {
1048 LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
1049 }
1050 }
1051 }
1052
CanStartAsyncDownload(int scheduleCount)1053 bool CloudSyncUtils::CanStartAsyncDownload(int scheduleCount)
1054 {
1055 if (!RuntimeContext::GetInstance()->GetAssetsDownloadManager()->CanStartNewTask()) {
1056 LOGW("[CloudSyncer] Too many download tasks");
1057 return false;
1058 }
1059 return scheduleCount <= 0;
1060 }
1061 }