• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "cloud/cloud_storage_utils.h"
17 #include <set>
18 
19 #include "cloud/asset_operation_utils.h"
20 #include "cloud/cloud_db_types.h"
21 #include "db_common.h"
22 #include "runtime_context.h"
23 
24 namespace DistributedDB {
BindInt64(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)25 int CloudStorageUtils::BindInt64(int index, const VBucket &vBucket, const Field &field,
26     sqlite3_stmt *upsertStmt)
27 {
28     int64_t val = 0;
29     int errCode = GetValueFromVBucket<int64_t>(field.colName, vBucket, val);
30     if (field.nullable && errCode == -E_NOT_FOUND) {
31         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
32     } else {
33         if (errCode != E_OK) {
34             LOGE("get int from vbucket failed, %d", errCode);
35             return -E_CLOUD_ERROR;
36         }
37         errCode = SQLiteUtils::BindInt64ToStatement(upsertStmt, index, val);
38     }
39 
40     if (errCode != E_OK) {
41         LOGE("Bind int to insert statement failed, %d", errCode);
42     }
43     return errCode;
44 }
45 
BindBool(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)46 int CloudStorageUtils::BindBool(int index, const VBucket &vBucket, const Field &field,
47     sqlite3_stmt *upsertStmt)
48 {
49     bool val = false;
50     int errCode = GetValueFromVBucket<bool>(field.colName, vBucket, val);
51     if (field.nullable && errCode == -E_NOT_FOUND) {
52         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
53     } else {
54         if (errCode != E_OK) {
55             LOGE("get bool from vbucket failed, %d", errCode);
56             return -E_CLOUD_ERROR;
57         }
58         errCode = SQLiteUtils::BindInt64ToStatement(upsertStmt, index, val);
59     }
60 
61     if (errCode != E_OK) {
62         LOGE("Bind bool to insert statement failed, %d", errCode);
63     }
64     return errCode;
65 }
66 
BindDouble(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)67 int CloudStorageUtils::BindDouble(int index, const VBucket &vBucket, const Field &field,
68     sqlite3_stmt *upsertStmt)
69 {
70     double val = 0.0;
71     int errCode = GetValueFromVBucket<double>(field.colName, vBucket, val);
72     if (field.nullable && errCode == -E_NOT_FOUND) {
73         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
74     } else {
75         if (errCode != E_OK) {
76             LOGE("get double from vbucket failed, %d", errCode);
77             return -E_CLOUD_ERROR;
78         }
79         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_double(upsertStmt, index, val));
80     }
81 
82     if (errCode != E_OK) {
83         LOGE("Bind double to insert statement failed, %d", errCode);
84     }
85     return errCode;
86 }
87 
BindText(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)88 int CloudStorageUtils::BindText(int index, const VBucket &vBucket, const Field &field,
89     sqlite3_stmt *upsertStmt)
90 {
91     std::string str;
92     int errCode = GetValueFromVBucket<std::string>(field.colName, vBucket, str);
93     if (field.nullable && errCode == -E_NOT_FOUND) {
94         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
95     } else {
96         if (errCode != E_OK) {
97             LOGE("get string from vbucket failed, %d", errCode);
98             return -E_CLOUD_ERROR;
99         }
100         errCode = SQLiteUtils::BindTextToStatement(upsertStmt, index, str);
101     }
102 
103     if (errCode != E_OK) {
104         LOGE("Bind string to insert statement failed, %d", errCode);
105     }
106     return errCode;
107 }
108 
BindBlob(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)109 int CloudStorageUtils::BindBlob(int index, const VBucket &vBucket, const Field &field,
110     sqlite3_stmt *upsertStmt)
111 {
112     int errCode = E_OK;
113     Bytes val;
114     if (field.type == TYPE_INDEX<Bytes>) {
115         errCode = GetValueFromVBucket<Bytes>(field.colName, vBucket, val);
116         if (!(IsFieldValid(field, errCode))) {
117             goto ERROR;
118         }
119     } else if (field.type == TYPE_INDEX<Asset>) {
120         Asset asset;
121         errCode = GetValueFromVBucket(field.colName, vBucket, asset);
122         if (!(IsFieldValid(field, errCode))) {
123             goto ERROR;
124         }
125         RuntimeContext::GetInstance()->AssetToBlob(asset, val);
126     } else {
127         Assets assets;
128         errCode = GetValueFromVBucket(field.colName, vBucket, assets);
129         if (!(IsFieldValid(field, errCode))) {
130             goto ERROR;
131         }
132         RuntimeContext::GetInstance()->AssetsToBlob(assets, val);
133     }
134 
135     if (errCode == -E_NOT_FOUND) {
136         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
137     } else {
138         errCode = SQLiteUtils::BindBlobToStatement(upsertStmt, index, val);
139     }
140     if (errCode != E_OK) {
141         LOGE("Bind blob to insert statement failed, %d", errCode);
142     }
143     return errCode;
144 ERROR:
145     LOGE("get blob from vbucket failed, %d", errCode);
146     return -E_CLOUD_ERROR;
147 }
148 
BindAsset(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)149 int CloudStorageUtils::BindAsset(int index, const VBucket &vBucket, const Field &field, sqlite3_stmt *upsertStmt)
150 {
151     int errCode;
152     Bytes val;
153     Type entry;
154     bool isExisted = GetTypeCaseInsensitive(field.colName, vBucket, entry);
155     if (!isExisted || entry.index() == TYPE_INDEX<Nil>) {
156         if (!field.nullable) {
157             LOGE("field value is not allowed to be null, %d", -E_CLOUD_ERROR);
158             return -E_CLOUD_ERROR;
159         }
160         return SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
161     }
162 
163     Type type = entry;
164     if (field.type == TYPE_INDEX<Asset>) {
165         Asset asset;
166         errCode = GetValueFromOneField(type, asset);
167         if (errCode != E_OK) {
168             LOGE("can not get asset from vBucket when bind, %d", errCode);
169             return errCode;
170         }
171         asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
172         errCode = RuntimeContext::GetInstance()->AssetToBlob(asset, val);
173     } else if (field.type == TYPE_INDEX<Assets>) {
174         Assets assets;
175         errCode = GetValueFromOneField(type, assets);
176         if (errCode != E_OK) {
177             LOGE("can not get assets from vBucket when bind, %d", errCode);
178             return errCode;
179         }
180         if (!assets.empty()) {
181             for (auto &asset: assets) {
182                 asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
183             }
184             errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, val);
185         }
186     } else {
187         LOGE("field type is not asset or assets, %d", -E_CLOUD_ERROR);
188         return -E_CLOUD_ERROR;
189     }
190     if (errCode != E_OK) {
191         LOGE("assets or asset to blob fail, %d", -E_CLOUD_ERROR);
192         return -E_CLOUD_ERROR;
193     }
194     if (val.empty()) {
195         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
196     } else {
197         errCode = SQLiteUtils::BindBlobToStatement(upsertStmt, index, val);
198     }
199     return errCode;
200 }
201 
Int64ToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)202 int CloudStorageUtils::Int64ToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
203     std::vector<uint8_t> &value)
204 {
205     (void)collateType;
206     int64_t val = 0;
207     if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
208         return -E_CLOUD_ERROR;
209     }
210     DBCommon::StringToVector(std::to_string(val), value);
211     return E_OK;
212 }
213 
BoolToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)214 int CloudStorageUtils::BoolToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
215     std::vector<uint8_t> &value)
216 {
217     (void)collateType;
218     bool val = false;
219     if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
220         return -E_CLOUD_ERROR;
221     }
222     DBCommon::StringToVector(std::to_string(val ? 1 : 0), value);
223     return E_OK;
224 }
225 
DoubleToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)226 int CloudStorageUtils::DoubleToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
227     std::vector<uint8_t> &value)
228 {
229     (void)collateType;
230     double val = 0.0;
231     if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
232         return -E_CLOUD_ERROR;
233     }
234     std::ostringstream s;
235     s << val;
236     DBCommon::StringToVector(s.str(), value);
237     return E_OK;
238 }
239 
TextToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)240 int CloudStorageUtils::TextToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
241     std::vector<uint8_t> &value)
242 {
243     std::string val;
244     if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
245         return -E_CLOUD_ERROR;
246     }
247     if (collateType == CollateType::COLLATE_NOCASE) {
248         std::transform(val.begin(), val.end(), val.begin(), ::toupper);
249     } else if (collateType == CollateType::COLLATE_RTRIM) {
250         DBCommon::RTrim(val);
251     }
252 
253     DBCommon::StringToVector(val, value);
254     return E_OK;
255 }
256 
BlobToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)257 int CloudStorageUtils::BlobToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
258     std::vector<uint8_t> &value)
259 {
260     (void)collateType;
261     if (field.type == TYPE_INDEX<Bytes>) {
262         return CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, value);
263     } else if (field.type == TYPE_INDEX<Asset>) {
264         Asset val;
265         if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
266             return -E_CLOUD_ERROR;
267         }
268         int errCode = RuntimeContext::GetInstance()->AssetToBlob(val, value);
269         if (errCode != E_OK) {
270             LOGE("asset to blob fail, %d", errCode);
271         }
272         return errCode;
273     } else {
274         Assets val;
275         if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
276             return -E_CLOUD_ERROR;
277         }
278         int errCode = RuntimeContext::GetInstance()->AssetsToBlob(val, value);
279         if (errCode != E_OK) {
280             LOGE("assets to blob fail, %d", errCode);
281         }
282         return errCode;
283     }
284 }
285 
GetCloudPrimaryKey(const TableSchema & tableSchema)286 std::set<std::string> CloudStorageUtils::GetCloudPrimaryKey(const TableSchema &tableSchema)
287 {
288     std::set<std::string> pkSet;
289     for (const auto &field : tableSchema.fields) {
290         if (field.primary) {
291             pkSet.insert(field.colName);
292         }
293     }
294     return pkSet;
295 }
296 
GetCloudAsset(const TableSchema & tableSchema)297 std::vector<Field> CloudStorageUtils::GetCloudAsset(const TableSchema &tableSchema)
298 {
299     std::vector<Field> assetFields;
300     for (const auto &item: tableSchema.fields) {
301         if (item.type != TYPE_INDEX<Asset> && item.type != TYPE_INDEX<Assets>) {
302             continue;
303         }
304         assetFields.push_back(item);
305     }
306     return assetFields;
307 }
308 
GetCloudPrimaryKeyField(const TableSchema & tableSchema,bool sortByName)309 std::vector<Field> CloudStorageUtils::GetCloudPrimaryKeyField(const TableSchema &tableSchema, bool sortByName)
310 {
311     std::vector<Field> pkVec;
312     for (const auto &field : tableSchema.fields) {
313         if (field.primary) {
314             pkVec.push_back(field);
315         }
316     }
317     if (sortByName) {
318         std::sort(pkVec.begin(), pkVec.end(), [](const Field &a, const Field &b) {
319            return a.colName < b.colName;
320         });
321     }
322     return pkVec;
323 }
324 
GetCloudPrimaryKeyFieldMap(const TableSchema & tableSchema,bool sortByUpper)325 std::map<std::string, Field> CloudStorageUtils::GetCloudPrimaryKeyFieldMap(const TableSchema &tableSchema,
326     bool sortByUpper)
327 {
328     std::map<std::string, Field> pkMap;
329     for (const auto &field : tableSchema.fields) {
330         if (field.primary) {
331             if (sortByUpper) {
332                 pkMap[DBCommon::ToUpperCase(field.colName)] = field;
333             } else {
334                 pkMap[field.colName] = field;
335             }
336         }
337     }
338     return pkMap;
339 }
340 
GetAssetFieldsFromSchema(const TableSchema & tableSchema,const VBucket & vBucket,std::vector<Field> & fields)341 int CloudStorageUtils::GetAssetFieldsFromSchema(const TableSchema &tableSchema, const VBucket &vBucket,
342     std::vector<Field> &fields)
343 {
344     for (const auto &field: tableSchema.fields) {
345         Type type;
346         bool isExisted = GetTypeCaseInsensitive(field.colName, vBucket, type);
347         if (!isExisted) {
348             continue;
349         }
350         if (type.index() != TYPE_INDEX<Asset> && type.index() != TYPE_INDEX<Assets>) {
351             continue;
352         }
353         fields.push_back(field);
354     }
355     if (fields.empty()) {
356         return -E_CLOUD_ERROR;
357     }
358     return E_OK;
359 }
360 
IsContainsPrimaryKey(const TableSchema & tableSchema)361 bool CloudStorageUtils::IsContainsPrimaryKey(const TableSchema &tableSchema)
362 {
363     for (const auto &field : tableSchema.fields) {
364         if (field.primary) {
365             return true;
366         }
367     }
368     return false;
369 }
370 
ObtainAssetFromVBucket(const VBucket & vBucket,VBucket & asset)371 void CloudStorageUtils::ObtainAssetFromVBucket(const VBucket &vBucket, VBucket &asset)
372 {
373     for (const auto &item: vBucket) {
374         if (IsAsset(item.second)) {
375             Asset data = std::get<Asset>(item.second);
376             asset.insert_or_assign(item.first, data);
377         } else if (IsAssets(item.second)) {
378             Assets data = std::get<Assets>(item.second);
379             asset.insert_or_assign(item.first, data);
380         }
381     }
382 }
383 
StatusToFlag(AssetStatus status)384 AssetOpType CloudStorageUtils::StatusToFlag(AssetStatus status)
385 {
386     switch (AssetOperationUtils::EraseBitMask(status)) {
387         case AssetStatus::INSERT:
388             return AssetOpType::INSERT;
389         case AssetStatus::DELETE:
390             return AssetOpType::DELETE;
391         case AssetStatus::UPDATE:
392             return AssetOpType::UPDATE;
393         default:
394             return AssetOpType::NO_CHANGE;
395     }
396 }
397 
FlagToStatus(AssetOpType opType)398 AssetStatus CloudStorageUtils::FlagToStatus(AssetOpType opType)
399 {
400     switch (opType) {
401         case AssetOpType::INSERT:
402             return AssetStatus::INSERT;
403         case AssetOpType::DELETE:
404             return AssetStatus::DELETE;
405         case AssetOpType::UPDATE:
406             return AssetStatus::UPDATE;
407         default:
408             return AssetStatus::NORMAL;
409     }
410 }
411 
ChangeAssetsOnVBucketToAsset(VBucket & vBucket,std::vector<Field> & fields)412 void CloudStorageUtils::ChangeAssetsOnVBucketToAsset(VBucket &vBucket, std::vector<Field> &fields)
413 {
414     for (const Field &field: fields) {
415         if (field.type == TYPE_INDEX<Asset>) {
416             Type asset = GetAssetFromAssets(vBucket[field.colName]);
417             vBucket[field.colName] = asset;
418         }
419     }
420 }
421 
GetAssetFromAssets(Type & value)422 Type CloudStorageUtils::GetAssetFromAssets(Type &value)
423 {
424     Asset assetVal;
425     int errCode = GetValueFromType(value, assetVal);
426     if (errCode == E_OK) {
427         return assetVal;
428     }
429 
430     Assets assets;
431     errCode = GetValueFromType(value, assets);
432     if (errCode != E_OK) {
433         return Nil();
434     }
435 
436     for (Asset &asset: assets) {
437         if (asset.flag != static_cast<uint32_t>(AssetOpType::DELETE)) {
438             return std::move(asset);
439         }
440     }
441     return Nil();
442 }
443 
FillAssetBeforeDownload(Asset & asset)444 int CloudStorageUtils::FillAssetBeforeDownload(Asset &asset)
445 {
446     AssetOpType flag = static_cast<AssetOpType>(asset.flag);
447     AssetStatus status = static_cast<AssetStatus>(asset.status);
448     uint32_t lowStatus = AssetOperationUtils::EraseBitMask(asset.status);
449     switch (flag) {
450         case AssetOpType::DELETE: {
451             // these asset no need to download, just remove before download
452             if (lowStatus == static_cast<uint32_t>(AssetStatus::DELETE) ||
453                 lowStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
454                 (asset.status == (AssetStatus::DOWNLOADING | AssetStatus::DOWNLOAD_WITH_NULL))) {
455                 return -E_NOT_FOUND;
456             }
457             break;
458         }
459         case AssetOpType::INSERT:
460         case AssetOpType::UPDATE: {
461             if (status != AssetStatus::NORMAL) {
462                 asset.hash = std::string("");
463             }
464             break;
465         }
466         default:
467             break;
468     }
469     return E_OK;
470 }
471 
FillAssetAfterDownload(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)472 int CloudStorageUtils::FillAssetAfterDownload(Asset &asset, Asset &dbAsset,
473     AssetOperationUtils::AssetOpType assetOpType)
474 {
475     if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
476         return E_OK;
477     }
478     dbAsset = asset;
479     AssetOpType flag = static_cast<AssetOpType>(asset.flag);
480     if (asset.status != AssetStatus::NORMAL) {
481         return E_OK;
482     }
483     switch (flag) {
484         case AssetOpType::DELETE: {
485             return -E_NOT_FOUND;
486         }
487         default:
488             break;
489     }
490     return E_OK;
491 }
492 
FillAssetsAfterDownload(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)493 void CloudStorageUtils::FillAssetsAfterDownload(Assets &assets, Assets &dbAssets,
494     const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
495 {
496     MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetAfterDownload);
497 }
498 
FillAssetForUpload(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)499 int CloudStorageUtils::FillAssetForUpload(Asset &asset, Asset &dbAsset, AssetOperationUtils::AssetOpType assetOpType)
500 {
501     if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
502         // db assetId may be empty, need to be based on cache
503         dbAsset.assetId = asset.assetId;
504         return E_OK;
505     }
506     AssetStatus status = static_cast<AssetStatus>(dbAsset.status);
507     dbAsset = asset;
508     switch (StatusToFlag(status)) {
509         case AssetOpType::INSERT:
510         case AssetOpType::UPDATE:
511         case AssetOpType::NO_CHANGE: {
512             dbAsset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
513             break;
514         }
515         case AssetOpType::DELETE: {
516             return -E_NOT_FOUND;
517         }
518         default: {
519             break;
520         }
521     }
522     dbAsset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
523     return E_OK;
524 }
525 
FillAssetsForUpload(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)526 void CloudStorageUtils::FillAssetsForUpload(Assets &assets, Assets &dbAssets,
527     const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
528 {
529     MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetForUpload);
530 }
531 
FillAssetBeforeUpload(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)532 int CloudStorageUtils::FillAssetBeforeUpload(Asset &asset, Asset &dbAsset, AssetOperationUtils::AssetOpType assetOpType)
533 {
534     if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
535         return E_OK;
536     }
537     dbAsset = asset;
538     switch (static_cast<AssetOpType>(asset.flag)) {
539         case AssetOpType::INSERT:
540         case AssetOpType::UPDATE:
541         case AssetOpType::DELETE:
542         case AssetOpType::NO_CHANGE:
543             dbAsset.status |= static_cast<uint32_t>(AssetStatus::UPLOADING);
544             break;
545         default:
546             break;
547     }
548     dbAsset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
549     return E_OK;
550 }
551 
FillAssetsBeforeUpload(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)552 void CloudStorageUtils::FillAssetsBeforeUpload(Assets &assets, Assets &dbAssets, const std::map<std::string,
553     AssetOperationUtils::AssetOpType> &assetOpTypeMap)
554 {
555     MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetBeforeUpload);
556 }
557 
PrepareToFillAssetFromVBucket(VBucket & vBucket,std::function<int (Asset &)> fillAsset)558 void CloudStorageUtils::PrepareToFillAssetFromVBucket(VBucket &vBucket, std::function<int(Asset &)> fillAsset)
559 {
560     for (auto &item: vBucket) {
561         if (IsAsset(item.second)) {
562             Asset asset;
563             GetValueFromType(item.second, asset);
564             fillAsset(asset);
565             vBucket[item.first] = asset;
566         } else if (IsAssets(item.second)) {
567             Assets assets;
568             GetValueFromType(item.second, assets);
569             for (auto it = assets.begin(); it != assets.end();) {
570                 fillAsset(*it) == -E_NOT_FOUND ? it = assets.erase(it) : ++it;
571             }
572             vBucket[item.first] = assets;
573         }
574     }
575 }
576 
FillAssetFromVBucketFinish(const AssetOperationUtils::RecordAssetOpType & assetOpType,VBucket & vBucket,VBucket & dbAssets,std::function<int (Asset &,Asset &,AssetOperationUtils::AssetOpType)> fillAsset,std::function<void (Assets &,Assets &,const std::map<std::string,AssetOperationUtils::AssetOpType> &)> fillAssets)577 void CloudStorageUtils::FillAssetFromVBucketFinish(const AssetOperationUtils::RecordAssetOpType &assetOpType,
578     VBucket &vBucket, VBucket &dbAssets,
579     std::function<int(Asset &, Asset &, AssetOperationUtils::AssetOpType)> fillAsset,
580     std::function<void(Assets &, Assets &,
581     const std::map<std::string, AssetOperationUtils::AssetOpType> &)> fillAssets)
582 {
583     for (auto &item: dbAssets) {
584         if (IsAsset(item.second)) {
585             Asset cacheItem;
586             GetValueFromType(vBucket[item.first], cacheItem);
587             Asset dbItem;
588             GetValueFromType(item.second, dbItem);
589             AssetOperationUtils::AssetOpType opType = AssetOperationUtils::AssetOpType::NOT_HANDLE;
590             auto iterCol = assetOpType.find(item.first);
591             if (iterCol != assetOpType.end() && iterCol->second.find(dbItem.name) != iterCol->second.end()) {
592                 opType = iterCol->second.at(dbItem.name);
593             }
594             int errCode = fillAsset(cacheItem, dbItem, opType);
595             if (errCode != E_OK) {
596                 dbAssets[item.first] = Nil();
597             } else {
598                 dbAssets[item.first] = dbItem;
599             }
600             continue;
601         }
602         if (IsAssets(item.second)) {
603             Assets cacheItems;
604             GetValueFromType(vBucket[item.first], cacheItems);
605             Assets dbItems;
606             GetValueFromType(item.second, dbItems);
607             auto iterCol = assetOpType.find(item.first);
608             if (iterCol == assetOpType.end()) {
609                 fillAssets(cacheItems, dbItems, {});
610             } else {
611                 fillAssets(cacheItems, dbItems, iterCol->second);
612             }
613             if (dbItems.empty()) {
614                 dbAssets[item.first] = Nil();
615             } else {
616                 dbAssets[item.first] = dbItems;
617             }
618         }
619     }
620 }
621 
IsAsset(const Type & type)622 bool CloudStorageUtils::IsAsset(const Type &type)
623 {
624     return type.index() == TYPE_INDEX<Asset>;
625 }
626 
IsAssets(const Type & type)627 bool CloudStorageUtils::IsAssets(const Type &type)
628 {
629     return type.index() == TYPE_INDEX<Assets>;
630 }
631 
CalculateHashKeyForOneField(const Field & field,const VBucket & vBucket,bool allowEmpty,CollateType collateType,std::vector<uint8_t> & hashValue)632 int CloudStorageUtils::CalculateHashKeyForOneField(const Field &field, const VBucket &vBucket, bool allowEmpty,
633     CollateType collateType, std::vector<uint8_t> &hashValue)
634 {
635     Type type;
636     bool isExisted = GetTypeCaseInsensitive(field.colName, vBucket, type);
637     if (allowEmpty && !isExisted) {
638         return E_OK; // if vBucket from cloud doesn't contain primary key and allowEmpty, no need to calculate hash
639     }
640     static std::map<int32_t, std::function<int(const VBucket &, const Field &, CollateType,
641         std::vector<uint8_t> &)>> toVecFunc = {
642         { TYPE_INDEX<int64_t>, &CloudStorageUtils::Int64ToVector },
643         { TYPE_INDEX<bool>, &CloudStorageUtils::BoolToVector },
644         { TYPE_INDEX<double>, &CloudStorageUtils::DoubleToVector },
645         { TYPE_INDEX<std::string>, &CloudStorageUtils::TextToVector },
646         { TYPE_INDEX<Bytes>, &CloudStorageUtils::BlobToVector },
647         { TYPE_INDEX<Asset>, &CloudStorageUtils::BlobToVector },
648         { TYPE_INDEX<Assets>, &CloudStorageUtils::BlobToVector },
649     };
650     auto it = toVecFunc.find(field.type);
651     if (it == toVecFunc.end()) {
652         LOGE("unknown cloud type when convert field to vector.");
653         return -E_CLOUD_ERROR;
654     }
655     std::vector<uint8_t> value;
656     int errCode = it->second(vBucket, field, collateType, value);
657     if (errCode != E_OK) {
658         LOGE("convert cloud field fail, %d", errCode);
659         return errCode;
660     }
661     return DBCommon::CalcValueHash(value, hashValue);
662 }
663 
IsAssetsContainDuplicateAsset(Assets & assets)664 bool CloudStorageUtils::IsAssetsContainDuplicateAsset(Assets &assets)
665 {
666     std::set<std::string> set;
667     for (const auto &asset : assets) {
668         if (set.find(asset.name) != set.end()) {
669             LOGE("assets contain duplicate Asset");
670             return true;
671         }
672         set.insert(asset.name);
673     }
674     return false;
675 }
676 
EraseNoChangeAsset(std::map<std::string,Assets> & assetsMap)677 void CloudStorageUtils::EraseNoChangeAsset(std::map<std::string, Assets> &assetsMap)
678 {
679     for (auto items = assetsMap.begin(); items != assetsMap.end();) {
680         for (auto item = items->second.begin(); item != items->second.end();) {
681             if (static_cast<AssetOpType>((*item).flag) == AssetOpType::NO_CHANGE) {
682                 item = items->second.erase(item);
683             } else {
684                 item++;
685             }
686         }
687         if (items->second.empty()) {
688             items = assetsMap.erase(items);
689         } else {
690             items++;
691         }
692     }
693 }
694 
MergeDownloadAsset(std::map<std::string,Assets> & downloadAssets,std::map<std::string,Assets> & mergeAssets)695 void CloudStorageUtils::MergeDownloadAsset(std::map<std::string, Assets> &downloadAssets,
696     std::map<std::string, Assets> &mergeAssets)
697 {
698     for (auto &items: mergeAssets) {
699         auto downloadItem = downloadAssets.find(items.first);
700         if (downloadItem == downloadAssets.end()) {
701             continue;
702         }
703         std::map<std::string, size_t> beCoveredAssetsMap = GenAssetsIndexMap(items.second);
704         for (const Asset &asset: downloadItem->second) {
705             auto it = beCoveredAssetsMap.find(asset.name);
706             if (it == beCoveredAssetsMap.end()) {
707                 continue;
708             }
709             items.second[it->second] = asset;
710         }
711     }
712 }
713 
GenAssetsIndexMap(Assets & assets)714 std::map<std::string, size_t> CloudStorageUtils::GenAssetsIndexMap(Assets &assets)
715 {
716     // key of assetsIndexMap is name of asset, the value of it is index.
717     std::map<std::string, size_t> assetsIndexMap;
718     for (size_t i = 0; i < assets.size(); i++) {
719         assetsIndexMap[assets[i].name] = i;
720     }
721     return assetsIndexMap;
722 }
723 
IsVbucketContainsAllPK(const VBucket & vBucket,const std::set<std::string> & pkSet)724 bool CloudStorageUtils::IsVbucketContainsAllPK(const VBucket &vBucket, const std::set<std::string> &pkSet)
725 {
726     if (pkSet.empty()) {
727         return false;
728     }
729     for (const auto &pk : pkSet) {
730         Type type;
731         bool isExisted = GetTypeCaseInsensitive(pk, vBucket, type);
732         if (!isExisted) {
733             return false;
734         }
735     }
736     return true;
737 }
738 
IsSharedTable(const TableSchema & tableSchema)739 bool CloudStorageUtils::IsSharedTable(const TableSchema &tableSchema)
740 {
741     return tableSchema.sharedTableName == tableSchema.name;
742 }
743 
IsViolationOfConstraints(const std::string & name,const std::vector<FieldInfo> & fieldInfos)744 static bool IsViolationOfConstraints(const std::string &name, const std::vector<FieldInfo> &fieldInfos)
745 {
746     for (const auto &field : fieldInfos) {
747         if (name != field.GetFieldName()) {
748             continue;
749         }
750         if (field.GetStorageType() == StorageType::STORAGE_TYPE_REAL) {
751             LOGE("[ConstraintsCheckForCloud] Not support create distributed table with real primary key.");
752             return true;
753         } else if (field.IsAssetType() || field.IsAssetsType()) {
754             LOGE("[ConstraintsCheckForCloud] Not support create distributed table with asset primary key.");
755             return true;
756         } else {
757             return false;
758         }
759     }
760     return false;
761 }
762 
ConstraintsCheckForCloud(const TableInfo & table,const std::string & trimmedSql)763 int CloudStorageUtils::ConstraintsCheckForCloud(const TableInfo &table, const std::string &trimmedSql)
764 {
765     if (DBCommon::HasConstraint(trimmedSql, "UNIQUE", " ,", " ,)(")) {
766         LOGE("[ConstraintsCheckForCloud] Not support create distributed table with 'UNIQUE' constraint.");
767         return -E_NOT_SUPPORT;
768     }
769 
770     const std::map<int, FieldName> &primaryKeys = table.GetPrimaryKey();
771     const std::vector<FieldInfo> &fieldInfos = table.GetFieldInfos();
772     for (const auto &item : primaryKeys) {
773         if (IsViolationOfConstraints(item.second, fieldInfos)) {
774             return -E_NOT_SUPPORT;
775         }
776     }
777     return E_OK;
778 }
779 
CheckAssetStatus(const Assets & assets)780 bool CloudStorageUtils::CheckAssetStatus(const Assets &assets)
781 {
782     for (const Asset &asset: assets) {
783         if (AssetOperationUtils::EraseBitMask(asset.status) > static_cast<uint32_t>(AssetStatus::UPDATE)) {
784             LOGE("assets contain invalid status:[%u]", asset.status);
785             return false;
786         }
787     }
788     return true;
789 }
790 
GetTableRefUpdateSql(const TableInfo & table,OpType opType)791 std::string CloudStorageUtils::GetTableRefUpdateSql(const TableInfo &table, OpType opType)
792 {
793     std::string sql;
794     std::string rowid = std::string(DBConstant::SQLITE_INNER_ROWID);
795     for (const auto &reference : table.GetTableReference()) {
796         if (reference.columns.empty()) {
797             return "";
798         }
799         std::string sourceLogName = DBCommon::GetLogTableName(reference.sourceTableName);
800         sql += " UPDATE " + sourceLogName + " SET timestamp=get_raw_sys_time(), flag=flag|0x02 WHERE ";
801         int index = 0;
802         for (const auto &itCol : reference.columns) {
803             if (opType != OpType::UPDATE) {
804                 continue;
805             }
806             if (index++ != 0) {
807                 sql += " OR ";
808             }
809             sql += " (OLD." + itCol.second + " IS NOT " + " NEW." + itCol.second + ")";
810         }
811         if (opType == OpType::UPDATE) {
812             sql += " AND ";
813         }
814         sql += " (flag&0x08=0x00) AND data_key IN (SELECT " + sourceLogName + ".data_key FROM " + sourceLogName +
815             " LEFT JOIN " + reference.sourceTableName + " ON " + sourceLogName + ".data_key = " +
816             reference.sourceTableName + "." + rowid + " WHERE ";
817         index = 0;
818         for (const auto &itCol : reference.columns) {
819             if (index++ != 0) {
820                 sql += " OR ";
821             }
822             if (opType == OpType::UPDATE) {
823                 sql += itCol.first + "=OLD." + itCol.second + " OR " + itCol.first + "=NEW." + itCol.second;
824             } else if (opType == OpType::INSERT) {
825                 sql += itCol.first + "=NEW." + itCol.second;
826             } else if (opType == OpType::DELETE) {
827                 sql += itCol.first + "=OLD." + itCol.second;
828             }
829         }
830         sql += ");";
831     }
832     return sql;
833 }
834 
GetLeftJoinLogSql(const std::string & tableName,bool logAsTableA)835 std::string CloudStorageUtils::GetLeftJoinLogSql(const std::string &tableName, bool logAsTableA)
836 {
837     std::string sql;
838     if (logAsTableA) {
839         sql += " FROM '" + DBCommon::GetLogTableName(tableName) + "' AS a LEFT JOIN '" + tableName + "' AS b " +
840             " ON (a.data_key = b." + std::string(DBConstant::SQLITE_INNER_ROWID) + ")";
841     } else {
842         sql += " FROM '" + DBCommon::GetLogTableName(tableName) + "' AS b LEFT JOIN '" + tableName + "' AS a " +
843             " ON (b.data_key = a." + std::string(DBConstant::SQLITE_INNER_ROWID) + ")";
844     }
845     return sql;
846 }
847 
ChkFillCloudAssetParam(const CloudSyncBatch & data,int errCode)848 bool CloudStorageUtils::ChkFillCloudAssetParam(const CloudSyncBatch &data, int errCode)
849 {
850     if (data.assets.empty()) {
851         errCode = E_OK;
852         return true;
853     }
854     if (data.rowid.empty() || data.timestamp.empty()) {
855         errCode = -E_INVALID_ARGS;
856         LOGE("param is empty when fill cloud Asset. rowidN:%u, timeN:%u", errCode, data.rowid.size(),
857             data.timestamp.size());
858         return true;
859     }
860     if (data.assets.size() != data.rowid.size() || data.assets.size() != data.timestamp.size() ||
861         data.assets.size() != data.hashKey.size() || data.assets.size() != data.extend.size()) {
862         errCode = -E_INVALID_ARGS;
863         LOGE("the num of param is invalid when fill cloud Asset. assetsN:%u, rowidN:%u, timeN:%u, "
864              "hashKeyN:%u, extendN:%u", data.assets.size(), data.rowid.size(), data.timestamp.size(),
865              data.hashKey.size(), data.extend.size());
866         return true;
867     }
868     return false;
869 }
870 
GetToBeRemoveAssets(const VBucket & vBucket,const AssetOperationUtils::RecordAssetOpType & assetOpType,std::vector<Asset> & removeAssets)871 void CloudStorageUtils::GetToBeRemoveAssets(const VBucket &vBucket,
872     const AssetOperationUtils::RecordAssetOpType &assetOpType, std::vector<Asset> &removeAssets)
873 {
874     for (const auto &col: assetOpType) {
875         Type itItem;
876         bool isExisted = GetTypeCaseInsensitive(col.first, vBucket, itItem);
877         if (!isExisted) {
878             continue;
879         }
880         if (!CloudStorageUtils::IsAsset(itItem) && !CloudStorageUtils::IsAssets(itItem)) {
881             continue;
882         }
883         if (CloudStorageUtils::IsAsset(itItem)) {
884             Asset delAsset;
885             GetValueFromType(itItem, delAsset);
886             auto itOp = col.second.find(delAsset.name);
887             if (itOp != col.second.end() && itOp->second == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
888                 removeAssets.push_back(delAsset);
889             }
890             continue;
891         }
892         Assets assets;
893         GetValueFromType(itItem, assets);
894         for (const auto &asset: assets) {
895             auto itOp = col.second.find(asset.name);
896             if (itOp == col.second.end() || itOp->second == AssetOperationUtils::AssetOpType::HANDLE) {
897                 continue;
898             }
899             removeAssets.push_back(asset);
900         }
901     }
902 }
903 
FillAssetForUploadFailed(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)904 int CloudStorageUtils::FillAssetForUploadFailed(Asset &asset, Asset &dbAsset,
905     AssetOperationUtils::AssetOpType assetOpType)
906 {
907     dbAsset.assetId = asset.assetId;
908     dbAsset.status &= ~AssetStatus::UPLOADING;
909     return E_OK;
910 }
911 
FillAssetsForUploadFailed(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)912 void CloudStorageUtils::FillAssetsForUploadFailed(Assets &assets, Assets &dbAssets,
913     const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
914 {
915     MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetForUploadFailed);
916 }
917 
FillAssetAfterDownloadFail(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)918 int CloudStorageUtils::FillAssetAfterDownloadFail(Asset &asset, Asset &dbAsset,
919     AssetOperationUtils::AssetOpType assetOpType)
920 {
921     AssetStatus status = static_cast<AssetStatus>(asset.status);
922     if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
923         return E_OK;
924     }
925     if (status != AssetStatus::ABNORMAL) {
926         return FillAssetAfterDownload(asset, dbAsset, assetOpType);
927     }
928     AssetOpType flag = static_cast<AssetOpType>(asset.flag);
929     dbAsset = asset;
930     switch (flag) {
931         case AssetOpType::INSERT:
932         case AssetOpType::DELETE:
933         case AssetOpType::UPDATE: {
934             dbAsset.hash = std::string("");
935             break;
936         }
937         default:
938             // other flag type do not need to clear hash
939             break;
940     }
941     return E_OK;
942 }
943 
FillAssetsAfterDownloadFail(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)944 void CloudStorageUtils::FillAssetsAfterDownloadFail(Assets &assets, Assets &dbAssets,
945     const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
946 {
947     MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetAfterDownloadFail);
948 }
949 
MergeAssetWithFillFunc(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap,std::function<int (Asset &,Asset &,AssetOperationUtils::AssetOpType)> fillAsset)950 void CloudStorageUtils::MergeAssetWithFillFunc(Assets &assets, Assets &dbAssets, const std::map<std::string,
951     AssetOperationUtils::AssetOpType> &assetOpTypeMap,
952     std::function<int(Asset &, Asset &, AssetOperationUtils::AssetOpType)> fillAsset)
953 {
954     std::map<std::string, size_t> indexMap = GenAssetsIndexMap(assets);
955     for (auto dbAsset = dbAssets.begin(); dbAsset != dbAssets.end();) {
956         Asset cacheAsset;
957         auto it = indexMap.find(dbAsset->name);
958         if (it != indexMap.end()) {
959             cacheAsset = assets[it->second];
960         }
961         AssetOperationUtils::AssetOpType opType = AssetOperationUtils::AssetOpType::NOT_HANDLE;
962         auto iterOp = assetOpTypeMap.find(dbAsset->name);
963         if (iterOp != assetOpTypeMap.end()) {
964             opType = iterOp->second;
965         }
966         if (fillAsset(cacheAsset, *dbAsset, opType) == -E_NOT_FOUND) {
967             dbAsset = dbAssets.erase(dbAsset);
968         } else {
969             dbAsset++;
970         }
971     }
972 }
973 
GetHashValueWithPrimaryKeyMap(const VBucket & vBucket,const TableSchema & tableSchema,const TableInfo & localTable,const std::map<std::string,Field> & pkMap,bool allowEmpty)974 std::pair<int, std::vector<uint8_t>> CloudStorageUtils::GetHashValueWithPrimaryKeyMap(const VBucket &vBucket,
975     const TableSchema &tableSchema, const TableInfo &localTable, const std::map<std::string, Field> &pkMap,
976     bool allowEmpty)
977 {
978     int errCode = E_OK;
979     std::vector<uint8_t> hashValue;
980     if (pkMap.size() == 0) {
981         LOGE("do not support get hashValue when primaryKey map is empty.");
982         return { -E_INTERNAL_ERROR, {} };
983     } else if (pkMap.size() == 1) {
984         std::vector<Field> pkVec = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema);
985         FieldInfoMap fieldInfos = localTable.GetFields();
986         if (fieldInfos.find(pkMap.begin()->first) == fieldInfos.end()) {
987             LOGE("localSchema doesn't contain primary key.");
988             return { -E_INTERNAL_ERROR, {} };
989         }
990         CollateType collateType = fieldInfos.at(pkMap.begin()->first).GetCollateType();
991         errCode = CloudStorageUtils::CalculateHashKeyForOneField(
992             pkVec.at(0), vBucket, allowEmpty, collateType, hashValue);
993     } else {
994         std::vector<uint8_t> tempRes;
995         for (const auto &item: pkMap) {
996             FieldInfoMap fieldInfos = localTable.GetFields();
997             if (fieldInfos.find(item.first) == fieldInfos.end()) {
998                 LOGE("localSchema doesn't contain primary key in multi pks.");
999                 return { -E_INTERNAL_ERROR, {} };
1000             }
1001             std::vector<uint8_t> temp;
1002             CollateType collateType = fieldInfos.at(item.first).GetCollateType();
1003             errCode = CloudStorageUtils::CalculateHashKeyForOneField(
1004                 item.second, vBucket, allowEmpty, collateType, temp);
1005             if (errCode != E_OK) {
1006                 LOGE("calc hash fail when there is more than one primary key. errCode = %d", errCode);
1007                 return { errCode, {} };
1008             }
1009             tempRes.insert(tempRes.end(), temp.begin(), temp.end());
1010         }
1011         errCode = DBCommon::CalcValueHash(tempRes, hashValue);
1012     }
1013     return { errCode, hashValue };
1014 }
1015 
TransferFieldToLower(VBucket & vBucket)1016 void CloudStorageUtils::TransferFieldToLower(VBucket &vBucket)
1017 {
1018     for (auto it = vBucket.begin(); it != vBucket.end();) {
1019         std::string lowerField(it->first.length(), ' ');
1020         std::transform(it->first.begin(), it->first.end(), lowerField.begin(), tolower);
1021         if (lowerField != it->first) {
1022             vBucket[lowerField] = std::move(vBucket[it->first]);
1023             vBucket.erase(it++);
1024         } else {
1025             it++;
1026         }
1027     }
1028 }
1029 
GetTypeCaseInsensitive(const std::string & fieldName,const VBucket & vBucket,Type & data)1030 bool CloudStorageUtils::GetTypeCaseInsensitive(const std::string &fieldName, const VBucket &vBucket, Type &data)
1031 {
1032     auto tmpFieldName = fieldName;
1033     auto tmpVBucket = vBucket;
1034     std::transform(tmpFieldName.begin(), tmpFieldName.end(), tmpFieldName.begin(), tolower);
1035     TransferFieldToLower(tmpVBucket);
1036     auto it = tmpVBucket.find(tmpFieldName);
1037     if (it == tmpVBucket.end()) {
1038         return false;
1039     }
1040     data = it->second;
1041     return true;
1042 }
1043 
TransferSchemaFieldToLower(TableSchema & tableSchema)1044 void CloudStorageUtils::TransferSchemaFieldToLower(TableSchema &tableSchema)
1045 {
1046     for (auto &field : tableSchema.fields) {
1047         std::transform(field.colName.begin(), field.colName.end(), field.colName.begin(), tolower);
1048     }
1049 }
1050 
CheckCloudSchemaFields(const TableSchema & tableSchema,const TableSchema & oldSchema)1051 bool CloudStorageUtils::CheckCloudSchemaFields(const TableSchema &tableSchema, const TableSchema &oldSchema)
1052 {
1053     if (tableSchema.name != oldSchema.name) {
1054         return true;
1055     }
1056     for (const auto &oldField : oldSchema.fields) {
1057         auto it = std::find_if(tableSchema.fields.begin(), tableSchema.fields.end(),
1058             [&oldField](const std::vector<Field>::value_type &field) {
1059                 return oldField == field;
1060             });
1061         if (it == tableSchema.fields.end()) {
1062             return false;
1063         }
1064     }
1065     return true;
1066 }
1067 }
1068