• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "cloud/cloud_storage_utils.h"
17 #include <set>
18 
19 #include "cloud/asset_operation_utils.h"
20 #include "cloud/cloud_db_constant.h"
21 #include "cloud/cloud_db_types.h"
22 #include "db_common.h"
23 #include "runtime_context.h"
24 
25 namespace DistributedDB {
BindInt64(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)26 int CloudStorageUtils::BindInt64(int index, const VBucket &vBucket, const Field &field,
27     sqlite3_stmt *upsertStmt)
28 {
29     int64_t val = 0;
30     int errCode = GetValueFromVBucket<int64_t>(field.colName, vBucket, val);
31     if (field.nullable && errCode == -E_NOT_FOUND) {
32         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
33     } else {
34         if (errCode != E_OK) {
35             LOGE("get int from vbucket failed, %d", errCode);
36             return -E_CLOUD_ERROR;
37         }
38         errCode = SQLiteUtils::BindInt64ToStatement(upsertStmt, index, val);
39     }
40 
41     if (errCode != E_OK) {
42         LOGE("Bind int to insert statement failed, %d", errCode);
43     }
44     return errCode;
45 }
46 
BindBool(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)47 int CloudStorageUtils::BindBool(int index, const VBucket &vBucket, const Field &field,
48     sqlite3_stmt *upsertStmt)
49 {
50     bool val = false;
51     int errCode = GetValueFromVBucket<bool>(field.colName, vBucket, val);
52     if (field.nullable && errCode == -E_NOT_FOUND) {
53         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
54     } else {
55         if (errCode != E_OK) {
56             LOGE("get bool from vbucket failed, %d", errCode);
57             return -E_CLOUD_ERROR;
58         }
59         errCode = SQLiteUtils::BindInt64ToStatement(upsertStmt, index, val);
60     }
61 
62     if (errCode != E_OK) {
63         LOGE("Bind bool to insert statement failed, %d", errCode);
64     }
65     return errCode;
66 }
67 
BindDouble(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)68 int CloudStorageUtils::BindDouble(int index, const VBucket &vBucket, const Field &field,
69     sqlite3_stmt *upsertStmt)
70 {
71     double val = 0.0;
72     int errCode = GetValueFromVBucket<double>(field.colName, vBucket, val);
73     if (field.nullable && errCode == -E_NOT_FOUND) {
74         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
75     } else {
76         if (errCode != E_OK) {
77             LOGE("get double from vbucket failed, %d", errCode);
78             return -E_CLOUD_ERROR;
79         }
80         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_double(upsertStmt, index, val));
81     }
82 
83     if (errCode != E_OK) {
84         LOGE("Bind double to insert statement failed, %d", errCode);
85     }
86     return errCode;
87 }
88 
BindText(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)89 int CloudStorageUtils::BindText(int index, const VBucket &vBucket, const Field &field,
90     sqlite3_stmt *upsertStmt)
91 {
92     std::string str;
93     int errCode = GetValueFromVBucket<std::string>(field.colName, vBucket, str);
94     if (field.nullable && errCode == -E_NOT_FOUND) {
95         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
96     } else {
97         if (errCode != E_OK) {
98             LOGE("get string from vbucket failed, %d", errCode);
99             return -E_CLOUD_ERROR;
100         }
101         errCode = SQLiteUtils::BindTextToStatement(upsertStmt, index, str);
102     }
103 
104     if (errCode != E_OK) {
105         LOGE("Bind string to insert statement failed, %d", errCode);
106     }
107     return errCode;
108 }
109 
BindBlob(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)110 int CloudStorageUtils::BindBlob(int index, const VBucket &vBucket, const Field &field,
111     sqlite3_stmt *upsertStmt)
112 {
113     int errCode = E_OK;
114     Bytes val;
115     if (field.type == TYPE_INDEX<Bytes>) {
116         errCode = GetValueFromVBucket<Bytes>(field.colName, vBucket, val);
117         if (!(IsFieldValid(field, errCode))) {
118             goto ERROR;
119         }
120     } else if (field.type == TYPE_INDEX<Asset>) {
121         Asset asset;
122         errCode = GetValueFromVBucket(field.colName, vBucket, asset);
123         if (!(IsFieldValid(field, errCode))) {
124             goto ERROR;
125         }
126         RuntimeContext::GetInstance()->AssetToBlob(asset, val);
127     } else {
128         Assets assets;
129         errCode = GetValueFromVBucket(field.colName, vBucket, assets);
130         if (!(IsFieldValid(field, errCode))) {
131             goto ERROR;
132         }
133         RuntimeContext::GetInstance()->AssetsToBlob(assets, val);
134     }
135 
136     if (errCode == -E_NOT_FOUND) {
137         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
138     } else {
139         errCode = SQLiteUtils::BindBlobToStatement(upsertStmt, index, val);
140     }
141     if (errCode != E_OK) {
142         LOGE("Bind blob to insert statement failed, %d", errCode);
143     }
144     return errCode;
145 ERROR:
146     LOGE("get blob from vbucket failed, %d", errCode);
147     return -E_CLOUD_ERROR;
148 }
149 
BindAsset(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)150 int CloudStorageUtils::BindAsset(int index, const VBucket &vBucket, const Field &field, sqlite3_stmt *upsertStmt)
151 {
152     int errCode;
153     Bytes val;
154     Type entry;
155     bool isExisted = GetTypeCaseInsensitive(field.colName, vBucket, entry);
156     if (!isExisted || entry.index() == TYPE_INDEX<Nil>) {
157         if (!field.nullable) {
158             LOGE("field value is not allowed to be null, %d", -E_CLOUD_ERROR);
159             return -E_CLOUD_ERROR;
160         }
161         return SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
162     }
163 
164     Type type = entry;
165     if (field.type == TYPE_INDEX<Asset>) {
166         Asset asset;
167         errCode = GetValueFromOneField(type, asset);
168         if (errCode != E_OK) {
169             LOGE("can not get asset from vBucket when bind, %d", errCode);
170             return errCode;
171         }
172         asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
173         errCode = RuntimeContext::GetInstance()->AssetToBlob(asset, val);
174     } else if (field.type == TYPE_INDEX<Assets>) {
175         Assets assets;
176         errCode = GetValueFromOneField(type, assets);
177         if (errCode != E_OK) {
178             LOGE("can not get assets from vBucket when bind, %d", errCode);
179             return errCode;
180         }
181         if (!assets.empty()) {
182             for (auto &asset: assets) {
183                 asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
184             }
185             errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, val);
186         }
187     } else {
188         LOGE("field type is not asset or assets, %d", -E_CLOUD_ERROR);
189         return -E_CLOUD_ERROR;
190     }
191     if (errCode != E_OK) {
192         LOGE("assets or asset to blob fail, %d", -E_CLOUD_ERROR);
193         return -E_CLOUD_ERROR;
194     }
195     if (val.empty()) {
196         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
197     } else {
198         errCode = SQLiteUtils::BindBlobToStatement(upsertStmt, index, val);
199     }
200     return errCode;
201 }
202 
GetCloudPrimaryKey(const TableSchema & tableSchema)203 std::set<std::string> CloudStorageUtils::GetCloudPrimaryKey(const TableSchema &tableSchema)
204 {
205     std::set<std::string> pkSet;
206     for (const auto &field : tableSchema.fields) {
207         if (field.primary) {
208             pkSet.insert(field.colName);
209         }
210     }
211     return pkSet;
212 }
213 
GetCloudAsset(const TableSchema & tableSchema)214 std::vector<Field> CloudStorageUtils::GetCloudAsset(const TableSchema &tableSchema)
215 {
216     std::vector<Field> assetFields;
217     for (const auto &item: tableSchema.fields) {
218         if (item.type != TYPE_INDEX<Asset> && item.type != TYPE_INDEX<Assets>) {
219             continue;
220         }
221         assetFields.push_back(item);
222     }
223     return assetFields;
224 }
225 
GetCloudPrimaryKeyField(const TableSchema & tableSchema,bool sortByName)226 std::vector<Field> CloudStorageUtils::GetCloudPrimaryKeyField(const TableSchema &tableSchema, bool sortByName)
227 {
228     std::vector<Field> pkVec;
229     for (const auto &field : tableSchema.fields) {
230         if (field.primary) {
231             pkVec.push_back(field);
232         }
233     }
234     if (sortByName) {
235         std::sort(pkVec.begin(), pkVec.end(), [](const Field &a, const Field &b) {
236            return a.colName < b.colName;
237         });
238     }
239     return pkVec;
240 }
241 
GetCloudPrimaryKeyFieldMap(const TableSchema & tableSchema,bool sortByUpper)242 std::map<std::string, Field> CloudStorageUtils::GetCloudPrimaryKeyFieldMap(const TableSchema &tableSchema,
243     bool sortByUpper)
244 {
245     std::map<std::string, Field> pkMap;
246     for (const auto &field : tableSchema.fields) {
247         if (field.primary) {
248             if (sortByUpper) {
249                 pkMap[DBCommon::ToUpperCase(field.colName)] = field;
250             } else {
251                 pkMap[field.colName] = field;
252             }
253         }
254     }
255     return pkMap;
256 }
257 
GetAssetFieldsFromSchema(const TableSchema & tableSchema,const VBucket & vBucket,std::vector<Field> & fields)258 int CloudStorageUtils::GetAssetFieldsFromSchema(const TableSchema &tableSchema, const VBucket &vBucket,
259     std::vector<Field> &fields)
260 {
261     for (const auto &field: tableSchema.fields) {
262         Type type;
263         bool isExisted = GetTypeCaseInsensitive(field.colName, vBucket, type);
264         if (!isExisted) {
265             continue;
266         }
267         if (type.index() != TYPE_INDEX<Asset> && type.index() != TYPE_INDEX<Assets>) {
268             continue;
269         }
270         fields.push_back(field);
271     }
272     if (fields.empty()) {
273         return -E_CLOUD_ERROR;
274     }
275     return E_OK;
276 }
277 
IsContainsPrimaryKey(const TableSchema & tableSchema)278 bool CloudStorageUtils::IsContainsPrimaryKey(const TableSchema &tableSchema)
279 {
280     for (const auto &field : tableSchema.fields) {
281         if (field.primary) {
282             return true;
283         }
284     }
285     return false;
286 }
287 
ObtainAssetFromVBucket(const VBucket & vBucket,VBucket & asset)288 void CloudStorageUtils::ObtainAssetFromVBucket(const VBucket &vBucket, VBucket &asset)
289 {
290     for (const auto &item: vBucket) {
291         if (IsAsset(item.second)) {
292             Asset data = std::get<Asset>(item.second);
293             asset.insert_or_assign(item.first, data);
294         } else if (IsAssets(item.second)) {
295             Assets data = std::get<Assets>(item.second);
296             asset.insert_or_assign(item.first, data);
297         }
298     }
299 }
300 
StatusToFlag(AssetStatus status)301 AssetOpType CloudStorageUtils::StatusToFlag(AssetStatus status)
302 {
303     switch (AssetOperationUtils::EraseBitMask(status)) {
304         case AssetStatus::INSERT:
305             return AssetOpType::INSERT;
306         case AssetStatus::DELETE:
307             return AssetOpType::DELETE;
308         case AssetStatus::UPDATE:
309             return AssetOpType::UPDATE;
310         default:
311             return AssetOpType::NO_CHANGE;
312     }
313 }
314 
FlagToStatus(AssetOpType opType)315 AssetStatus CloudStorageUtils::FlagToStatus(AssetOpType opType)
316 {
317     switch (opType) {
318         case AssetOpType::INSERT:
319             return AssetStatus::INSERT;
320         case AssetOpType::DELETE:
321             return AssetStatus::DELETE;
322         case AssetOpType::UPDATE:
323             return AssetStatus::UPDATE;
324         default:
325             return AssetStatus::NORMAL;
326     }
327 }
328 
ChangeAssetsOnVBucketToAsset(VBucket & vBucket,std::vector<Field> & fields)329 void CloudStorageUtils::ChangeAssetsOnVBucketToAsset(VBucket &vBucket, std::vector<Field> &fields)
330 {
331     for (const Field &field: fields) {
332         if (field.type == TYPE_INDEX<Asset>) {
333             Type asset = GetAssetFromAssets(vBucket[field.colName]);
334             vBucket[field.colName] = asset;
335         }
336     }
337 }
338 
GetAssetFromAssets(Type & value)339 Type CloudStorageUtils::GetAssetFromAssets(Type &value)
340 {
341     Asset assetVal;
342     int errCode = GetValueFromType(value, assetVal);
343     if (errCode == E_OK) {
344         return assetVal;
345     }
346 
347     Assets assets;
348     errCode = GetValueFromType(value, assets);
349     if (errCode != E_OK) {
350         return Nil();
351     }
352 
353     for (Asset &asset: assets) {
354         uint32_t lowStatus = AssetOperationUtils::EraseBitMask(asset.status);
355         if ((asset.flag == static_cast<uint32_t>(AssetOpType::DELETE) && (lowStatus == AssetStatus::ABNORMAL ||
356             lowStatus == AssetStatus::NORMAL)) || asset.flag != static_cast<uint32_t>(AssetOpType::DELETE)) {
357             return std::move(asset);
358         }
359     }
360     return Nil();
361 }
362 
FillAssetBeforeDownload(Asset & asset)363 int CloudStorageUtils::FillAssetBeforeDownload(Asset &asset)
364 {
365     AssetOpType flag = static_cast<AssetOpType>(asset.flag);
366     uint32_t lowStatus = AssetOperationUtils::EraseBitMask(asset.status);
367     switch (flag) {
368         case AssetOpType::DELETE: {
369             // these asset no need to download, just remove before download
370             if (lowStatus == static_cast<uint32_t>(AssetStatus::DELETE) ||
371                 lowStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
372                 (asset.status == (AssetStatus::DOWNLOADING | AssetStatus::DOWNLOAD_WITH_NULL))) {
373                 return -E_NOT_FOUND;
374             }
375             break;
376         }
377         default:
378             break;
379     }
380     return E_OK;
381 }
382 
FillAssetAfterDownload(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)383 int CloudStorageUtils::FillAssetAfterDownload(Asset &asset, Asset &dbAsset,
384     AssetOperationUtils::AssetOpType assetOpType)
385 {
386     if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
387         return E_OK;
388     }
389     dbAsset = asset;
390     AssetOpType flag = static_cast<AssetOpType>(asset.flag);
391     if (asset.status != AssetStatus::NORMAL) {
392         return E_OK;
393     }
394     switch (flag) {
395         case AssetOpType::DELETE: {
396             return -E_NOT_FOUND;
397         }
398         default:
399             break;
400     }
401     return E_OK;
402 }
403 
FillAssetsAfterDownload(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)404 void CloudStorageUtils::FillAssetsAfterDownload(Assets &assets, Assets &dbAssets,
405     const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
406 {
407     MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetAfterDownload);
408 }
409 
FillAssetForUpload(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)410 int CloudStorageUtils::FillAssetForUpload(Asset &asset, Asset &dbAsset, AssetOperationUtils::AssetOpType assetOpType)
411 {
412     if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
413         // db assetId may be empty, need to be based on cache
414         // Notes: Assign happened when dbAsset.assetId is empty because of asset.assetId may be empty.
415         if (dbAsset.assetId.empty()) {
416             dbAsset.assetId = asset.assetId;
417         }
418         return E_OK;
419     }
420     AssetStatus status = static_cast<AssetStatus>(dbAsset.status);
421     dbAsset = asset;
422     switch (StatusToFlag(status)) {
423         case AssetOpType::INSERT:
424         case AssetOpType::UPDATE:
425         case AssetOpType::NO_CHANGE: {
426             dbAsset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
427             break;
428         }
429         case AssetOpType::DELETE: {
430             return -E_NOT_FOUND;
431         }
432         default: {
433             break;
434         }
435     }
436     dbAsset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
437     return E_OK;
438 }
439 
FillAssetsForUpload(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)440 void CloudStorageUtils::FillAssetsForUpload(Assets &assets, Assets &dbAssets,
441     const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
442 {
443     MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetForUpload);
444 }
445 
FillAssetBeforeUpload(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)446 int CloudStorageUtils::FillAssetBeforeUpload(Asset &asset, Asset &dbAsset, AssetOperationUtils::AssetOpType assetOpType)
447 {
448     if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
449         return E_OK;
450     }
451     dbAsset = asset;
452     switch (static_cast<AssetOpType>(asset.flag)) {
453         case AssetOpType::INSERT:
454         case AssetOpType::UPDATE:
455         case AssetOpType::DELETE:
456         case AssetOpType::NO_CHANGE:
457             dbAsset.status |= static_cast<uint32_t>(AssetStatus::UPLOADING);
458             break;
459         default:
460             break;
461     }
462     dbAsset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
463     return E_OK;
464 }
465 
FillAssetsBeforeUpload(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)466 void CloudStorageUtils::FillAssetsBeforeUpload(Assets &assets, Assets &dbAssets, const std::map<std::string,
467     AssetOperationUtils::AssetOpType> &assetOpTypeMap)
468 {
469     MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetBeforeUpload);
470 }
471 
PrepareToFillAssetFromVBucket(VBucket & vBucket,std::function<int (Asset &)> fillAsset)472 void CloudStorageUtils::PrepareToFillAssetFromVBucket(VBucket &vBucket, std::function<int(Asset &)> fillAsset)
473 {
474     for (auto &item: vBucket) {
475         if (IsAsset(item.second)) {
476             Asset asset;
477             GetValueFromType(item.second, asset);
478             fillAsset(asset);
479             vBucket[item.first] = asset;
480         } else if (IsAssets(item.second)) {
481             Assets assets;
482             GetValueFromType(item.second, assets);
483             for (auto it = assets.begin(); it != assets.end();) {
484                 fillAsset(*it) == -E_NOT_FOUND ? it = assets.erase(it) : ++it;
485             }
486             vBucket[item.first] = assets;
487         }
488     }
489 }
490 
FillAssetFromVBucketFinish(const AssetOperationUtils::RecordAssetOpType & assetOpType,VBucket & vBucket,VBucket & dbAssets,std::function<int (Asset &,Asset &,AssetOperationUtils::AssetOpType)> fillAsset,std::function<void (Assets &,Assets &,const std::map<std::string,AssetOperationUtils::AssetOpType> &)> fillAssets)491 void CloudStorageUtils::FillAssetFromVBucketFinish(const AssetOperationUtils::RecordAssetOpType &assetOpType,
492     VBucket &vBucket, VBucket &dbAssets,
493     std::function<int(Asset &, Asset &, AssetOperationUtils::AssetOpType)> fillAsset,
494     std::function<void(Assets &, Assets &,
495     const std::map<std::string, AssetOperationUtils::AssetOpType> &)> fillAssets)
496 {
497     for (auto &item: dbAssets) {
498         if (IsAsset(item.second)) {
499             Asset cacheItem;
500             GetValueFromType(vBucket[item.first], cacheItem);
501             Asset dbItem;
502             GetValueFromType(item.second, dbItem);
503             AssetOperationUtils::AssetOpType opType = AssetOperationUtils::AssetOpType::NOT_HANDLE;
504             auto iterCol = assetOpType.find(item.first);
505             if (iterCol != assetOpType.end() && iterCol->second.find(dbItem.name) != iterCol->second.end()) {
506                 opType = iterCol->second.at(dbItem.name);
507             }
508             int errCode = fillAsset(cacheItem, dbItem, opType);
509             if (errCode != E_OK) {
510                 dbAssets[item.first] = Nil();
511             } else {
512                 dbAssets[item.first] = dbItem;
513             }
514             continue;
515         }
516         if (IsAssets(item.second)) {
517             Assets cacheItems;
518             GetValueFromType(vBucket[item.first], cacheItems);
519             Assets dbItems;
520             GetValueFromType(item.second, dbItems);
521             auto iterCol = assetOpType.find(item.first);
522             if (iterCol == assetOpType.end()) {
523                 fillAssets(cacheItems, dbItems, {});
524             } else {
525                 fillAssets(cacheItems, dbItems, iterCol->second);
526             }
527             if (dbItems.empty()) {
528                 dbAssets[item.first] = Nil();
529             } else {
530                 dbAssets[item.first] = dbItems;
531             }
532         }
533     }
534 }
535 
IsAsset(const Type & type)536 bool CloudStorageUtils::IsAsset(const Type &type)
537 {
538     return type.index() == TYPE_INDEX<Asset>;
539 }
540 
IsAssets(const Type & type)541 bool CloudStorageUtils::IsAssets(const Type &type)
542 {
543     return type.index() == TYPE_INDEX<Assets>;
544 }
545 
IsAssetsContainDuplicateAsset(Assets & assets)546 bool CloudStorageUtils::IsAssetsContainDuplicateAsset(Assets &assets)
547 {
548     std::set<std::string> set;
549     for (const auto &asset : assets) {
550         if (set.find(asset.name) != set.end()) {
551             LOGE("assets contain duplicate Asset");
552             return true;
553         }
554         set.insert(asset.name);
555     }
556     return false;
557 }
558 
EraseNoChangeAsset(std::map<std::string,Assets> & assetsMap)559 void CloudStorageUtils::EraseNoChangeAsset(std::map<std::string, Assets> &assetsMap)
560 {
561     for (auto items = assetsMap.begin(); items != assetsMap.end();) {
562         for (auto item = items->second.begin(); item != items->second.end();) {
563             if (static_cast<AssetOpType>((*item).flag) == AssetOpType::NO_CHANGE) {
564                 item = items->second.erase(item);
565             } else {
566                 item++;
567             }
568         }
569         if (items->second.empty()) {
570             items = assetsMap.erase(items);
571         } else {
572             items++;
573         }
574     }
575 }
576 
MergeDownloadAsset(std::map<std::string,Assets> & downloadAssets,std::map<std::string,Assets> & mergeAssets)577 void CloudStorageUtils::MergeDownloadAsset(std::map<std::string, Assets> &downloadAssets,
578     std::map<std::string, Assets> &mergeAssets)
579 {
580     for (auto &items: mergeAssets) {
581         auto downloadItem = downloadAssets.find(items.first);
582         if (downloadItem == downloadAssets.end()) { // LCOV_EXCL_BR_LINE
583             continue;
584         }
585         std::map<std::string, size_t> beCoveredAssetsMap = GenAssetsIndexMap(items.second);
586         for (const Asset &asset: downloadItem->second) {
587             auto it = beCoveredAssetsMap.find(asset.name);
588             if (it == beCoveredAssetsMap.end()) { // LCOV_EXCL_BR_LINE
589                 continue;
590             }
591             items.second[it->second] = asset;
592         }
593     }
594 }
595 
GenAssetsIndexMap(Assets & assets)596 std::map<std::string, size_t> CloudStorageUtils::GenAssetsIndexMap(Assets &assets)
597 {
598     // key of assetsIndexMap is name of asset, the value of it is index.
599     std::map<std::string, size_t> assetsIndexMap;
600     for (size_t i = 0; i < assets.size(); i++) {
601         assetsIndexMap[assets[i].name] = i;
602     }
603     return assetsIndexMap;
604 }
605 
IsVbucketContainsAllPK(const VBucket & vBucket,const std::set<std::string> & pkSet)606 bool CloudStorageUtils::IsVbucketContainsAllPK(const VBucket &vBucket, const std::set<std::string> &pkSet)
607 {
608     if (pkSet.empty()) {
609         return false;
610     }
611     for (const auto &pk : pkSet) {
612         Type type;
613         bool isExisted = GetTypeCaseInsensitive(pk, vBucket, type);
614         if (!isExisted) {
615             return false;
616         }
617     }
618     return true;
619 }
620 
IsSharedTable(const TableSchema & tableSchema)621 bool CloudStorageUtils::IsSharedTable(const TableSchema &tableSchema)
622 {
623     return tableSchema.sharedTableName == tableSchema.name;
624 }
625 
IsViolationOfConstraints(const std::string & name,const std::vector<FieldInfo> & fieldInfos)626 static bool IsViolationOfConstraints(const std::string &name, const std::vector<FieldInfo> &fieldInfos)
627 {
628     for (const auto &field : fieldInfos) {
629         if (name != field.GetFieldName()) {
630             continue;
631         }
632         if (field.GetStorageType() == StorageType::STORAGE_TYPE_REAL) {
633             LOGE("[ConstraintsCheckForCloud] Not support create distributed table with real primary key.");
634             return true;
635         } else if (field.IsAssetType() || field.IsAssetsType()) {
636             LOGE("[ConstraintsCheckForCloud] Not support create distributed table with asset primary key.");
637             return true;
638         } else {
639             return false;
640         }
641     }
642     return false;
643 }
644 
ConstraintsCheckForCloud(const TableInfo & table,const std::string & trimmedSql)645 int CloudStorageUtils::ConstraintsCheckForCloud(const TableInfo &table, const std::string &trimmedSql)
646 {
647     if (DBCommon::HasConstraint(trimmedSql, "UNIQUE", " ,", " ,)(")) {
648         LOGE("[ConstraintsCheckForCloud] Not support create distributed table with 'UNIQUE' constraint.");
649         return -E_NOT_SUPPORT;
650     }
651 
652     const std::map<int, FieldName> &primaryKeys = table.GetPrimaryKey();
653     const std::vector<FieldInfo> &fieldInfos = table.GetFieldInfos();
654     for (const auto &item : primaryKeys) {
655         if (IsViolationOfConstraints(item.second, fieldInfos)) {
656             return -E_NOT_SUPPORT;
657         }
658     }
659     return E_OK;
660 }
661 
CheckAssetStatus(const Assets & assets)662 bool CloudStorageUtils::CheckAssetStatus(const Assets &assets)
663 {
664     for (const Asset &asset: assets) {
665         if (AssetOperationUtils::EraseBitMask(asset.status) > static_cast<uint32_t>(AssetStatus::UPDATE)) {
666             LOGE("assets contain invalid status:[%u]", asset.status);
667             return false;
668         }
669     }
670     return true;
671 }
672 
GetLeftJoinLogSql(const std::string & tableName,bool logAsTableA)673 std::string CloudStorageUtils::GetLeftJoinLogSql(const std::string &tableName, bool logAsTableA)
674 {
675     std::string sql;
676     if (logAsTableA) {
677         sql += " FROM '" + DBCommon::GetLogTableName(tableName) + "' AS a LEFT JOIN '" + tableName + "' AS b " +
678             " ON (a.data_key = b." + std::string(DBConstant::SQLITE_INNER_ROWID) + ")";
679     } else {
680         sql += " FROM '" + DBCommon::GetLogTableName(tableName) + "' AS b LEFT JOIN '" + tableName + "' AS a " +
681             " ON (b.data_key = a." + std::string(DBConstant::SQLITE_INNER_ROWID) + ")";
682     }
683     return sql;
684 }
685 
ChkFillCloudAssetParam(const CloudSyncBatch & data,int errCode)686 bool CloudStorageUtils::ChkFillCloudAssetParam(const CloudSyncBatch &data, int errCode)
687 {
688     if (data.assets.empty()) {
689         errCode = E_OK;
690         return true;
691     }
692     if (data.rowid.empty() || data.timestamp.empty()) {
693         errCode = -E_INVALID_ARGS;
694         LOGE("param is empty when fill cloud Asset. rowidN:%u, timeN:%u", errCode, data.rowid.size(),
695             data.timestamp.size());
696         return true;
697     }
698     if (data.assets.size() != data.rowid.size() || data.assets.size() != data.timestamp.size() ||
699         data.assets.size() != data.hashKey.size() || data.assets.size() != data.extend.size()) {
700         errCode = -E_INVALID_ARGS;
701         LOGE("the num of param is invalid when fill cloud Asset. assetsN:%u, rowidN:%u, timeN:%u, "
702              "hashKeyN:%u, extendN:%u", data.assets.size(), data.rowid.size(), data.timestamp.size(),
703              data.hashKey.size(), data.extend.size());
704         return true;
705     }
706     return false;
707 }
708 
GetToBeRemoveAssets(const VBucket & vBucket,const AssetOperationUtils::RecordAssetOpType & assetOpType,std::vector<Asset> & removeAssets)709 void CloudStorageUtils::GetToBeRemoveAssets(const VBucket &vBucket,
710     const AssetOperationUtils::RecordAssetOpType &assetOpType, std::vector<Asset> &removeAssets)
711 {
712     for (const auto &col: assetOpType) {
713         Type itItem;
714         bool isExisted = GetTypeCaseInsensitive(col.first, vBucket, itItem);
715         if (!isExisted) {
716             continue;
717         }
718         if (!CloudStorageUtils::IsAsset(itItem) && !CloudStorageUtils::IsAssets(itItem)) {
719             continue;
720         }
721         if (CloudStorageUtils::IsAsset(itItem)) {
722             Asset delAsset;
723             GetValueFromType(itItem, delAsset);
724             auto itOp = col.second.find(delAsset.name);
725             if (itOp != col.second.end() && itOp->second == AssetOperationUtils::AssetOpType::NOT_HANDLE
726                 && delAsset.flag != static_cast<uint32_t>(AssetOpType::DELETE)) {
727                 removeAssets.push_back(delAsset);
728             }
729             continue;
730         }
731         Assets assets;
732         GetValueFromType(itItem, assets);
733         for (const auto &asset: assets) {
734             auto itOp = col.second.find(asset.name);
735             if (itOp == col.second.end() || itOp->second == AssetOperationUtils::AssetOpType::HANDLE ||
736                 asset.flag == static_cast<uint32_t>(AssetOpType::DELETE)) {
737                 continue;
738             }
739             removeAssets.push_back(asset);
740         }
741     }
742 }
743 
FillAssetForUploadFailed(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)744 int CloudStorageUtils::FillAssetForUploadFailed(Asset &asset, Asset &dbAsset,
745     AssetOperationUtils::AssetOpType assetOpType)
746 {
747     dbAsset.assetId = asset.assetId;
748     dbAsset.status &= ~AssetStatus::UPLOADING;
749     return E_OK;
750 }
751 
FillAssetsForUploadFailed(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)752 void CloudStorageUtils::FillAssetsForUploadFailed(Assets &assets, Assets &dbAssets,
753     const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
754 {
755     MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetForUploadFailed);
756 }
757 
FillAssetAfterDownloadFail(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)758 int CloudStorageUtils::FillAssetAfterDownloadFail(Asset &asset, Asset &dbAsset,
759     AssetOperationUtils::AssetOpType assetOpType)
760 {
761     AssetStatus status = static_cast<AssetStatus>(asset.status);
762     if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
763         return E_OK;
764     }
765     if (status != AssetStatus::ABNORMAL) {
766         return FillAssetAfterDownload(asset, dbAsset, assetOpType);
767     }
768     dbAsset = asset;
769     return E_OK;
770 }
771 
FillAssetsAfterDownloadFail(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)772 void CloudStorageUtils::FillAssetsAfterDownloadFail(Assets &assets, Assets &dbAssets,
773     const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
774 {
775     MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetAfterDownloadFail);
776 }
777 
MergeAssetWithFillFunc(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap,std::function<int (Asset &,Asset &,AssetOperationUtils::AssetOpType)> fillAsset)778 void CloudStorageUtils::MergeAssetWithFillFunc(Assets &assets, Assets &dbAssets, const std::map<std::string,
779     AssetOperationUtils::AssetOpType> &assetOpTypeMap,
780     std::function<int(Asset &, Asset &, AssetOperationUtils::AssetOpType)> fillAsset)
781 {
782     std::map<std::string, size_t> indexMap = GenAssetsIndexMap(assets);
783     for (auto dbAsset = dbAssets.begin(); dbAsset != dbAssets.end();) {
784         Asset cacheAsset;
785         auto it = indexMap.find(dbAsset->name);
786         if (it != indexMap.end()) {
787             cacheAsset = assets[it->second];
788         }
789         AssetOperationUtils::AssetOpType opType = AssetOperationUtils::AssetOpType::NOT_HANDLE;
790         auto iterOp = assetOpTypeMap.find(dbAsset->name);
791         if (iterOp != assetOpTypeMap.end()) {
792             opType = iterOp->second;
793         }
794         if (fillAsset(cacheAsset, *dbAsset, opType) == -E_NOT_FOUND) {
795             dbAsset = dbAssets.erase(dbAsset);
796         } else {
797             dbAsset++;
798         }
799     }
800 }
801 
GetHashValueWithPrimaryKeyMap(const VBucket & vBucket,const TableSchema & tableSchema,const TableInfo & localTable,const std::map<std::string,Field> & pkMap,bool allowEmpty)802 std::pair<int, std::vector<uint8_t>> CloudStorageUtils::GetHashValueWithPrimaryKeyMap(const VBucket &vBucket,
803     const TableSchema &tableSchema, const TableInfo &localTable, const std::map<std::string, Field> &pkMap,
804     bool allowEmpty)
805 {
806     int errCode = E_OK;
807     std::vector<uint8_t> hashValue;
808     if (pkMap.size() == 0) {
809         LOGE("do not support get hashValue when primaryKey map is empty.");
810         return { -E_INTERNAL_ERROR, {} };
811     } else if (pkMap.size() == 1) {
812         std::vector<Field> pkVec = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema);
813         FieldInfoMap fieldInfos = localTable.GetFields();
814         if (fieldInfos.find(pkMap.begin()->first) == fieldInfos.end()) {
815             LOGE("localSchema doesn't contain primary key.");
816             return { -E_INTERNAL_ERROR, {} };
817         }
818         CollateType collateType = fieldInfos.at(pkMap.begin()->first).GetCollateType();
819         errCode = CloudStorageUtils::CalculateHashKeyForOneField(
820             pkVec.at(0), vBucket, allowEmpty, collateType, hashValue);
821     } else {
822         std::vector<uint8_t> tempRes;
823         for (const auto &item: pkMap) {
824             FieldInfoMap fieldInfos = localTable.GetFields();
825             if (fieldInfos.find(item.first) == fieldInfos.end()) {
826                 LOGE("localSchema doesn't contain primary key in multi pks.");
827                 return { -E_INTERNAL_ERROR, {} };
828             }
829             std::vector<uint8_t> temp;
830             CollateType collateType = fieldInfos.at(item.first).GetCollateType();
831             errCode = CloudStorageUtils::CalculateHashKeyForOneField(
832                 item.second, vBucket, allowEmpty, collateType, temp);
833             if (errCode != E_OK) {
834                 LOGE("calc hash fail when there is more than one primary key. errCode = %d", errCode);
835                 return { errCode, {} };
836             }
837             tempRes.insert(tempRes.end(), temp.begin(), temp.end());
838         }
839         errCode = DBCommon::CalcValueHash(tempRes, hashValue);
840     }
841     return { errCode, hashValue };
842 }
843 
CheckCloudSchemaFields(const TableSchema & tableSchema,const TableSchema & oldSchema)844 bool CloudStorageUtils::CheckCloudSchemaFields(const TableSchema &tableSchema, const TableSchema &oldSchema)
845 {
846     if (tableSchema.name != oldSchema.name) {
847         return true;
848     }
849     for (const auto &oldField : oldSchema.fields) {
850         auto it = std::find_if(tableSchema.fields.begin(), tableSchema.fields.end(),
851             [&oldField](const std::vector<Field>::value_type &field) {
852                 return oldField == field;
853             });
854         if (it == tableSchema.fields.end()) {
855             return false;
856         }
857     }
858     return true;
859 }
860 
BindUpdateLogStmtFromVBucket(const VBucket & vBucket,const TableSchema & tableSchema,const std::vector<std::string> & colNames,sqlite3_stmt * updateLogStmt)861 int CloudStorageUtils::BindUpdateLogStmtFromVBucket(const VBucket &vBucket, const TableSchema &tableSchema,
862     const std::vector<std::string> &colNames, sqlite3_stmt *updateLogStmt)
863 {
864     int index = 0;
865     int errCode = E_OK;
866     for (const auto &colName : colNames) {
867         index++;
868         if (colName == CloudDbConstant::GID_FIELD) {
869             if (vBucket.find(colName) == vBucket.end()) {
870                 LOGE("cloud data doesn't contain gid field when bind update log stmt.");
871                 return -E_CLOUD_ERROR;
872             }
873             errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index,
874                 std::get<std::string>(vBucket.at(colName)));
875         } else if (colName == CloudDbConstant::MODIFY_FIELD) {
876             if (vBucket.find(colName) == vBucket.end()) {
877                 LOGE("cloud data doesn't contain modify field when bind update log stmt.");
878                 return -E_CLOUD_ERROR;
879             }
880             errCode = SQLiteUtils::BindInt64ToStatement(updateLogStmt, index, std::get<int64_t>(vBucket.at(colName)));
881         } else if (colName == CloudDbConstant::VERSION_FIELD) {
882             if (vBucket.find(colName) == vBucket.end()) {
883                 errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index, std::string(""));
884             } else {
885                 errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index,
886                     std::get<std::string>(vBucket.at(colName)));
887             }
888         } else if (colName == CloudDbConstant::SHARING_RESOURCE_FIELD) {
889             if (vBucket.find(colName) == vBucket.end()) {
890                 errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index, std::string(""));
891             } else {
892                 errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index,
893                     std::get<std::string>(vBucket.at(colName)));
894             }
895         } else {
896             LOGE("invalid col name when bind value to update log statement.");
897             return -E_INTERNAL_ERROR;
898         }
899         if (errCode != E_OK) {
900             LOGE("fail to bind value to update log statement.");
901             return errCode;
902         }
903     }
904     return E_OK;
905 }
906 
GetUpdateRecordFlagSql(UpdateRecordFlagStruct updateRecordFlag,const LogInfo & logInfo,const VBucket & uploadExtend,const CloudWaterType & type)907 std::string CloudStorageUtils::GetUpdateRecordFlagSql(UpdateRecordFlagStruct updateRecordFlag, const LogInfo &logInfo,
908     const VBucket &uploadExtend, const CloudWaterType &type)
909 {
910     std::string compensatedBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC));
911     std::string inconsistencyBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY));
912     bool gidEmpty = logInfo.cloudGid.empty();
913     bool isDeleted = logInfo.dataKey == DBConstant::DEFAULT_ROW_ID;
914     std::string sql = "UPDATE " + DBCommon::GetLogTableName(updateRecordFlag.tableName) +
915         " SET flag = (CASE WHEN timestamp = ? THEN ";
916     bool isNeedCompensated =
917         updateRecordFlag.isRecordConflict || DBCommon::IsNeedCompensatedForUpload(uploadExtend, type);
918     if (isNeedCompensated && !(isDeleted && gidEmpty)) {
919         sql += "flag | " + compensatedBit + " ELSE flag | " + compensatedBit;
920     } else {
921         if (updateRecordFlag.isInconsistency) {
922             sql += "flag & ~" + compensatedBit + " |" + inconsistencyBit + " ELSE flag & ~" + compensatedBit;
923         } else {
924             sql += "flag & ~" + compensatedBit + " & ~" + inconsistencyBit + " ELSE flag & ~" + compensatedBit;
925         }
926     }
927     sql += " END), status = (CASE WHEN status == 2 THEN 3 WHEN (status == 1 AND timestamp = ?) THEN 0 ELSE status END)";
928     if (DBCommon::IsCloudRecordNotFound(uploadExtend) &&
929         (type == CloudWaterType::UPDATE || type == CloudWaterType::DELETE)) {
930         sql += ", cloud_gid = '', version = '' ";
931     }
932     sql += " WHERE ";
933     if (!gidEmpty) {
934         sql += " cloud_gid = '" + logInfo.cloudGid + "'";
935     }
936     if (!isDeleted) {
937         if (!gidEmpty) {
938             sql += " OR ";
939         }
940         sql += " data_key = '" + std::to_string(logInfo.dataKey) + "'";
941     }
942     if (gidEmpty && isDeleted) {
943         sql += " hash_key = ?";
944     }
945     sql += ";";
946     return sql;
947 }
948 
GetUpdateRecordFlagSqlUpload(const std::string & tableName,bool recordConflict,const LogInfo & logInfo,const VBucket & uploadExtend,const CloudWaterType & type)949 std::string CloudStorageUtils::GetUpdateRecordFlagSqlUpload(const std::string &tableName, bool recordConflict,
950     const LogInfo &logInfo, const VBucket &uploadExtend, const CloudWaterType &type)
951 {
952     std::string compensatedBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC));
953     std::string inconsistencyBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY));
954     bool gidEmpty = logInfo.cloudGid.empty();
955     bool isDeleted = logInfo.dataKey == DBConstant::DEFAULT_ROW_ID;
956     std::string sql;
957     bool isNeedCompensated = recordConflict || DBCommon::IsNeedCompensatedForUpload(uploadExtend, type);
958     if (isNeedCompensated && !(isDeleted && gidEmpty)) {
959         sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag = (CASE WHEN timestamp = ? OR " +
960             "flag & 0x01 = 0 THEN flag | " + compensatedBit + " ELSE flag";
961     } else {
962         sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag = (CASE WHEN timestamp = ? THEN " +
963             "flag & ~" + compensatedBit + " & ~" + inconsistencyBit + " ELSE flag & ~" + compensatedBit;
964     }
965     sql += " END), status = (CASE WHEN status == 2 THEN 3 WHEN (status == 1 AND timestamp = ?) THEN 0 ELSE status END)";
966     if (DBCommon::IsCloudRecordNotFound(uploadExtend) &&
967         (type == CloudWaterType::UPDATE || type == CloudWaterType::DELETE)) {
968         sql += ", cloud_gid = '', version = '' ";
969     }
970     sql += " WHERE ";
971     if (!gidEmpty) {
972         sql += " cloud_gid = '" + logInfo.cloudGid + "'";
973     }
974     if (!isDeleted) {
975         if (!gidEmpty) {
976             sql += " OR ";
977         }
978         sql += " data_key = '" + std::to_string(logInfo.dataKey) + "'";
979     }
980     if (gidEmpty && isDeleted) {
981         sql += " hash_key = ?";
982     }
983     sql += ";";
984     return sql;
985 }
986 
GetUpdateUploadFinishedSql(const std::string & tableName,bool isExistAssetsDownload)987 std::string CloudStorageUtils::GetUpdateUploadFinishedSql(const std::string &tableName, bool isExistAssetsDownload)
988 {
989     std::string finishedBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_UPLOAD_FINISHED));
990     std::string compensatedBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC));
991     // When the data flag is not in the compensation state and the local data does not change, the upload is finished.
992     if (isExistAssetsDownload) {
993         return "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag = (CASE WHEN timestamp = ? AND flag & " +
994             compensatedBit + " != " + compensatedBit + " THEN flag | " + finishedBit +
995             " ELSE flag END) WHERE hash_key=?";
996     }
997     // Clear IsWaitAssetDownload flag when no asset is to be downloaded
998     return "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag = ((CASE WHEN timestamp = ? AND flag & " +
999         compensatedBit + " != " + compensatedBit + " THEN flag | " + finishedBit +
1000         " ELSE flag END) & ~0x1000) WHERE hash_key=?";
1001 }
1002 
BindStepConsistentFlagStmt(sqlite3_stmt * stmt,const VBucket & data,const std::set<std::string> & gidFilters)1003 int CloudStorageUtils::BindStepConsistentFlagStmt(sqlite3_stmt *stmt, const VBucket &data,
1004     const std::set<std::string> &gidFilters)
1005 {
1006     std::string gidStr;
1007     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, data, gidStr);
1008     if (errCode != E_OK || gidStr.empty()) {
1009         LOGE("Get gid from bucket fail when mark flag as consistent, errCode = %d", errCode);
1010         return errCode;
1011     }
1012     if (gidStr.empty()) {
1013         LOGE("Get empty gid from bucket when mark flag as consistent.");
1014         return -E_CLOUD_ERROR;
1015     }
1016     // this data has not yet downloaded asset, skipping
1017     if (gidFilters.find(gidStr) != gidFilters.end()) {
1018         return E_OK;
1019     }
1020     errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gidStr); // 1 is cloud_gid
1021     if (errCode != E_OK) {
1022         LOGE("Bind cloud_gid to mark flag as consistent stmt failed, %d", errCode);
1023         return errCode;
1024     }
1025     int64_t modifyTime;
1026     errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::MODIFY_FIELD, data, modifyTime);
1027     if (errCode != E_OK) {
1028         LOGE("Get modify time from bucket fail when mark flag as consistent, errCode = %d", errCode);
1029         return errCode;
1030     }
1031     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, modifyTime); // 2 is timestamp
1032     if (errCode != E_OK) {
1033         LOGE("Bind modify time to mark flag as consistent stmt failed, %d", errCode);
1034         return errCode;
1035     }
1036     errCode = SQLiteUtils::StepWithRetry(stmt);
1037     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1038         errCode = E_OK;
1039     } else {
1040         LOGE("[Storage Executor]Step mark flag as consistent stmt failed, %d", errCode);
1041     }
1042     return errCode;
1043 }
1044 
IsCloudGidMismatch(const std::string & downloadGid,const std::string & curGid)1045 bool CloudStorageUtils::IsCloudGidMismatch(const std::string &downloadGid, const std::string &curGid)
1046 {
1047     return !downloadGid.empty() && !curGid.empty() && downloadGid != curGid;
1048 }
1049 
IsGetCloudDataContinue(uint32_t curNum,uint32_t curSize,uint32_t maxSize,uint32_t maxCount)1050 bool CloudStorageUtils::IsGetCloudDataContinue(uint32_t curNum, uint32_t curSize, uint32_t maxSize, uint32_t maxCount)
1051 {
1052     if (curNum == 0) {
1053         return true;
1054     }
1055     if (curSize < maxSize && curNum < maxCount) {
1056         return true;
1057     }
1058     return false;
1059 }
1060 
IdentifyCloudType(const CloudUploadRecorder & recorder,CloudSyncData & cloudSyncData,VBucket & data,VBucket & log,VBucket & flags)1061 int CloudStorageUtils::IdentifyCloudType(const CloudUploadRecorder &recorder, CloudSyncData &cloudSyncData,
1062     VBucket &data, VBucket &log, VBucket &flags)
1063 {
1064     Bytes *hashKey = std::get_if<Bytes>(&flags[CloudDbConstant::HASH_KEY]);
1065     int64_t *timeStamp = std::get_if<int64_t>(&flags[CloudDbConstant::TIMESTAMP]);
1066     if (timeStamp == nullptr || hashKey == nullptr) {
1067         return -E_INVALID_DATA;
1068     }
1069     if (recorder.IsIgnoreUploadRecord(cloudSyncData.tableName, *hashKey, cloudSyncData.mode, *timeStamp)) {
1070         cloudSyncData.ignoredCount++;
1071         return -E_IGNORE_DATA;
1072     }
1073     return IdentifyCloudTypeInner(cloudSyncData, data, log, flags);
1074 }
1075 
IdentifyCloudTypeInner(CloudSyncData & cloudSyncData,VBucket & data,VBucket & log,VBucket & flags)1076 int CloudStorageUtils::IdentifyCloudTypeInner(CloudSyncData &cloudSyncData, VBucket &data, VBucket &log, VBucket &flags)
1077 {
1078     int64_t *rowid = std::get_if<int64_t>(&flags[DBConstant::ROWID]);
1079     int64_t *flag = std::get_if<int64_t>(&flags[CloudDbConstant::FLAG]);
1080     int64_t *timeStamp = std::get_if<int64_t>(&flags[CloudDbConstant::TIMESTAMP]);
1081     Bytes *hashKey = std::get_if<Bytes>(&flags[CloudDbConstant::HASH_KEY]);
1082     int64_t *status = std::get_if<int64_t>(&flags[CloudDbConstant::STATUS]);
1083     if (rowid == nullptr || flag == nullptr || timeStamp == nullptr || hashKey == nullptr) {
1084         return -E_INVALID_DATA;
1085     }
1086     bool isDelete = ((static_cast<uint64_t>(*flag) & DataItem::DELETE_FLAG) != 0);
1087     bool isInsert = (!isDelete) && (log.find(CloudDbConstant::GID_FIELD) == log.end());
1088     if (status != nullptr && !isInsert && (CloudStorageUtils::IsDataLocked(*status))) {
1089         cloudSyncData.ignoredCount++;
1090         cloudSyncData.lockData.extend.push_back(log);
1091         cloudSyncData.lockData.hashKey.push_back(*hashKey);
1092         cloudSyncData.lockData.timestamp.push_back(*timeStamp);
1093         cloudSyncData.lockData.rowid.push_back(*rowid);
1094         return -E_IGNORE_DATA;
1095     }
1096     if (isDelete) {
1097         cloudSyncData.delData.record.push_back(data);
1098         cloudSyncData.delData.extend.push_back(log);
1099         cloudSyncData.delData.hashKey.push_back(*hashKey);
1100         cloudSyncData.delData.timestamp.push_back(*timeStamp);
1101         cloudSyncData.delData.rowid.push_back(*rowid);
1102     } else {
1103         bool isAsyncDownloading = ((static_cast<uint64_t>(*flag) &
1104             static_cast<uint64_t>(LogInfoFlag::FLAG_ASSET_DOWNLOADING_FOR_ASYNC)) != 0);
1105         int errCode = CheckAbnormalData(cloudSyncData, data, isInsert, isAsyncDownloading);
1106         if (errCode != E_OK) {
1107             return errCode;
1108         }
1109         CloudSyncBatch &opData = isInsert ? cloudSyncData.insData : cloudSyncData.updData;
1110         opData.record.push_back(data);
1111         opData.rowid.push_back(*rowid);
1112         VBucket asset;
1113         CloudStorageUtils::ObtainAssetFromVBucket(data, asset);
1114         opData.timestamp.push_back(*timeStamp);
1115         opData.assets.push_back(asset);
1116         if (isInsert) {
1117             log[CloudDbConstant::HASH_KEY_FIELD] = DBCommon::VectorToHexString(*hashKey);
1118         }
1119         opData.extend.push_back(log);
1120         opData.hashKey.push_back(*hashKey);
1121     }
1122     return E_OK;
1123 }
1124 
IsAssetNotDownload(const uint32_t & status)1125 bool CloudStorageUtils::IsAssetNotDownload(const uint32_t &status)
1126 {
1127     return status == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
1128         status == static_cast<uint32_t>(AssetStatus::DOWNLOADING) ||
1129         (status & static_cast<uint32_t>(AssetStatus::DOWNLOAD_WITH_NULL)) != 0;
1130 }
1131 
CheckAbnormalDataInner(const bool isAsyncDownloading,VBucket & data,bool & isSyncAssetAbnormal,bool & isAsyncAssetAbnormal)1132 void CloudStorageUtils::CheckAbnormalDataInner(const bool isAsyncDownloading, VBucket &data,
1133     bool &isSyncAssetAbnormal, bool &isAsyncAssetAbnormal)
1134 {
1135     std::vector<std::string> abnormalAssetFields;
1136     for (auto &item : data) {
1137         const Asset *asset = std::get_if<TYPE_INDEX<Asset>>(&item.second);
1138         if (asset != nullptr) {
1139             bool isAssetNotDownload = IsAssetNotDownload(asset->status);
1140             if (!isAssetNotDownload) {
1141                 continue;
1142             }
1143             if (!isAsyncDownloading) {
1144                 isSyncAssetAbnormal = true;
1145                 return;
1146             }
1147             isAsyncAssetAbnormal = true;
1148             abnormalAssetFields.push_back(item.first);
1149             continue;
1150         }
1151         auto *assets = std::get_if<TYPE_INDEX<Assets>>(&item.second);
1152         if (assets == nullptr) {
1153             continue;
1154         }
1155         for (auto it = assets->begin(); it != assets->end();) {
1156             const auto &oneAsset = *it;
1157             bool isOneAssetNotDownload = IsAssetNotDownload(oneAsset.status);
1158             if (!isOneAssetNotDownload) {
1159                 ++it;
1160                 continue;
1161             }
1162             if (!isAsyncDownloading) {
1163                 isSyncAssetAbnormal = true;
1164                 return;
1165             }
1166             isAsyncAssetAbnormal = true;
1167             it = assets->erase(it);
1168         }
1169     }
1170     for (const auto &item : abnormalAssetFields) {
1171         data.erase(item);
1172     }
1173 }
1174 
CheckAbnormalData(CloudSyncData & cloudSyncData,VBucket & data,bool isInsert,bool isAsyncDownloading)1175 int CloudStorageUtils::CheckAbnormalData(CloudSyncData &cloudSyncData, VBucket &data, bool isInsert,
1176     bool isAsyncDownloading)
1177 {
1178     if (data.empty()) {
1179         LOGE("The cloud data is empty, isInsert:%d", static_cast<int>(isInsert));
1180         return -E_INVALID_DATA;
1181     }
1182     bool isSyncAssetAbnormal = false;
1183     bool isAsyncAssetAbnormal = false;
1184     CheckAbnormalDataInner(isAsyncDownloading, data, isSyncAssetAbnormal, isAsyncAssetAbnormal);
1185     if (isSyncAssetAbnormal) {
1186         std::string gid;
1187         (void)GetValueFromVBucket(CloudDbConstant::GID_FIELD, data, gid);
1188         LOGW("This data is abnormal, ignore it when upload, isInsert:%d, gid:%s",
1189             static_cast<int>(isInsert), gid.c_str());
1190         cloudSyncData.ignoredCount++;
1191         return -E_IGNORE_DATA;
1192     }
1193     if (isAsyncAssetAbnormal) {
1194         std::string gid;
1195         (void)GetValueFromVBucket(CloudDbConstant::GID_FIELD, data, gid);
1196         LOGW("This data has assets that are not downloaded, upload data only, gid:%s", gid.c_str());
1197     }
1198     return E_OK;
1199 }
1200 
GetDataItemFromCloudData(VBucket & data)1201 std::pair<int, DataItem> CloudStorageUtils::GetDataItemFromCloudData(VBucket &data)
1202 {
1203     std::pair<int, DataItem> res;
1204     auto &[errCode, dataItem] = res;
1205     GetBytesFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_KEY, data, dataItem.key);
1206     GetBytesFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, data, dataItem.value);
1207     GetStringFromCloudData(CloudDbConstant::GID_FIELD, data, dataItem.gid);
1208     GetStringFromCloudData(CloudDbConstant::VERSION_FIELD, data, dataItem.version);
1209     GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, data, dataItem.dev);
1210     GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE, data, dataItem.origDev);
1211     dataItem.flag = static_cast<uint64_t>(LogInfoFlag::FLAG_CLOUD_WRITE);
1212     GetUInt64FromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME, data, dataItem.writeTimestamp);
1213     GetUInt64FromCloudData(CloudDbConstant::MODIFY_FIELD, data, dataItem.modifyTime);
1214     errCode = GetUInt64FromCloudData(CloudDbConstant::CREATE_FIELD, data, dataItem.createTime);
1215     bool isSystemRecord = IsSystemRecord(dataItem.key);
1216     if (isSystemRecord) {
1217         dataItem.hashKey = dataItem.key;
1218         dataItem.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_SYSTEM_RECORD);
1219     } else if (!dataItem.key.empty()) {
1220         (void)DBCommon::CalcValueHash(dataItem.key, dataItem.hashKey);
1221     }
1222     return res;
1223 }
1224 
GetDataItemFromCloudVersionData(VBucket & data)1225 std::pair<int, DataItem> CloudStorageUtils::GetDataItemFromCloudVersionData(VBucket &data)
1226 {
1227     std::pair<int, DataItem> res;
1228     auto &[errCode, dataItem] = res;
1229     GetBytesFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_KEY, data, dataItem.key);
1230     GetBytesFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, data, dataItem.value);
1231     errCode = GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, data, dataItem.dev);
1232     return res;
1233 }
1234 
GetBytesFromCloudData(const std::string & field,VBucket & data,Bytes & bytes)1235 int CloudStorageUtils::GetBytesFromCloudData(const std::string &field, VBucket &data, Bytes &bytes)
1236 {
1237     std::string blobStr;
1238     int errCode = GetValueFromVBucket(field, data, blobStr);
1239     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1240         LOGE("[CloudStorageUtils] Get %.3s failed %d", field.c_str(), errCode);
1241         return errCode;
1242     }
1243     DBCommon::StringToVector(blobStr, bytes);
1244     return errCode;
1245 }
1246 
GetStringFromCloudData(const std::string & field,VBucket & data,std::string & str)1247 int CloudStorageUtils::GetStringFromCloudData(const std::string &field, VBucket &data, std::string &str)
1248 {
1249     int errCode = GetValueFromVBucket(field, data, str);
1250     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1251         LOGE("[CloudStorageUtils] Get %.3s failed %d", field.c_str(), errCode);
1252         return errCode;
1253     }
1254     return errCode;
1255 }
1256 
GetUInt64FromCloudData(const std::string & field,VBucket & data,uint64_t & number)1257 int CloudStorageUtils::GetUInt64FromCloudData(const std::string &field, VBucket &data, uint64_t &number)
1258 {
1259     int64_t intNum;
1260     int errCode = GetValueFromVBucket(field, data, intNum);
1261     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1262         LOGE("[CloudStorageUtils] Get %.3s failed %d", field.c_str(), errCode);
1263         return errCode;
1264     }
1265     number = static_cast<uint64_t>(intNum);
1266     return errCode;
1267 }
1268 
AddUpdateColForShare(const TableSchema & tableSchema,std::string & updateLogSql,std::vector<std::string> & updateColName)1269 void CloudStorageUtils::AddUpdateColForShare(const TableSchema &tableSchema, std::string &updateLogSql,
1270     std::vector<std::string> &updateColName)
1271 {
1272     updateLogSql += ", version = ?";
1273     updateColName.push_back(CloudDbConstant::VERSION_FIELD);
1274     updateLogSql += ", sharing_resource = ?";
1275     updateColName.push_back(CloudDbConstant::SHARING_RESOURCE_FIELD);
1276 }
1277 
IsDataLocked(uint32_t status)1278 bool CloudStorageUtils::IsDataLocked(uint32_t status)
1279 {
1280     return status == static_cast<uint32_t>(LockStatus::LOCK) ||
1281         status == static_cast<uint32_t>(LockStatus::LOCK_CHANGE);
1282 }
1283 
GetSystemRecordFromCloudData(VBucket & data)1284 std::pair<int, DataItem> CloudStorageUtils::GetSystemRecordFromCloudData(VBucket &data)
1285 {
1286     auto res = CloudStorageUtils::GetDataItemFromCloudData(data); // only record first one
1287     auto &[errCode, dataItem] = res;
1288     if (errCode != E_OK) {
1289         LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", errCode);
1290         return res;
1291     }
1292     dataItem.dev = "";
1293     dataItem.origDev = "";
1294     return res;
1295 }
1296 
IsSystemRecord(const Key & key)1297 bool CloudStorageUtils::IsSystemRecord(const Key &key)
1298 {
1299     std::string prefixKey = CloudDbConstant::CLOUD_VERSION_RECORD_PREFIX_KEY;
1300     if (key.size() < prefixKey.size()) {
1301         return false;
1302     }
1303     std::string keyStr(key.begin(), key.end());
1304     return keyStr.find(prefixKey) == 0;
1305 }
1306 
GetCursorUpgradeSql(const std::string & tableName)1307 std::string CloudStorageUtils::GetCursorUpgradeSql(const std::string &tableName)
1308 {
1309     return "INSERT OR REPLACE INTO " + DBCommon::GetMetaTableName() + "(key,value) VALUES (x'" +
1310         DBCommon::TransferStringToHex(DBCommon::GetCursorKey(tableName)) + "', (SELECT CASE WHEN MAX(cursor) IS" +
1311         " NULL THEN 0 ELSE MAX(cursor) END FROM " + DBCommon::GetLogTableName(tableName) + "));";
1312 }
1313 
FillQueryByPK(const std::string & tableName,bool isKv,std::map<std::string,size_t> dataIndex,std::vector<std::map<std::string,std::vector<Type>>> & syncPkVec,std::vector<QuerySyncObject> & syncQuery)1314 int CloudStorageUtils::FillQueryByPK(const std::string &tableName, bool isKv, std::map<std::string, size_t> dataIndex,
1315     std::vector<std::map<std::string, std::vector<Type>>> &syncPkVec, std::vector<QuerySyncObject> &syncQuery)
1316 {
1317     for (const auto &syncPk : syncPkVec) {
1318         Query query = Query::Select().From(tableName);
1319         if (isKv) {
1320             QueryUtils::FillQueryInKeys(syncPk, dataIndex, query);
1321         } else {
1322             for (const auto &[col, pkList] : syncPk) {
1323                 QueryUtils::FillQueryIn(col, pkList, dataIndex[col], query);
1324             }
1325         }
1326         auto objectList = QuerySyncObject::GetQuerySyncObject(query);
1327         if (objectList.size() != 1u) { // only support one QueryExpression
1328             return -E_INTERNAL_ERROR;
1329         }
1330         syncQuery.push_back(objectList[0]);
1331     }
1332     return E_OK;
1333 }
1334 
PutSyncPkVec(const std::string & col,std::map<std::string,std::vector<Type>> & syncPk,std::vector<std::map<std::string,std::vector<Type>>> & syncPkVec)1335 void CloudStorageUtils::PutSyncPkVec(const std::string &col, std::map<std::string, std::vector<Type>> &syncPk,
1336     std::vector<std::map<std::string, std::vector<Type>>> &syncPkVec)
1337 {
1338     if (syncPk[col].size() >= CloudDbConstant::MAX_CONDITIONS_SIZE) {
1339         syncPkVec.push_back(syncPk);
1340         syncPk[col].clear();
1341     }
1342 }
1343 
GetSyncQueryByPk(const std::string & tableName,const std::vector<VBucket> & data,bool isKv,std::vector<QuerySyncObject> & syncQuery)1344 int CloudStorageUtils::GetSyncQueryByPk(const std::string &tableName, const std::vector<VBucket> &data, bool isKv,
1345     std::vector<QuerySyncObject> &syncQuery)
1346 {
1347     std::map<std::string, size_t> dataIndex;
1348     std::vector<std::map<std::string, std::vector<Type>>> syncPkVec;
1349     std::map<std::string, std::vector<Type>> syncPk;
1350     int ignoreCount = 0;
1351     for (const auto &oneRow : data) {
1352         if (oneRow.size() >= 2u) { // mean this data has more than 2 pk
1353             LOGW("compensated sync does not support composite PK, oneRow size: %zu, tableName: %s",
1354                 oneRow.size(), DBCommon::StringMiddleMasking(tableName).c_str());
1355             return -E_NOT_SUPPORT;
1356         }
1357         for (const auto &[col, value] : oneRow) {
1358             bool isFind = dataIndex.find(col) != dataIndex.end();
1359             if (!isFind && value.index() == TYPE_INDEX<Nil>) {
1360                 ignoreCount++;
1361                 continue;
1362             }
1363             if (!isFind && value.index() != TYPE_INDEX<Nil>) {
1364                 dataIndex[col] = value.index();
1365                 syncPk[col].push_back(value);
1366                 PutSyncPkVec(col, syncPk, syncPkVec);
1367                 continue;
1368             }
1369             if (isFind && dataIndex[col] != value.index()) {
1370                 ignoreCount++;
1371                 continue;
1372             }
1373             syncPk[col].push_back(value);
1374             PutSyncPkVec(col, syncPk, syncPkVec);
1375         }
1376     }
1377     syncPkVec.push_back(syncPk);
1378     LOGI("match %zu data for compensated sync, ignore %d", data.size(), ignoreCount);
1379     return FillQueryByPK(tableName, isKv, dataIndex, syncPkVec, syncQuery);
1380 }
1381 
IsAssetsContainDownloadRecord(const VBucket & dbAssets)1382 bool CloudStorageUtils::IsAssetsContainDownloadRecord(const VBucket &dbAssets)
1383 {
1384     for (const auto &item: dbAssets) {
1385         if (IsAsset(item.second)) {
1386             const auto &asset = std::get<Asset>(item.second);
1387             if (AssetOperationUtils::IsAssetNeedDownload(asset)) {
1388                 return true;
1389             }
1390         } else if (IsAssets(item.second)) {
1391             const auto &assets = std::get<Assets>(item.second);
1392             if (AssetOperationUtils::IsAssetsNeedDownload(assets)) {
1393                 return true;
1394             }
1395         }
1396     }
1397     return false;
1398 }
1399 
UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor * handle,const CloudSyncParam & param,const CloudSyncBatch & updateData,CloudUploadRecorder & recorder,bool isLock)1400 int CloudStorageUtils::UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor *handle,
1401     const CloudSyncParam &param, const CloudSyncBatch &updateData, CloudUploadRecorder &recorder, bool isLock)
1402 {
1403     if (updateData.timestamp.size() != updateData.extend.size()) {
1404         LOGE("the num of extend:%zu and timestamp:%zu is not equal.",
1405             updateData.extend.size(), updateData.timestamp.size());
1406         return -E_INVALID_ARGS;
1407     }
1408     for (size_t i = 0; i < updateData.extend.size(); ++i) {
1409         const auto &record = updateData.extend[i];
1410         if (DBCommon::IsRecordError(record) || DBCommon::IsRecordAssetsMissing(record) ||
1411             DBCommon::IsRecordVersionConflict(record) || isLock) {
1412             if (DBCommon::IsRecordAssetsMissing(record)) {
1413                 LOGI("[CloudStorageUtils][UpdateRecordFlagAfterUpload] Record assets missing, skip update.");
1414             }
1415             int errCode = handle->UpdateRecordStatus(param.tableName, CloudDbConstant::TO_LOCAL_CHANGE,
1416                 updateData.hashKey[i]);
1417             if (errCode != E_OK) {
1418                 LOGE("[CloudStorageUtils] Update record status failed in index %zu", i);
1419                 return errCode;
1420             }
1421             continue;
1422         }
1423         const auto &rowId = updateData.rowid[i];
1424         std::string cloudGid;
1425         (void)CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, record, cloudGid);
1426         LogInfo logInfo;
1427         logInfo.cloudGid = cloudGid;
1428         logInfo.timestamp = updateData.timestamp[i];
1429         logInfo.dataKey = rowId;
1430         logInfo.hashKey = updateData.hashKey[i];
1431         std::string sql = CloudStorageUtils::GetUpdateRecordFlagSqlUpload(
1432             param.tableName, DBCommon::IsRecordIgnored(record), logInfo, record, param.type);
1433         int errCode = handle->UpdateRecordFlag(param.tableName, sql, logInfo);
1434         if (errCode != E_OK) {
1435             LOGE("[CloudStorageUtils] Update record flag failed in index %zu", i);
1436             return errCode;
1437         }
1438 
1439         std::vector<VBucket> assets;
1440         errCode = handle->GetDownloadAssetRecordsByGid(param.tableSchema, logInfo.cloudGid, assets);
1441         if (errCode != E_OK) {
1442             LOGE("[RDBExecutor]Get downloading assets records by gid failed: %d", errCode);
1443             return errCode;
1444         }
1445         handle->MarkFlagAsUploadFinished(param.tableName, updateData.hashKey[i], updateData.timestamp[i],
1446             !assets.empty());
1447         recorder.RecordUploadRecord(param.tableName, logInfo.hashKey, param.type, updateData.timestamp[i]);
1448     }
1449     return E_OK;
1450 }
1451 
FillCloudQueryToExtend(QuerySyncObject & obj,VBucket & extend)1452 int CloudStorageUtils::FillCloudQueryToExtend(QuerySyncObject &obj, VBucket &extend)
1453 {
1454     Bytes bytes;
1455     bytes.resize(obj.CalculateParcelLen(SOFTWARE_VERSION_CURRENT));
1456     Parcel parcel(bytes.data(), bytes.size());
1457     int errCode = obj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
1458     if (errCode != E_OK) {
1459         LOGE("[CloudStorageUtils] Query serialize failed %d", errCode);
1460         return errCode;
1461     }
1462     extend[CloudDbConstant::TYPE_FIELD] = static_cast<int64_t>(CloudQueryType::QUERY_FIELD);
1463     extend[CloudDbConstant::QUERY_FIELD] = bytes;
1464     return E_OK;
1465 }
1466 
SaveChangedDataByType(const DataValue & dataValue,Type & value)1467 void CloudStorageUtils::SaveChangedDataByType(const DataValue &dataValue, Type &value)
1468 {
1469     int ret = E_OK;
1470     switch (dataValue.GetType()) {
1471         case StorageType::STORAGE_TYPE_TEXT:
1472             {
1473                 std::string sValue;
1474                 ret = dataValue.GetText(sValue);
1475                 if (ret != E_OK) {
1476                     LOGE("[CloudStorageUtils] save changed string data failed %d", ret);
1477                     return;
1478                 }
1479                 value = sValue;
1480             } break;
1481         case StorageType::STORAGE_TYPE_BLOB:
1482             {
1483                 Blob blob;
1484                 (void)dataValue.GetBlob(blob);
1485                 if (blob.GetSize() == 0u) {
1486                     LOGE("[CloudStorageUtils] save changed Blob data failed");
1487                     return;
1488                 }
1489                 value = std::vector<uint8_t>(blob.GetData(), blob.GetData() + blob.GetSize());
1490             } break;
1491         case StorageType::STORAGE_TYPE_INTEGER:
1492             {
1493                 int64_t iValue;
1494                 ret = dataValue.GetInt64(iValue);
1495                 if (ret != E_OK) {
1496                     LOGE("[CloudStorageUtils] save changed int64 data failed %d", ret);
1497                     return;
1498                 }
1499                 value = iValue;
1500             } break;
1501         case StorageType::STORAGE_TYPE_REAL:
1502             {
1503                 double dValue;
1504                 ret = dataValue.GetDouble(dValue);
1505                 if (ret != E_OK) {
1506                     LOGE("[CloudStorageUtils] save changed double data failed %d", ret);
1507                     return;
1508                 }
1509                 value = dValue;
1510             } break;
1511         default:
1512             LOGE("[CloudStorageUtils] save changed failed, wrong storage type :%" PRIu32, dataValue.GetType());
1513             return;
1514     }
1515 }
1516 }
1517