1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "cloud/cloud_storage_utils.h"
17 #include <set>
18
19 #include "cloud/cloud_db_types.h"
20 #include "db_common.h"
21 #include "runtime_context.h"
22
23 namespace DistributedDB {
BindInt64(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)24 int CloudStorageUtils::BindInt64(int index, const VBucket &vBucket, const Field &field,
25 sqlite3_stmt *upsertStmt)
26 {
27 int64_t val = 0;
28 int errCode = GetValueFromVBucket<int64_t>(field.colName, vBucket, val);
29 if (field.nullable && errCode == -E_NOT_FOUND) {
30 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
31 } else {
32 if (errCode != E_OK) {
33 LOGE("get int from vbucket failed, %d", errCode);
34 return -E_CLOUD_ERROR;
35 }
36 errCode = SQLiteUtils::BindInt64ToStatement(upsertStmt, index, val);
37 }
38
39 if (errCode != E_OK) {
40 LOGE("Bind int to insert statement failed, %d", errCode);
41 }
42 return errCode;
43 }
44
BindBool(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)45 int CloudStorageUtils::BindBool(int index, const VBucket &vBucket, const Field &field,
46 sqlite3_stmt *upsertStmt)
47 {
48 bool val = false;
49 int errCode = GetValueFromVBucket<bool>(field.colName, vBucket, val);
50 if (field.nullable && errCode == -E_NOT_FOUND) {
51 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
52 } else {
53 if (errCode != E_OK) {
54 LOGE("get bool from vbucket failed, %d", errCode);
55 return -E_CLOUD_ERROR;
56 }
57 errCode = SQLiteUtils::BindInt64ToStatement(upsertStmt, index, val);
58 }
59
60 if (errCode != E_OK) {
61 LOGE("Bind bool to insert statement failed, %d", errCode);
62 }
63 return errCode;
64 }
65
BindDouble(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)66 int CloudStorageUtils::BindDouble(int index, const VBucket &vBucket, const Field &field,
67 sqlite3_stmt *upsertStmt)
68 {
69 double val = 0.0;
70 int errCode = GetValueFromVBucket<double>(field.colName, vBucket, val);
71 if (field.nullable && errCode == -E_NOT_FOUND) {
72 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
73 } else {
74 if (errCode != E_OK) {
75 LOGE("get double from vbucket failed, %d", errCode);
76 return -E_CLOUD_ERROR;
77 }
78 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_double(upsertStmt, index, val));
79 }
80
81 if (errCode != E_OK) {
82 LOGE("Bind double to insert statement failed, %d", errCode);
83 }
84 return errCode;
85 }
86
BindText(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)87 int CloudStorageUtils::BindText(int index, const VBucket &vBucket, const Field &field,
88 sqlite3_stmt *upsertStmt)
89 {
90 std::string str;
91 int errCode = GetValueFromVBucket<std::string>(field.colName, vBucket, str);
92 if (field.nullable && errCode == -E_NOT_FOUND) {
93 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
94 } else {
95 if (errCode != E_OK) {
96 LOGE("get string from vbucket failed, %d", errCode);
97 return -E_CLOUD_ERROR;
98 }
99 errCode = SQLiteUtils::BindTextToStatement(upsertStmt, index, str);
100 }
101
102 if (errCode != E_OK) {
103 LOGE("Bind string to insert statement failed, %d", errCode);
104 }
105 return errCode;
106 }
107
BindBlob(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)108 int CloudStorageUtils::BindBlob(int index, const VBucket &vBucket, const Field &field,
109 sqlite3_stmt *upsertStmt)
110 {
111 int errCode = E_OK;
112 Bytes val;
113 if (field.type == TYPE_INDEX<Bytes>) {
114 errCode = GetValueFromVBucket<Bytes>(field.colName, vBucket, val);
115 if (!(IsFieldValid(field, errCode))) {
116 goto ERROR;
117 }
118 } else if (field.type == TYPE_INDEX<Asset>) {
119 Asset asset;
120 errCode = GetValueFromVBucket(field.colName, vBucket, asset);
121 if (!(IsFieldValid(field, errCode))) {
122 goto ERROR;
123 }
124 RuntimeContext::GetInstance()->AssetToBlob(asset, val);
125 } else {
126 Assets assets;
127 errCode = GetValueFromVBucket(field.colName, vBucket, assets);
128 if (!(IsFieldValid(field, errCode))) {
129 goto ERROR;
130 }
131 RuntimeContext::GetInstance()->AssetsToBlob(assets, val);
132 }
133
134 if (errCode == -E_NOT_FOUND) {
135 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
136 } else {
137 errCode = SQLiteUtils::BindBlobToStatement(upsertStmt, index, val);
138 }
139 if (errCode != E_OK) {
140 LOGE("Bind blob to insert statement failed, %d", errCode);
141 }
142 return errCode;
143 ERROR:
144 LOGE("get blob from vbucket failed, %d", errCode);
145 return -E_CLOUD_ERROR;
146 }
147
BindAsset(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)148 int CloudStorageUtils::BindAsset(int index, const VBucket &vBucket, const Field &field, sqlite3_stmt *upsertStmt)
149 {
150 int errCode;
151 Bytes val;
152 auto entry = vBucket.find(field.colName);
153 if (entry == vBucket.end() || entry->second.index() == TYPE_INDEX<Nil>) {
154 if (!field.nullable) {
155 LOGE("field value is not allowed to be null, %d", -E_CLOUD_ERROR);
156 return -E_CLOUD_ERROR;
157 }
158 return SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
159 }
160
161 Type type = entry->second;
162 if (field.type == TYPE_INDEX<Asset>) {
163 Asset asset;
164 errCode = GetValueFromOneField(type, asset);
165 if (errCode != E_OK) {
166 LOGE("can not get asset from vBucket when bind, %d", errCode);
167 return errCode;
168 }
169 errCode = RuntimeContext::GetInstance()->AssetToBlob(asset, val);
170 } else if (field.type == TYPE_INDEX<Assets>) {
171 Assets assets;
172 errCode = GetValueFromOneField(type, assets);
173 if (errCode != E_OK) {
174 LOGE("can not get assets from vBucket when bind, %d", errCode);
175 return errCode;
176 }
177 if (!assets.empty()) {
178 errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, val);
179 }
180 } else {
181 LOGE("field type is not asset or assets, %d", -E_CLOUD_ERROR);
182 return -E_CLOUD_ERROR;
183 }
184 if (errCode != E_OK) {
185 LOGE("assets or asset to blob fail, %d", -E_CLOUD_ERROR);
186 return -E_CLOUD_ERROR;
187 }
188 if (val.empty()) {
189 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
190 } else {
191 errCode = SQLiteUtils::BindBlobToStatement(upsertStmt, index, val);
192 }
193 return errCode;
194 }
195
Int64ToVector(const VBucket & vBucket,const Field & field,std::vector<uint8_t> & value)196 int CloudStorageUtils::Int64ToVector(const VBucket &vBucket, const Field &field, std::vector<uint8_t> &value)
197 {
198 int64_t val = 0;
199 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
200 return -E_CLOUD_ERROR;
201 }
202 DBCommon::StringToVector(std::to_string(val), value);
203 return E_OK;
204 }
205
BoolToVector(const VBucket & vBucket,const Field & field,std::vector<uint8_t> & value)206 int CloudStorageUtils::BoolToVector(const VBucket &vBucket, const Field &field, std::vector<uint8_t> &value)
207 {
208 bool val = false;
209 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
210 return -E_CLOUD_ERROR;
211 }
212 DBCommon::StringToVector(std::to_string(val ? 1 : 0), value);
213 return E_OK;
214 }
215
DoubleToVector(const VBucket & vBucket,const Field & field,std::vector<uint8_t> & value)216 int CloudStorageUtils::DoubleToVector(const VBucket &vBucket, const Field &field, std::vector<uint8_t> &value)
217 {
218 double val = 0.0;
219 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
220 return -E_CLOUD_ERROR;
221 }
222 std::ostringstream s;
223 s << val;
224 DBCommon::StringToVector(s.str(), value);
225 return E_OK;
226 }
227
TextToVector(const VBucket & vBucket,const Field & field,std::vector<uint8_t> & value)228 int CloudStorageUtils::TextToVector(const VBucket &vBucket, const Field &field, std::vector<uint8_t> &value)
229 {
230 std::string val;
231 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
232 return -E_CLOUD_ERROR;
233 }
234 DBCommon::StringToVector(val, value);
235 return E_OK;
236 }
237
BlobToVector(const VBucket & vBucket,const Field & field,std::vector<uint8_t> & value)238 int CloudStorageUtils::BlobToVector(const VBucket &vBucket, const Field &field, std::vector<uint8_t> &value)
239 {
240 if (field.type == TYPE_INDEX<Bytes>) {
241 return CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, value);
242 } else if (field.type == TYPE_INDEX<Asset>) {
243 Asset val;
244 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
245 return -E_CLOUD_ERROR;
246 }
247 int errCode = RuntimeContext::GetInstance()->AssetToBlob(val, value);
248 if (errCode != E_OK) {
249 LOGE("asset to blob fail, %d", errCode);
250 }
251 return errCode;
252 } else {
253 Assets val;
254 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
255 return -E_CLOUD_ERROR;
256 }
257 int errCode = RuntimeContext::GetInstance()->AssetsToBlob(val, value);
258 if (errCode != E_OK) {
259 LOGE("assets to blob fail, %d", errCode);
260 }
261 return errCode;
262 }
263 }
264
GetCloudPrimaryKey(const TableSchema & tableSchema)265 std::set<std::string> CloudStorageUtils::GetCloudPrimaryKey(const TableSchema &tableSchema)
266 {
267 std::set<std::string> pkSet;
268 for (const auto &field : tableSchema.fields) {
269 if (field.primary) {
270 pkSet.insert(field.colName);
271 }
272 }
273 return pkSet;
274 }
275
GetCloudAsset(const TableSchema & tableSchema)276 std::vector<Field> CloudStorageUtils::GetCloudAsset(const TableSchema &tableSchema)
277 {
278 std::vector<Field> assetFields;
279 for (const auto &item: tableSchema.fields) {
280 if (item.type != TYPE_INDEX<Asset> && item.type != TYPE_INDEX<Assets>) {
281 continue;
282 }
283 assetFields.push_back(item);
284 }
285 return assetFields;
286 }
287
GetCloudPrimaryKeyField(const TableSchema & tableSchema)288 std::vector<Field> CloudStorageUtils::GetCloudPrimaryKeyField(const TableSchema &tableSchema)
289 {
290 std::vector<Field> pkVec;
291 for (const auto &field : tableSchema.fields) {
292 if (field.primary) {
293 pkVec.push_back(field);
294 }
295 }
296 return pkVec;
297 }
298
GetCloudPrimaryKeyFieldMap(const TableSchema & tableSchema)299 std::map<std::string, Field> CloudStorageUtils::GetCloudPrimaryKeyFieldMap(const TableSchema &tableSchema)
300 {
301 std::map<std::string, Field> pkMap;
302 for (const auto &field : tableSchema.fields) {
303 if (field.primary) {
304 pkMap[field.colName] = field;
305 }
306 }
307 return pkMap;
308 }
309
GetAssetFieldsFromSchema(const TableSchema & tableSchema,VBucket & vBucket,std::vector<Field> & fields)310 int CloudStorageUtils::GetAssetFieldsFromSchema(const TableSchema &tableSchema, VBucket &vBucket,
311 std::vector<Field> &fields)
312 {
313 for (const auto &field: tableSchema.fields) {
314 auto it = vBucket.find(field.colName);
315 if (it == vBucket.end()) {
316 continue;
317 }
318 if (it->second.index() != TYPE_INDEX<Asset> && it->second.index() != TYPE_INDEX<Assets>) {
319 continue;
320 }
321 fields.push_back(field);
322 }
323 if (fields.empty()) {
324 return -E_CLOUD_ERROR;
325 }
326 return E_OK;
327 }
328
IsContainsPrimaryKey(const TableSchema & tableSchema)329 bool CloudStorageUtils::IsContainsPrimaryKey(const TableSchema &tableSchema)
330 {
331 for (const auto &field : tableSchema.fields) {
332 if (field.primary) {
333 return true;
334 }
335 }
336 return false;
337 }
338
ObtainAssetFromVBucket(const VBucket & vBucket,VBucket & asset)339 void CloudStorageUtils::ObtainAssetFromVBucket(const VBucket &vBucket, VBucket &asset)
340 {
341 for (const auto &item: vBucket) {
342 if (IsAsset(item.second)) {
343 Asset data = std::get<Asset>(item.second);
344 asset.insert_or_assign(item.first, data);
345 } else if (IsAssets(item.second)) {
346 Assets data = std::get<Assets>(item.second);
347 asset.insert_or_assign(item.first, data);
348 }
349 }
350 }
351
StatusToFlag(AssetStatus status)352 AssetOpType CloudStorageUtils::StatusToFlag(AssetStatus status)
353 {
354 switch (status) {
355 case AssetStatus::INSERT:
356 return AssetOpType::INSERT;
357 case AssetStatus::DELETE:
358 return AssetOpType::DELETE;
359 case AssetStatus::UPDATE:
360 return AssetOpType::UPDATE;
361 default:
362 return AssetOpType::NO_CHANGE;
363 }
364 }
365
FlagToStatus(AssetOpType opType)366 AssetStatus CloudStorageUtils::FlagToStatus(AssetOpType opType)
367 {
368 switch (opType) {
369 case AssetOpType::INSERT:
370 return AssetStatus::INSERT;
371 case AssetOpType::DELETE:
372 return AssetStatus::DELETE;
373 case AssetOpType::UPDATE:
374 return AssetStatus::UPDATE;
375 default:
376 return AssetStatus::NORMAL;
377 }
378 }
379
ChangeAssetsOnVBucketToAsset(VBucket & vBucket,std::vector<Field> & fields)380 void CloudStorageUtils::ChangeAssetsOnVBucketToAsset(VBucket &vBucket, std::vector<Field> &fields)
381 {
382 for (const Field &field: fields) {
383 if (field.type == TYPE_INDEX<Asset>) {
384 Type asset = GetAssetFromAssets(vBucket[field.colName]);
385 vBucket[field.colName] = asset;
386 }
387 }
388 }
389
GetAssetFromAssets(Type & value)390 Type CloudStorageUtils::GetAssetFromAssets(Type &value)
391 {
392 Asset assetVal;
393 int errCode = GetValueFromType(value, assetVal);
394 if (errCode == E_OK) {
395 return assetVal;
396 }
397
398 Assets assets;
399 errCode = GetValueFromType(value, assets);
400 if (errCode != E_OK) {
401 return Nil();
402 }
403
404 for (Asset &asset: assets) {
405 if (asset.flag != static_cast<uint32_t>(AssetOpType::DELETE)) {
406 return std::move(asset);
407 }
408 }
409 return Nil();
410 }
411
FillAssetBeforeDownload(Asset & asset)412 void CloudStorageUtils::FillAssetBeforeDownload(Asset &asset)
413 {
414 AssetOpType flag = static_cast<AssetOpType>(asset.flag);
415 AssetStatus status = static_cast<AssetStatus>(asset.status);
416 switch (flag) {
417 case AssetOpType::INSERT: {
418 if (status != AssetStatus::NORMAL) {
419 asset.hash = std::string("");
420 }
421 break;
422 }
423 default:
424 break;
425 }
426 }
427
FillAssetAfterDownloadFail(Asset & asset)428 void CloudStorageUtils::FillAssetAfterDownloadFail(Asset &asset)
429 {
430 AssetOpType flag = static_cast<AssetOpType>(asset.flag);
431 AssetStatus status = static_cast<AssetStatus>(asset.status);
432 switch (flag) {
433 case AssetOpType::INSERT:
434 case AssetOpType::DELETE:
435 case AssetOpType::UPDATE: {
436 if (status != AssetStatus::NORMAL) {
437 asset.hash = std::string("");
438 asset.status = static_cast<uint32_t>(AssetStatus::ABNORMAL);
439 }
440 break;
441 }
442 default:
443 break;
444 }
445 }
446
FillAssetAfterDownload(Asset & asset)447 int CloudStorageUtils::FillAssetAfterDownload(Asset &asset)
448 {
449 AssetOpType flag = static_cast<AssetOpType>(asset.flag);
450 switch (flag) {
451 case AssetOpType::INSERT:
452 case AssetOpType::UPDATE: {
453 asset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
454 break;
455 }
456 case AssetOpType::DELETE: {
457 return -E_NOT_FOUND;
458 }
459 default:
460 break;
461 }
462 return E_OK;
463 }
464
FillAssetsAfterDownload(Assets & assets)465 void CloudStorageUtils::FillAssetsAfterDownload(Assets &assets)
466 {
467 for (auto asset = assets.begin(); asset != assets.end();) {
468 if (FillAssetAfterDownload(*asset) == -E_NOT_FOUND) {
469 asset = assets.erase(asset);
470 } else {
471 asset++;
472 }
473 }
474 }
475
FillAssetForUpload(Asset & asset)476 int CloudStorageUtils::FillAssetForUpload(Asset &asset)
477 {
478 AssetStatus status = static_cast<AssetStatus>(asset.status);
479 switch (StatusToFlag(status)) {
480 case AssetOpType::INSERT:
481 case AssetOpType::UPDATE: {
482 asset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
483 break;
484 }
485 case AssetOpType::DELETE: {
486 return -E_NOT_FOUND;
487 }
488 default: {
489 break;
490 }
491 }
492 return E_OK;
493 }
494
FillAssetsForUpload(Assets & assets)495 void CloudStorageUtils::FillAssetsForUpload(Assets &assets)
496 {
497 for (auto asset = assets.begin(); asset != assets.end();) {
498 if (FillAssetForUpload(*asset) == -E_NOT_FOUND) {
499 asset = assets.erase(asset);
500 } else {
501 asset++;
502 }
503 }
504 }
505
PrepareToFillAssetFromVBucket(VBucket & vBucket,std::function<void (Asset &)> fillAsset)506 void CloudStorageUtils::PrepareToFillAssetFromVBucket(VBucket &vBucket, std::function<void(Asset &)> fillAsset)
507 {
508 for (auto &item: vBucket) {
509 if (IsAsset(item.second)) {
510 Asset asset;
511 GetValueFromType(item.second, asset);
512 fillAsset(asset);
513 vBucket[item.first] = asset;
514 } else if (IsAssets(item.second)) {
515 Assets assets;
516 GetValueFromType(item.second, assets);
517 for (auto &asset: assets) {
518 fillAsset(asset);
519 }
520 vBucket[item.first] = assets;
521 }
522 }
523 }
524
FillAssetFromVBucketFinish(VBucket & vBucket,std::function<int (Asset &)> fillAsset,std::function<void (Assets &)> fillAssets)525 void CloudStorageUtils::FillAssetFromVBucketFinish(VBucket &vBucket, std::function<int(Asset &)> fillAsset,
526 std::function<void(Assets &)> fillAssets)
527 {
528 for (auto &item: vBucket) {
529 if (IsAsset(item.second)) {
530 Asset asset;
531 GetValueFromType(item.second, asset);
532 int errCode = fillAsset(asset);
533 if (errCode != E_OK) {
534 vBucket[item.first] = Nil();
535 } else {
536 vBucket[item.first] = asset;
537 }
538 continue;
539 }
540 if (IsAssets(item.second)) {
541 Assets assets;
542 GetValueFromType(item.second, assets);
543 fillAssets(assets);
544 if (assets.empty()) {
545 vBucket[item.first] = Nil();
546 } else {
547 vBucket[item.first] = assets;
548 }
549 }
550 }
551 }
552
IsAsset(const Type & type)553 bool CloudStorageUtils::IsAsset(const Type &type)
554 {
555 return type.index() == TYPE_INDEX<Asset>;
556 }
557
IsAssets(const Type & type)558 bool CloudStorageUtils::IsAssets(const Type &type)
559 {
560 return type.index() == TYPE_INDEX<Assets>;
561 }
562
CalculateHashKeyForOneField(const Field & field,const VBucket & vBucket,bool allowEmpty,std::vector<uint8_t> & hashValue)563 int CloudStorageUtils::CalculateHashKeyForOneField(const Field &field, const VBucket &vBucket, bool allowEmpty,
564 std::vector<uint8_t> &hashValue)
565 {
566 if (allowEmpty && vBucket.find(field.colName) == vBucket.end()) {
567 return E_OK; // if vBucket from cloud doesn't contain primary key and allowEmpty, no need to calculate hash
568 }
569 static std::map<int32_t, std::function<int(const VBucket &, const Field &, std::vector<uint8_t> &)>> toVecFunc = {
570 { TYPE_INDEX<int64_t>, &CloudStorageUtils::Int64ToVector },
571 { TYPE_INDEX<bool>, &CloudStorageUtils::BoolToVector },
572 { TYPE_INDEX<double>, &CloudStorageUtils::DoubleToVector },
573 { TYPE_INDEX<std::string>, &CloudStorageUtils::TextToVector },
574 { TYPE_INDEX<Bytes>, &CloudStorageUtils::BlobToVector },
575 { TYPE_INDEX<Asset>, &CloudStorageUtils::BlobToVector },
576 { TYPE_INDEX<Assets>, &CloudStorageUtils::BlobToVector },
577 };
578 auto it = toVecFunc.find(field.type);
579 if (it == toVecFunc.end()) {
580 LOGE("unknown cloud type when convert field to vector.");
581 return -E_CLOUD_ERROR;
582 }
583 std::vector<uint8_t> value;
584 int errCode = it->second(vBucket, field, value);
585 if (errCode != E_OK) {
586 LOGE("convert cloud field fail, %d", errCode);
587 return errCode;
588 }
589 return DBCommon::CalcValueHash(value, hashValue);
590 }
591
IsAssetsContainDuplicateAsset(Assets & assets)592 bool CloudStorageUtils::IsAssetsContainDuplicateAsset(Assets &assets)
593 {
594 std::set<std::string> set;
595 for (const auto &asset : assets) {
596 if (set.find(asset.name) != set.end()) {
597 LOGE("assets contain duplicate Asset");
598 return true;
599 }
600 set.insert(asset.name);
601 }
602 return false;
603 }
604
EraseNoChangeAsset(std::map<std::string,Assets> & assetsMap)605 void CloudStorageUtils::EraseNoChangeAsset(std::map<std::string, Assets> &assetsMap)
606 {
607 for (auto items = assetsMap.begin(); items != assetsMap.end();) {
608 for (auto item = items->second.begin(); item != items->second.end();) {
609 if (static_cast<AssetOpType>((*item).flag) == AssetOpType::NO_CHANGE) {
610 item = items->second.erase(item);
611 } else {
612 item++;
613 }
614 }
615 if (items->second.empty()) {
616 items = assetsMap.erase(items);
617 } else {
618 items++;
619 }
620 }
621 }
622
MergeDownloadAsset(std::map<std::string,Assets> & downloadAssets,std::map<std::string,Assets> & mergeAssets)623 void CloudStorageUtils::MergeDownloadAsset(std::map<std::string, Assets> &downloadAssets,
624 std::map<std::string, Assets> &mergeAssets)
625 {
626 for (auto &items: mergeAssets) {
627 auto downloadItem = downloadAssets.find(items.first);
628 if (downloadItem == downloadAssets.end()) {
629 continue;
630 }
631 std::map<std::string, size_t> beCoveredAssetsMap = GenAssetsIndexMap(items.second);
632 for (const Asset &asset: downloadItem->second) {
633 auto it = beCoveredAssetsMap.find(asset.name);
634 if (it == beCoveredAssetsMap.end()) {
635 continue;
636 }
637 items.second[it->second] = asset;
638 }
639 }
640 }
641
GenAssetsIndexMap(Assets & assets)642 std::map<std::string, size_t> CloudStorageUtils::GenAssetsIndexMap(Assets &assets)
643 {
644 // key of assetsIndexMap is name of asset, the value of it is index.
645 std::map<std::string, size_t> assetsIndexMap;
646 for (size_t i = 0; i < assets.size(); i++) {
647 assetsIndexMap[assets[i].name] = i;
648 }
649 return assetsIndexMap;
650 }
651
IsVbucketContainsAllPK(const VBucket & vBucket,const std::set<std::string> & pkSet)652 bool CloudStorageUtils::IsVbucketContainsAllPK(const VBucket &vBucket, const std::set<std::string> &pkSet)
653 {
654 if (pkSet.empty()) {
655 return false;
656 }
657 for (const auto &pk : pkSet) {
658 if (vBucket.find(pk) == vBucket.end()) {
659 return false;
660 }
661 }
662 return true;
663 }
664
IsViolationOfConstraints(const std::string & name,const std::vector<FieldInfo> & fieldInfos)665 static bool IsViolationOfConstraints(const std::string &name, const std::vector<FieldInfo> &fieldInfos)
666 {
667 for (const auto &field : fieldInfos) {
668 if (name == field.GetFieldName()) {
669 if (field.GetStorageType() == StorageType::STORAGE_TYPE_REAL) {
670 LOGE("[ConstraintsCheckForCloud] Not support create distributed table with real primary key.");
671 return true;
672 } else if (field.IsAssetType() || field.IsAssetsType()) {
673 LOGE("[ConstraintsCheckForCloud] Not support create distributed table with asset primary key.");
674 return true;
675 } else {
676 return false;
677 }
678 }
679 }
680 return false;
681 }
682
ConstraintsCheckForCloud(const TableInfo & table,const std::string & trimmedSql)683 int CloudStorageUtils::ConstraintsCheckForCloud(const TableInfo &table, const std::string &trimmedSql)
684 {
685 if (DBCommon::HasConstraint(trimmedSql, "UNIQUE", " ,", " ,)(")) {
686 LOGE("[ConstraintsCheckForCloud] Not support create distributed table with 'UNIQUE' constraint.");
687 return -E_NOT_SUPPORT;
688 }
689
690 const std::map<int, FieldName> &primaryKeys = table.GetPrimaryKey();
691 const std::vector<FieldInfo> &fieldInfos = table.GetFieldInfos();
692 for (const auto &item : primaryKeys) {
693 if (IsViolationOfConstraints(item.second, fieldInfos)) {
694 return -E_NOT_SUPPORT;
695 }
696 }
697 return E_OK;
698 }
699
CheckAssetStatus(const Assets & assets)700 bool CloudStorageUtils::CheckAssetStatus(const Assets &assets)
701 {
702 for (const Asset &asset: assets) {
703 if (asset.status > static_cast<uint32_t>(AssetStatus::UPDATE)) {
704 LOGE("assets contain invalid status:[%u]", asset.status);
705 return false;
706 }
707 }
708 return true;
709 }
710 }
711