• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "cloud/cloud_storage_utils.h"
17 #include <set>
18 
19 #include "cloud/asset_operation_utils.h"
20 #include "cloud/cloud_db_constant.h"
21 #include "cloud/cloud_db_types.h"
22 #include "db_common.h"
23 #include "runtime_context.h"
24 
25 namespace DistributedDB {
BindInt64(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)26 int CloudStorageUtils::BindInt64(int index, const VBucket &vBucket, const Field &field,
27     sqlite3_stmt *upsertStmt)
28 {
29     int64_t val = 0;
30     int errCode = GetValueFromVBucket<int64_t>(field.colName, vBucket, val);
31     if (field.nullable && errCode == -E_NOT_FOUND) {
32         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
33     } else {
34         if (errCode != E_OK) {
35             LOGE("get int from vbucket failed, %d", errCode);
36             return -E_CLOUD_ERROR;
37         }
38         errCode = SQLiteUtils::BindInt64ToStatement(upsertStmt, index, val);
39     }
40 
41     if (errCode != E_OK) {
42         LOGE("Bind int to insert statement failed, %d", errCode);
43     }
44     return errCode;
45 }
46 
BindBool(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)47 int CloudStorageUtils::BindBool(int index, const VBucket &vBucket, const Field &field,
48     sqlite3_stmt *upsertStmt)
49 {
50     bool val = false;
51     int errCode = GetValueFromVBucket<bool>(field.colName, vBucket, val);
52     if (field.nullable && errCode == -E_NOT_FOUND) {
53         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
54     } else {
55         if (errCode != E_OK) {
56             LOGE("get bool from vbucket failed, %d", errCode);
57             return -E_CLOUD_ERROR;
58         }
59         errCode = SQLiteUtils::BindInt64ToStatement(upsertStmt, index, val);
60     }
61 
62     if (errCode != E_OK) {
63         LOGE("Bind bool to insert statement failed, %d", errCode);
64     }
65     return errCode;
66 }
67 
BindDouble(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)68 int CloudStorageUtils::BindDouble(int index, const VBucket &vBucket, const Field &field,
69     sqlite3_stmt *upsertStmt)
70 {
71     double val = 0.0;
72     int errCode = GetValueFromVBucket<double>(field.colName, vBucket, val);
73     if (field.nullable && errCode == -E_NOT_FOUND) {
74         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
75     } else {
76         if (errCode != E_OK) {
77             LOGE("get double from vbucket failed, %d", errCode);
78             return -E_CLOUD_ERROR;
79         }
80         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_double(upsertStmt, index, val));
81     }
82 
83     if (errCode != E_OK) {
84         LOGE("Bind double to insert statement failed, %d", errCode);
85     }
86     return errCode;
87 }
88 
BindText(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)89 int CloudStorageUtils::BindText(int index, const VBucket &vBucket, const Field &field,
90     sqlite3_stmt *upsertStmt)
91 {
92     std::string str;
93     int errCode = GetValueFromVBucket<std::string>(field.colName, vBucket, str);
94     if (field.nullable && errCode == -E_NOT_FOUND) {
95         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
96     } else {
97         if (errCode != E_OK) {
98             LOGE("get string from vbucket failed, %d", errCode);
99             return -E_CLOUD_ERROR;
100         }
101         errCode = SQLiteUtils::BindTextToStatement(upsertStmt, index, str);
102     }
103 
104     if (errCode != E_OK) {
105         LOGE("Bind string to insert statement failed, %d", errCode);
106     }
107     return errCode;
108 }
109 
BindBlob(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)110 int CloudStorageUtils::BindBlob(int index, const VBucket &vBucket, const Field &field,
111     sqlite3_stmt *upsertStmt)
112 {
113     int errCode = E_OK;
114     Bytes val;
115     if (field.type == TYPE_INDEX<Bytes>) {
116         errCode = GetValueFromVBucket<Bytes>(field.colName, vBucket, val);
117         if (!(IsFieldValid(field, errCode))) {
118             goto ERROR;
119         }
120     } else if (field.type == TYPE_INDEX<Asset>) {
121         Asset asset;
122         errCode = GetValueFromVBucket(field.colName, vBucket, asset);
123         if (!(IsFieldValid(field, errCode))) {
124             goto ERROR;
125         }
126         RuntimeContext::GetInstance()->AssetToBlob(asset, val);
127     } else {
128         Assets assets;
129         errCode = GetValueFromVBucket(field.colName, vBucket, assets);
130         if (!(IsFieldValid(field, errCode))) {
131             goto ERROR;
132         }
133         RuntimeContext::GetInstance()->AssetsToBlob(assets, val);
134     }
135 
136     if (errCode == -E_NOT_FOUND) {
137         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
138     } else {
139         errCode = SQLiteUtils::BindBlobToStatement(upsertStmt, index, val);
140     }
141     if (errCode != E_OK) {
142         LOGE("Bind blob to insert statement failed, %d", errCode);
143     }
144     return errCode;
145 ERROR:
146     LOGE("get blob from vbucket failed, %d", errCode);
147     return -E_CLOUD_ERROR;
148 }
149 
BindAsset(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)150 int CloudStorageUtils::BindAsset(int index, const VBucket &vBucket, const Field &field, sqlite3_stmt *upsertStmt)
151 {
152     int errCode;
153     Bytes val;
154     Type entry;
155     bool isExisted = GetTypeCaseInsensitive(field.colName, vBucket, entry);
156     if (!isExisted || entry.index() == TYPE_INDEX<Nil>) {
157         if (!field.nullable) {
158             LOGE("field value is not allowed to be null, %d", -E_CLOUD_ERROR);
159             return -E_CLOUD_ERROR;
160         }
161         return SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
162     }
163 
164     Type type = entry;
165     if (field.type == TYPE_INDEX<Asset>) {
166         Asset asset;
167         errCode = GetValueFromOneField(type, asset);
168         if (errCode != E_OK) {
169             LOGE("can not get asset from vBucket when bind, %d", errCode);
170             return errCode;
171         }
172         asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
173         errCode = RuntimeContext::GetInstance()->AssetToBlob(asset, val);
174     } else if (field.type == TYPE_INDEX<Assets>) {
175         Assets assets;
176         errCode = GetValueFromOneField(type, assets);
177         if (errCode != E_OK) {
178             LOGE("can not get assets from vBucket when bind, %d", errCode);
179             return errCode;
180         }
181         if (!assets.empty()) {
182             for (auto &asset: assets) {
183                 asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
184             }
185             errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, val);
186         }
187     } else {
188         LOGE("field type is not asset or assets, %d", -E_CLOUD_ERROR);
189         return -E_CLOUD_ERROR;
190     }
191     if (errCode != E_OK) {
192         LOGE("assets or asset to blob fail, %d", -E_CLOUD_ERROR);
193         return -E_CLOUD_ERROR;
194     }
195     if (val.empty()) {
196         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
197     } else {
198         errCode = SQLiteUtils::BindBlobToStatement(upsertStmt, index, val);
199     }
200     return errCode;
201 }
202 
Int64ToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)203 int CloudStorageUtils::Int64ToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
204     std::vector<uint8_t> &value)
205 {
206     (void)collateType;
207     int64_t val = 0;
208     if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
209         return -E_CLOUD_ERROR;
210     }
211     DBCommon::StringToVector(std::to_string(val), value);
212     return E_OK;
213 }
214 
BoolToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)215 int CloudStorageUtils::BoolToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
216     std::vector<uint8_t> &value)
217 {
218     (void)collateType;
219     bool val = false;
220     if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) { // LCOV_EXCL_BR_LINE
221         return -E_CLOUD_ERROR;
222     }
223     DBCommon::StringToVector(std::to_string(val ? 1 : 0), value);
224     return E_OK;
225 }
226 
DoubleToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)227 int CloudStorageUtils::DoubleToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
228     std::vector<uint8_t> &value)
229 {
230     (void)collateType;
231     double val = 0.0;
232     if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) { // LCOV_EXCL_BR_LINE
233         return -E_CLOUD_ERROR;
234     }
235     std::ostringstream s;
236     s << val;
237     DBCommon::StringToVector(s.str(), value);
238     return E_OK;
239 }
240 
TextToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)241 int CloudStorageUtils::TextToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
242     std::vector<uint8_t> &value)
243 {
244     std::string val;
245     if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
246         return -E_CLOUD_ERROR;
247     }
248     if (collateType == CollateType::COLLATE_NOCASE) {
249         std::transform(val.begin(), val.end(), val.begin(), ::toupper);
250     } else if (collateType == CollateType::COLLATE_RTRIM) {
251         DBCommon::RTrim(val);
252     }
253 
254     DBCommon::StringToVector(val, value);
255     return E_OK;
256 }
257 
BlobToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)258 int CloudStorageUtils::BlobToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
259     std::vector<uint8_t> &value)
260 {
261     (void)collateType;
262     if (field.type == TYPE_INDEX<Bytes>) { // LCOV_EXCL_BR_LINE
263         return CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, value);
264     } else if (field.type == TYPE_INDEX<Asset>) {
265         Asset val;
266         if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) { // LCOV_EXCL_BR_LINE
267             return -E_CLOUD_ERROR;
268         }
269         int errCode = RuntimeContext::GetInstance()->AssetToBlob(val, value);
270         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
271             LOGE("asset to blob fail, %d", errCode);
272         }
273         return errCode;
274     } else {
275         Assets val;
276         if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) { // LCOV_EXCL_BR_LINE
277             return -E_CLOUD_ERROR;
278         }
279         int errCode = RuntimeContext::GetInstance()->AssetsToBlob(val, value);
280         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
281             LOGE("assets to blob fail, %d", errCode);
282         }
283         return errCode;
284     }
285 }
286 
GetCloudPrimaryKey(const TableSchema & tableSchema)287 std::set<std::string> CloudStorageUtils::GetCloudPrimaryKey(const TableSchema &tableSchema)
288 {
289     std::set<std::string> pkSet;
290     for (const auto &field : tableSchema.fields) {
291         if (field.primary) {
292             pkSet.insert(field.colName);
293         }
294     }
295     return pkSet;
296 }
297 
GetCloudAsset(const TableSchema & tableSchema)298 std::vector<Field> CloudStorageUtils::GetCloudAsset(const TableSchema &tableSchema)
299 {
300     std::vector<Field> assetFields;
301     for (const auto &item: tableSchema.fields) {
302         if (item.type != TYPE_INDEX<Asset> && item.type != TYPE_INDEX<Assets>) {
303             continue;
304         }
305         assetFields.push_back(item);
306     }
307     return assetFields;
308 }
309 
GetCloudPrimaryKeyField(const TableSchema & tableSchema,bool sortByName)310 std::vector<Field> CloudStorageUtils::GetCloudPrimaryKeyField(const TableSchema &tableSchema, bool sortByName)
311 {
312     std::vector<Field> pkVec;
313     for (const auto &field : tableSchema.fields) {
314         if (field.primary) {
315             pkVec.push_back(field);
316         }
317     }
318     if (sortByName) {
319         std::sort(pkVec.begin(), pkVec.end(), [](const Field &a, const Field &b) {
320            return a.colName < b.colName;
321         });
322     }
323     return pkVec;
324 }
325 
GetCloudPrimaryKeyFieldMap(const TableSchema & tableSchema,bool sortByUpper)326 std::map<std::string, Field> CloudStorageUtils::GetCloudPrimaryKeyFieldMap(const TableSchema &tableSchema,
327     bool sortByUpper)
328 {
329     std::map<std::string, Field> pkMap;
330     for (const auto &field : tableSchema.fields) {
331         if (field.primary) {
332             if (sortByUpper) {
333                 pkMap[DBCommon::ToUpperCase(field.colName)] = field;
334             } else {
335                 pkMap[field.colName] = field;
336             }
337         }
338     }
339     return pkMap;
340 }
341 
GetAssetFieldsFromSchema(const TableSchema & tableSchema,const VBucket & vBucket,std::vector<Field> & fields)342 int CloudStorageUtils::GetAssetFieldsFromSchema(const TableSchema &tableSchema, const VBucket &vBucket,
343     std::vector<Field> &fields)
344 {
345     for (const auto &field: tableSchema.fields) {
346         Type type;
347         bool isExisted = GetTypeCaseInsensitive(field.colName, vBucket, type);
348         if (!isExisted) {
349             continue;
350         }
351         if (type.index() != TYPE_INDEX<Asset> && type.index() != TYPE_INDEX<Assets>) {
352             continue;
353         }
354         fields.push_back(field);
355     }
356     if (fields.empty()) {
357         return -E_CLOUD_ERROR;
358     }
359     return E_OK;
360 }
361 
IsContainsPrimaryKey(const TableSchema & tableSchema)362 bool CloudStorageUtils::IsContainsPrimaryKey(const TableSchema &tableSchema)
363 {
364     for (const auto &field : tableSchema.fields) {
365         if (field.primary) {
366             return true;
367         }
368     }
369     return false;
370 }
371 
ObtainAssetFromVBucket(const VBucket & vBucket,VBucket & asset)372 void CloudStorageUtils::ObtainAssetFromVBucket(const VBucket &vBucket, VBucket &asset)
373 {
374     for (const auto &item: vBucket) {
375         if (IsAsset(item.second)) {
376             Asset data = std::get<Asset>(item.second);
377             asset.insert_or_assign(item.first, data);
378         } else if (IsAssets(item.second)) {
379             Assets data = std::get<Assets>(item.second);
380             asset.insert_or_assign(item.first, data);
381         }
382     }
383 }
384 
StatusToFlag(AssetStatus status)385 AssetOpType CloudStorageUtils::StatusToFlag(AssetStatus status)
386 {
387     switch (AssetOperationUtils::EraseBitMask(status)) {
388         case AssetStatus::INSERT:
389             return AssetOpType::INSERT;
390         case AssetStatus::DELETE:
391             return AssetOpType::DELETE;
392         case AssetStatus::UPDATE:
393             return AssetOpType::UPDATE;
394         default:
395             return AssetOpType::NO_CHANGE;
396     }
397 }
398 
FlagToStatus(AssetOpType opType)399 AssetStatus CloudStorageUtils::FlagToStatus(AssetOpType opType)
400 {
401     switch (opType) {
402         case AssetOpType::INSERT:
403             return AssetStatus::INSERT;
404         case AssetOpType::DELETE:
405             return AssetStatus::DELETE;
406         case AssetOpType::UPDATE:
407             return AssetStatus::UPDATE;
408         default:
409             return AssetStatus::NORMAL;
410     }
411 }
412 
ChangeAssetsOnVBucketToAsset(VBucket & vBucket,std::vector<Field> & fields)413 void CloudStorageUtils::ChangeAssetsOnVBucketToAsset(VBucket &vBucket, std::vector<Field> &fields)
414 {
415     for (const Field &field: fields) {
416         if (field.type == TYPE_INDEX<Asset>) {
417             Type asset = GetAssetFromAssets(vBucket[field.colName]);
418             vBucket[field.colName] = asset;
419         }
420     }
421 }
422 
GetAssetFromAssets(Type & value)423 Type CloudStorageUtils::GetAssetFromAssets(Type &value)
424 {
425     Asset assetVal;
426     int errCode = GetValueFromType(value, assetVal);
427     if (errCode == E_OK) {
428         return assetVal;
429     }
430 
431     Assets assets;
432     errCode = GetValueFromType(value, assets);
433     if (errCode != E_OK) {
434         return Nil();
435     }
436 
437     for (Asset &asset: assets) {
438         uint32_t lowStatus = AssetOperationUtils::EraseBitMask(asset.status);
439         if ((asset.flag == static_cast<uint32_t>(AssetOpType::DELETE) && (lowStatus == AssetStatus::ABNORMAL ||
440             lowStatus == AssetStatus::NORMAL)) || asset.flag != static_cast<uint32_t>(AssetOpType::DELETE)) {
441             return std::move(asset);
442         }
443     }
444     return Nil();
445 }
446 
FillAssetBeforeDownload(Asset & asset)447 int CloudStorageUtils::FillAssetBeforeDownload(Asset &asset)
448 {
449     AssetOpType flag = static_cast<AssetOpType>(asset.flag);
450     uint32_t lowStatus = AssetOperationUtils::EraseBitMask(asset.status);
451     switch (flag) {
452         case AssetOpType::DELETE: {
453             // these asset no need to download, just remove before download
454             if (lowStatus == static_cast<uint32_t>(AssetStatus::DELETE) ||
455                 lowStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
456                 (asset.status == (AssetStatus::DOWNLOADING | AssetStatus::DOWNLOAD_WITH_NULL))) {
457                 return -E_NOT_FOUND;
458             }
459             break;
460         }
461         default:
462             break;
463     }
464     return E_OK;
465 }
466 
FillAssetAfterDownload(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)467 int CloudStorageUtils::FillAssetAfterDownload(Asset &asset, Asset &dbAsset,
468     AssetOperationUtils::AssetOpType assetOpType)
469 {
470     if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
471         return E_OK;
472     }
473     dbAsset = asset;
474     AssetOpType flag = static_cast<AssetOpType>(asset.flag);
475     if (asset.status != AssetStatus::NORMAL) {
476         return E_OK;
477     }
478     switch (flag) {
479         case AssetOpType::DELETE: {
480             return -E_NOT_FOUND;
481         }
482         default:
483             break;
484     }
485     return E_OK;
486 }
487 
FillAssetsAfterDownload(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)488 void CloudStorageUtils::FillAssetsAfterDownload(Assets &assets, Assets &dbAssets,
489     const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
490 {
491     MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetAfterDownload);
492 }
493 
FillAssetForUpload(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)494 int CloudStorageUtils::FillAssetForUpload(Asset &asset, Asset &dbAsset, AssetOperationUtils::AssetOpType assetOpType)
495 {
496     if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
497         // db assetId may be empty, need to be based on cache
498         // Notes: Assign happened when dbAsset.assetId is empty because of asset.assetId may be empty.
499         if (dbAsset.assetId.empty()) {
500             dbAsset.assetId = asset.assetId;
501         }
502         return E_OK;
503     }
504     AssetStatus status = static_cast<AssetStatus>(dbAsset.status);
505     dbAsset = asset;
506     switch (StatusToFlag(status)) {
507         case AssetOpType::INSERT:
508         case AssetOpType::UPDATE:
509         case AssetOpType::NO_CHANGE: {
510             dbAsset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
511             break;
512         }
513         case AssetOpType::DELETE: {
514             return -E_NOT_FOUND;
515         }
516         default: {
517             break;
518         }
519     }
520     dbAsset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
521     return E_OK;
522 }
523 
FillAssetsForUpload(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)524 void CloudStorageUtils::FillAssetsForUpload(Assets &assets, Assets &dbAssets,
525     const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
526 {
527     MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetForUpload);
528 }
529 
FillAssetBeforeUpload(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)530 int CloudStorageUtils::FillAssetBeforeUpload(Asset &asset, Asset &dbAsset, AssetOperationUtils::AssetOpType assetOpType)
531 {
532     if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
533         return E_OK;
534     }
535     dbAsset = asset;
536     switch (static_cast<AssetOpType>(asset.flag)) {
537         case AssetOpType::INSERT:
538         case AssetOpType::UPDATE:
539         case AssetOpType::DELETE:
540         case AssetOpType::NO_CHANGE:
541             dbAsset.status |= static_cast<uint32_t>(AssetStatus::UPLOADING);
542             break;
543         default:
544             break;
545     }
546     dbAsset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
547     return E_OK;
548 }
549 
FillAssetsBeforeUpload(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)550 void CloudStorageUtils::FillAssetsBeforeUpload(Assets &assets, Assets &dbAssets, const std::map<std::string,
551     AssetOperationUtils::AssetOpType> &assetOpTypeMap)
552 {
553     MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetBeforeUpload);
554 }
555 
PrepareToFillAssetFromVBucket(VBucket & vBucket,std::function<int (Asset &)> fillAsset)556 void CloudStorageUtils::PrepareToFillAssetFromVBucket(VBucket &vBucket, std::function<int(Asset &)> fillAsset)
557 {
558     for (auto &item: vBucket) {
559         if (IsAsset(item.second)) {
560             Asset asset;
561             GetValueFromType(item.second, asset);
562             fillAsset(asset);
563             vBucket[item.first] = asset;
564         } else if (IsAssets(item.second)) {
565             Assets assets;
566             GetValueFromType(item.second, assets);
567             for (auto it = assets.begin(); it != assets.end();) {
568                 fillAsset(*it) == -E_NOT_FOUND ? it = assets.erase(it) : ++it;
569             }
570             vBucket[item.first] = assets;
571         }
572     }
573 }
574 
FillAssetFromVBucketFinish(const AssetOperationUtils::RecordAssetOpType & assetOpType,VBucket & vBucket,VBucket & dbAssets,std::function<int (Asset &,Asset &,AssetOperationUtils::AssetOpType)> fillAsset,std::function<void (Assets &,Assets &,const std::map<std::string,AssetOperationUtils::AssetOpType> &)> fillAssets)575 void CloudStorageUtils::FillAssetFromVBucketFinish(const AssetOperationUtils::RecordAssetOpType &assetOpType,
576     VBucket &vBucket, VBucket &dbAssets,
577     std::function<int(Asset &, Asset &, AssetOperationUtils::AssetOpType)> fillAsset,
578     std::function<void(Assets &, Assets &,
579     const std::map<std::string, AssetOperationUtils::AssetOpType> &)> fillAssets)
580 {
581     for (auto &item: dbAssets) {
582         if (IsAsset(item.second)) {
583             Asset cacheItem;
584             GetValueFromType(vBucket[item.first], cacheItem);
585             Asset dbItem;
586             GetValueFromType(item.second, dbItem);
587             AssetOperationUtils::AssetOpType opType = AssetOperationUtils::AssetOpType::NOT_HANDLE;
588             auto iterCol = assetOpType.find(item.first);
589             if (iterCol != assetOpType.end() && iterCol->second.find(dbItem.name) != iterCol->second.end()) {
590                 opType = iterCol->second.at(dbItem.name);
591             }
592             int errCode = fillAsset(cacheItem, dbItem, opType);
593             if (errCode != E_OK) {
594                 dbAssets[item.first] = Nil();
595             } else {
596                 dbAssets[item.first] = dbItem;
597             }
598             continue;
599         }
600         if (IsAssets(item.second)) {
601             Assets cacheItems;
602             GetValueFromType(vBucket[item.first], cacheItems);
603             Assets dbItems;
604             GetValueFromType(item.second, dbItems);
605             auto iterCol = assetOpType.find(item.first);
606             if (iterCol == assetOpType.end()) {
607                 fillAssets(cacheItems, dbItems, {});
608             } else {
609                 fillAssets(cacheItems, dbItems, iterCol->second);
610             }
611             if (dbItems.empty()) {
612                 dbAssets[item.first] = Nil();
613             } else {
614                 dbAssets[item.first] = dbItems;
615             }
616         }
617     }
618 }
619 
IsAsset(const Type & type)620 bool CloudStorageUtils::IsAsset(const Type &type)
621 {
622     return type.index() == TYPE_INDEX<Asset>;
623 }
624 
IsAssets(const Type & type)625 bool CloudStorageUtils::IsAssets(const Type &type)
626 {
627     return type.index() == TYPE_INDEX<Assets>;
628 }
629 
CalculateHashKeyForOneField(const Field & field,const VBucket & vBucket,bool allowEmpty,CollateType collateType,std::vector<uint8_t> & hashValue)630 int CloudStorageUtils::CalculateHashKeyForOneField(const Field &field, const VBucket &vBucket, bool allowEmpty,
631     CollateType collateType, std::vector<uint8_t> &hashValue)
632 {
633     Type type;
634     bool isExisted = GetTypeCaseInsensitive(field.colName, vBucket, type);
635     if (allowEmpty && !isExisted) {
636         return E_OK; // if vBucket from cloud doesn't contain primary key and allowEmpty, no need to calculate hash
637     }
638     static std::map<int32_t, std::function<int(const VBucket &, const Field &, CollateType,
639         std::vector<uint8_t> &)>> toVecFunc = {
640         { TYPE_INDEX<int64_t>, &CloudStorageUtils::Int64ToVector },
641         { TYPE_INDEX<bool>, &CloudStorageUtils::BoolToVector },
642         { TYPE_INDEX<double>, &CloudStorageUtils::DoubleToVector },
643         { TYPE_INDEX<std::string>, &CloudStorageUtils::TextToVector },
644         { TYPE_INDEX<Bytes>, &CloudStorageUtils::BlobToVector },
645         { TYPE_INDEX<Asset>, &CloudStorageUtils::BlobToVector },
646         { TYPE_INDEX<Assets>, &CloudStorageUtils::BlobToVector },
647     };
648     auto it = toVecFunc.find(field.type);
649     if (it == toVecFunc.end()) {
650         LOGE("unknown cloud type when convert field to vector.");
651         return -E_CLOUD_ERROR;
652     }
653     std::vector<uint8_t> value;
654     int errCode = it->second(vBucket, field, collateType, value);
655     if (errCode != E_OK) {
656         LOGE("convert cloud field fail, %d", errCode);
657         return errCode;
658     }
659     return DBCommon::CalcValueHash(value, hashValue);
660 }
661 
IsAssetsContainDuplicateAsset(Assets & assets)662 bool CloudStorageUtils::IsAssetsContainDuplicateAsset(Assets &assets)
663 {
664     std::set<std::string> set;
665     for (const auto &asset : assets) {
666         if (set.find(asset.name) != set.end()) {
667             LOGE("assets contain duplicate Asset");
668             return true;
669         }
670         set.insert(asset.name);
671     }
672     return false;
673 }
674 
EraseNoChangeAsset(std::map<std::string,Assets> & assetsMap)675 void CloudStorageUtils::EraseNoChangeAsset(std::map<std::string, Assets> &assetsMap)
676 {
677     for (auto items = assetsMap.begin(); items != assetsMap.end();) {
678         for (auto item = items->second.begin(); item != items->second.end();) {
679             if (static_cast<AssetOpType>((*item).flag) == AssetOpType::NO_CHANGE) {
680                 item = items->second.erase(item);
681             } else {
682                 item++;
683             }
684         }
685         if (items->second.empty()) {
686             items = assetsMap.erase(items);
687         } else {
688             items++;
689         }
690     }
691 }
692 
MergeDownloadAsset(std::map<std::string,Assets> & downloadAssets,std::map<std::string,Assets> & mergeAssets)693 void CloudStorageUtils::MergeDownloadAsset(std::map<std::string, Assets> &downloadAssets,
694     std::map<std::string, Assets> &mergeAssets)
695 {
696     for (auto &items: mergeAssets) {
697         auto downloadItem = downloadAssets.find(items.first);
698         if (downloadItem == downloadAssets.end()) { // LCOV_EXCL_BR_LINE
699             continue;
700         }
701         std::map<std::string, size_t> beCoveredAssetsMap = GenAssetsIndexMap(items.second);
702         for (const Asset &asset: downloadItem->second) {
703             auto it = beCoveredAssetsMap.find(asset.name);
704             if (it == beCoveredAssetsMap.end()) { // LCOV_EXCL_BR_LINE
705                 continue;
706             }
707             items.second[it->second] = asset;
708         }
709     }
710 }
711 
GenAssetsIndexMap(Assets & assets)712 std::map<std::string, size_t> CloudStorageUtils::GenAssetsIndexMap(Assets &assets)
713 {
714     // key of assetsIndexMap is name of asset, the value of it is index.
715     std::map<std::string, size_t> assetsIndexMap;
716     for (size_t i = 0; i < assets.size(); i++) {
717         assetsIndexMap[assets[i].name] = i;
718     }
719     return assetsIndexMap;
720 }
721 
IsVbucketContainsAllPK(const VBucket & vBucket,const std::set<std::string> & pkSet)722 bool CloudStorageUtils::IsVbucketContainsAllPK(const VBucket &vBucket, const std::set<std::string> &pkSet)
723 {
724     if (pkSet.empty()) {
725         return false;
726     }
727     for (const auto &pk : pkSet) {
728         Type type;
729         bool isExisted = GetTypeCaseInsensitive(pk, vBucket, type);
730         if (!isExisted) {
731             return false;
732         }
733     }
734     return true;
735 }
736 
IsSharedTable(const TableSchema & tableSchema)737 bool CloudStorageUtils::IsSharedTable(const TableSchema &tableSchema)
738 {
739     return tableSchema.sharedTableName == tableSchema.name;
740 }
741 
IsViolationOfConstraints(const std::string & name,const std::vector<FieldInfo> & fieldInfos)742 static bool IsViolationOfConstraints(const std::string &name, const std::vector<FieldInfo> &fieldInfos)
743 {
744     for (const auto &field : fieldInfos) {
745         if (name != field.GetFieldName()) {
746             continue;
747         }
748         if (field.GetStorageType() == StorageType::STORAGE_TYPE_REAL) {
749             LOGE("[ConstraintsCheckForCloud] Not support create distributed table with real primary key.");
750             return true;
751         } else if (field.IsAssetType() || field.IsAssetsType()) {
752             LOGE("[ConstraintsCheckForCloud] Not support create distributed table with asset primary key.");
753             return true;
754         } else {
755             return false;
756         }
757     }
758     return false;
759 }
760 
ConstraintsCheckForCloud(const TableInfo & table,const std::string & trimmedSql)761 int CloudStorageUtils::ConstraintsCheckForCloud(const TableInfo &table, const std::string &trimmedSql)
762 {
763     if (DBCommon::HasConstraint(trimmedSql, "UNIQUE", " ,", " ,)(")) {
764         LOGE("[ConstraintsCheckForCloud] Not support create distributed table with 'UNIQUE' constraint.");
765         return -E_NOT_SUPPORT;
766     }
767 
768     const std::map<int, FieldName> &primaryKeys = table.GetPrimaryKey();
769     const std::vector<FieldInfo> &fieldInfos = table.GetFieldInfos();
770     for (const auto &item : primaryKeys) {
771         if (IsViolationOfConstraints(item.second, fieldInfos)) {
772             return -E_NOT_SUPPORT;
773         }
774     }
775     return E_OK;
776 }
777 
CheckAssetStatus(const Assets & assets)778 bool CloudStorageUtils::CheckAssetStatus(const Assets &assets)
779 {
780     for (const Asset &asset: assets) {
781         if (AssetOperationUtils::EraseBitMask(asset.status) > static_cast<uint32_t>(AssetStatus::UPDATE)) {
782             LOGE("assets contain invalid status:[%u]", asset.status);
783             return false;
784         }
785     }
786     return true;
787 }
788 
GetTableRefUpdateSql(const TableInfo & table,OpType opType)789 std::string CloudStorageUtils::GetTableRefUpdateSql(const TableInfo &table, OpType opType)
790 {
791     std::string sql;
792     std::string rowid = std::string(DBConstant::SQLITE_INNER_ROWID);
793     for (const auto &reference : table.GetTableReference()) {
794         if (reference.columns.empty()) {
795             return "";
796         }
797         std::string sourceLogName = DBCommon::GetLogTableName(reference.sourceTableName);
798         sql += " UPDATE " + sourceLogName + " SET timestamp=get_raw_sys_time(), flag=flag|0x02 WHERE ";
799         int index = 0;
800         for (const auto &itCol : reference.columns) {
801             if (opType != OpType::UPDATE) {
802                 continue;
803             }
804             if (index++ != 0) {
805                 sql += " OR ";
806             }
807             sql += " (OLD." + itCol.second + " IS NOT " + " NEW." + itCol.second + ")";
808         }
809         if (opType == OpType::UPDATE) {
810             sql += " AND ";
811         }
812         sql += " (flag&0x08=0x00) AND data_key IN (SELECT " + sourceLogName + ".data_key FROM " + sourceLogName +
813             " LEFT JOIN " + reference.sourceTableName + " ON " + sourceLogName + ".data_key = " +
814             reference.sourceTableName + "." + rowid + " WHERE ";
815         index = 0;
816         for (const auto &itCol : reference.columns) {
817             if (index++ != 0) {
818                 sql += " OR ";
819             }
820             if (opType == OpType::UPDATE) {
821                 sql += itCol.first + "=OLD." + itCol.second + " OR " + itCol.first + "=NEW." + itCol.second;
822             } else if (opType == OpType::INSERT) {
823                 sql += itCol.first + "=NEW." + itCol.second;
824             } else if (opType == OpType::DELETE) {
825                 sql += itCol.first + "=OLD." + itCol.second;
826             }
827         }
828         sql += ");";
829     }
830     return sql;
831 }
832 
GetLeftJoinLogSql(const std::string & tableName,bool logAsTableA)833 std::string CloudStorageUtils::GetLeftJoinLogSql(const std::string &tableName, bool logAsTableA)
834 {
835     std::string sql;
836     if (logAsTableA) {
837         sql += " FROM '" + DBCommon::GetLogTableName(tableName) + "' AS a LEFT JOIN '" + tableName + "' AS b " +
838             " ON (a.data_key = b." + std::string(DBConstant::SQLITE_INNER_ROWID) + ")";
839     } else {
840         sql += " FROM '" + DBCommon::GetLogTableName(tableName) + "' AS b LEFT JOIN '" + tableName + "' AS a " +
841             " ON (b.data_key = a." + std::string(DBConstant::SQLITE_INNER_ROWID) + ")";
842     }
843     return sql;
844 }
845 
GetUpdateLockChangedSql()846 std::string CloudStorageUtils::GetUpdateLockChangedSql()
847 {
848     return " status = CASE WHEN status == 2 THEN 3 ELSE status END";
849 }
850 
GetDeleteLockChangedSql()851 std::string CloudStorageUtils::GetDeleteLockChangedSql()
852 {
853     return " status = CASE WHEN status == 2 or status == 3 THEN 1 ELSE status END";
854 }
855 
ChkFillCloudAssetParam(const CloudSyncBatch & data,int errCode)856 bool CloudStorageUtils::ChkFillCloudAssetParam(const CloudSyncBatch &data, int errCode)
857 {
858     if (data.assets.empty()) {
859         errCode = E_OK;
860         return true;
861     }
862     if (data.rowid.empty() || data.timestamp.empty()) {
863         errCode = -E_INVALID_ARGS;
864         LOGE("param is empty when fill cloud Asset. rowidN:%u, timeN:%u", errCode, data.rowid.size(),
865             data.timestamp.size());
866         return true;
867     }
868     if (data.assets.size() != data.rowid.size() || data.assets.size() != data.timestamp.size() ||
869         data.assets.size() != data.hashKey.size() || data.assets.size() != data.extend.size()) {
870         errCode = -E_INVALID_ARGS;
871         LOGE("the num of param is invalid when fill cloud Asset. assetsN:%u, rowidN:%u, timeN:%u, "
872              "hashKeyN:%u, extendN:%u", data.assets.size(), data.rowid.size(), data.timestamp.size(),
873              data.hashKey.size(), data.extend.size());
874         return true;
875     }
876     return false;
877 }
878 
GetToBeRemoveAssets(const VBucket & vBucket,const AssetOperationUtils::RecordAssetOpType & assetOpType,std::vector<Asset> & removeAssets)879 void CloudStorageUtils::GetToBeRemoveAssets(const VBucket &vBucket,
880     const AssetOperationUtils::RecordAssetOpType &assetOpType, std::vector<Asset> &removeAssets)
881 {
882     for (const auto &col: assetOpType) {
883         Type itItem;
884         bool isExisted = GetTypeCaseInsensitive(col.first, vBucket, itItem);
885         if (!isExisted) {
886             continue;
887         }
888         if (!CloudStorageUtils::IsAsset(itItem) && !CloudStorageUtils::IsAssets(itItem)) {
889             continue;
890         }
891         if (CloudStorageUtils::IsAsset(itItem)) {
892             Asset delAsset;
893             GetValueFromType(itItem, delAsset);
894             auto itOp = col.second.find(delAsset.name);
895             if (itOp != col.second.end() && itOp->second == AssetOperationUtils::AssetOpType::NOT_HANDLE
896                 && delAsset.flag != static_cast<uint32_t>(AssetOpType::DELETE)) {
897                 removeAssets.push_back(delAsset);
898             }
899             continue;
900         }
901         Assets assets;
902         GetValueFromType(itItem, assets);
903         for (const auto &asset: assets) {
904             auto itOp = col.second.find(asset.name);
905             if (itOp == col.second.end() || itOp->second == AssetOperationUtils::AssetOpType::HANDLE ||
906                 asset.flag == static_cast<uint32_t>(AssetOpType::DELETE)) {
907                 continue;
908             }
909             removeAssets.push_back(asset);
910         }
911     }
912 }
913 
FillAssetForUploadFailed(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)914 int CloudStorageUtils::FillAssetForUploadFailed(Asset &asset, Asset &dbAsset,
915     AssetOperationUtils::AssetOpType assetOpType)
916 {
917     dbAsset.assetId = asset.assetId;
918     dbAsset.status &= ~AssetStatus::UPLOADING;
919     return E_OK;
920 }
921 
FillAssetsForUploadFailed(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)922 void CloudStorageUtils::FillAssetsForUploadFailed(Assets &assets, Assets &dbAssets,
923     const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
924 {
925     MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetForUploadFailed);
926 }
927 
FillAssetAfterDownloadFail(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)928 int CloudStorageUtils::FillAssetAfterDownloadFail(Asset &asset, Asset &dbAsset,
929     AssetOperationUtils::AssetOpType assetOpType)
930 {
931     AssetStatus status = static_cast<AssetStatus>(asset.status);
932     if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
933         return E_OK;
934     }
935     if (status != AssetStatus::ABNORMAL) {
936         return FillAssetAfterDownload(asset, dbAsset, assetOpType);
937     }
938     dbAsset = asset;
939     return E_OK;
940 }
941 
FillAssetsAfterDownloadFail(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)942 void CloudStorageUtils::FillAssetsAfterDownloadFail(Assets &assets, Assets &dbAssets,
943     const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
944 {
945     MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetAfterDownloadFail);
946 }
947 
MergeAssetWithFillFunc(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap,std::function<int (Asset &,Asset &,AssetOperationUtils::AssetOpType)> fillAsset)948 void CloudStorageUtils::MergeAssetWithFillFunc(Assets &assets, Assets &dbAssets, const std::map<std::string,
949     AssetOperationUtils::AssetOpType> &assetOpTypeMap,
950     std::function<int(Asset &, Asset &, AssetOperationUtils::AssetOpType)> fillAsset)
951 {
952     std::map<std::string, size_t> indexMap = GenAssetsIndexMap(assets);
953     for (auto dbAsset = dbAssets.begin(); dbAsset != dbAssets.end();) {
954         Asset cacheAsset;
955         auto it = indexMap.find(dbAsset->name);
956         if (it != indexMap.end()) {
957             cacheAsset = assets[it->second];
958         }
959         AssetOperationUtils::AssetOpType opType = AssetOperationUtils::AssetOpType::NOT_HANDLE;
960         auto iterOp = assetOpTypeMap.find(dbAsset->name);
961         if (iterOp != assetOpTypeMap.end()) {
962             opType = iterOp->second;
963         }
964         if (fillAsset(cacheAsset, *dbAsset, opType) == -E_NOT_FOUND) {
965             dbAsset = dbAssets.erase(dbAsset);
966         } else {
967             dbAsset++;
968         }
969     }
970 }
971 
GetHashValueWithPrimaryKeyMap(const VBucket & vBucket,const TableSchema & tableSchema,const TableInfo & localTable,const std::map<std::string,Field> & pkMap,bool allowEmpty)972 std::pair<int, std::vector<uint8_t>> CloudStorageUtils::GetHashValueWithPrimaryKeyMap(const VBucket &vBucket,
973     const TableSchema &tableSchema, const TableInfo &localTable, const std::map<std::string, Field> &pkMap,
974     bool allowEmpty)
975 {
976     int errCode = E_OK;
977     std::vector<uint8_t> hashValue;
978     if (pkMap.size() == 0) {
979         LOGE("do not support get hashValue when primaryKey map is empty.");
980         return { -E_INTERNAL_ERROR, {} };
981     } else if (pkMap.size() == 1) {
982         std::vector<Field> pkVec = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema);
983         FieldInfoMap fieldInfos = localTable.GetFields();
984         if (fieldInfos.find(pkMap.begin()->first) == fieldInfos.end()) {
985             LOGE("localSchema doesn't contain primary key.");
986             return { -E_INTERNAL_ERROR, {} };
987         }
988         CollateType collateType = fieldInfos.at(pkMap.begin()->first).GetCollateType();
989         errCode = CloudStorageUtils::CalculateHashKeyForOneField(
990             pkVec.at(0), vBucket, allowEmpty, collateType, hashValue);
991     } else {
992         std::vector<uint8_t> tempRes;
993         for (const auto &item: pkMap) {
994             FieldInfoMap fieldInfos = localTable.GetFields();
995             if (fieldInfos.find(item.first) == fieldInfos.end()) {
996                 LOGE("localSchema doesn't contain primary key in multi pks.");
997                 return { -E_INTERNAL_ERROR, {} };
998             }
999             std::vector<uint8_t> temp;
1000             CollateType collateType = fieldInfos.at(item.first).GetCollateType();
1001             errCode = CloudStorageUtils::CalculateHashKeyForOneField(
1002                 item.second, vBucket, allowEmpty, collateType, temp);
1003             if (errCode != E_OK) {
1004                 LOGE("calc hash fail when there is more than one primary key. errCode = %d", errCode);
1005                 return { errCode, {} };
1006             }
1007             tempRes.insert(tempRes.end(), temp.begin(), temp.end());
1008         }
1009         errCode = DBCommon::CalcValueHash(tempRes, hashValue);
1010     }
1011     return { errCode, hashValue };
1012 }
1013 
CheckCloudSchemaFields(const TableSchema & tableSchema,const TableSchema & oldSchema)1014 bool CloudStorageUtils::CheckCloudSchemaFields(const TableSchema &tableSchema, const TableSchema &oldSchema)
1015 {
1016     if (tableSchema.name != oldSchema.name) {
1017         return true;
1018     }
1019     for (const auto &oldField : oldSchema.fields) {
1020         auto it = std::find_if(tableSchema.fields.begin(), tableSchema.fields.end(),
1021             [&oldField](const std::vector<Field>::value_type &field) {
1022                 return oldField == field;
1023             });
1024         if (it == tableSchema.fields.end()) {
1025             return false;
1026         }
1027     }
1028     return true;
1029 }
1030 
TransferFieldToLower(VBucket & vBucket)1031 void CloudStorageUtils::TransferFieldToLower(VBucket &vBucket)
1032 {
1033     for (auto it = vBucket.begin(); it != vBucket.end();) {
1034         std::string lowerField(it->first.length(), ' ');
1035         std::transform(it->first.begin(), it->first.end(), lowerField.begin(), ::tolower);
1036         if (lowerField != it->first) {
1037             vBucket[lowerField] = std::move(vBucket[it->first]);
1038             vBucket.erase(it++);
1039         } else {
1040             it++;
1041         }
1042     }
1043 }
1044 
GetTypeCaseInsensitive(const std::string & fieldName,const VBucket & vBucket,Type & data)1045 bool CloudStorageUtils::GetTypeCaseInsensitive(const std::string &fieldName, const VBucket &vBucket, Type &data)
1046 {
1047     auto tmpFieldName = fieldName;
1048     auto tmpVBucket = vBucket;
1049     std::transform(tmpFieldName.begin(), tmpFieldName.end(), tmpFieldName.begin(), ::tolower);
1050     TransferFieldToLower(tmpVBucket);
1051     auto it = tmpVBucket.find(tmpFieldName);
1052     if (it == tmpVBucket.end()) {
1053         return false;
1054     }
1055     data = it->second;
1056     return true;
1057 }
1058 
BindUpdateLogStmtFromVBucket(const VBucket & vBucket,const TableSchema & tableSchema,const std::vector<std::string> & colNames,sqlite3_stmt * updateLogStmt)1059 int CloudStorageUtils::BindUpdateLogStmtFromVBucket(const VBucket &vBucket, const TableSchema &tableSchema,
1060     const std::vector<std::string> &colNames, sqlite3_stmt *updateLogStmt)
1061 {
1062     int index = 0;
1063     int errCode = E_OK;
1064     for (const auto &colName : colNames) {
1065         index++;
1066         if (colName == CloudDbConstant::GID_FIELD) {
1067             if (vBucket.find(colName) == vBucket.end()) {
1068                 LOGE("cloud data doesn't contain gid field when bind update log stmt.");
1069                 return -E_CLOUD_ERROR;
1070             }
1071             errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index,
1072                 std::get<std::string>(vBucket.at(colName)));
1073         } else if (colName == CloudDbConstant::MODIFY_FIELD) {
1074             if (vBucket.find(colName) == vBucket.end()) {
1075                 LOGE("cloud data doesn't contain modify field when bind update log stmt.");
1076                 return -E_CLOUD_ERROR;
1077             }
1078             errCode = SQLiteUtils::BindInt64ToStatement(updateLogStmt, index, std::get<int64_t>(vBucket.at(colName)));
1079         } else if (colName == CloudDbConstant::VERSION_FIELD) {
1080             if (vBucket.find(colName) == vBucket.end()) {
1081                 errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index, std::string(""));
1082             } else {
1083                 errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index,
1084                     std::get<std::string>(vBucket.at(colName)));
1085             }
1086         } else if (colName == CloudDbConstant::SHARING_RESOURCE_FIELD) {
1087             if (vBucket.find(colName) == vBucket.end()) {
1088                 errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index, std::string(""));
1089             } else {
1090                 errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index,
1091                     std::get<std::string>(vBucket.at(colName)));
1092             }
1093         } else {
1094             LOGE("invalid col name when bind value to update log statement.");
1095             return -E_INTERNAL_ERROR;
1096         }
1097         if (errCode != E_OK) {
1098             LOGE("fail to bind value to update log statement.");
1099             return errCode;
1100         }
1101     }
1102     return E_OK;
1103 }
1104 
GetUpdateRecordFlagSql(UpdateRecordFlagStruct updateRecordFlag,const LogInfo & logInfo,const VBucket & uploadExtend,const CloudWaterType & type)1105 std::string CloudStorageUtils::GetUpdateRecordFlagSql(UpdateRecordFlagStruct updateRecordFlag, const LogInfo &logInfo,
1106     const VBucket &uploadExtend, const CloudWaterType &type)
1107 {
1108     std::string compensatedBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC));
1109     std::string inconsistencyBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY));
1110     bool gidEmpty = logInfo.cloudGid.empty();
1111     bool isDeleted = logInfo.dataKey == DBConstant::DEFAULT_ROW_ID;
1112     std::string sql = "UPDATE " + DBCommon::GetLogTableName(updateRecordFlag.tableName) +
1113         " SET flag = (CASE WHEN timestamp = ? THEN ";
1114     bool isNeedCompensated =
1115         updateRecordFlag.isRecordConflict || DBCommon::IsNeedCompensatedForUpload(uploadExtend, type);
1116     if (isNeedCompensated && !(isDeleted && gidEmpty)) {
1117         sql += "flag | " + compensatedBit + " ELSE flag | " + compensatedBit;
1118     } else {
1119         if (updateRecordFlag.isInconsistency) {
1120             sql += "flag & ~" + compensatedBit + " |" + inconsistencyBit + " ELSE flag & ~" + compensatedBit;
1121         } else {
1122             sql += "flag & ~" + compensatedBit + " & ~" + inconsistencyBit + " ELSE flag & ~" + compensatedBit;
1123         }
1124     }
1125     sql += " END), status = (CASE WHEN status == 2 THEN 3 WHEN (status == 1 AND timestamp = ?) THEN 0 ELSE status END)";
1126     if (DBCommon::IsCloudRecordNotFound(uploadExtend) &&
1127         (type == CloudWaterType::UPDATE || type == CloudWaterType::DELETE)) {
1128         sql += ", cloud_gid = '', version = '' ";
1129     }
1130     sql += " WHERE ";
1131     if (!gidEmpty) {
1132         sql += " cloud_gid = '" + logInfo.cloudGid + "'";
1133     }
1134     if (!isDeleted) {
1135         if (!gidEmpty) {
1136             sql += " OR ";
1137         }
1138         sql += " data_key = '" + std::to_string(logInfo.dataKey) + "'";
1139     }
1140     if (gidEmpty && isDeleted) {
1141         sql += " hash_key = ?";
1142     }
1143     sql += ";";
1144     return sql;
1145 }
1146 
GetUpdateRecordFlagSqlUpload(const std::string & tableName,bool recordConflict,const LogInfo & logInfo,const VBucket & uploadExtend,const CloudWaterType & type)1147 std::string CloudStorageUtils::GetUpdateRecordFlagSqlUpload(const std::string &tableName, bool recordConflict,
1148     const LogInfo &logInfo, const VBucket &uploadExtend, const CloudWaterType &type)
1149 {
1150     std::string compensatedBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC));
1151     std::string inconsistencyBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY));
1152     bool gidEmpty = logInfo.cloudGid.empty();
1153     bool isDeleted = logInfo.dataKey == DBConstant::DEFAULT_ROW_ID;
1154     std::string sql;
1155     bool isNeedCompensated = recordConflict || DBCommon::IsNeedCompensatedForUpload(uploadExtend, type);
1156     if (isNeedCompensated && !(isDeleted && gidEmpty)) {
1157         sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag = (CASE WHEN timestamp = ? OR " +
1158             "flag & 0x01 = 0 THEN flag | " + compensatedBit + " ELSE flag";
1159     } else {
1160         sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag = (CASE WHEN timestamp = ? THEN " +
1161             "flag & ~" + compensatedBit + " & ~" + inconsistencyBit + " ELSE flag & ~" + compensatedBit;
1162     }
1163     sql += " END), status = (CASE WHEN status == 2 THEN 3 WHEN (status == 1 AND timestamp = ?) THEN 0 ELSE status END)";
1164     if (DBCommon::IsCloudRecordNotFound(uploadExtend) &&
1165         (type == CloudWaterType::UPDATE || type == CloudWaterType::DELETE)) {
1166         sql += ", cloud_gid = '', version = '' ";
1167     }
1168     sql += " WHERE ";
1169     if (!gidEmpty) {
1170         sql += " cloud_gid = '" + logInfo.cloudGid + "'";
1171     }
1172     if (!isDeleted) {
1173         if (!gidEmpty) {
1174             sql += " OR ";
1175         }
1176         sql += " data_key = '" + std::to_string(logInfo.dataKey) + "'";
1177     }
1178     if (gidEmpty && isDeleted) {
1179         sql += " hash_key = ?";
1180     }
1181     sql += ";";
1182     return sql;
1183 }
1184 
GetUpdateUploadFinishedSql(const std::string & tableName,bool isExistAssetsDownload)1185 std::string CloudStorageUtils::GetUpdateUploadFinishedSql(const std::string &tableName, bool isExistAssetsDownload)
1186 {
1187     std::string finishedBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_UPLOAD_FINISHED));
1188     std::string compensatedBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC));
1189     // When the data flag is not in the compensation state and the local data does not change, the upload is finished.
1190     if (isExistAssetsDownload) {
1191         return "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag = (CASE WHEN timestamp = ? AND flag & " +
1192             compensatedBit + " != " + compensatedBit + " THEN flag | " + finishedBit +
1193             " ELSE flag END) WHERE hash_key=?";
1194     }
1195     // Clear IsWaitAssetDownload flag when no asset is to be downloaded
1196     return "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag = ((CASE WHEN timestamp = ? AND flag & " +
1197         compensatedBit + " != " + compensatedBit + " THEN flag | " + finishedBit +
1198         " ELSE flag END) & ~0x1000) WHERE hash_key=?";
1199 }
1200 
BindStepConsistentFlagStmt(sqlite3_stmt * stmt,const VBucket & data,const std::set<std::string> & gidFilters)1201 int CloudStorageUtils::BindStepConsistentFlagStmt(sqlite3_stmt *stmt, const VBucket &data,
1202     const std::set<std::string> &gidFilters)
1203 {
1204     std::string gidStr;
1205     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, data, gidStr);
1206     if (errCode != E_OK || gidStr.empty()) {
1207         LOGE("Get gid from bucket fail when mark flag as consistent, errCode = %d", errCode);
1208         return errCode;
1209     }
1210     if (gidStr.empty()) {
1211         LOGE("Get empty gid from bucket when mark flag as consistent.");
1212         return -E_CLOUD_ERROR;
1213     }
1214     // this data has not yet downloaded asset, skipping
1215     if (gidFilters.find(gidStr) != gidFilters.end()) {
1216         return E_OK;
1217     }
1218     errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gidStr); // 1 is cloud_gid
1219     if (errCode != E_OK) {
1220         LOGE("Bind cloud_gid to mark flag as consistent stmt failed, %d", errCode);
1221         return errCode;
1222     }
1223     int64_t modifyTime;
1224     errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::MODIFY_FIELD, data, modifyTime);
1225     if (errCode != E_OK) {
1226         LOGE("Get modify time from bucket fail when mark flag as consistent, errCode = %d", errCode);
1227         return errCode;
1228     }
1229     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, modifyTime); // 2 is timestamp
1230     if (errCode != E_OK) {
1231         LOGE("Bind modify time to mark flag as consistent stmt failed, %d", errCode);
1232         return errCode;
1233     }
1234     errCode = SQLiteUtils::StepWithRetry(stmt);
1235     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1236         errCode = E_OK;
1237     } else {
1238         LOGE("[Storage Executor]Step mark flag as consistent stmt failed, %d", errCode);
1239     }
1240     return errCode;
1241 }
1242 
IsCloudGidMismatch(const std::string & downloadGid,const std::string & curGid)1243 bool CloudStorageUtils::IsCloudGidMismatch(const std::string &downloadGid, const std::string &curGid)
1244 {
1245     return !downloadGid.empty() && !curGid.empty() && downloadGid != curGid;
1246 }
1247 
IsGetCloudDataContinue(uint32_t curNum,uint32_t curSize,uint32_t maxSize,uint32_t maxCount)1248 bool CloudStorageUtils::IsGetCloudDataContinue(uint32_t curNum, uint32_t curSize, uint32_t maxSize, uint32_t maxCount)
1249 {
1250     if (curNum == 0) {
1251         return true;
1252     }
1253     if (curSize < maxSize && curNum < maxCount) {
1254         return true;
1255     }
1256     return false;
1257 }
1258 
IdentifyCloudType(const CloudUploadRecorder & recorder,CloudSyncData & cloudSyncData,VBucket & data,VBucket & log,VBucket & flags)1259 int CloudStorageUtils::IdentifyCloudType(const CloudUploadRecorder &recorder, CloudSyncData &cloudSyncData,
1260     VBucket &data, VBucket &log, VBucket &flags)
1261 {
1262     Bytes *hashKey = std::get_if<Bytes>(&flags[CloudDbConstant::HASH_KEY]);
1263     int64_t *timeStamp = std::get_if<int64_t>(&flags[CloudDbConstant::TIMESTAMP]);
1264     if (timeStamp == nullptr || hashKey == nullptr) {
1265         return -E_INVALID_DATA;
1266     }
1267     if (recorder.IsIgnoreUploadRecord(cloudSyncData.tableName, *hashKey, cloudSyncData.mode, *timeStamp)) {
1268         cloudSyncData.ignoredCount++;
1269         return -E_IGNORE_DATA;
1270     }
1271     return IdentifyCloudTypeInner(cloudSyncData, data, log, flags);
1272 }
1273 
IdentifyCloudTypeInner(CloudSyncData & cloudSyncData,VBucket & data,VBucket & log,VBucket & flags)1274 int CloudStorageUtils::IdentifyCloudTypeInner(CloudSyncData &cloudSyncData, VBucket &data, VBucket &log, VBucket &flags)
1275 {
1276     int64_t *rowid = std::get_if<int64_t>(&flags[DBConstant::ROWID]);
1277     int64_t *flag = std::get_if<int64_t>(&flags[CloudDbConstant::FLAG]);
1278     int64_t *timeStamp = std::get_if<int64_t>(&flags[CloudDbConstant::TIMESTAMP]);
1279     Bytes *hashKey = std::get_if<Bytes>(&flags[CloudDbConstant::HASH_KEY]);
1280     int64_t *status = std::get_if<int64_t>(&flags[CloudDbConstant::STATUS]);
1281     if (rowid == nullptr || flag == nullptr || timeStamp == nullptr || hashKey == nullptr) {
1282         return -E_INVALID_DATA;
1283     }
1284     bool isDelete = ((static_cast<uint64_t>(*flag) & DataItem::DELETE_FLAG) != 0);
1285     bool isInsert = (!isDelete) && (log.find(CloudDbConstant::GID_FIELD) == log.end());
1286     if (status != nullptr && !isInsert && (CloudStorageUtils::IsDataLocked(*status))) {
1287         cloudSyncData.ignoredCount++;
1288         cloudSyncData.lockData.extend.push_back(log);
1289         cloudSyncData.lockData.hashKey.push_back(*hashKey);
1290         cloudSyncData.lockData.timestamp.push_back(*timeStamp);
1291         cloudSyncData.lockData.rowid.push_back(*rowid);
1292         return -E_IGNORE_DATA;
1293     }
1294     if (isDelete) {
1295         cloudSyncData.delData.record.push_back(data);
1296         cloudSyncData.delData.extend.push_back(log);
1297         cloudSyncData.delData.hashKey.push_back(*hashKey);
1298         cloudSyncData.delData.timestamp.push_back(*timeStamp);
1299         cloudSyncData.delData.rowid.push_back(*rowid);
1300     } else {
1301         int errCode = CheckAbnormalData(cloudSyncData, data, isInsert);
1302         if (errCode != E_OK) {
1303             return errCode;
1304         }
1305         CloudSyncBatch &opData = isInsert ? cloudSyncData.insData : cloudSyncData.updData;
1306         opData.record.push_back(data);
1307         opData.rowid.push_back(*rowid);
1308         VBucket asset;
1309         CloudStorageUtils::ObtainAssetFromVBucket(data, asset);
1310         opData.timestamp.push_back(*timeStamp);
1311         opData.assets.push_back(asset);
1312         if (isInsert) {
1313             log[CloudDbConstant::HASH_KEY_FIELD] = DBCommon::VectorToHexString(*hashKey);
1314         }
1315         opData.extend.push_back(log);
1316         opData.hashKey.push_back(*hashKey);
1317     }
1318     return E_OK;
1319 }
1320 
CheckAbnormalData(CloudSyncData & cloudSyncData,VBucket & data,bool isInsert)1321 int CloudStorageUtils::CheckAbnormalData(CloudSyncData &cloudSyncData, VBucket &data, bool isInsert)
1322 {
1323     if (data.empty()) {
1324         LOGE("The cloud data is empty, isInsert:%d", static_cast<int>(isInsert));
1325         return -E_INVALID_DATA;
1326     }
1327     bool isDataAbnormal = false;
1328     for (auto &item : data) {
1329         const Asset *asset = std::get_if<TYPE_INDEX<Asset>>(&item.second);
1330         if (asset != nullptr) {
1331             if (asset->status == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
1332                 (asset->status & static_cast<uint32_t>(AssetStatus::DOWNLOAD_WITH_NULL)) != 0) {
1333                 isDataAbnormal = true;
1334                 break;
1335             }
1336             continue;
1337         }
1338         auto *assets = std::get_if<TYPE_INDEX<Assets>>(&item.second);
1339         if (assets == nullptr) {
1340             continue;
1341         }
1342         for (auto it = assets->begin(); it != assets->end();) {
1343             const auto &oneAsset = *it;
1344             if (oneAsset.status == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
1345                 oneAsset.status == static_cast<uint32_t>(AssetStatus::DOWNLOADING) ||
1346                 (oneAsset.status & static_cast<uint32_t>(AssetStatus::DOWNLOAD_WITH_NULL)) != 0) {
1347                 isDataAbnormal = true;
1348                 it = assets->erase(it);
1349             } else {
1350                 ++it;
1351             }
1352         }
1353     }
1354     if (isDataAbnormal) {
1355         std::string gid;
1356         (void)GetValueFromVBucket(CloudDbConstant::GID_FIELD, data, gid);
1357         LOGW("This data is abnormal, upload it, isInsert:%d, gid:%s", static_cast<int>(isInsert),
1358             gid.c_str());
1359     }
1360     return E_OK;
1361 }
1362 
GetDataItemFromCloudData(VBucket & data)1363 std::pair<int, DataItem> CloudStorageUtils::GetDataItemFromCloudData(VBucket &data)
1364 {
1365     std::pair<int, DataItem> res;
1366     auto &[errCode, dataItem] = res;
1367     GetBytesFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_KEY, data, dataItem.key);
1368     GetBytesFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, data, dataItem.value);
1369     GetStringFromCloudData(CloudDbConstant::GID_FIELD, data, dataItem.gid);
1370     GetStringFromCloudData(CloudDbConstant::VERSION_FIELD, data, dataItem.version);
1371     GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, data, dataItem.dev);
1372     GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE, data, dataItem.origDev);
1373     dataItem.flag = static_cast<uint64_t>(LogInfoFlag::FLAG_CLOUD_WRITE);
1374     GetUInt64FromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME, data, dataItem.writeTimestamp);
1375     GetUInt64FromCloudData(CloudDbConstant::MODIFY_FIELD, data, dataItem.modifyTime);
1376     errCode = GetUInt64FromCloudData(CloudDbConstant::CREATE_FIELD, data, dataItem.createTime);
1377     bool isSystemRecord = IsSystemRecord(dataItem.key);
1378     if (isSystemRecord) {
1379         dataItem.hashKey = dataItem.key;
1380         dataItem.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_SYSTEM_RECORD);
1381     } else if (!dataItem.key.empty()) {
1382         (void)DBCommon::CalcValueHash(dataItem.key, dataItem.hashKey);
1383     }
1384     return res;
1385 }
1386 
GetDataItemFromCloudVersionData(VBucket & data)1387 std::pair<int, DataItem> CloudStorageUtils::GetDataItemFromCloudVersionData(VBucket &data)
1388 {
1389     std::pair<int, DataItem> res;
1390     auto &[errCode, dataItem] = res;
1391     GetBytesFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_KEY, data, dataItem.key);
1392     GetBytesFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, data, dataItem.value);
1393     errCode = GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, data, dataItem.dev);
1394     return res;
1395 }
1396 
GetBytesFromCloudData(const std::string & field,VBucket & data,Bytes & bytes)1397 int CloudStorageUtils::GetBytesFromCloudData(const std::string &field, VBucket &data, Bytes &bytes)
1398 {
1399     std::string blobStr;
1400     int errCode = GetValueFromVBucket(field, data, blobStr);
1401     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1402         LOGE("[CloudStorageUtils] Get %.3s failed %d", field.c_str(), errCode);
1403         return errCode;
1404     }
1405     DBCommon::StringToVector(blobStr, bytes);
1406     return errCode;
1407 }
1408 
GetStringFromCloudData(const std::string & field,VBucket & data,std::string & str)1409 int CloudStorageUtils::GetStringFromCloudData(const std::string &field, VBucket &data, std::string &str)
1410 {
1411     int errCode = GetValueFromVBucket(field, data, str);
1412     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1413         LOGE("[CloudStorageUtils] Get %.3s failed %d", field.c_str(), errCode);
1414         return errCode;
1415     }
1416     return errCode;
1417 }
1418 
GetUInt64FromCloudData(const std::string & field,VBucket & data,uint64_t & number)1419 int CloudStorageUtils::GetUInt64FromCloudData(const std::string &field, VBucket &data, uint64_t &number)
1420 {
1421     int64_t intNum;
1422     int errCode = GetValueFromVBucket(field, data, intNum);
1423     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1424         LOGE("[CloudStorageUtils] Get %.3s failed %d", field.c_str(), errCode);
1425         return errCode;
1426     }
1427     number = static_cast<uint64_t>(intNum);
1428     return errCode;
1429 }
1430 
AddUpdateColForShare(const TableSchema & tableSchema,std::string & updateLogSql,std::vector<std::string> & updateColName)1431 void CloudStorageUtils::AddUpdateColForShare(const TableSchema &tableSchema, std::string &updateLogSql,
1432     std::vector<std::string> &updateColName)
1433 {
1434     updateLogSql += ", version = ?";
1435     updateColName.push_back(CloudDbConstant::VERSION_FIELD);
1436     updateLogSql += ", sharing_resource = ?";
1437     updateColName.push_back(CloudDbConstant::SHARING_RESOURCE_FIELD);
1438 }
1439 
IsDataLocked(uint32_t status)1440 bool CloudStorageUtils::IsDataLocked(uint32_t status)
1441 {
1442     return status == static_cast<uint32_t>(LockStatus::LOCK) ||
1443         status == static_cast<uint32_t>(LockStatus::LOCK_CHANGE);
1444 }
1445 
GetSystemRecordFromCloudData(VBucket & data)1446 std::pair<int, DataItem> CloudStorageUtils::GetSystemRecordFromCloudData(VBucket &data)
1447 {
1448     auto res = CloudStorageUtils::GetDataItemFromCloudData(data); // only record first one
1449     auto &[errCode, dataItem] = res;
1450     if (errCode != E_OK) {
1451         LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", errCode);
1452         return res;
1453     }
1454     dataItem.dev = "";
1455     dataItem.origDev = "";
1456     return res;
1457 }
1458 
IsSystemRecord(const Key & key)1459 bool CloudStorageUtils::IsSystemRecord(const Key &key)
1460 {
1461     std::string prefixKey = CloudDbConstant::CLOUD_VERSION_RECORD_PREFIX_KEY;
1462     if (key.size() < prefixKey.size()) {
1463         return false;
1464     }
1465     std::string keyStr(key.begin(), key.end());
1466     return keyStr.find(prefixKey) == 0;
1467 }
1468 
GetSelectIncCursorSql(const std::string & tableName)1469 std::string CloudStorageUtils::GetSelectIncCursorSql(const std::string &tableName)
1470 {
1471     return "(SELECT value FROM " + DBCommon::GetMetaTableName() + " WHERE key=x'" +
1472         DBCommon::TransferStringToHex(DBCommon::GetCursorKey(tableName)) + "')";
1473 }
1474 
GetCursorIncSql(const std::string & tableName)1475 std::string CloudStorageUtils::GetCursorIncSql(const std::string &tableName)
1476 {
1477     return "UPDATE " + DBCommon::GetMetaTableName() + " SET value=value+1 WHERE key=x'" +
1478         DBCommon::TransferStringToHex(DBCommon::GetCursorKey(tableName)) + "';";
1479 }
1480 
GetCursorIncSqlWhenAllow(const std::string & tableName)1481 std::string CloudStorageUtils::GetCursorIncSqlWhenAllow(const std::string &tableName)
1482 {
1483     std::string prefix = DBConstant::RELATIONAL_PREFIX;
1484     return "UPDATE " + prefix + "metadata" + " SET value= case when (select 1 from " +
1485         prefix + "metadata" + " where key='cursor_inc_flag' AND value = 'true') then value + 1" +
1486         " else value end WHERE key=x'" + DBCommon::TransferStringToHex(DBCommon::GetCursorKey(tableName)) + "';";
1487 }
1488 
GetCursorUpgradeSql(const std::string & tableName)1489 std::string CloudStorageUtils::GetCursorUpgradeSql(const std::string &tableName)
1490 {
1491     return "INSERT OR REPLACE INTO " + DBCommon::GetMetaTableName() + "(key,value) VALUES (x'" +
1492         DBCommon::TransferStringToHex(DBCommon::GetCursorKey(tableName)) + "', (SELECT CASE WHEN MAX(cursor) IS" +
1493         " NULL THEN 0 ELSE MAX(cursor) END FROM " + DBCommon::GetLogTableName(tableName) + "));";
1494 }
1495 
FillQueryByPK(const std::string & tableName,bool isKv,std::map<std::string,size_t> dataIndex,std::vector<std::map<std::string,std::vector<Type>>> & syncPkVec,std::vector<QuerySyncObject> & syncQuery)1496 int CloudStorageUtils::FillQueryByPK(const std::string &tableName, bool isKv, std::map<std::string, size_t> dataIndex,
1497     std::vector<std::map<std::string, std::vector<Type>>> &syncPkVec, std::vector<QuerySyncObject> &syncQuery)
1498 {
1499     for (const auto &syncPk : syncPkVec) {
1500         Query query = Query::Select().From(tableName);
1501         if (isKv) {
1502             QueryUtils::FillQueryInKeys(syncPk, dataIndex, query);
1503         } else {
1504             for (const auto &[col, pkList] : syncPk) {
1505                 QueryUtils::FillQueryIn(col, pkList, dataIndex[col], query);
1506             }
1507         }
1508         auto objectList = QuerySyncObject::GetQuerySyncObject(query);
1509         if (objectList.size() != 1u) { // only support one QueryExpression
1510             return -E_INTERNAL_ERROR;
1511         }
1512         syncQuery.push_back(objectList[0]);
1513     }
1514     return E_OK;
1515 }
1516 
PutSyncPkVec(const std::string & col,std::map<std::string,std::vector<Type>> & syncPk,std::vector<std::map<std::string,std::vector<Type>>> & syncPkVec)1517 void CloudStorageUtils::PutSyncPkVec(const std::string &col, std::map<std::string, std::vector<Type>> &syncPk,
1518     std::vector<std::map<std::string, std::vector<Type>>> &syncPkVec)
1519 {
1520     if (syncPk[col].size() >= CloudDbConstant::MAX_CONDITIONS_SIZE) {
1521         syncPkVec.push_back(syncPk);
1522         syncPk[col].clear();
1523     }
1524 }
1525 
GetSyncQueryByPk(const std::string & tableName,const std::vector<VBucket> & data,bool isKv,std::vector<QuerySyncObject> & syncQuery)1526 int CloudStorageUtils::GetSyncQueryByPk(const std::string &tableName, const std::vector<VBucket> &data, bool isKv,
1527     std::vector<QuerySyncObject> &syncQuery)
1528 {
1529     std::map<std::string, size_t> dataIndex;
1530     std::vector<std::map<std::string, std::vector<Type>>> syncPkVec;
1531     std::map<std::string, std::vector<Type>> syncPk;
1532     int ignoreCount = 0;
1533     for (const auto &oneRow : data) {
1534         if (oneRow.size() >= 2u) { // mean this data has more than 2 pk
1535             LOGW("compensated sync does not support composite PK, oneRow size: %zu, tableName: %s",
1536                 oneRow.size(), DBCommon::StringMiddleMasking(tableName).c_str());
1537             return -E_NOT_SUPPORT;
1538         }
1539         for (const auto &[col, value] : oneRow) {
1540             bool isFind = dataIndex.find(col) != dataIndex.end();
1541             if (!isFind && value.index() == TYPE_INDEX<Nil>) {
1542                 ignoreCount++;
1543                 continue;
1544             }
1545             if (!isFind && value.index() != TYPE_INDEX<Nil>) {
1546                 dataIndex[col] = value.index();
1547                 syncPk[col].push_back(value);
1548                 PutSyncPkVec(col, syncPk, syncPkVec);
1549                 continue;
1550             }
1551             if (isFind && dataIndex[col] != value.index()) {
1552                 ignoreCount++;
1553                 continue;
1554             }
1555             syncPk[col].push_back(value);
1556             PutSyncPkVec(col, syncPk, syncPkVec);
1557         }
1558     }
1559     syncPkVec.push_back(syncPk);
1560     LOGI("match %zu data for compensated sync, ignore %d", data.size(), ignoreCount);
1561     return FillQueryByPK(tableName, isKv, dataIndex, syncPkVec, syncQuery);
1562 }
1563 
IsAssetsContainDownloadRecord(const VBucket & dbAssets)1564 bool CloudStorageUtils::IsAssetsContainDownloadRecord(const VBucket &dbAssets)
1565 {
1566     for (const auto &item: dbAssets) {
1567         if (IsAsset(item.second)) {
1568             const auto &asset = std::get<Asset>(item.second);
1569             if (AssetOperationUtils::IsAssetNeedDownload(asset)) {
1570                 return true;
1571             }
1572         } else if (IsAssets(item.second)) {
1573             const auto &assets = std::get<Assets>(item.second);
1574             if (AssetOperationUtils::IsAssetsNeedDownload(assets)) {
1575                 return true;
1576             }
1577         }
1578     }
1579     return false;
1580 }
1581 
UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor * handle,const CloudSyncParam & param,const CloudSyncBatch & updateData,CloudUploadRecorder & recorder,bool isLock)1582 int CloudStorageUtils::UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor *handle,
1583     const CloudSyncParam &param, const CloudSyncBatch &updateData, CloudUploadRecorder &recorder, bool isLock)
1584 {
1585     if (updateData.timestamp.size() != updateData.extend.size()) {
1586         LOGE("the num of extend:%zu and timestamp:%zu is not equal.",
1587             updateData.extend.size(), updateData.timestamp.size());
1588         return -E_INVALID_ARGS;
1589     }
1590     for (size_t i = 0; i < updateData.extend.size(); ++i) {
1591         const auto &record = updateData.extend[i];
1592         if (DBCommon::IsRecordError(record) || DBCommon::IsRecordAssetsMissing(record) ||
1593             DBCommon::IsRecordVersionConflict(record) || isLock) {
1594             if (DBCommon::IsRecordAssetsMissing(record)) {
1595                 LOGI("[CloudStorageUtils][UpdateRecordFlagAfterUpload] Record assets missing, skip update.");
1596             }
1597             int errCode = handle->UpdateRecordStatus(param.tableName, CloudDbConstant::TO_LOCAL_CHANGE,
1598                 updateData.hashKey[i]);
1599             if (errCode != E_OK) {
1600                 LOGE("[CloudStorageUtils] Update record status failed in index %zu", i);
1601                 return errCode;
1602             }
1603             continue;
1604         }
1605         const auto &rowId = updateData.rowid[i];
1606         std::string cloudGid;
1607         (void)CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, record, cloudGid);
1608         LogInfo logInfo;
1609         logInfo.cloudGid = cloudGid;
1610         logInfo.timestamp = updateData.timestamp[i];
1611         logInfo.dataKey = rowId;
1612         logInfo.hashKey = updateData.hashKey[i];
1613         std::string sql = CloudStorageUtils::GetUpdateRecordFlagSqlUpload(
1614             param.tableName, DBCommon::IsRecordIgnored(record), logInfo, record, param.type);
1615         int errCode = handle->UpdateRecordFlag(param.tableName, sql, logInfo);
1616         if (errCode != E_OK) {
1617             LOGE("[CloudStorageUtils] Update record flag failed in index %zu", i);
1618             return errCode;
1619         }
1620 
1621         std::vector<VBucket> assets;
1622         errCode = handle->GetDownloadAssetRecordsByGid(param.tableSchema, logInfo.cloudGid, assets);
1623         if (errCode != E_OK) {
1624             LOGE("[RDBExecutor]Get downloading assets records by gid failed: %d", errCode);
1625             return errCode;
1626         }
1627         handle->MarkFlagAsUploadFinished(param.tableName, updateData.hashKey[i], updateData.timestamp[i],
1628             !assets.empty());
1629         recorder.RecordUploadRecord(param.tableName, logInfo.hashKey, param.type, updateData.timestamp[i]);
1630     }
1631     return E_OK;
1632 }
1633 
FillCloudQueryToExtend(QuerySyncObject & obj,VBucket & extend)1634 int CloudStorageUtils::FillCloudQueryToExtend(QuerySyncObject &obj, VBucket &extend)
1635 {
1636     Bytes bytes;
1637     bytes.resize(obj.CalculateParcelLen(SOFTWARE_VERSION_CURRENT));
1638     Parcel parcel(bytes.data(), bytes.size());
1639     int errCode = obj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
1640     if (errCode != E_OK) {
1641         LOGE("[CloudStorageUtils] Query serialize failed %d", errCode);
1642         return errCode;
1643     }
1644     extend[CloudDbConstant::TYPE_FIELD] = static_cast<int64_t>(CloudQueryType::QUERY_FIELD);
1645     extend[CloudDbConstant::QUERY_FIELD] = bytes;
1646     return E_OK;
1647 }
1648 
SaveChangedDataByType(const DataValue & dataValue,Type & value)1649 void CloudStorageUtils::SaveChangedDataByType(const DataValue &dataValue, Type &value)
1650 {
1651     int ret = E_OK;
1652     switch (dataValue.GetType()) {
1653         case StorageType::STORAGE_TYPE_TEXT:
1654             {
1655                 std::string sValue;
1656                 ret = dataValue.GetText(sValue);
1657                 if (ret != E_OK) {
1658                     LOGE("[CloudStorageUtils] save changed string data failed %d", ret);
1659                     return;
1660                 }
1661                 value = sValue;
1662             } break;
1663         case StorageType::STORAGE_TYPE_BLOB:
1664             {
1665                 Blob blob;
1666                 (void)dataValue.GetBlob(blob);
1667                 if (blob.GetSize() == 0u) {
1668                     LOGE("[CloudStorageUtils] save changed Blob data failed");
1669                     return;
1670                 }
1671                 value = std::vector<uint8_t>(blob.GetData(), blob.GetData() + blob.GetSize());
1672             } break;
1673         case StorageType::STORAGE_TYPE_INTEGER:
1674             {
1675                 int64_t iValue;
1676                 ret = dataValue.GetInt64(iValue);
1677                 if (ret != E_OK) {
1678                     LOGE("[CloudStorageUtils] save changed int64 data failed %d", ret);
1679                     return;
1680                 }
1681                 value = iValue;
1682             } break;
1683         case StorageType::STORAGE_TYPE_REAL:
1684             {
1685                 double dValue;
1686                 ret = dataValue.GetDouble(dValue);
1687                 if (ret != E_OK) {
1688                     LOGE("[CloudStorageUtils] save changed double data failed %d", ret);
1689                     return;
1690                 }
1691                 value = dValue;
1692             } break;
1693         default:
1694             LOGE("[CloudStorageUtils] save changed failed, wrong storage type :%" PRIu32, dataValue.GetType());
1695             return;
1696     }
1697 }
1698 }
1699