1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud/cloud_sync_utils.h"
16
17 #include "cloud/asset_operation_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_storage_utils.h"
20 #include "db_common.h"
21 #include "db_dfx_adapter.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24
25 namespace DistributedDB {
GetCloudPkVals(const VBucket & datum,const std::vector<std::string> & pkColNames,int64_t dataKey,std::vector<Type> & cloudPkVals)26 int CloudSyncUtils::GetCloudPkVals(const VBucket &datum, const std::vector<std::string> &pkColNames, int64_t dataKey,
27 std::vector<Type> &cloudPkVals)
28 {
29 if (!cloudPkVals.empty()) {
30 LOGE("[CloudSyncer] Output parameter should be empty");
31 return -E_INVALID_ARGS;
32 }
33 for (const auto &pkColName : pkColNames) {
34 // If data is primary key or is a composite primary key, then use rowID as value
35 // The single primary key table, does not contain rowid.
36 if (pkColName == DBConstant::ROWID) {
37 cloudPkVals.emplace_back(dataKey);
38 continue;
39 }
40 Type type;
41 bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive(pkColName, datum, type);
42 if (!isExisted) {
43 LOGE("[CloudSyncer] Cloud data do not contain expected primary field value");
44 return -E_CLOUD_ERROR;
45 }
46 cloudPkVals.push_back(type);
47 }
48 return E_OK;
49 }
50
OpTypeToChangeType(OpType strategy)51 ChangeType CloudSyncUtils::OpTypeToChangeType(OpType strategy)
52 {
53 switch (strategy) {
54 case OpType::INSERT:
55 return OP_INSERT;
56 case OpType::DELETE:
57 return OP_DELETE;
58 case OpType::UPDATE:
59 return OP_UPDATE;
60 default:
61 return OP_BUTT;
62 }
63 }
64
IsSinglePrimaryKey(const std::vector<std::string> & pkColNames)65 bool CloudSyncUtils::IsSinglePrimaryKey(const std::vector<std::string> &pkColNames)
66 {
67 return pkColNames.size() == 1 && pkColNames[0] != DBConstant::ROWID;
68 }
69
RemoveDataExceptExtendInfo(VBucket & datum,const std::vector<std::string> & pkColNames)70 void CloudSyncUtils::RemoveDataExceptExtendInfo(VBucket &datum, const std::vector<std::string> &pkColNames)
71 {
72 for (auto item = datum.begin(); item != datum.end();) {
73 const auto &key = item->first;
74 if (key != CloudDbConstant::GID_FIELD &&
75 key != CloudDbConstant::CREATE_FIELD &&
76 key != CloudDbConstant::MODIFY_FIELD &&
77 key != CloudDbConstant::DELETE_FIELD &&
78 key != CloudDbConstant::CURSOR_FIELD &&
79 key != CloudDbConstant::VERSION_FIELD &&
80 key != CloudDbConstant::SHARING_RESOURCE_FIELD &&
81 (std::find(pkColNames.begin(), pkColNames.end(), key) == pkColNames.end())) {
82 item = datum.erase(item);
83 } else {
84 item++;
85 }
86 }
87 }
88
StatusToFlag(AssetStatus status)89 AssetOpType CloudSyncUtils::StatusToFlag(AssetStatus status)
90 {
91 auto tmpStatus = static_cast<AssetStatus>(AssetOperationUtils::EraseBitMask(static_cast<uint32_t>(status)));
92 switch (tmpStatus) {
93 case AssetStatus::INSERT:
94 return AssetOpType::INSERT;
95 case AssetStatus::DELETE:
96 return AssetOpType::DELETE;
97 case AssetStatus::UPDATE:
98 return AssetOpType::UPDATE;
99 case AssetStatus::NORMAL:
100 return AssetOpType::NO_CHANGE;
101 default:
102 LOGW("[CloudSyncer] Unexpected Situation and won't be handled"
103 ", Caller should ensure that current situation won't occur");
104 return AssetOpType::NO_CHANGE;
105 }
106 }
107
StatusToFlagForAsset(Asset & asset)108 void CloudSyncUtils::StatusToFlagForAsset(Asset &asset)
109 {
110 asset.flag = static_cast<uint32_t>(StatusToFlag(static_cast<AssetStatus>(asset.status)));
111 asset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
112 }
113
StatusToFlagForAssets(Assets & assets)114 void CloudSyncUtils::StatusToFlagForAssets(Assets &assets)
115 {
116 for (Asset &asset : assets) {
117 StatusToFlagForAsset(asset);
118 }
119 }
120
StatusToFlagForAssetsInRecord(const std::vector<Field> & fields,VBucket & record)121 void CloudSyncUtils::StatusToFlagForAssetsInRecord(const std::vector<Field> &fields, VBucket &record)
122 {
123 for (const Field &field : fields) {
124 if (field.type == TYPE_INDEX<Assets> && record[field.colName].index() == TYPE_INDEX<Assets>) {
125 StatusToFlagForAssets(std::get<Assets>(record[field.colName]));
126 } else if (field.type == TYPE_INDEX<Asset> && record[field.colName].index() == TYPE_INDEX<Asset>) {
127 StatusToFlagForAsset(std::get<Asset>(record[field.colName]));
128 }
129 }
130 }
131
IsChangeDataEmpty(const ChangedData & changedData)132 bool CloudSyncUtils::IsChangeDataEmpty(const ChangedData &changedData)
133 {
134 return changedData.primaryData[ChangeType::OP_INSERT].empty() ||
135 changedData.primaryData[ChangeType::OP_UPDATE].empty() ||
136 changedData.primaryData[ChangeType::OP_DELETE].empty();
137 }
138
EqualInMsLevel(const Timestamp cmp,const Timestamp beCmp)139 bool CloudSyncUtils::EqualInMsLevel(const Timestamp cmp, const Timestamp beCmp)
140 {
141 return (cmp / CloudDbConstant::TEN_THOUSAND) == (beCmp / CloudDbConstant::TEN_THOUSAND);
142 }
143
NeedSaveData(const LogInfo & localLogInfo,const LogInfo & cloudLogInfo)144 bool CloudSyncUtils::NeedSaveData(const LogInfo &localLogInfo, const LogInfo &cloudLogInfo)
145 {
146 // If timeStamp, write timestamp, cloudGid are all the same,
147 // We thought that the datum is mostly be the same between cloud and local
148 // However, there are still slightly possibility that it may be created from different device,
149 // So, during the strategy policy [i.e. TagSyncDataStatus], the datum was tagged as UPDATE
150 // But we won't notify the datum
151 bool isSame = localLogInfo.timestamp == cloudLogInfo.timestamp &&
152 EqualInMsLevel(localLogInfo.wTimestamp, cloudLogInfo.wTimestamp) &&
153 localLogInfo.cloudGid == cloudLogInfo.cloudGid &&
154 localLogInfo.sharingResource == cloudLogInfo.sharingResource &&
155 localLogInfo.version == cloudLogInfo.version &&
156 (localLogInfo.flag & static_cast<uint64_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC)) == 0;
157 return !isSame;
158 }
159
CheckParamValid(const std::vector<DeviceID> & devices,SyncMode mode)160 int CloudSyncUtils::CheckParamValid(const std::vector<DeviceID> &devices, SyncMode mode)
161 {
162 if (devices.size() != 1) {
163 LOGE("[CloudSyncer] invalid devices size %zu", devices.size());
164 return -E_INVALID_ARGS;
165 }
166 for (const auto &dev: devices) {
167 if (dev.empty() || dev.size() > DBConstant::MAX_DEV_LENGTH) {
168 LOGE("[CloudSyncer] invalid device, size %zu", dev.size());
169 return -E_INVALID_ARGS;
170 }
171 }
172 if (mode >= SyncMode::SYNC_MODE_PUSH_ONLY && mode < SyncMode::SYNC_MODE_CLOUD_MERGE) {
173 LOGE("[CloudSyncer] not support mode %d", static_cast<int>(mode));
174 return -E_NOT_SUPPORT;
175 }
176 if (mode < SyncMode::SYNC_MODE_PUSH_ONLY || mode > SyncMode::SYNC_MODE_CLOUD_FORCE_PULL) {
177 LOGE("[CloudSyncer] invalid mode %d", static_cast<int>(mode));
178 return -E_INVALID_ARGS;
179 }
180 return E_OK;
181 }
182
GetCloudLogInfo(DistributedDB::VBucket & datum)183 LogInfo CloudSyncUtils::GetCloudLogInfo(DistributedDB::VBucket &datum)
184 {
185 LogInfo cloudLogInfo;
186 cloudLogInfo.dataKey = 0;
187 cloudLogInfo.timestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::MODIFY_FIELD]);
188 cloudLogInfo.wTimestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::CREATE_FIELD]);
189 cloudLogInfo.flag = (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) ? 1u : 0u;
190 cloudLogInfo.cloudGid = std::get<std::string>(datum[CloudDbConstant::GID_FIELD]);
191 CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, datum, cloudLogInfo.device);
192 (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
193 datum, cloudLogInfo.sharingResource);
194 (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
195 datum, cloudLogInfo.version);
196 return cloudLogInfo;
197 }
198
SaveChangedDataByType(const VBucket & datum,ChangedData & changedData,const DataInfoWithLog & localInfo,ChangeType type)199 int CloudSyncUtils::SaveChangedDataByType(const VBucket &datum, ChangedData &changedData,
200 const DataInfoWithLog &localInfo, ChangeType type)
201 {
202 int ret = E_OK;
203 std::vector<Type> cloudPkVals;
204 if (type == ChangeType::OP_DELETE) {
205 ret = CloudSyncUtils::GetCloudPkVals(localInfo.primaryKeys, changedData.field, localInfo.logInfo.dataKey,
206 cloudPkVals);
207 } else {
208 ret = CloudSyncUtils::GetCloudPkVals(datum, changedData.field, localInfo.logInfo.dataKey, cloudPkVals);
209 }
210 if (ret != E_OK) {
211 return ret;
212 }
213 InsertOrReplaceChangedDataByType(type, cloudPkVals, changedData);
214 return E_OK;
215 }
216
CheckCloudSyncDataValid(const CloudSyncData & uploadData,const std::string & tableName,int64_t count)217 int CloudSyncUtils::CheckCloudSyncDataValid(const CloudSyncData &uploadData, const std::string &tableName,
218 int64_t count)
219 {
220 size_t insRecordLen = uploadData.insData.record.size();
221 size_t insExtendLen = uploadData.insData.extend.size();
222 size_t updRecordLen = uploadData.updData.record.size();
223 size_t updExtendLen = uploadData.updData.extend.size();
224 size_t delRecordLen = uploadData.delData.record.size();
225 size_t delExtendLen = uploadData.delData.extend.size();
226
227 bool syncDataValid = (uploadData.tableName == tableName) &&
228 ((insRecordLen > 0 && insExtendLen > 0 && insRecordLen == insExtendLen) ||
229 (updRecordLen > 0 && updExtendLen > 0 && updRecordLen == updExtendLen) ||
230 (delRecordLen > 0 && delExtendLen > 0 && delRecordLen == delExtendLen) ||
231 (uploadData.lockData.extend.size() > 0));
232 if (!syncDataValid) {
233 LOGE("[CloudSyncUtils] upload data is empty but upload count is not zero or upload table name"
234 " is not the same as table name of sync data.");
235 return -E_INTERNAL_ERROR;
236 }
237 int64_t syncDataCount = static_cast<int64_t>(insRecordLen) + static_cast<int64_t>(updRecordLen) +
238 static_cast<int64_t>(delRecordLen);
239 if (syncDataCount > count) {
240 LOGE("[CloudSyncUtils] Size of a batch of sync data is greater than upload data size. count %d", count);
241 return -E_INTERNAL_ERROR;
242 }
243 return E_OK;
244 }
245
ClearCloudSyncData(CloudSyncData & uploadData)246 void CloudSyncUtils::ClearCloudSyncData(CloudSyncData &uploadData)
247 {
248 std::vector<VBucket>().swap(uploadData.insData.record);
249 std::vector<VBucket>().swap(uploadData.insData.extend);
250 std::vector<int64_t>().swap(uploadData.insData.rowid);
251 std::vector<VBucket>().swap(uploadData.updData.record);
252 std::vector<VBucket>().swap(uploadData.updData.extend);
253 std::vector<VBucket>().swap(uploadData.delData.record);
254 std::vector<VBucket>().swap(uploadData.delData.extend);
255 }
256
GetWaterMarkAndUpdateTime(std::vector<VBucket> & extend,Timestamp & waterMark)257 int CloudSyncUtils::GetWaterMarkAndUpdateTime(std::vector<VBucket> &extend, Timestamp &waterMark)
258 {
259 for (auto &extendData: extend) {
260 if (extendData.empty() || extendData.find(CloudDbConstant::MODIFY_FIELD) == extendData.end()) {
261 LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist.");
262 return -E_INTERNAL_ERROR;
263 }
264 if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::MODIFY_FIELD).index()) {
265 LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doesn't fit int64_t.");
266 return -E_INTERNAL_ERROR;
267 }
268 if (extendData.empty() || extendData.find(CloudDbConstant::CREATE_FIELD) == extendData.end()) {
269 LOGE("[CloudSyncer] VBucket is empty or CREATE_FIELD doesn't exist.");
270 return -E_INTERNAL_ERROR;
271 }
272 if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::CREATE_FIELD).index()) {
273 LOGE("[CloudSyncer] VBucket's CREATE_FIELD doesn't fit int64_t.");
274 return -E_INTERNAL_ERROR;
275 }
276 waterMark = std::max(int64_t(waterMark), std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)));
277 int64_t modifyTime =
278 std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)) / CloudDbConstant::TEN_THOUSAND;
279 int64_t createTime =
280 std::get<int64_t>(extendData.at(CloudDbConstant::CREATE_FIELD)) / CloudDbConstant::TEN_THOUSAND;
281 extendData.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
282 extendData.insert_or_assign(CloudDbConstant::CREATE_FIELD, createTime);
283 }
284 return E_OK;
285 }
286
CheckCloudSyncDataEmpty(const CloudSyncData & uploadData)287 bool CloudSyncUtils::CheckCloudSyncDataEmpty(const CloudSyncData &uploadData)
288 {
289 return uploadData.insData.extend.empty() && uploadData.insData.record.empty() &&
290 uploadData.updData.extend.empty() && uploadData.updData.record.empty() &&
291 uploadData.delData.extend.empty() && uploadData.delData.record.empty() &&
292 uploadData.lockData.rowid.empty();
293 }
294
ModifyCloudDataTime(DistributedDB::VBucket & data)295 void CloudSyncUtils::ModifyCloudDataTime(DistributedDB::VBucket &data)
296 {
297 // data already check field modify_field and create_field
298 int64_t modifyTime = std::get<int64_t>(data[CloudDbConstant::MODIFY_FIELD]) * CloudDbConstant::TEN_THOUSAND;
299 int64_t createTime = std::get<int64_t>(data[CloudDbConstant::CREATE_FIELD]) * CloudDbConstant::TEN_THOUSAND;
300 data[CloudDbConstant::MODIFY_FIELD] = modifyTime;
301 data[CloudDbConstant::CREATE_FIELD] = createTime;
302 }
303
304 // After doing a batch upload, we need to use CloudSyncData's maximum timestamp to update the water mark;
UpdateExtendTime(CloudSyncData & uploadData,const int64_t & count,uint64_t taskId,Timestamp & waterMark)305 int CloudSyncUtils::UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count, uint64_t taskId,
306 Timestamp &waterMark)
307 {
308 int ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, uploadData.tableName, count);
309 if (ret != E_OK) {
310 LOGE("[CloudSyncer] Invalid Sync Data when get local water mark.");
311 return ret;
312 }
313 if (!uploadData.insData.extend.empty()) {
314 if (uploadData.insData.record.size() != uploadData.insData.extend.size()) {
315 LOGE("[CloudSyncer] Inconsistent size of inserted data.");
316 return -E_INTERNAL_ERROR;
317 }
318 ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.insData.extend, waterMark);
319 if (ret != E_OK) {
320 return ret;
321 }
322 }
323
324 if (!uploadData.updData.extend.empty()) {
325 if (uploadData.updData.record.size() != uploadData.updData.extend.size()) {
326 LOGE("[CloudSyncer] Inconsistent size of updated data, %d.", -E_INTERNAL_ERROR);
327 return -E_INTERNAL_ERROR;
328 }
329 ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.updData.extend, waterMark);
330 if (ret != E_OK) {
331 return ret;
332 }
333 }
334
335 if (!uploadData.delData.extend.empty()) {
336 if (uploadData.delData.record.size() != uploadData.delData.extend.size()) {
337 LOGE("[CloudSyncer] Inconsistent size of deleted data, %d.", -E_INTERNAL_ERROR);
338 return -E_INTERNAL_ERROR;
339 }
340 ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.delData.extend, waterMark);
341 if (ret != E_OK) {
342 return ret;
343 }
344 }
345 return E_OK;
346 }
347
UpdateLocalCache(OpType opType,const LogInfo & cloudInfo,const LogInfo & localInfo,std::map<std::string,LogInfo> & localLogInfoCache)348 void CloudSyncUtils::UpdateLocalCache(OpType opType, const LogInfo &cloudInfo, const LogInfo &localInfo,
349 std::map<std::string, LogInfo> &localLogInfoCache)
350 {
351 LogInfo updateLogInfo;
352 std::string hashKey(localInfo.hashKey.begin(), localInfo.hashKey.end());
353 bool updateCache = true;
354 switch (opType) {
355 case OpType::INSERT :
356 case OpType::UPDATE :
357 case OpType::DELETE: {
358 updateLogInfo = cloudInfo;
359 updateLogInfo.device = CloudDbConstant::DEFAULT_CLOUD_DEV;
360 updateLogInfo.hashKey = localInfo.hashKey;
361 if (opType == OpType::DELETE) {
362 updateLogInfo.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
363 } else if (opType == OpType::INSERT) {
364 updateLogInfo.originDev = CloudDbConstant::DEFAULT_CLOUD_DEV;
365 }
366 break;
367 }
368 case OpType::CLEAR_GID:
369 case OpType::UPDATE_TIMESTAMP: {
370 updateLogInfo = localInfo;
371 updateLogInfo.cloudGid.clear();
372 updateLogInfo.sharingResource.clear();
373 break;
374 }
375 default:
376 updateCache = false;
377 break;
378 }
379 if (updateCache) {
380 localLogInfoCache[hashKey] = updateLogInfo;
381 }
382 }
383
SaveChangedData(ICloudSyncer::SyncParam & param,size_t dataIndex,const ICloudSyncer::DataInfo & dataInfo,std::vector<std::pair<Key,size_t>> & deletedList)384 int CloudSyncUtils::SaveChangedData(ICloudSyncer::SyncParam ¶m, size_t dataIndex,
385 const ICloudSyncer::DataInfo &dataInfo, std::vector<std::pair<Key, size_t>> &deletedList)
386 {
387 OpType opType = CalOpType(param, dataIndex);
388 Key hashKey = dataInfo.localInfo.logInfo.hashKey;
389 if (param.deletePrimaryKeySet.find(hashKey) != param.deletePrimaryKeySet.end()) {
390 if (opType == OpType::INSERT) {
391 (void)param.dupHashKeySet.insert(hashKey);
392 opType = OpType::UPDATE;
393 // only composite primary key needs to be processed.
394 if (!param.isSinglePrimaryKey) {
395 param.withoutRowIdData.updateData.emplace_back(dataIndex,
396 param.changedData.primaryData[ChangeType::OP_UPDATE].size());
397 }
398 }
399 }
400 // INSERT: for no primary key or composite primary key situation
401 if (!param.isSinglePrimaryKey && opType == OpType::INSERT) {
402 param.info.downLoadInfo.insertCount++;
403 param.withoutRowIdData.insertData.push_back(dataIndex);
404 return E_OK;
405 }
406 switch (opType) {
407 // INSERT: only for single primary key situation
408 case OpType::INSERT:
409 param.info.downLoadInfo.insertCount++;
410 param.info.retryInfo.downloadBatchOpCount++;
411 return CloudSyncUtils::SaveChangedDataByType(
412 param.downloadData.data[dataIndex], param.changedData, dataInfo.localInfo, ChangeType::OP_INSERT);
413 case OpType::UPDATE:
414 param.info.downLoadInfo.updateCount++;
415 param.info.retryInfo.downloadBatchOpCount++;
416 if (CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
417 return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
418 dataInfo.localInfo, ChangeType::OP_UPDATE);
419 }
420 return E_OK;
421 case OpType::DELETE:
422 param.info.downLoadInfo.deleteCount++;
423 param.info.retryInfo.downloadBatchOpCount++;
424 return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
425 dataInfo.localInfo, ChangeType::OP_DELETE);
426 case OpType::UPDATE_TIMESTAMP:
427 param.info.retryInfo.downloadBatchOpCount++;
428 return E_OK;
429 default:
430 return E_OK;
431 }
432 }
433
ClearWithoutData(ICloudSyncer::SyncParam & param)434 void CloudSyncUtils::ClearWithoutData(ICloudSyncer::SyncParam ¶m)
435 {
436 param.withoutRowIdData.insertData.clear();
437 param.withoutRowIdData.updateData.clear();
438 param.withoutRowIdData.assetInsertData.clear();
439 }
440
IsSkipAssetsMissingRecord(const std::vector<VBucket> & extend)441 bool CloudSyncUtils::IsSkipAssetsMissingRecord(const std::vector<VBucket> &extend)
442 {
443 if (extend.empty()) {
444 return false;
445 }
446 for (size_t i = 0; i < extend.size(); ++i) {
447 if (DBCommon::IsIntTypeRecordError(extend[i]) && !DBCommon::IsRecordAssetsMissing(extend[i])) {
448 return false;
449 }
450 }
451 return true;
452 }
453
FillAssetIdToAssets(CloudSyncBatch & data,int errorCode,const CloudWaterType & type)454 int CloudSyncUtils::FillAssetIdToAssets(CloudSyncBatch &data, int errorCode, const CloudWaterType &type)
455 {
456 if (data.extend.size() != data.assets.size()) {
457 LOGE("[CloudSyncUtils] size not match, extend:%zu assets:%zu.", data.extend.size(), data.assets.size());
458 return -E_CLOUD_ERROR;
459 }
460 int errCode = E_OK;
461 for (size_t i = 0; i < data.assets.size(); i++) {
462 if (data.assets[i].empty() || DBCommon::IsRecordIgnored(data.extend[i]) ||
463 (errorCode != E_OK &&
464 (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordAssetsMissing(data.extend[i]))) ||
465 DBCommon::IsNeedCompensatedForUpload(data.extend[i], type)) {
466 if (errCode != E_OK && DBCommon::IsRecordAssetsMissing(data.extend[i])) {
467 LOGI("[CloudSyncUtils][FileAssetIdToAssets] errCode with assets missing, skip fill assets id");
468 }
469 continue;
470 }
471 for (auto it = data.assets[i].begin(); it != data.assets[i].end();) {
472 auto &[col, value] = *it;
473 if (!CheckIfContainsInsertAssets(value)) {
474 ++it;
475 continue;
476 }
477 auto extendIt = data.extend[i].find(col);
478 if (extendIt == data.extend[i].end()) {
479 LOGI("[CloudSyncUtils] Asset field name can not find in extend. key is:%s.", col.c_str());
480 it = data.assets[i].erase(it);
481 continue;
482 }
483 if (extendIt->second.index() != value.index()) {
484 LOGE("[CloudSyncUtils] Asset field type not same. extend:%zu, data:%zu",
485 extendIt->second.index(), value.index());
486 errCode = -E_CLOUD_ERROR;
487 ++it;
488 continue;
489 }
490 int ret = FillAssetIdToAssetData(extendIt->second, value);
491 if (ret != E_OK) {
492 LOGE("[CloudSyncUtils] fail to fill assetId, %d.", ret);
493 errCode = -E_CLOUD_ERROR;
494 }
495 ++it;
496 }
497 }
498 return errCode;
499 }
500
FillAssetIdToAssetData(const Type & extend,Type & assetData)501 int CloudSyncUtils::FillAssetIdToAssetData(const Type &extend, Type &assetData)
502 {
503 if (extend.index() == TYPE_INDEX<Asset>) {
504 if (std::get<Asset>(assetData).name != std::get<Asset>(extend).name) {
505 LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset name can not find in extend.");
506 return -E_CLOUD_ERROR;
507 }
508 if (std::get<Asset>(extend).assetId.empty()) {
509 LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset id is empty.");
510 return -E_CLOUD_ERROR;
511 }
512 std::get<Asset>(assetData).assetId = std::get<Asset>(extend).assetId;
513 }
514 if (extend.index() == TYPE_INDEX<Assets>) {
515 FillAssetIdToAssetsData(std::get<Assets>(extend), std::get<Assets>(assetData));
516 }
517 return E_OK;
518 }
519
FillAssetIdToAssetsData(const Assets & extend,Assets & assets)520 void CloudSyncUtils::FillAssetIdToAssetsData(const Assets &extend, Assets &assets)
521 {
522 for (auto it = assets.begin(); it != assets.end();) {
523 auto &asset = *it;
524 if (asset.flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
525 ++it;
526 continue;
527 }
528 auto extendAssets = extend;
529 bool isAssetExisted = false;
530 for (const auto &extendAsset : extendAssets) {
531 if (asset.name == extendAsset.name && !extendAsset.assetId.empty()) {
532 asset.assetId = extendAsset.assetId;
533 isAssetExisted = true;
534 break;
535 }
536 }
537 if (!isAssetExisted) {
538 LOGI("Unable to sync local asset, skip fill assetId.");
539 it = assets.erase(it);
540 } else {
541 ++it;
542 }
543 }
544 }
545
CheckIfContainsInsertAssets(const Type & assetData)546 bool CloudSyncUtils::CheckIfContainsInsertAssets(const Type &assetData)
547 {
548 if (assetData.index() == TYPE_INDEX<Asset>) {
549 if (std::get<Asset>(assetData).flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
550 return false;
551 }
552 } else if (assetData.index() == TYPE_INDEX<Assets>) {
553 bool hasInsertAsset = false;
554 for (const auto &asset : std::get<Assets>(assetData)) {
555 if (asset.flag == static_cast<uint32_t>(AssetOpType::INSERT)) {
556 hasInsertAsset = true;
557 break;
558 }
559 }
560 if (!hasInsertAsset) {
561 return false;
562 }
563 }
564 return true;
565 }
566
UpdateAssetsFlag(CloudSyncData & uploadData)567 void CloudSyncUtils::UpdateAssetsFlag(CloudSyncData &uploadData)
568 {
569 AssetOperationUtils::UpdateAssetsFlag(uploadData.insData.record, uploadData.insData.assets);
570 AssetOperationUtils::UpdateAssetsFlag(uploadData.updData.record, uploadData.updData.assets);
571 AssetOperationUtils::UpdateAssetsFlag(uploadData.delData.record, uploadData.delData.assets);
572 }
573
InsertOrReplaceChangedDataByType(ChangeType type,std::vector<Type> & pkVal,ChangedData & changedData)574 void CloudSyncUtils::InsertOrReplaceChangedDataByType(ChangeType type, std::vector<Type> &pkVal,
575 ChangedData &changedData)
576 {
577 // erase old changedData if exist
578 for (auto &changePkValList : changedData.primaryData) {
579 changePkValList.erase(std::remove_if(changePkValList.begin(), changePkValList.end(),
580 [&pkVal](const std::vector<Type> &existPkVal) {
581 return existPkVal == pkVal;
582 }), changePkValList.end());
583 }
584 // insert new changeData
585 changedData.primaryData[type].emplace_back(std::move(pkVal));
586 }
587
CalOpType(ICloudSyncer::SyncParam & param,size_t dataIndex)588 OpType CloudSyncUtils::CalOpType(ICloudSyncer::SyncParam ¶m, size_t dataIndex)
589 {
590 OpType opType = param.downloadData.opType[dataIndex];
591 if (opType != OpType::INSERT && opType != OpType::UPDATE) {
592 return opType;
593 }
594
595 std::vector<Type> cloudPkVal;
596 // use dataIndex as dataKey avoid get same pk with no pk schema
597 int errCode = CloudSyncUtils::GetCloudPkVals(param.downloadData.data[dataIndex], param.changedData.field, dataIndex,
598 cloudPkVal);
599 if (errCode != E_OK) {
600 LOGW("[CloudSyncUtils] Get pk from download data failed %d", errCode);
601 // use origin opType
602 return opType;
603 }
604 auto iter = std::find_if(param.insertPk.begin(), param.insertPk.end(), [&cloudPkVal](const auto &item) {
605 return item == cloudPkVal;
606 });
607 if (opType == OpType::INSERT) {
608 // record all insert pk in one batch
609 if (iter == param.insertPk.end()) {
610 param.insertPk.push_back(cloudPkVal);
611 }
612 return OpType::INSERT;
613 }
614 // notify with insert because this data not exist in local before query
615 return (iter == param.insertPk.end()) ? OpType::UPDATE : OpType::INSERT;
616 }
617
InitCompensatedSyncTaskInfo()618 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo()
619 {
620 CloudSyncer::CloudTaskInfo taskInfo;
621 taskInfo.priorityTask = true;
622 taskInfo.priorityLevel = CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL;
623 taskInfo.timeout = CloudDbConstant::CLOUD_DEFAULT_TIMEOUT;
624 taskInfo.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
625 taskInfo.callback = nullptr;
626 taskInfo.compensatedTask = true;
627 return taskInfo;
628 }
629
InitCompensatedSyncTaskInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess)630 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncOption &option,
631 const SyncProcessCallback &onProcess)
632 {
633 CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
634 taskInfo.callback = onProcess;
635 taskInfo.devices = option.devices;
636 taskInfo.prepareTraceId = option.prepareTraceId;
637 if (option.users.empty()) {
638 taskInfo.users.push_back("");
639 } else {
640 taskInfo.users = option.users;
641 }
642 taskInfo.lockAction = option.lockAction;
643 return taskInfo;
644 }
645
InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo & oriTaskInfo)646 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo &oriTaskInfo)
647 {
648 CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
649 taskInfo.lockAction = oriTaskInfo.lockAction;
650 taskInfo.users = oriTaskInfo.users;
651 taskInfo.devices = oriTaskInfo.devices;
652 taskInfo.storeId = oriTaskInfo.storeId;
653 taskInfo.prepareTraceId = oriTaskInfo.prepareTraceId;
654 return taskInfo;
655 }
656
CheckQueryCloudData(std::string & traceId,DownloadData & downloadData,std::vector<std::string> & pkColNames)657 void CloudSyncUtils::CheckQueryCloudData(std::string &traceId, DownloadData &downloadData,
658 std::vector<std::string> &pkColNames)
659 {
660 for (auto &data : downloadData.data) {
661 bool isVersionExist = data.count(CloudDbConstant::VERSION_FIELD) != 0;
662 bool isContainAllPk = true;
663 for (auto &pkColName : pkColNames) {
664 if (data.count(pkColName) == 0) {
665 isContainAllPk = false;
666 break;
667 }
668 }
669 std::string gid;
670 (void)CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, data, gid);
671 if (!isVersionExist || !isContainAllPk) {
672 LOGE("[CloudSyncer] Invalid data from cloud, no version[%d], lost primary key[%d], gid[%s], traceId[%s]",
673 static_cast<int>(!isVersionExist), static_cast<int>(!isContainAllPk), gid.c_str(), traceId.c_str());
674 }
675 }
676 }
677
IsNeedUpdateAsset(const VBucket & data)678 bool CloudSyncUtils::IsNeedUpdateAsset(const VBucket &data)
679 {
680 for (const auto &item : data) {
681 const Asset *asset = std::get_if<TYPE_INDEX<Asset>>(&item.second);
682 if (asset != nullptr) {
683 uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(asset->status);
684 if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
685 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
686 return true;
687 }
688 continue;
689 }
690 const Assets *assets = std::get_if<TYPE_INDEX<Assets>>(&item.second);
691 if (assets == nullptr) {
692 continue;
693 }
694 for (const auto &oneAsset : *assets) {
695 uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(oneAsset.status);
696 if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
697 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
698 return true;
699 }
700 }
701 }
702 return false;
703 }
704
GetDownloadListByGid(const std::shared_ptr<StorageProxy> & proxy,const std::vector<std::string> & data,const std::string & table)705 std::tuple<int, DownloadList, ChangedData> CloudSyncUtils::GetDownloadListByGid(
706 const std::shared_ptr<StorageProxy> &proxy, const std::vector<std::string> &data, const std::string &table)
707 {
708 std::tuple<int, DownloadList, ChangedData> res;
709 std::vector<std::string> pkColNames;
710 std::vector<Field> assetFields;
711 auto &[errCode, downloadList, changeData] = res;
712 errCode = proxy->GetPrimaryColNamesWithAssetsFields(table, pkColNames, assetFields);
713 if (errCode != E_OK) {
714 LOGE("[CloudSyncUtils] Get %s pk names by failed %d", DBCommon::StringMiddleMasking(table).c_str(), errCode);
715 return res;
716 }
717 changeData.tableName = table;
718 changeData.type = ChangedDataType::ASSET;
719 changeData.field = pkColNames;
720 for (const auto &gid : data) {
721 VBucket assetInfo;
722 VBucket record;
723 record[CloudDbConstant::GID_FIELD] = gid;
724 DataInfoWithLog dataInfo;
725 errCode = proxy->GetInfoByPrimaryKeyOrGid(table, record, false, dataInfo, assetInfo);
726 if (errCode != E_OK) {
727 LOGE("[CloudSyncUtils] Get download list by gid failed %s %d", gid.c_str(), errCode);
728 break;
729 }
730 Type prefix;
731 std::vector<Type> pkVal;
732 OpType strategy = OpType::UPDATE;
733 errCode = CloudSyncUtils::GetCloudPkVals(dataInfo.primaryKeys, pkColNames, dataInfo.logInfo.dataKey, pkVal);
734 if (errCode != E_OK) {
735 LOGE("[CloudSyncUtils] HandleTagAssets cannot get primary key value list. %d", errCode);
736 break;
737 }
738 if (IsSinglePrimaryKey(pkColNames) && !pkVal.empty()) {
739 prefix = pkVal[0];
740 }
741 auto assetsMap = AssetOperationUtils::FilterNeedDownloadAsset(assetInfo);
742 downloadList.push_back(
743 std::make_tuple(dataInfo.logInfo.cloudGid, prefix, strategy, assetsMap, dataInfo.logInfo.hashKey,
744 pkVal, dataInfo.logInfo.timestamp));
745 }
746 return res;
747 }
748
UpdateMaxTimeWithDownloadList(const DownloadList & downloadList,const std::string & table,std::map<std::string,int64_t> & downloadBeginTime)749 void CloudSyncUtils::UpdateMaxTimeWithDownloadList(const DownloadList &downloadList, const std::string &table,
750 std::map<std::string, int64_t> &downloadBeginTime)
751 {
752 auto origin = downloadBeginTime[table];
753 for (const auto &item : downloadList) {
754 auto timestamp = std::get<CloudSyncUtils::TIMESTAMP_INDEX>(item);
755 downloadBeginTime[table] = std::max(static_cast<int64_t>(timestamp), downloadBeginTime[table]);
756 }
757 if (downloadBeginTime[table] == origin) {
758 downloadBeginTime[table]++;
759 }
760 }
761
IsContainDownloading(const DownloadAssetUnit & downloadAssetUnit)762 bool CloudSyncUtils::IsContainDownloading(const DownloadAssetUnit &downloadAssetUnit)
763 {
764 auto &assets = std::get<CloudSyncUtils::ASSETS_INDEX>(downloadAssetUnit);
765 for (const auto &item : assets) {
766 for (const auto &asset : item.second) {
767 if ((AssetOperationUtils::EraseBitMask(asset.status) & static_cast<uint32_t>(AssetStatus::DOWNLOADING))
768 != 0) {
769 return true;
770 }
771 }
772 }
773 return false;
774 }
775
GetDownloadAssetsOnlyMapFromDownLoadData(size_t idx,ICloudSyncer::SyncParam & param,std::map<std::string,Assets> & downloadAssetsMap)776 int CloudSyncUtils::GetDownloadAssetsOnlyMapFromDownLoadData(
777 size_t idx, ICloudSyncer::SyncParam ¶m, std::map<std::string, Assets> &downloadAssetsMap)
778 {
779 std::string gid;
780 int errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(
781 CloudDbConstant::GID_FIELD, param.downloadData.data[idx], gid);
782 if (errCode != E_OK) {
783 LOGE("Get gid from bucket fail when get download assets only map from download data, error code %d", errCode);
784 return errCode;
785 }
786
787 auto assetsMap = param.gidAssetsMap[gid];
788 for (auto &item : param.downloadData.data[idx]) {
789 auto findAssetList = assetsMap.find(item.first);
790 if (findAssetList == assetsMap.end()) {
791 continue;
792 }
793 Asset *asset = std::get_if<Asset>(&item.second);
794 if (asset != nullptr) {
795 auto matchName = std::find_if(findAssetList->second.begin(),
796 findAssetList->second.end(),
797 [&asset](const std::string &a) { return a == asset->name; });
798 if (matchName != findAssetList->second.end()) {
799 Asset tmpAsset = *asset;
800 tmpAsset.status = static_cast<uint32_t>(AssetStatus::UPDATE);
801 tmpAsset.flag = static_cast<uint32_t>(AssetOpType::UPDATE);
802 downloadAssetsMap[item.first].push_back(tmpAsset);
803 }
804 continue;
805 }
806 Assets *assets = std::get_if<Assets>(&item.second);
807 if (assets == nullptr) {
808 continue;
809 }
810 for (const auto &assetItem : (*assets)) {
811 auto matchName = std::find_if(findAssetList->second.begin(),
812 findAssetList->second.end(),
813 [&assetItem](const std::string &a) { return a == assetItem.name; });
814 if (matchName != findAssetList->second.end()) {
815 Asset tmpAsset = assetItem;
816 tmpAsset.status = static_cast<uint32_t>(AssetStatus::UPDATE);
817 tmpAsset.flag = static_cast<uint32_t>(AssetOpType::UPDATE);
818 downloadAssetsMap[item.first].push_back(tmpAsset);
819 }
820 }
821 }
822 return E_OK;
823 }
824
NotifyChangeData(const std::string & dev,const std::shared_ptr<StorageProxy> & proxy,ChangedData && changedData)825 int CloudSyncUtils::NotifyChangeData(const std::string &dev, const std::shared_ptr<StorageProxy> &proxy,
826 ChangedData &&changedData)
827 {
828 int ret = proxy->NotifyChangedData(dev, std::move(changedData));
829 if (ret != E_OK) {
830 DBDfxAdapter::ReportBehavior(
831 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, ret});
832 LOGE("[CloudSyncer] Cannot notify changed data while downloading, %d.", ret);
833 } else {
834 DBDfxAdapter::ReportBehavior(
835 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::SUCC, ret});
836 }
837 return ret;
838 }
839
GetQueryAndUsersForCompensatedSync(bool isQueryDownloadRecords,std::shared_ptr<StorageProxy> & storageProxy,std::vector<std::string> & users,std::vector<QuerySyncObject> & syncQuery)840 int CloudSyncUtils::GetQueryAndUsersForCompensatedSync(bool isQueryDownloadRecords,
841 std::shared_ptr<StorageProxy> &storageProxy, std::vector<std::string> &users,
842 std::vector<QuerySyncObject> &syncQuery)
843 {
844 int errCode = storageProxy->GetCompensatedSyncQuery(syncQuery, users, isQueryDownloadRecords);
845 if (errCode != E_OK) {
846 LOGW("[CloudSyncer] get query for compensated sync failed! errCode = %d", errCode);
847 return errCode;
848 }
849 if (syncQuery.empty()) {
850 LOGD("[CloudSyncer] Not need generate compensated sync");
851 }
852 return E_OK;
853 }
854
GetUserListForCompensatedSync(CloudDBProxy & cloudDB,const std::vector<std::string> & users,std::vector<std::string> & userList)855 void CloudSyncUtils::GetUserListForCompensatedSync(
856 CloudDBProxy &cloudDB, const std::vector<std::string> &users, std::vector<std::string> &userList)
857 {
858 auto cloudDBs = cloudDB.GetCloudDB();
859 if (cloudDBs.empty()) {
860 LOGW("[CloudSyncer][GetUserListForCompensatedSync] not set cloud db");
861 return;
862 }
863 for (auto &[user, cloudDb] : cloudDBs) {
864 auto it = std::find(users.begin(), users.end(), user);
865 if (it != users.end()) {
866 userList.push_back(user);
867 }
868 }
869 }
870
SetAssetsMapByCloudGid(std::vector<std::string> & cloudGid,const AssetsMap & groupAssetsMap,std::map<std::string,AssetsMap> & gidAssetsMap)871 bool CloudSyncUtils::SetAssetsMapByCloudGid(
872 std::vector<std::string> &cloudGid, const AssetsMap &groupAssetsMap, std::map<std::string, AssetsMap> &gidAssetsMap)
873 {
874 bool isFindOneRecord = false;
875 for (auto &iter : cloudGid) {
876 auto gidIter = gidAssetsMap.find(iter);
877 if (gidIter == gidAssetsMap.end()) {
878 continue;
879 }
880 for (const auto &pair : groupAssetsMap) {
881 if (gidIter->second.find(pair.first) == gidIter->second.end()) {
882 gidIter->second[pair.first] = pair.second;
883 } else {
884 // merge assets
885 gidIter->second[pair.first].insert(pair.second.begin(), pair.second.end());
886 }
887 }
888 isFindOneRecord = true;
889 }
890 return isFindOneRecord;
891 }
892
CheckAssetsOnlyIsEmptyInGroup(const std::map<std::string,AssetsMap> & gidAssetsMap,const AssetsMap & assetsMap)893 bool CloudSyncUtils::CheckAssetsOnlyIsEmptyInGroup(
894 const std::map<std::string, AssetsMap> &gidAssetsMap, const AssetsMap &assetsMap)
895 {
896 if (gidAssetsMap.empty()) {
897 return true;
898 }
899 for (const auto &item : gidAssetsMap) {
900 const auto &gidAssets = item.second;
901 if (gidAssets.empty()) {
902 return true;
903 }
904 bool isMatch = true;
905 for (const auto &assets : assetsMap) {
906 auto iter = gidAssets.find(assets.first);
907 if (iter == gidAssets.end()) {
908 isMatch = false;
909 break;
910 }
911 if (!std::includes(iter->second.begin(), iter->second.end(), assets.second.begin(), assets.second.end())) {
912 isMatch = false;
913 break;
914 }
915 }
916 if (isMatch) {
917 // find one match, so group is not empty.
918 return false;
919 }
920 }
921 return true;
922 }
923
IsAssetOnlyData(VBucket & queryData,AssetsMap & assetsMap,bool isDownloading)924 bool CloudSyncUtils::IsAssetOnlyData(VBucket &queryData, AssetsMap &assetsMap, bool isDownloading)
925 {
926 if (assetsMap.empty()) {
927 return false;
928 }
929 for (auto &item : assetsMap) {
930 auto &assetNameList = item.second;
931 auto findAssetField = queryData.find(item.first);
932 if (findAssetField == queryData.end() || assetNameList.empty()) {
933 // if not find asset field or assetNameList is empty, mean this is not asset only data.
934 return false;
935 }
936
937 Asset *asset = std::get_if<Asset>(&(findAssetField->second));
938 if (asset != nullptr) {
939 // if is Asset type, assetNameList size must be 1.
940 if (assetNameList.size() != 1u || *(assetNameList.begin()) != asset->name ||
941 asset->status == AssetStatus::DELETE) {
942 // if data is delele, also not asset only data.
943 return false;
944 }
945 if (isDownloading) {
946 asset->status = static_cast<uint32_t>(AssetStatus::DOWNLOADING);
947 }
948 continue;
949 }
950
951 Assets *assets = std::get_if<Assets>(&(findAssetField->second));
952 if (assets == nullptr) {
953 return false;
954 }
955 for (auto &assetName : assetNameList) {
956 auto findAsset = std::find_if(
957 assets->begin(), assets->end(), [&assetName](const Asset &a) { return a.name == assetName; });
958 if (findAsset == assets->end() || (*findAsset).status == AssetStatus::DELETE) {
959 // if data is delele, also not asset only data.
960 return false;
961 }
962 if (isDownloading) {
963 (*findAsset).status = AssetStatus::DOWNLOADING;
964 }
965 }
966 }
967 return true;
968 }
969 }