1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "cloud/cloud_storage_utils.h"
17 #include <set>
18
19 #include "cloud/asset_operation_utils.h"
20 #include "cloud/cloud_db_types.h"
21 #include "db_common.h"
22 #include "runtime_context.h"
23
24 namespace DistributedDB {
BindInt64(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)25 int CloudStorageUtils::BindInt64(int index, const VBucket &vBucket, const Field &field,
26 sqlite3_stmt *upsertStmt)
27 {
28 int64_t val = 0;
29 int errCode = GetValueFromVBucket<int64_t>(field.colName, vBucket, val);
30 if (field.nullable && errCode == -E_NOT_FOUND) {
31 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
32 } else {
33 if (errCode != E_OK) {
34 LOGE("get int from vbucket failed, %d", errCode);
35 return -E_CLOUD_ERROR;
36 }
37 errCode = SQLiteUtils::BindInt64ToStatement(upsertStmt, index, val);
38 }
39
40 if (errCode != E_OK) {
41 LOGE("Bind int to insert statement failed, %d", errCode);
42 }
43 return errCode;
44 }
45
BindBool(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)46 int CloudStorageUtils::BindBool(int index, const VBucket &vBucket, const Field &field,
47 sqlite3_stmt *upsertStmt)
48 {
49 bool val = false;
50 int errCode = GetValueFromVBucket<bool>(field.colName, vBucket, val);
51 if (field.nullable && errCode == -E_NOT_FOUND) {
52 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
53 } else {
54 if (errCode != E_OK) {
55 LOGE("get bool from vbucket failed, %d", errCode);
56 return -E_CLOUD_ERROR;
57 }
58 errCode = SQLiteUtils::BindInt64ToStatement(upsertStmt, index, val);
59 }
60
61 if (errCode != E_OK) {
62 LOGE("Bind bool to insert statement failed, %d", errCode);
63 }
64 return errCode;
65 }
66
BindDouble(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)67 int CloudStorageUtils::BindDouble(int index, const VBucket &vBucket, const Field &field,
68 sqlite3_stmt *upsertStmt)
69 {
70 double val = 0.0;
71 int errCode = GetValueFromVBucket<double>(field.colName, vBucket, val);
72 if (field.nullable && errCode == -E_NOT_FOUND) {
73 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
74 } else {
75 if (errCode != E_OK) {
76 LOGE("get double from vbucket failed, %d", errCode);
77 return -E_CLOUD_ERROR;
78 }
79 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_double(upsertStmt, index, val));
80 }
81
82 if (errCode != E_OK) {
83 LOGE("Bind double to insert statement failed, %d", errCode);
84 }
85 return errCode;
86 }
87
BindText(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)88 int CloudStorageUtils::BindText(int index, const VBucket &vBucket, const Field &field,
89 sqlite3_stmt *upsertStmt)
90 {
91 std::string str;
92 int errCode = GetValueFromVBucket<std::string>(field.colName, vBucket, str);
93 if (field.nullable && errCode == -E_NOT_FOUND) {
94 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
95 } else {
96 if (errCode != E_OK) {
97 LOGE("get string from vbucket failed, %d", errCode);
98 return -E_CLOUD_ERROR;
99 }
100 errCode = SQLiteUtils::BindTextToStatement(upsertStmt, index, str);
101 }
102
103 if (errCode != E_OK) {
104 LOGE("Bind string to insert statement failed, %d", errCode);
105 }
106 return errCode;
107 }
108
BindBlob(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)109 int CloudStorageUtils::BindBlob(int index, const VBucket &vBucket, const Field &field,
110 sqlite3_stmt *upsertStmt)
111 {
112 int errCode = E_OK;
113 Bytes val;
114 if (field.type == TYPE_INDEX<Bytes>) {
115 errCode = GetValueFromVBucket<Bytes>(field.colName, vBucket, val);
116 if (!(IsFieldValid(field, errCode))) {
117 goto ERROR;
118 }
119 } else if (field.type == TYPE_INDEX<Asset>) {
120 Asset asset;
121 errCode = GetValueFromVBucket(field.colName, vBucket, asset);
122 if (!(IsFieldValid(field, errCode))) {
123 goto ERROR;
124 }
125 RuntimeContext::GetInstance()->AssetToBlob(asset, val);
126 } else {
127 Assets assets;
128 errCode = GetValueFromVBucket(field.colName, vBucket, assets);
129 if (!(IsFieldValid(field, errCode))) {
130 goto ERROR;
131 }
132 RuntimeContext::GetInstance()->AssetsToBlob(assets, val);
133 }
134
135 if (errCode == -E_NOT_FOUND) {
136 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
137 } else {
138 errCode = SQLiteUtils::BindBlobToStatement(upsertStmt, index, val);
139 }
140 if (errCode != E_OK) {
141 LOGE("Bind blob to insert statement failed, %d", errCode);
142 }
143 return errCode;
144 ERROR:
145 LOGE("get blob from vbucket failed, %d", errCode);
146 return -E_CLOUD_ERROR;
147 }
148
BindAsset(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)149 int CloudStorageUtils::BindAsset(int index, const VBucket &vBucket, const Field &field, sqlite3_stmt *upsertStmt)
150 {
151 int errCode;
152 Bytes val;
153 Type entry;
154 bool isExisted = GetTypeCaseInsensitive(field.colName, vBucket, entry);
155 if (!isExisted || entry.index() == TYPE_INDEX<Nil>) {
156 if (!field.nullable) {
157 LOGE("field value is not allowed to be null, %d", -E_CLOUD_ERROR);
158 return -E_CLOUD_ERROR;
159 }
160 return SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
161 }
162
163 Type type = entry;
164 if (field.type == TYPE_INDEX<Asset>) {
165 Asset asset;
166 errCode = GetValueFromOneField(type, asset);
167 if (errCode != E_OK) {
168 LOGE("can not get asset from vBucket when bind, %d", errCode);
169 return errCode;
170 }
171 asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
172 errCode = RuntimeContext::GetInstance()->AssetToBlob(asset, val);
173 } else if (field.type == TYPE_INDEX<Assets>) {
174 Assets assets;
175 errCode = GetValueFromOneField(type, assets);
176 if (errCode != E_OK) {
177 LOGE("can not get assets from vBucket when bind, %d", errCode);
178 return errCode;
179 }
180 if (!assets.empty()) {
181 for (auto &asset: assets) {
182 asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
183 }
184 errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, val);
185 }
186 } else {
187 LOGE("field type is not asset or assets, %d", -E_CLOUD_ERROR);
188 return -E_CLOUD_ERROR;
189 }
190 if (errCode != E_OK) {
191 LOGE("assets or asset to blob fail, %d", -E_CLOUD_ERROR);
192 return -E_CLOUD_ERROR;
193 }
194 if (val.empty()) {
195 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
196 } else {
197 errCode = SQLiteUtils::BindBlobToStatement(upsertStmt, index, val);
198 }
199 return errCode;
200 }
201
Int64ToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)202 int CloudStorageUtils::Int64ToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
203 std::vector<uint8_t> &value)
204 {
205 (void)collateType;
206 int64_t val = 0;
207 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
208 return -E_CLOUD_ERROR;
209 }
210 DBCommon::StringToVector(std::to_string(val), value);
211 return E_OK;
212 }
213
BoolToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)214 int CloudStorageUtils::BoolToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
215 std::vector<uint8_t> &value)
216 {
217 (void)collateType;
218 bool val = false;
219 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
220 return -E_CLOUD_ERROR;
221 }
222 DBCommon::StringToVector(std::to_string(val ? 1 : 0), value);
223 return E_OK;
224 }
225
DoubleToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)226 int CloudStorageUtils::DoubleToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
227 std::vector<uint8_t> &value)
228 {
229 (void)collateType;
230 double val = 0.0;
231 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
232 return -E_CLOUD_ERROR;
233 }
234 std::ostringstream s;
235 s << val;
236 DBCommon::StringToVector(s.str(), value);
237 return E_OK;
238 }
239
TextToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)240 int CloudStorageUtils::TextToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
241 std::vector<uint8_t> &value)
242 {
243 std::string val;
244 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
245 return -E_CLOUD_ERROR;
246 }
247 if (collateType == CollateType::COLLATE_NOCASE) {
248 std::transform(val.begin(), val.end(), val.begin(), ::toupper);
249 } else if (collateType == CollateType::COLLATE_RTRIM) {
250 DBCommon::RTrim(val);
251 }
252
253 DBCommon::StringToVector(val, value);
254 return E_OK;
255 }
256
BlobToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)257 int CloudStorageUtils::BlobToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
258 std::vector<uint8_t> &value)
259 {
260 (void)collateType;
261 if (field.type == TYPE_INDEX<Bytes>) {
262 return CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, value);
263 } else if (field.type == TYPE_INDEX<Asset>) {
264 Asset val;
265 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
266 return -E_CLOUD_ERROR;
267 }
268 int errCode = RuntimeContext::GetInstance()->AssetToBlob(val, value);
269 if (errCode != E_OK) {
270 LOGE("asset to blob fail, %d", errCode);
271 }
272 return errCode;
273 } else {
274 Assets val;
275 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
276 return -E_CLOUD_ERROR;
277 }
278 int errCode = RuntimeContext::GetInstance()->AssetsToBlob(val, value);
279 if (errCode != E_OK) {
280 LOGE("assets to blob fail, %d", errCode);
281 }
282 return errCode;
283 }
284 }
285
GetCloudPrimaryKey(const TableSchema & tableSchema)286 std::set<std::string> CloudStorageUtils::GetCloudPrimaryKey(const TableSchema &tableSchema)
287 {
288 std::set<std::string> pkSet;
289 for (const auto &field : tableSchema.fields) {
290 if (field.primary) {
291 pkSet.insert(field.colName);
292 }
293 }
294 return pkSet;
295 }
296
GetCloudAsset(const TableSchema & tableSchema)297 std::vector<Field> CloudStorageUtils::GetCloudAsset(const TableSchema &tableSchema)
298 {
299 std::vector<Field> assetFields;
300 for (const auto &item: tableSchema.fields) {
301 if (item.type != TYPE_INDEX<Asset> && item.type != TYPE_INDEX<Assets>) {
302 continue;
303 }
304 assetFields.push_back(item);
305 }
306 return assetFields;
307 }
308
GetCloudPrimaryKeyField(const TableSchema & tableSchema,bool sortByName)309 std::vector<Field> CloudStorageUtils::GetCloudPrimaryKeyField(const TableSchema &tableSchema, bool sortByName)
310 {
311 std::vector<Field> pkVec;
312 for (const auto &field : tableSchema.fields) {
313 if (field.primary) {
314 pkVec.push_back(field);
315 }
316 }
317 if (sortByName) {
318 std::sort(pkVec.begin(), pkVec.end(), [](const Field &a, const Field &b) {
319 return a.colName < b.colName;
320 });
321 }
322 return pkVec;
323 }
324
GetCloudPrimaryKeyFieldMap(const TableSchema & tableSchema,bool sortByUpper)325 std::map<std::string, Field> CloudStorageUtils::GetCloudPrimaryKeyFieldMap(const TableSchema &tableSchema,
326 bool sortByUpper)
327 {
328 std::map<std::string, Field> pkMap;
329 for (const auto &field : tableSchema.fields) {
330 if (field.primary) {
331 if (sortByUpper) {
332 pkMap[DBCommon::ToUpperCase(field.colName)] = field;
333 } else {
334 pkMap[field.colName] = field;
335 }
336 }
337 }
338 return pkMap;
339 }
340
GetAssetFieldsFromSchema(const TableSchema & tableSchema,const VBucket & vBucket,std::vector<Field> & fields)341 int CloudStorageUtils::GetAssetFieldsFromSchema(const TableSchema &tableSchema, const VBucket &vBucket,
342 std::vector<Field> &fields)
343 {
344 for (const auto &field: tableSchema.fields) {
345 Type type;
346 bool isExisted = GetTypeCaseInsensitive(field.colName, vBucket, type);
347 if (!isExisted) {
348 continue;
349 }
350 if (type.index() != TYPE_INDEX<Asset> && type.index() != TYPE_INDEX<Assets>) {
351 continue;
352 }
353 fields.push_back(field);
354 }
355 if (fields.empty()) {
356 return -E_CLOUD_ERROR;
357 }
358 return E_OK;
359 }
360
IsContainsPrimaryKey(const TableSchema & tableSchema)361 bool CloudStorageUtils::IsContainsPrimaryKey(const TableSchema &tableSchema)
362 {
363 for (const auto &field : tableSchema.fields) {
364 if (field.primary) {
365 return true;
366 }
367 }
368 return false;
369 }
370
ObtainAssetFromVBucket(const VBucket & vBucket,VBucket & asset)371 void CloudStorageUtils::ObtainAssetFromVBucket(const VBucket &vBucket, VBucket &asset)
372 {
373 for (const auto &item: vBucket) {
374 if (IsAsset(item.second)) {
375 Asset data = std::get<Asset>(item.second);
376 asset.insert_or_assign(item.first, data);
377 } else if (IsAssets(item.second)) {
378 Assets data = std::get<Assets>(item.second);
379 asset.insert_or_assign(item.first, data);
380 }
381 }
382 }
383
StatusToFlag(AssetStatus status)384 AssetOpType CloudStorageUtils::StatusToFlag(AssetStatus status)
385 {
386 switch (AssetOperationUtils::EraseBitMask(status)) {
387 case AssetStatus::INSERT:
388 return AssetOpType::INSERT;
389 case AssetStatus::DELETE:
390 return AssetOpType::DELETE;
391 case AssetStatus::UPDATE:
392 return AssetOpType::UPDATE;
393 default:
394 return AssetOpType::NO_CHANGE;
395 }
396 }
397
FlagToStatus(AssetOpType opType)398 AssetStatus CloudStorageUtils::FlagToStatus(AssetOpType opType)
399 {
400 switch (opType) {
401 case AssetOpType::INSERT:
402 return AssetStatus::INSERT;
403 case AssetOpType::DELETE:
404 return AssetStatus::DELETE;
405 case AssetOpType::UPDATE:
406 return AssetStatus::UPDATE;
407 default:
408 return AssetStatus::NORMAL;
409 }
410 }
411
ChangeAssetsOnVBucketToAsset(VBucket & vBucket,std::vector<Field> & fields)412 void CloudStorageUtils::ChangeAssetsOnVBucketToAsset(VBucket &vBucket, std::vector<Field> &fields)
413 {
414 for (const Field &field: fields) {
415 if (field.type == TYPE_INDEX<Asset>) {
416 Type asset = GetAssetFromAssets(vBucket[field.colName]);
417 vBucket[field.colName] = asset;
418 }
419 }
420 }
421
GetAssetFromAssets(Type & value)422 Type CloudStorageUtils::GetAssetFromAssets(Type &value)
423 {
424 Asset assetVal;
425 int errCode = GetValueFromType(value, assetVal);
426 if (errCode == E_OK) {
427 return assetVal;
428 }
429
430 Assets assets;
431 errCode = GetValueFromType(value, assets);
432 if (errCode != E_OK) {
433 return Nil();
434 }
435
436 for (Asset &asset: assets) {
437 if (asset.flag != static_cast<uint32_t>(AssetOpType::DELETE)) {
438 return std::move(asset);
439 }
440 }
441 return Nil();
442 }
443
FillAssetBeforeDownload(Asset & asset)444 int CloudStorageUtils::FillAssetBeforeDownload(Asset &asset)
445 {
446 AssetOpType flag = static_cast<AssetOpType>(asset.flag);
447 AssetStatus status = static_cast<AssetStatus>(asset.status);
448 uint32_t lowStatus = AssetOperationUtils::EraseBitMask(asset.status);
449 switch (flag) {
450 case AssetOpType::DELETE: {
451 // these asset no need to download, just remove before download
452 if (lowStatus == static_cast<uint32_t>(AssetStatus::DELETE) ||
453 lowStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
454 (asset.status == (AssetStatus::DOWNLOADING | AssetStatus::DOWNLOAD_WITH_NULL))) {
455 return -E_NOT_FOUND;
456 }
457 break;
458 }
459 case AssetOpType::INSERT:
460 case AssetOpType::UPDATE: {
461 if (status != AssetStatus::NORMAL) {
462 asset.hash = std::string("");
463 }
464 break;
465 }
466 default:
467 break;
468 }
469 return E_OK;
470 }
471
FillAssetAfterDownload(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)472 int CloudStorageUtils::FillAssetAfterDownload(Asset &asset, Asset &dbAsset,
473 AssetOperationUtils::AssetOpType assetOpType)
474 {
475 if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
476 return E_OK;
477 }
478 dbAsset = asset;
479 AssetOpType flag = static_cast<AssetOpType>(asset.flag);
480 if (asset.status != AssetStatus::NORMAL) {
481 return E_OK;
482 }
483 switch (flag) {
484 case AssetOpType::DELETE: {
485 return -E_NOT_FOUND;
486 }
487 default:
488 break;
489 }
490 return E_OK;
491 }
492
FillAssetsAfterDownload(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)493 void CloudStorageUtils::FillAssetsAfterDownload(Assets &assets, Assets &dbAssets,
494 const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
495 {
496 MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetAfterDownload);
497 }
498
FillAssetForUpload(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)499 int CloudStorageUtils::FillAssetForUpload(Asset &asset, Asset &dbAsset, AssetOperationUtils::AssetOpType assetOpType)
500 {
501 if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
502 // db assetId may be empty, need to be based on cache
503 dbAsset.assetId = asset.assetId;
504 return E_OK;
505 }
506 AssetStatus status = static_cast<AssetStatus>(dbAsset.status);
507 dbAsset = asset;
508 switch (StatusToFlag(status)) {
509 case AssetOpType::INSERT:
510 case AssetOpType::UPDATE:
511 case AssetOpType::NO_CHANGE: {
512 dbAsset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
513 break;
514 }
515 case AssetOpType::DELETE: {
516 return -E_NOT_FOUND;
517 }
518 default: {
519 break;
520 }
521 }
522 dbAsset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
523 return E_OK;
524 }
525
FillAssetsForUpload(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)526 void CloudStorageUtils::FillAssetsForUpload(Assets &assets, Assets &dbAssets,
527 const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
528 {
529 MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetForUpload);
530 }
531
FillAssetBeforeUpload(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)532 int CloudStorageUtils::FillAssetBeforeUpload(Asset &asset, Asset &dbAsset, AssetOperationUtils::AssetOpType assetOpType)
533 {
534 if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
535 return E_OK;
536 }
537 dbAsset = asset;
538 switch (static_cast<AssetOpType>(asset.flag)) {
539 case AssetOpType::INSERT:
540 case AssetOpType::UPDATE:
541 case AssetOpType::DELETE:
542 case AssetOpType::NO_CHANGE:
543 dbAsset.status |= static_cast<uint32_t>(AssetStatus::UPLOADING);
544 break;
545 default:
546 break;
547 }
548 dbAsset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
549 return E_OK;
550 }
551
FillAssetsBeforeUpload(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)552 void CloudStorageUtils::FillAssetsBeforeUpload(Assets &assets, Assets &dbAssets, const std::map<std::string,
553 AssetOperationUtils::AssetOpType> &assetOpTypeMap)
554 {
555 MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetBeforeUpload);
556 }
557
PrepareToFillAssetFromVBucket(VBucket & vBucket,std::function<int (Asset &)> fillAsset)558 void CloudStorageUtils::PrepareToFillAssetFromVBucket(VBucket &vBucket, std::function<int(Asset &)> fillAsset)
559 {
560 for (auto &item: vBucket) {
561 if (IsAsset(item.second)) {
562 Asset asset;
563 GetValueFromType(item.second, asset);
564 fillAsset(asset);
565 vBucket[item.first] = asset;
566 } else if (IsAssets(item.second)) {
567 Assets assets;
568 GetValueFromType(item.second, assets);
569 for (auto it = assets.begin(); it != assets.end();) {
570 fillAsset(*it) == -E_NOT_FOUND ? it = assets.erase(it) : ++it;
571 }
572 vBucket[item.first] = assets;
573 }
574 }
575 }
576
FillAssetFromVBucketFinish(const AssetOperationUtils::RecordAssetOpType & assetOpType,VBucket & vBucket,VBucket & dbAssets,std::function<int (Asset &,Asset &,AssetOperationUtils::AssetOpType)> fillAsset,std::function<void (Assets &,Assets &,const std::map<std::string,AssetOperationUtils::AssetOpType> &)> fillAssets)577 void CloudStorageUtils::FillAssetFromVBucketFinish(const AssetOperationUtils::RecordAssetOpType &assetOpType,
578 VBucket &vBucket, VBucket &dbAssets,
579 std::function<int(Asset &, Asset &, AssetOperationUtils::AssetOpType)> fillAsset,
580 std::function<void(Assets &, Assets &,
581 const std::map<std::string, AssetOperationUtils::AssetOpType> &)> fillAssets)
582 {
583 for (auto &item: dbAssets) {
584 if (IsAsset(item.second)) {
585 Asset cacheItem;
586 GetValueFromType(vBucket[item.first], cacheItem);
587 Asset dbItem;
588 GetValueFromType(item.second, dbItem);
589 AssetOperationUtils::AssetOpType opType = AssetOperationUtils::AssetOpType::NOT_HANDLE;
590 auto iterCol = assetOpType.find(item.first);
591 if (iterCol != assetOpType.end() && iterCol->second.find(dbItem.name) != iterCol->second.end()) {
592 opType = iterCol->second.at(dbItem.name);
593 }
594 int errCode = fillAsset(cacheItem, dbItem, opType);
595 if (errCode != E_OK) {
596 dbAssets[item.first] = Nil();
597 } else {
598 dbAssets[item.first] = dbItem;
599 }
600 continue;
601 }
602 if (IsAssets(item.second)) {
603 Assets cacheItems;
604 GetValueFromType(vBucket[item.first], cacheItems);
605 Assets dbItems;
606 GetValueFromType(item.second, dbItems);
607 auto iterCol = assetOpType.find(item.first);
608 if (iterCol == assetOpType.end()) {
609 fillAssets(cacheItems, dbItems, {});
610 } else {
611 fillAssets(cacheItems, dbItems, iterCol->second);
612 }
613 if (dbItems.empty()) {
614 dbAssets[item.first] = Nil();
615 } else {
616 dbAssets[item.first] = dbItems;
617 }
618 }
619 }
620 }
621
IsAsset(const Type & type)622 bool CloudStorageUtils::IsAsset(const Type &type)
623 {
624 return type.index() == TYPE_INDEX<Asset>;
625 }
626
IsAssets(const Type & type)627 bool CloudStorageUtils::IsAssets(const Type &type)
628 {
629 return type.index() == TYPE_INDEX<Assets>;
630 }
631
CalculateHashKeyForOneField(const Field & field,const VBucket & vBucket,bool allowEmpty,CollateType collateType,std::vector<uint8_t> & hashValue)632 int CloudStorageUtils::CalculateHashKeyForOneField(const Field &field, const VBucket &vBucket, bool allowEmpty,
633 CollateType collateType, std::vector<uint8_t> &hashValue)
634 {
635 Type type;
636 bool isExisted = GetTypeCaseInsensitive(field.colName, vBucket, type);
637 if (allowEmpty && !isExisted) {
638 return E_OK; // if vBucket from cloud doesn't contain primary key and allowEmpty, no need to calculate hash
639 }
640 static std::map<int32_t, std::function<int(const VBucket &, const Field &, CollateType,
641 std::vector<uint8_t> &)>> toVecFunc = {
642 { TYPE_INDEX<int64_t>, &CloudStorageUtils::Int64ToVector },
643 { TYPE_INDEX<bool>, &CloudStorageUtils::BoolToVector },
644 { TYPE_INDEX<double>, &CloudStorageUtils::DoubleToVector },
645 { TYPE_INDEX<std::string>, &CloudStorageUtils::TextToVector },
646 { TYPE_INDEX<Bytes>, &CloudStorageUtils::BlobToVector },
647 { TYPE_INDEX<Asset>, &CloudStorageUtils::BlobToVector },
648 { TYPE_INDEX<Assets>, &CloudStorageUtils::BlobToVector },
649 };
650 auto it = toVecFunc.find(field.type);
651 if (it == toVecFunc.end()) {
652 LOGE("unknown cloud type when convert field to vector.");
653 return -E_CLOUD_ERROR;
654 }
655 std::vector<uint8_t> value;
656 int errCode = it->second(vBucket, field, collateType, value);
657 if (errCode != E_OK) {
658 LOGE("convert cloud field fail, %d", errCode);
659 return errCode;
660 }
661 return DBCommon::CalcValueHash(value, hashValue);
662 }
663
IsAssetsContainDuplicateAsset(Assets & assets)664 bool CloudStorageUtils::IsAssetsContainDuplicateAsset(Assets &assets)
665 {
666 std::set<std::string> set;
667 for (const auto &asset : assets) {
668 if (set.find(asset.name) != set.end()) {
669 LOGE("assets contain duplicate Asset");
670 return true;
671 }
672 set.insert(asset.name);
673 }
674 return false;
675 }
676
EraseNoChangeAsset(std::map<std::string,Assets> & assetsMap)677 void CloudStorageUtils::EraseNoChangeAsset(std::map<std::string, Assets> &assetsMap)
678 {
679 for (auto items = assetsMap.begin(); items != assetsMap.end();) {
680 for (auto item = items->second.begin(); item != items->second.end();) {
681 if (static_cast<AssetOpType>((*item).flag) == AssetOpType::NO_CHANGE) {
682 item = items->second.erase(item);
683 } else {
684 item++;
685 }
686 }
687 if (items->second.empty()) {
688 items = assetsMap.erase(items);
689 } else {
690 items++;
691 }
692 }
693 }
694
MergeDownloadAsset(std::map<std::string,Assets> & downloadAssets,std::map<std::string,Assets> & mergeAssets)695 void CloudStorageUtils::MergeDownloadAsset(std::map<std::string, Assets> &downloadAssets,
696 std::map<std::string, Assets> &mergeAssets)
697 {
698 for (auto &items: mergeAssets) {
699 auto downloadItem = downloadAssets.find(items.first);
700 if (downloadItem == downloadAssets.end()) {
701 continue;
702 }
703 std::map<std::string, size_t> beCoveredAssetsMap = GenAssetsIndexMap(items.second);
704 for (const Asset &asset: downloadItem->second) {
705 auto it = beCoveredAssetsMap.find(asset.name);
706 if (it == beCoveredAssetsMap.end()) {
707 continue;
708 }
709 items.second[it->second] = asset;
710 }
711 }
712 }
713
GenAssetsIndexMap(Assets & assets)714 std::map<std::string, size_t> CloudStorageUtils::GenAssetsIndexMap(Assets &assets)
715 {
716 // key of assetsIndexMap is name of asset, the value of it is index.
717 std::map<std::string, size_t> assetsIndexMap;
718 for (size_t i = 0; i < assets.size(); i++) {
719 assetsIndexMap[assets[i].name] = i;
720 }
721 return assetsIndexMap;
722 }
723
IsVbucketContainsAllPK(const VBucket & vBucket,const std::set<std::string> & pkSet)724 bool CloudStorageUtils::IsVbucketContainsAllPK(const VBucket &vBucket, const std::set<std::string> &pkSet)
725 {
726 if (pkSet.empty()) {
727 return false;
728 }
729 for (const auto &pk : pkSet) {
730 Type type;
731 bool isExisted = GetTypeCaseInsensitive(pk, vBucket, type);
732 if (!isExisted) {
733 return false;
734 }
735 }
736 return true;
737 }
738
IsSharedTable(const TableSchema & tableSchema)739 bool CloudStorageUtils::IsSharedTable(const TableSchema &tableSchema)
740 {
741 return tableSchema.sharedTableName == tableSchema.name;
742 }
743
IsViolationOfConstraints(const std::string & name,const std::vector<FieldInfo> & fieldInfos)744 static bool IsViolationOfConstraints(const std::string &name, const std::vector<FieldInfo> &fieldInfos)
745 {
746 for (const auto &field : fieldInfos) {
747 if (name != field.GetFieldName()) {
748 continue;
749 }
750 if (field.GetStorageType() == StorageType::STORAGE_TYPE_REAL) {
751 LOGE("[ConstraintsCheckForCloud] Not support create distributed table with real primary key.");
752 return true;
753 } else if (field.IsAssetType() || field.IsAssetsType()) {
754 LOGE("[ConstraintsCheckForCloud] Not support create distributed table with asset primary key.");
755 return true;
756 } else {
757 return false;
758 }
759 }
760 return false;
761 }
762
ConstraintsCheckForCloud(const TableInfo & table,const std::string & trimmedSql)763 int CloudStorageUtils::ConstraintsCheckForCloud(const TableInfo &table, const std::string &trimmedSql)
764 {
765 if (DBCommon::HasConstraint(trimmedSql, "UNIQUE", " ,", " ,)(")) {
766 LOGE("[ConstraintsCheckForCloud] Not support create distributed table with 'UNIQUE' constraint.");
767 return -E_NOT_SUPPORT;
768 }
769
770 const std::map<int, FieldName> &primaryKeys = table.GetPrimaryKey();
771 const std::vector<FieldInfo> &fieldInfos = table.GetFieldInfos();
772 for (const auto &item : primaryKeys) {
773 if (IsViolationOfConstraints(item.second, fieldInfos)) {
774 return -E_NOT_SUPPORT;
775 }
776 }
777 return E_OK;
778 }
779
CheckAssetStatus(const Assets & assets)780 bool CloudStorageUtils::CheckAssetStatus(const Assets &assets)
781 {
782 for (const Asset &asset: assets) {
783 if (AssetOperationUtils::EraseBitMask(asset.status) > static_cast<uint32_t>(AssetStatus::UPDATE)) {
784 LOGE("assets contain invalid status:[%u]", asset.status);
785 return false;
786 }
787 }
788 return true;
789 }
790
GetTableRefUpdateSql(const TableInfo & table,OpType opType)791 std::string CloudStorageUtils::GetTableRefUpdateSql(const TableInfo &table, OpType opType)
792 {
793 std::string sql;
794 std::string rowid = std::string(DBConstant::SQLITE_INNER_ROWID);
795 for (const auto &reference : table.GetTableReference()) {
796 if (reference.columns.empty()) {
797 return "";
798 }
799 std::string sourceLogName = DBCommon::GetLogTableName(reference.sourceTableName);
800 sql += " UPDATE " + sourceLogName + " SET timestamp=get_raw_sys_time(), flag=flag|0x02 WHERE ";
801 int index = 0;
802 for (const auto &itCol : reference.columns) {
803 if (opType != OpType::UPDATE) {
804 continue;
805 }
806 if (index++ != 0) {
807 sql += " OR ";
808 }
809 sql += " (OLD." + itCol.second + " IS NOT " + " NEW." + itCol.second + ")";
810 }
811 if (opType == OpType::UPDATE) {
812 sql += " AND ";
813 }
814 sql += " (flag&0x08=0x00) AND data_key IN (SELECT " + sourceLogName + ".data_key FROM " + sourceLogName +
815 " LEFT JOIN " + reference.sourceTableName + " ON " + sourceLogName + ".data_key = " +
816 reference.sourceTableName + "." + rowid + " WHERE ";
817 index = 0;
818 for (const auto &itCol : reference.columns) {
819 if (index++ != 0) {
820 sql += " OR ";
821 }
822 if (opType == OpType::UPDATE) {
823 sql += itCol.first + "=OLD." + itCol.second + " OR " + itCol.first + "=NEW." + itCol.second;
824 } else if (opType == OpType::INSERT) {
825 sql += itCol.first + "=NEW." + itCol.second;
826 } else if (opType == OpType::DELETE) {
827 sql += itCol.first + "=OLD." + itCol.second;
828 }
829 }
830 sql += ");";
831 }
832 return sql;
833 }
834
GetLeftJoinLogSql(const std::string & tableName,bool logAsTableA)835 std::string CloudStorageUtils::GetLeftJoinLogSql(const std::string &tableName, bool logAsTableA)
836 {
837 std::string sql;
838 if (logAsTableA) {
839 sql += " FROM '" + DBCommon::GetLogTableName(tableName) + "' AS a LEFT JOIN '" + tableName + "' AS b " +
840 " ON (a.data_key = b." + std::string(DBConstant::SQLITE_INNER_ROWID) + ")";
841 } else {
842 sql += " FROM '" + DBCommon::GetLogTableName(tableName) + "' AS b LEFT JOIN '" + tableName + "' AS a " +
843 " ON (b.data_key = a." + std::string(DBConstant::SQLITE_INNER_ROWID) + ")";
844 }
845 return sql;
846 }
847
ChkFillCloudAssetParam(const CloudSyncBatch & data,int errCode)848 bool CloudStorageUtils::ChkFillCloudAssetParam(const CloudSyncBatch &data, int errCode)
849 {
850 if (data.assets.empty()) {
851 errCode = E_OK;
852 return true;
853 }
854 if (data.rowid.empty() || data.timestamp.empty()) {
855 errCode = -E_INVALID_ARGS;
856 LOGE("param is empty when fill cloud Asset. rowidN:%u, timeN:%u", errCode, data.rowid.size(),
857 data.timestamp.size());
858 return true;
859 }
860 if (data.assets.size() != data.rowid.size() || data.assets.size() != data.timestamp.size() ||
861 data.assets.size() != data.hashKey.size() || data.assets.size() != data.extend.size()) {
862 errCode = -E_INVALID_ARGS;
863 LOGE("the num of param is invalid when fill cloud Asset. assetsN:%u, rowidN:%u, timeN:%u, "
864 "hashKeyN:%u, extendN:%u", data.assets.size(), data.rowid.size(), data.timestamp.size(),
865 data.hashKey.size(), data.extend.size());
866 return true;
867 }
868 return false;
869 }
870
GetToBeRemoveAssets(const VBucket & vBucket,const AssetOperationUtils::RecordAssetOpType & assetOpType,std::vector<Asset> & removeAssets)871 void CloudStorageUtils::GetToBeRemoveAssets(const VBucket &vBucket,
872 const AssetOperationUtils::RecordAssetOpType &assetOpType, std::vector<Asset> &removeAssets)
873 {
874 for (const auto &col: assetOpType) {
875 Type itItem;
876 bool isExisted = GetTypeCaseInsensitive(col.first, vBucket, itItem);
877 if (!isExisted) {
878 continue;
879 }
880 if (!CloudStorageUtils::IsAsset(itItem) && !CloudStorageUtils::IsAssets(itItem)) {
881 continue;
882 }
883 if (CloudStorageUtils::IsAsset(itItem)) {
884 Asset delAsset;
885 GetValueFromType(itItem, delAsset);
886 auto itOp = col.second.find(delAsset.name);
887 if (itOp != col.second.end() && itOp->second == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
888 removeAssets.push_back(delAsset);
889 }
890 continue;
891 }
892 Assets assets;
893 GetValueFromType(itItem, assets);
894 for (const auto &asset: assets) {
895 auto itOp = col.second.find(asset.name);
896 if (itOp == col.second.end() || itOp->second == AssetOperationUtils::AssetOpType::HANDLE) {
897 continue;
898 }
899 removeAssets.push_back(asset);
900 }
901 }
902 }
903
FillAssetForUploadFailed(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)904 int CloudStorageUtils::FillAssetForUploadFailed(Asset &asset, Asset &dbAsset,
905 AssetOperationUtils::AssetOpType assetOpType)
906 {
907 dbAsset.assetId = asset.assetId;
908 dbAsset.status &= ~AssetStatus::UPLOADING;
909 return E_OK;
910 }
911
FillAssetsForUploadFailed(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)912 void CloudStorageUtils::FillAssetsForUploadFailed(Assets &assets, Assets &dbAssets,
913 const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
914 {
915 MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetForUploadFailed);
916 }
917
FillAssetAfterDownloadFail(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)918 int CloudStorageUtils::FillAssetAfterDownloadFail(Asset &asset, Asset &dbAsset,
919 AssetOperationUtils::AssetOpType assetOpType)
920 {
921 AssetStatus status = static_cast<AssetStatus>(asset.status);
922 if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
923 return E_OK;
924 }
925 if (status != AssetStatus::ABNORMAL) {
926 return FillAssetAfterDownload(asset, dbAsset, assetOpType);
927 }
928 AssetOpType flag = static_cast<AssetOpType>(asset.flag);
929 dbAsset = asset;
930 switch (flag) {
931 case AssetOpType::INSERT:
932 case AssetOpType::DELETE:
933 case AssetOpType::UPDATE: {
934 dbAsset.hash = std::string("");
935 break;
936 }
937 default:
938 // other flag type do not need to clear hash
939 break;
940 }
941 return E_OK;
942 }
943
FillAssetsAfterDownloadFail(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)944 void CloudStorageUtils::FillAssetsAfterDownloadFail(Assets &assets, Assets &dbAssets,
945 const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
946 {
947 MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetAfterDownloadFail);
948 }
949
MergeAssetWithFillFunc(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap,std::function<int (Asset &,Asset &,AssetOperationUtils::AssetOpType)> fillAsset)950 void CloudStorageUtils::MergeAssetWithFillFunc(Assets &assets, Assets &dbAssets, const std::map<std::string,
951 AssetOperationUtils::AssetOpType> &assetOpTypeMap,
952 std::function<int(Asset &, Asset &, AssetOperationUtils::AssetOpType)> fillAsset)
953 {
954 std::map<std::string, size_t> indexMap = GenAssetsIndexMap(assets);
955 for (auto dbAsset = dbAssets.begin(); dbAsset != dbAssets.end();) {
956 Asset cacheAsset;
957 auto it = indexMap.find(dbAsset->name);
958 if (it != indexMap.end()) {
959 cacheAsset = assets[it->second];
960 }
961 AssetOperationUtils::AssetOpType opType = AssetOperationUtils::AssetOpType::NOT_HANDLE;
962 auto iterOp = assetOpTypeMap.find(dbAsset->name);
963 if (iterOp != assetOpTypeMap.end()) {
964 opType = iterOp->second;
965 }
966 if (fillAsset(cacheAsset, *dbAsset, opType) == -E_NOT_FOUND) {
967 dbAsset = dbAssets.erase(dbAsset);
968 } else {
969 dbAsset++;
970 }
971 }
972 }
973
GetHashValueWithPrimaryKeyMap(const VBucket & vBucket,const TableSchema & tableSchema,const TableInfo & localTable,const std::map<std::string,Field> & pkMap,bool allowEmpty)974 std::pair<int, std::vector<uint8_t>> CloudStorageUtils::GetHashValueWithPrimaryKeyMap(const VBucket &vBucket,
975 const TableSchema &tableSchema, const TableInfo &localTable, const std::map<std::string, Field> &pkMap,
976 bool allowEmpty)
977 {
978 int errCode = E_OK;
979 std::vector<uint8_t> hashValue;
980 if (pkMap.size() == 0) {
981 LOGE("do not support get hashValue when primaryKey map is empty.");
982 return { -E_INTERNAL_ERROR, {} };
983 } else if (pkMap.size() == 1) {
984 std::vector<Field> pkVec = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema);
985 FieldInfoMap fieldInfos = localTable.GetFields();
986 if (fieldInfos.find(pkMap.begin()->first) == fieldInfos.end()) {
987 LOGE("localSchema doesn't contain primary key.");
988 return { -E_INTERNAL_ERROR, {} };
989 }
990 CollateType collateType = fieldInfos.at(pkMap.begin()->first).GetCollateType();
991 errCode = CloudStorageUtils::CalculateHashKeyForOneField(
992 pkVec.at(0), vBucket, allowEmpty, collateType, hashValue);
993 } else {
994 std::vector<uint8_t> tempRes;
995 for (const auto &item: pkMap) {
996 FieldInfoMap fieldInfos = localTable.GetFields();
997 if (fieldInfos.find(item.first) == fieldInfos.end()) {
998 LOGE("localSchema doesn't contain primary key in multi pks.");
999 return { -E_INTERNAL_ERROR, {} };
1000 }
1001 std::vector<uint8_t> temp;
1002 CollateType collateType = fieldInfos.at(item.first).GetCollateType();
1003 errCode = CloudStorageUtils::CalculateHashKeyForOneField(
1004 item.second, vBucket, allowEmpty, collateType, temp);
1005 if (errCode != E_OK) {
1006 LOGE("calc hash fail when there is more than one primary key. errCode = %d", errCode);
1007 return { errCode, {} };
1008 }
1009 tempRes.insert(tempRes.end(), temp.begin(), temp.end());
1010 }
1011 errCode = DBCommon::CalcValueHash(tempRes, hashValue);
1012 }
1013 return { errCode, hashValue };
1014 }
1015
TransferFieldToLower(VBucket & vBucket)1016 void CloudStorageUtils::TransferFieldToLower(VBucket &vBucket)
1017 {
1018 for (auto it = vBucket.begin(); it != vBucket.end();) {
1019 std::string lowerField(it->first.length(), ' ');
1020 std::transform(it->first.begin(), it->first.end(), lowerField.begin(), tolower);
1021 if (lowerField != it->first) {
1022 vBucket[lowerField] = std::move(vBucket[it->first]);
1023 vBucket.erase(it++);
1024 } else {
1025 it++;
1026 }
1027 }
1028 }
1029
GetTypeCaseInsensitive(const std::string & fieldName,const VBucket & vBucket,Type & data)1030 bool CloudStorageUtils::GetTypeCaseInsensitive(const std::string &fieldName, const VBucket &vBucket, Type &data)
1031 {
1032 auto tmpFieldName = fieldName;
1033 auto tmpVBucket = vBucket;
1034 std::transform(tmpFieldName.begin(), tmpFieldName.end(), tmpFieldName.begin(), tolower);
1035 TransferFieldToLower(tmpVBucket);
1036 auto it = tmpVBucket.find(tmpFieldName);
1037 if (it == tmpVBucket.end()) {
1038 return false;
1039 }
1040 data = it->second;
1041 return true;
1042 }
1043
TransferSchemaFieldToLower(TableSchema & tableSchema)1044 void CloudStorageUtils::TransferSchemaFieldToLower(TableSchema &tableSchema)
1045 {
1046 for (auto &field : tableSchema.fields) {
1047 std::transform(field.colName.begin(), field.colName.end(), field.colName.begin(), tolower);
1048 }
1049 }
1050
CheckCloudSchemaFields(const TableSchema & tableSchema,const TableSchema & oldSchema)1051 bool CloudStorageUtils::CheckCloudSchemaFields(const TableSchema &tableSchema, const TableSchema &oldSchema)
1052 {
1053 if (tableSchema.name != oldSchema.name) {
1054 return true;
1055 }
1056 for (const auto &oldField : oldSchema.fields) {
1057 auto it = std::find_if(tableSchema.fields.begin(), tableSchema.fields.end(),
1058 [&oldField](const std::vector<Field>::value_type &field) {
1059 return oldField == field;
1060 });
1061 if (it == tableSchema.fields.end()) {
1062 return false;
1063 }
1064 }
1065 return true;
1066 }
1067 }
1068