1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud/asset_operation_utils.h"
16 #include "cloud/cloud_db_constant.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "cloud/cloud_sync_utils.h"
22
23 namespace DistributedDB {
GetCloudPkVals(const VBucket & datum,const std::vector<std::string> & pkColNames,int64_t dataKey,std::vector<Type> & cloudPkVals)24 int CloudSyncUtils::GetCloudPkVals(const VBucket &datum, const std::vector<std::string> &pkColNames, int64_t dataKey,
25 std::vector<Type> &cloudPkVals)
26 {
27 if (!cloudPkVals.empty()) {
28 LOGE("[CloudSyncer] Output parameter should be empty");
29 return -E_INVALID_ARGS;
30 }
31 for (const auto &pkColName : pkColNames) {
32 // If data is primary key or is a composite primary key, then use rowID as value
33 // The single primary key table, does not contain rowid.
34 if (pkColName == CloudDbConstant::ROW_ID_FIELD_NAME) {
35 cloudPkVals.emplace_back(dataKey);
36 continue;
37 }
38 Type type;
39 bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive(pkColName, datum, type);
40 if (!isExisted) {
41 LOGE("[CloudSyncer] Cloud data do not contain expected primary field value");
42 return -E_CLOUD_ERROR;
43 }
44 cloudPkVals.push_back(type);
45 }
46 return E_OK;
47 }
48
OpTypeToChangeType(OpType strategy)49 ChangeType CloudSyncUtils::OpTypeToChangeType(OpType strategy)
50 {
51 switch (strategy) {
52 case OpType::INSERT:
53 return OP_INSERT;
54 case OpType::DELETE:
55 return OP_DELETE;
56 case OpType::UPDATE:
57 return OP_UPDATE;
58 default:
59 return OP_BUTT;
60 }
61 }
62
IsSinglePrimaryKey(const std::vector<std::string> & pkColNames)63 bool CloudSyncUtils::IsSinglePrimaryKey(const std::vector<std::string> &pkColNames)
64 {
65 return pkColNames.size() == 1 && pkColNames[0] != CloudDbConstant::ROW_ID_FIELD_NAME;
66 }
67
RemoveDataExceptExtendInfo(VBucket & datum,const std::vector<std::string> & pkColNames)68 void CloudSyncUtils::RemoveDataExceptExtendInfo(VBucket &datum, const std::vector<std::string> &pkColNames)
69 {
70 for (auto item = datum.begin(); item != datum.end();) {
71 const auto &key = item->first;
72 if (key != CloudDbConstant::GID_FIELD &&
73 key != CloudDbConstant::CREATE_FIELD &&
74 key != CloudDbConstant::MODIFY_FIELD &&
75 key != CloudDbConstant::DELETE_FIELD &&
76 key != CloudDbConstant::CURSOR_FIELD &&
77 key != CloudDbConstant::VERSION_FIELD &&
78 key != CloudDbConstant::SHARING_RESOURCE_FIELD &&
79 (std::find(pkColNames.begin(), pkColNames.end(), key) == pkColNames.end())) {
80 item = datum.erase(item);
81 } else {
82 item++;
83 }
84 }
85 }
86
StatusToFlag(AssetStatus status)87 AssetOpType CloudSyncUtils::StatusToFlag(AssetStatus status)
88 {
89 auto tmpStatus = static_cast<AssetStatus>(AssetOperationUtils::EraseBitMask(static_cast<uint32_t>(status)));
90 switch (tmpStatus) {
91 case AssetStatus::INSERT:
92 return AssetOpType::INSERT;
93 case AssetStatus::DELETE:
94 return AssetOpType::DELETE;
95 case AssetStatus::UPDATE:
96 return AssetOpType::UPDATE;
97 case AssetStatus::NORMAL:
98 return AssetOpType::NO_CHANGE;
99 default:
100 LOGW("[CloudSyncer] Unexpected Situation and won't be handled"
101 ", Caller should ensure that current situation won't occur");
102 return AssetOpType::NO_CHANGE;
103 }
104 }
105
StatusToFlagForAsset(Asset & asset)106 void CloudSyncUtils::StatusToFlagForAsset(Asset &asset)
107 {
108 asset.flag = static_cast<uint32_t>(StatusToFlag(static_cast<AssetStatus>(asset.status)));
109 asset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
110 }
111
StatusToFlagForAssets(Assets & assets)112 void CloudSyncUtils::StatusToFlagForAssets(Assets &assets)
113 {
114 for (Asset &asset : assets) {
115 StatusToFlagForAsset(asset);
116 }
117 }
118
StatusToFlagForAssetsInRecord(const std::vector<Field> & fields,VBucket & record)119 void CloudSyncUtils::StatusToFlagForAssetsInRecord(const std::vector<Field> &fields, VBucket &record)
120 {
121 for (const Field &field : fields) {
122 if (field.type == TYPE_INDEX<Assets> && record[field.colName].index() == TYPE_INDEX<Assets>) {
123 StatusToFlagForAssets(std::get<Assets>(record[field.colName]));
124 } else if (field.type == TYPE_INDEX<Asset> && record[field.colName].index() == TYPE_INDEX<Asset>) {
125 StatusToFlagForAsset(std::get<Asset>(record[field.colName]));
126 }
127 }
128 }
129
IsChangeDataEmpty(const ChangedData & changedData)130 bool CloudSyncUtils::IsChangeDataEmpty(const ChangedData &changedData)
131 {
132 return changedData.primaryData[ChangeType::OP_INSERT].empty() ||
133 changedData.primaryData[ChangeType::OP_UPDATE].empty() ||
134 changedData.primaryData[ChangeType::OP_DELETE].empty();
135 }
136
EqualInMsLevel(const Timestamp cmp,const Timestamp beCmp)137 bool CloudSyncUtils::EqualInMsLevel(const Timestamp cmp, const Timestamp beCmp)
138 {
139 return (cmp / CloudDbConstant::TEN_THOUSAND) == (beCmp / CloudDbConstant::TEN_THOUSAND);
140 }
141
NeedSaveData(const LogInfo & localLogInfo,const LogInfo & cloudLogInfo)142 bool CloudSyncUtils::NeedSaveData(const LogInfo &localLogInfo, const LogInfo &cloudLogInfo)
143 {
144 // If timeStamp, write timestamp, cloudGid are all the same,
145 // We thought that the datum is mostly be the same between cloud and local
146 // However, there are still slightly possibility that it may be created from different device,
147 // So, during the strategy policy [i.e. TagSyncDataStatus], the datum was tagged as UPDATE
148 // But we won't notify the datum
149 bool isSame = localLogInfo.timestamp == cloudLogInfo.timestamp &&
150 EqualInMsLevel(localLogInfo.wTimestamp, cloudLogInfo.wTimestamp) &&
151 localLogInfo.cloudGid == cloudLogInfo.cloudGid &&
152 localLogInfo.sharingResource == cloudLogInfo.sharingResource &&
153 localLogInfo.version == cloudLogInfo.version &&
154 (localLogInfo.flag & static_cast<uint64_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC)) == 0;
155 return !isSame;
156 }
157
CheckParamValid(const std::vector<DeviceID> & devices,SyncMode mode)158 int CloudSyncUtils::CheckParamValid(const std::vector<DeviceID> &devices, SyncMode mode)
159 {
160 if (devices.size() != 1) {
161 LOGE("[CloudSyncer] invalid devices size %zu", devices.size());
162 return -E_INVALID_ARGS;
163 }
164 for (const auto &dev: devices) {
165 if (dev.empty() || dev.size() > DBConstant::MAX_DEV_LENGTH) {
166 LOGE("[CloudSyncer] invalid device, size %zu", dev.size());
167 return -E_INVALID_ARGS;
168 }
169 }
170 if (mode >= SyncMode::SYNC_MODE_PUSH_ONLY && mode < SyncMode::SYNC_MODE_CLOUD_MERGE) {
171 LOGE("[CloudSyncer] not support mode %d", static_cast<int>(mode));
172 return -E_NOT_SUPPORT;
173 }
174 if (mode < SyncMode::SYNC_MODE_PUSH_ONLY || mode > SyncMode::SYNC_MODE_CLOUD_FORCE_PULL) {
175 LOGE("[CloudSyncer] invalid mode %d", static_cast<int>(mode));
176 return -E_INVALID_ARGS;
177 }
178 return E_OK;
179 }
180
GetCloudLogInfo(DistributedDB::VBucket & datum)181 LogInfo CloudSyncUtils::GetCloudLogInfo(DistributedDB::VBucket &datum)
182 {
183 LogInfo cloudLogInfo;
184 cloudLogInfo.dataKey = 0;
185 cloudLogInfo.timestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::MODIFY_FIELD]);
186 cloudLogInfo.wTimestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::CREATE_FIELD]);
187 cloudLogInfo.flag = (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) ? 1u : 0u;
188 cloudLogInfo.cloudGid = std::get<std::string>(datum[CloudDbConstant::GID_FIELD]);
189 CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, datum, cloudLogInfo.device);
190 (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
191 datum, cloudLogInfo.sharingResource);
192 (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
193 datum, cloudLogInfo.version);
194 return cloudLogInfo;
195 }
196
SaveChangedDataByType(const VBucket & datum,ChangedData & changedData,const DataInfoWithLog & localInfo,ChangeType type)197 int CloudSyncUtils::SaveChangedDataByType(const VBucket &datum, ChangedData &changedData,
198 const DataInfoWithLog &localInfo, ChangeType type)
199 {
200 int ret = E_OK;
201 std::vector<Type> cloudPkVals;
202 if (type == ChangeType::OP_DELETE) {
203 ret = CloudSyncUtils::GetCloudPkVals(localInfo.primaryKeys, changedData.field, localInfo.logInfo.dataKey,
204 cloudPkVals);
205 } else {
206 ret = CloudSyncUtils::GetCloudPkVals(datum, changedData.field, localInfo.logInfo.dataKey, cloudPkVals);
207 }
208 if (ret != E_OK) {
209 return ret;
210 }
211 InsertOrReplaceChangedDataByType(type, cloudPkVals, changedData);
212 return E_OK;
213 }
214
CheckCloudSyncDataValid(const CloudSyncData & uploadData,const std::string & tableName,int64_t count)215 int CloudSyncUtils::CheckCloudSyncDataValid(const CloudSyncData &uploadData, const std::string &tableName,
216 int64_t count)
217 {
218 size_t insRecordLen = uploadData.insData.record.size();
219 size_t insExtendLen = uploadData.insData.extend.size();
220 size_t updRecordLen = uploadData.updData.record.size();
221 size_t updExtendLen = uploadData.updData.extend.size();
222 size_t delRecordLen = uploadData.delData.record.size();
223 size_t delExtendLen = uploadData.delData.extend.size();
224
225 bool syncDataValid = (uploadData.tableName == tableName) &&
226 ((insRecordLen > 0 && insExtendLen > 0 && insRecordLen == insExtendLen) ||
227 (updRecordLen > 0 && updExtendLen > 0 && updRecordLen == updExtendLen) ||
228 (delRecordLen > 0 && delExtendLen > 0 && delRecordLen == delExtendLen) ||
229 (uploadData.lockData.extend.size() > 0));
230 if (!syncDataValid) {
231 LOGE("[CloudSyncUtils] upload data is empty but upload count is not zero or upload table name"
232 " is not the same as table name of sync data.");
233 return -E_INTERNAL_ERROR;
234 }
235 int64_t syncDataCount = static_cast<int64_t>(insRecordLen) + static_cast<int64_t>(updRecordLen) +
236 static_cast<int64_t>(delRecordLen);
237 if (syncDataCount > count) {
238 LOGE("[CloudSyncUtils] Size of a batch of sync data is greater than upload data size. count %d", count);
239 return -E_INTERNAL_ERROR;
240 }
241 return E_OK;
242 }
243
ClearCloudSyncData(CloudSyncData & uploadData)244 void CloudSyncUtils::ClearCloudSyncData(CloudSyncData &uploadData)
245 {
246 std::vector<VBucket>().swap(uploadData.insData.record);
247 std::vector<VBucket>().swap(uploadData.insData.extend);
248 std::vector<int64_t>().swap(uploadData.insData.rowid);
249 std::vector<VBucket>().swap(uploadData.updData.record);
250 std::vector<VBucket>().swap(uploadData.updData.extend);
251 std::vector<VBucket>().swap(uploadData.delData.record);
252 std::vector<VBucket>().swap(uploadData.delData.extend);
253 }
254
GetWaterMarkAndUpdateTime(std::vector<VBucket> & extend,Timestamp & waterMark)255 int CloudSyncUtils::GetWaterMarkAndUpdateTime(std::vector<VBucket> &extend, Timestamp &waterMark)
256 {
257 for (auto &extendData: extend) {
258 if (extendData.empty() || extendData.find(CloudDbConstant::MODIFY_FIELD) == extendData.end()) {
259 LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist.");
260 return -E_INTERNAL_ERROR;
261 }
262 if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::MODIFY_FIELD).index()) {
263 LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doesn't fit int64_t.");
264 return -E_INTERNAL_ERROR;
265 }
266 if (extendData.empty() || extendData.find(CloudDbConstant::CREATE_FIELD) == extendData.end()) {
267 LOGE("[CloudSyncer] VBucket is empty or CREATE_FIELD doesn't exist.");
268 return -E_INTERNAL_ERROR;
269 }
270 if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::CREATE_FIELD).index()) {
271 LOGE("[CloudSyncer] VBucket's CREATE_FIELD doesn't fit int64_t.");
272 return -E_INTERNAL_ERROR;
273 }
274 waterMark = std::max(int64_t(waterMark), std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)));
275 int64_t modifyTime =
276 std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)) / CloudDbConstant::TEN_THOUSAND;
277 int64_t createTime =
278 std::get<int64_t>(extendData.at(CloudDbConstant::CREATE_FIELD)) / CloudDbConstant::TEN_THOUSAND;
279 extendData.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
280 extendData.insert_or_assign(CloudDbConstant::CREATE_FIELD, createTime);
281 }
282 return E_OK;
283 }
284
CheckCloudSyncDataEmpty(const CloudSyncData & uploadData)285 bool CloudSyncUtils::CheckCloudSyncDataEmpty(const CloudSyncData &uploadData)
286 {
287 return uploadData.insData.extend.empty() && uploadData.insData.record.empty() &&
288 uploadData.updData.extend.empty() && uploadData.updData.record.empty() &&
289 uploadData.delData.extend.empty() && uploadData.delData.record.empty() &&
290 uploadData.lockData.rowid.empty();
291 }
292
ModifyCloudDataTime(DistributedDB::VBucket & data)293 void CloudSyncUtils::ModifyCloudDataTime(DistributedDB::VBucket &data)
294 {
295 // data already check field modify_field and create_field
296 int64_t modifyTime = std::get<int64_t>(data[CloudDbConstant::MODIFY_FIELD]) * CloudDbConstant::TEN_THOUSAND;
297 int64_t createTime = std::get<int64_t>(data[CloudDbConstant::CREATE_FIELD]) * CloudDbConstant::TEN_THOUSAND;
298 data[CloudDbConstant::MODIFY_FIELD] = modifyTime;
299 data[CloudDbConstant::CREATE_FIELD] = createTime;
300 }
301
302 // After doing a batch upload, we need to use CloudSyncData's maximum timestamp to update the water mark;
UpdateExtendTime(CloudSyncData & uploadData,const int64_t & count,uint64_t taskId,Timestamp & waterMark)303 int CloudSyncUtils::UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count, uint64_t taskId,
304 Timestamp &waterMark)
305 {
306 int ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, uploadData.tableName, count);
307 if (ret != E_OK) {
308 LOGE("[CloudSyncer] Invalid Sync Data when get local water mark.");
309 return ret;
310 }
311 if (!uploadData.insData.extend.empty()) {
312 if (uploadData.insData.record.size() != uploadData.insData.extend.size()) {
313 LOGE("[CloudSyncer] Inconsistent size of inserted data.");
314 return -E_INTERNAL_ERROR;
315 }
316 ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.insData.extend, waterMark);
317 if (ret != E_OK) {
318 return ret;
319 }
320 }
321
322 if (!uploadData.updData.extend.empty()) {
323 if (uploadData.updData.record.size() != uploadData.updData.extend.size()) {
324 LOGE("[CloudSyncer] Inconsistent size of updated data, %d.", -E_INTERNAL_ERROR);
325 return -E_INTERNAL_ERROR;
326 }
327 ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.updData.extend, waterMark);
328 if (ret != E_OK) {
329 return ret;
330 }
331 }
332
333 if (!uploadData.delData.extend.empty()) {
334 if (uploadData.delData.record.size() != uploadData.delData.extend.size()) {
335 LOGE("[CloudSyncer] Inconsistent size of deleted data, %d.", -E_INTERNAL_ERROR);
336 return -E_INTERNAL_ERROR;
337 }
338 ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.delData.extend, waterMark);
339 if (ret != E_OK) {
340 return ret;
341 }
342 }
343 return E_OK;
344 }
345
UpdateLocalCache(OpType opType,const LogInfo & cloudInfo,const LogInfo & localInfo,std::map<std::string,LogInfo> & localLogInfoCache)346 void CloudSyncUtils::UpdateLocalCache(OpType opType, const LogInfo &cloudInfo, const LogInfo &localInfo,
347 std::map<std::string, LogInfo> &localLogInfoCache)
348 {
349 LogInfo updateLogInfo;
350 std::string hashKey(localInfo.hashKey.begin(), localInfo.hashKey.end());
351 bool updateCache = true;
352 switch (opType) {
353 case OpType::INSERT :
354 case OpType::UPDATE :
355 case OpType::DELETE: {
356 updateLogInfo = cloudInfo;
357 updateLogInfo.device = CloudDbConstant::DEFAULT_CLOUD_DEV;
358 updateLogInfo.hashKey = localInfo.hashKey;
359 if (opType == OpType::DELETE) {
360 updateLogInfo.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
361 } else if (opType == OpType::INSERT) {
362 updateLogInfo.originDev = CloudDbConstant::DEFAULT_CLOUD_DEV;
363 }
364 break;
365 }
366 case OpType::CLEAR_GID:
367 case OpType::UPDATE_TIMESTAMP: {
368 updateLogInfo = localInfo;
369 updateLogInfo.cloudGid.clear();
370 updateLogInfo.sharingResource.clear();
371 break;
372 }
373 default:
374 updateCache = false;
375 break;
376 }
377 if (updateCache) {
378 localLogInfoCache[hashKey] = updateLogInfo;
379 }
380 }
381
SaveChangedData(ICloudSyncer::SyncParam & param,size_t dataIndex,const ICloudSyncer::DataInfo & dataInfo,std::vector<std::pair<Key,size_t>> & deletedList)382 int CloudSyncUtils::SaveChangedData(ICloudSyncer::SyncParam ¶m, size_t dataIndex,
383 const ICloudSyncer::DataInfo &dataInfo, std::vector<std::pair<Key, size_t>> &deletedList)
384 {
385 OpType opType = CalOpType(param, dataIndex);
386 Key hashKey = dataInfo.localInfo.logInfo.hashKey;
387 if (param.deletePrimaryKeySet.find(hashKey) != param.deletePrimaryKeySet.end()) {
388 if (opType == OpType::INSERT) {
389 (void)param.dupHashKeySet.insert(hashKey);
390 opType = OpType::UPDATE;
391 // only composite primary key needs to be processed.
392 if (!param.isSinglePrimaryKey) {
393 param.withoutRowIdData.updateData.emplace_back(dataIndex,
394 param.changedData.primaryData[ChangeType::OP_UPDATE].size());
395 }
396 }
397 }
398 // INSERT: for no primary key or composite primary key situation
399 if (!param.isSinglePrimaryKey && opType == OpType::INSERT) {
400 param.withoutRowIdData.insertData.push_back(dataIndex);
401 return E_OK;
402 }
403 switch (opType) {
404 // INSERT: only for single primary key situation
405 case OpType::INSERT:
406 param.info.downLoadInfo.insertCount++;
407 return CloudSyncUtils::SaveChangedDataByType(
408 param.downloadData.data[dataIndex], param.changedData, dataInfo.localInfo, ChangeType::OP_INSERT);
409 case OpType::UPDATE:
410 param.info.downLoadInfo.updateCount++;
411 if (CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
412 return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
413 dataInfo.localInfo, ChangeType::OP_UPDATE);
414 }
415 return E_OK;
416 case OpType::DELETE:
417 param.info.downLoadInfo.deleteCount++;
418 return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
419 dataInfo.localInfo, ChangeType::OP_DELETE);
420 default:
421 return E_OK;
422 }
423 }
424
ClearWithoutData(ICloudSyncer::SyncParam & param)425 void CloudSyncUtils::ClearWithoutData(ICloudSyncer::SyncParam ¶m)
426 {
427 param.withoutRowIdData.insertData.clear();
428 param.withoutRowIdData.updateData.clear();
429 param.withoutRowIdData.assetInsertData.clear();
430 }
431
IsSkipAssetsMissingRecord(const std::vector<VBucket> & extend)432 bool CloudSyncUtils::IsSkipAssetsMissingRecord(const std::vector<VBucket> &extend)
433 {
434 if (extend.empty()) {
435 return false;
436 }
437 for (size_t i = 0; i < extend.size(); ++i) {
438 if (DBCommon::IsIntTypeRecordError(extend[i]) && !DBCommon::IsRecordAssetsMissing(extend[i])) {
439 return false;
440 }
441 }
442 return true;
443 }
444
FillAssetIdToAssets(CloudSyncBatch & data,int errorCode,const CloudWaterType & type)445 int CloudSyncUtils::FillAssetIdToAssets(CloudSyncBatch &data, int errorCode, const CloudWaterType &type)
446 {
447 if (data.extend.size() != data.assets.size()) {
448 LOGE("[CloudSyncUtils] size not match, extend:%zu assets:%zu.", data.extend.size(), data.assets.size());
449 return -E_CLOUD_ERROR;
450 }
451 int errCode = E_OK;
452 for (size_t i = 0; i < data.assets.size(); i++) {
453 if (data.assets[i].empty() || DBCommon::IsRecordIgnored(data.extend[i]) ||
454 (errorCode != E_OK &&
455 (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordAssetsMissing(data.extend[i]))) ||
456 DBCommon::IsNeedCompensatedForUpload(data.extend[i], type)) {
457 if (errCode != E_OK && DBCommon::IsRecordAssetsMissing(data.extend[i])) {
458 LOGI("[CloudSyncUtils][FileAssetIdToAssets] errCode with assets missing, skip fill assets id");
459 }
460 continue;
461 }
462 for (auto it = data.assets[i].begin(); it != data.assets[i].end();) {
463 auto &[col, value] = *it;
464 if (!CheckIfContainsInsertAssets(value)) {
465 ++it;
466 continue;
467 }
468 auto extendIt = data.extend[i].find(col);
469 if (extendIt == data.extend[i].end()) {
470 LOGI("[CloudSyncUtils] Asset field name can not find in extend. key is:%s.", col.c_str());
471 it = data.assets[i].erase(it);
472 continue;
473 }
474 if (extendIt->second.index() != value.index()) {
475 LOGE("[CloudSyncUtils] Asset field type not same. extend:%zu, data:%zu",
476 extendIt->second.index(), value.index());
477 errCode = -E_CLOUD_ERROR;
478 ++it;
479 continue;
480 }
481 int ret = FillAssetIdToAssetData(extendIt->second, value);
482 if (ret != E_OK) {
483 LOGE("[CloudSyncUtils] fail to fill assetId, %d.", ret);
484 errCode = -E_CLOUD_ERROR;
485 }
486 ++it;
487 }
488 }
489 return errCode;
490 }
491
FillAssetIdToAssetData(const Type & extend,Type & assetData)492 int CloudSyncUtils::FillAssetIdToAssetData(const Type &extend, Type &assetData)
493 {
494 if (extend.index() == TYPE_INDEX<Asset>) {
495 if (std::get<Asset>(assetData).name != std::get<Asset>(extend).name) {
496 LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset name can not find in extend.");
497 return -E_CLOUD_ERROR;
498 }
499 if (std::get<Asset>(extend).assetId.empty()) {
500 LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset id is empty.");
501 return -E_CLOUD_ERROR;
502 }
503 std::get<Asset>(assetData).assetId = std::get<Asset>(extend).assetId;
504 }
505 if (extend.index() == TYPE_INDEX<Assets>) {
506 FillAssetIdToAssetsData(std::get<Assets>(extend), std::get<Assets>(assetData));
507 }
508 return E_OK;
509 }
510
FillAssetIdToAssetsData(const Assets & extend,Assets & assets)511 void CloudSyncUtils::FillAssetIdToAssetsData(const Assets &extend, Assets &assets)
512 {
513 for (auto it = assets.begin(); it != assets.end();) {
514 auto &asset = *it;
515 if (asset.flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
516 ++it;
517 continue;
518 }
519 auto extendAssets = extend;
520 bool isAssetExisted = false;
521 for (const auto &extendAsset : extendAssets) {
522 if (asset.name == extendAsset.name && !extendAsset.assetId.empty()) {
523 asset.assetId = extendAsset.assetId;
524 isAssetExisted = true;
525 break;
526 }
527 }
528 if (!isAssetExisted) {
529 LOGI("Unable to sync local asset, skip fill assetId.");
530 it = assets.erase(it);
531 } else {
532 ++it;
533 }
534 }
535 }
536
CheckIfContainsInsertAssets(const Type & assetData)537 bool CloudSyncUtils::CheckIfContainsInsertAssets(const Type &assetData)
538 {
539 if (assetData.index() == TYPE_INDEX<Asset>) {
540 if (std::get<Asset>(assetData).flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
541 return false;
542 }
543 } else if (assetData.index() == TYPE_INDEX<Assets>) {
544 bool hasInsertAsset = false;
545 for (const auto &asset : std::get<Assets>(assetData)) {
546 if (asset.flag == static_cast<uint32_t>(AssetOpType::INSERT)) {
547 hasInsertAsset = true;
548 break;
549 }
550 }
551 if (!hasInsertAsset) {
552 return false;
553 }
554 }
555 return true;
556 }
557
UpdateAssetsFlag(CloudSyncData & uploadData)558 void CloudSyncUtils::UpdateAssetsFlag(CloudSyncData &uploadData)
559 {
560 AssetOperationUtils::UpdateAssetsFlag(uploadData.insData.record, uploadData.insData.assets);
561 AssetOperationUtils::UpdateAssetsFlag(uploadData.updData.record, uploadData.updData.assets);
562 AssetOperationUtils::UpdateAssetsFlag(uploadData.delData.record, uploadData.delData.assets);
563 }
564
InsertOrReplaceChangedDataByType(ChangeType type,std::vector<Type> & pkVal,ChangedData & changedData)565 void CloudSyncUtils::InsertOrReplaceChangedDataByType(ChangeType type, std::vector<Type> &pkVal,
566 ChangedData &changedData)
567 {
568 // erase old changedData if exist
569 for (auto &changePkValList : changedData.primaryData) {
570 changePkValList.erase(std::remove_if(changePkValList.begin(), changePkValList.end(),
571 [&pkVal](const std::vector<Type> &existPkVal) {
572 return existPkVal == pkVal;
573 }), changePkValList.end());
574 }
575 // insert new changeData
576 changedData.primaryData[type].emplace_back(std::move(pkVal));
577 }
578
CalOpType(ICloudSyncer::SyncParam & param,size_t dataIndex)579 OpType CloudSyncUtils::CalOpType(ICloudSyncer::SyncParam ¶m, size_t dataIndex)
580 {
581 OpType opType = param.downloadData.opType[dataIndex];
582 if (opType != OpType::INSERT && opType != OpType::UPDATE) {
583 return opType;
584 }
585
586 std::vector<Type> cloudPkVal;
587 // use dataIndex as dataKey avoid get same pk with no pk schema
588 int errCode = CloudSyncUtils::GetCloudPkVals(param.downloadData.data[dataIndex], param.changedData.field, dataIndex,
589 cloudPkVal);
590 if (errCode != E_OK) {
591 LOGW("[CloudSyncUtils] Get pk from download data failed %d", errCode);
592 // use origin opType
593 return opType;
594 }
595 auto iter = std::find_if(param.insertPk.begin(), param.insertPk.end(), [&cloudPkVal](const auto &item) {
596 return item == cloudPkVal;
597 });
598 if (opType == OpType::INSERT) {
599 // record all insert pk in one batch
600 if (iter == param.insertPk.end()) {
601 param.insertPk.push_back(cloudPkVal);
602 }
603 return OpType::INSERT;
604 }
605 // notify with insert because this data not exist in local before query
606 return (iter == param.insertPk.end()) ? OpType::UPDATE : OpType::INSERT;
607 }
608
InitCompensatedSyncTaskInfo()609 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo()
610 {
611 CloudSyncer::CloudTaskInfo taskInfo;
612 taskInfo.priorityTask = true;
613 taskInfo.timeout = CloudDbConstant::CLOUD_DEFAULT_TIMEOUT;
614 taskInfo.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
615 taskInfo.callback = nullptr;
616 taskInfo.compensatedTask = true;
617 return taskInfo;
618 }
619
InitCompensatedSyncTaskInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess)620 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncOption &option,
621 const SyncProcessCallback &onProcess)
622 {
623 CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
624 taskInfo.callback = onProcess;
625 taskInfo.devices = option.devices;
626 taskInfo.prepareTraceId = option.prepareTraceId;
627 if (option.users.empty()) {
628 taskInfo.users.push_back("");
629 } else {
630 taskInfo.users = option.users;
631 }
632 taskInfo.lockAction = option.lockAction;
633 return taskInfo;
634 }
635
InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo & oriTaskInfo)636 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo &oriTaskInfo)
637 {
638 CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
639 taskInfo.lockAction = oriTaskInfo.lockAction;
640 taskInfo.users = oriTaskInfo.users;
641 taskInfo.devices = oriTaskInfo.devices;
642 taskInfo.storeId = oriTaskInfo.storeId;
643 taskInfo.prepareTraceId = oriTaskInfo.prepareTraceId;
644 return taskInfo;
645 }
646 }