1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_db_types.h"
19 #include "distributeddb_data_generate_unit_test.h"
20 #include "log_print.h"
21 #include "relational_store_delegate.h"
22 #include "relational_store_manager.h"
23 #include "runtime_config.h"
24 #include "time_helper.h"
25 #include "virtual_asset_loader.h"
26 #include "virtual_cloud_data_translate.h"
27 #include "virtual_cloud_db.h"
28 #include "virtual_communicator_aggregator.h"
29 #include "sqlite_relational_utils.h"
30 #include "cloud/cloud_storage_utils.h"
31 #include "cloud_db_sync_utils_test.h"
32
33 namespace {
34 using namespace testing::ext;
35 using namespace DistributedDB;
36 using namespace DistributedDBUnitTest;
37 const char *g_createSQL =
38 "CREATE TABLE IF NOT EXISTS DistributedDBCloudAssetsOperationSyncTest(" \
39 "id TEXT PRIMARY KEY," \
40 "name TEXT," \
41 "height REAL ," \
42 "photo BLOB," \
43 "asset ASSET," \
44 "assets ASSETS," \
45 "age INT);";
46 const int64_t g_syncWaitTime = 60;
47 const int g_assetsNum = 3;
48 const Asset g_localAsset = {
49 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
50 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
51 };
52 SyncProcess lastProcess_;
53
CreateUserDBAndTable(sqlite3 * & db)54 void CreateUserDBAndTable(sqlite3 *&db)
55 {
56 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
57 EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createSQL), SQLITE_OK);
58 }
59
BlockSync(const Query & query,RelationalStoreDelegate * delegate,SyncMode syncMode=SYNC_MODE_CLOUD_MERGE)60 void BlockSync(const Query &query, RelationalStoreDelegate *delegate, SyncMode syncMode = SYNC_MODE_CLOUD_MERGE)
61 {
62 std::mutex dataMutex;
63 std::condition_variable cv;
64 bool finish = false;
65 SyncProcess last;
66 auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
67 for (const auto &item: process) {
68 if (item.second.process == DistributedDB::FINISHED) {
69 {
70 std::lock_guard<std::mutex> autoLock(dataMutex);
71 finish = true;
72 }
73 last = item.second;
74 cv.notify_one();
75 }
76 }
77 };
78 LOGW("begin call sync");
79 ASSERT_EQ(delegate->Sync({ "CLOUD" }, syncMode, query, callback, g_syncWaitTime), OK);
80 std::unique_lock<std::mutex> uniqueLock(dataMutex);
81 cv.wait(uniqueLock, [&finish]() {
82 return finish;
83 });
84 lastProcess_ = last;
85 LOGW("end call sync");
86 }
87
88 class DistributedDBCloudAssetsOperationSyncTest : public testing::Test {
89 public:
90 static void SetUpTestCase();
91 static void TearDownTestCase();
92 void SetUp() override;
93 void TearDown() override;
94 void WriteDataWithoutCommitTransaction();
95 protected:
96 void InitTestDir();
97 DataBaseSchema GetSchema();
98 void CloseDb();
99 void InsertUserTableRecord(const std::string &tableName, int64_t begin, int64_t count, size_t assetCount = 2u,
100 const Assets &templateAsset = {});
101 void UpdateLocalTableRecord(const std::string &tableName, int64_t begin, int64_t count, size_t assetCount = 2u,
102 bool updateAssets = true);
103 void UpdateLocalAssetRecord(const std::string &tableName, int64_t begin, int64_t count);
104 void CheckAssetsCount(const std::vector<size_t> &expectCount, bool checkAsset = false);
105 void UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull);
106 void ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount, int &removeCount);
107 void InsertLocalAssetData(const std::string &assetHash);
108 void InsertCloudAssetData(const std::string &assetHash);
109 void PrepareForAssetOperation010();
110 void UpdateAssetWhenSyncUpload();
111 DBStatus InsertRecordToCloud(const std::vector<VBucket> &record);
112 void PrepareDataInCloud();
113 void LocalAssetRemoveTest();
114
115 static std::vector<Asset> GetAssets(const std::string &baseName, const Assets &templateAsset, size_t assetCount);
116 static std::vector<VBucket> GenerateAssetsRecords(const std::map<std::string, int32_t> &colType,
117 const Asset &templateAsset, int assetsCount, int rowCount);
118 static Asset GenerateAsset(const Asset &templateAsset, int id);
119 static Assets GenerateAssets(const Asset &templateAsset, int id, int assetsCount);
120 std::string testDir_;
121 std::string storePath_;
122 sqlite3 *db_ = nullptr;
123 RelationalStoreDelegate *delegate_ = nullptr;
124 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
125 std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
126 std::shared_ptr<VirtualCloudDataTranslate> virtualTranslator_ = nullptr;
127 std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
128 std::string tableName_ = "DistributedDBCloudAssetsOperationSyncTest";
129 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
130 TrackerSchema trackerSchema = {
131 .tableName = tableName_, .extendColNames = {"name"}, .trackerColNames = {"age"}
132 };
133 };
134
SetUpTestCase()135 void DistributedDBCloudAssetsOperationSyncTest::SetUpTestCase()
136 {
137 RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
138 }
139
TearDownTestCase()140 void DistributedDBCloudAssetsOperationSyncTest::TearDownTestCase()
141 {}
142
SetUp()143 void DistributedDBCloudAssetsOperationSyncTest::SetUp()
144 {
145 DistributedDBToolsUnitTest::PrintTestCaseInfo();
146 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
147 InitTestDir();
148 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
149 LOGE("rm test db files error.");
150 }
151 DistributedDBToolsUnitTest::PrintTestCaseInfo();
152 LOGD("Test dir is %s", testDir_.c_str());
153 db_ = RelationalTestUtils::CreateDataBase(storePath_);
154 ASSERT_NE(db_, nullptr);
155 CreateUserDBAndTable(db_);
156 mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
157 RelationalStoreDelegate::Option option;
158 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
159 ASSERT_NE(delegate_, nullptr);
160 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
161 ASSERT_EQ(delegate_->SetTrackerTable(trackerSchema), DBStatus::OK);
162 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
163 virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
164 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
165 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
166 virtualTranslator_ = std::make_shared<VirtualCloudDataTranslate>();
167 DataBaseSchema dataBaseSchema = GetSchema();
168 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
169 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
170 ASSERT_TRUE(communicatorAggregator_ != nullptr);
171 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
172 }
173
TearDown()174 void DistributedDBCloudAssetsOperationSyncTest::TearDown()
175 {
176 CloseDb();
177 EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
178 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
179 LOGE("rm test db files error.");
180 }
181 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
182 communicatorAggregator_ = nullptr;
183 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
184 }
185
InitTestDir()186 void DistributedDBCloudAssetsOperationSyncTest::InitTestDir()
187 {
188 if (!testDir_.empty()) {
189 return;
190 }
191 DistributedDBToolsUnitTest::TestDirInit(testDir_);
192 storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
193 LOGI("The test db is:%s", testDir_.c_str());
194 }
195
GetSchema()196 DataBaseSchema DistributedDBCloudAssetsOperationSyncTest::GetSchema()
197 {
198 DataBaseSchema schema;
199 TableSchema tableSchema;
200 tableSchema.name = tableName_;
201 tableSchema.sharedTableName = tableName_ + "_shared";
202 tableSchema.fields = {
203 {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
204 {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
205 {"age", TYPE_INDEX<int64_t>}
206 };
207 schema.tables.push_back(tableSchema);
208 return schema;
209 }
210
CloseDb()211 void DistributedDBCloudAssetsOperationSyncTest::CloseDb()
212 {
213 virtualCloudDb_->ForkUpload(nullptr);
214 virtualCloudDb_ = nullptr;
215 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
216 delegate_ = nullptr;
217 mgr_ = nullptr;
218 }
219
InsertUserTableRecord(const std::string & tableName,int64_t begin,int64_t count,size_t assetCount,const Assets & templateAsset)220 void DistributedDBCloudAssetsOperationSyncTest::InsertUserTableRecord(const std::string &tableName, int64_t begin,
221 int64_t count, size_t assetCount, const Assets &templateAsset)
222 {
223 std::string photo = "phone";
224 int errCode;
225 std::vector<uint8_t> assetBlob;
226 std::vector<uint8_t> assetsBlob;
227 const int64_t index2 = 2;
228 for (int64_t i = begin; i < begin + count; ++i) {
229 std::string name = g_localAsset.name + std::to_string(i);
230 Asset asset = g_localAsset;
231 asset.name = name;
232 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
233 std::vector<Asset> assets = GetAssets(name, templateAsset, assetCount);
234 string sql = "INSERT OR REPLACE INTO " + tableName +
235 " (id, name, height, photo, asset, assets, age) VALUES ('" + std::to_string(i) +
236 "', 'local', '178.0', '" + photo + "', ?, ?, '18');";
237 sqlite3_stmt *stmt = nullptr;
238 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
239 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
240 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
241 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, index2, assetsBlob, false), E_OK);
242 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
243 SQLiteUtils::ResetStatement(stmt, true, errCode);
244 }
245 }
246
UpdateLocalTableRecord(const std::string & tableName,int64_t begin,int64_t count,size_t assetCount,bool updateAssets)247 void DistributedDBCloudAssetsOperationSyncTest::UpdateLocalTableRecord(const std::string &tableName, int64_t begin,
248 int64_t count, size_t assetCount, bool updateAssets)
249 {
250 int errCode;
251 std::vector<uint8_t> assetBlob;
252 std::vector<uint8_t> assetsBlob;
253 std::string hash = updateAssets ? "new_hash" : g_localAsset.hash;
254 for (int64_t i = begin; i < begin + count; ++i) {
255 std::string name = g_localAsset.name + std::to_string(i);
256 Asset asset = g_localAsset;
257 asset.name = name;
258 asset.hash = hash;
259 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
260 std::vector<Asset> assets = GetAssets(name, {}, assetCount);
261 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
262 std::string dataName = "new_name_" + std::to_string(i);
263 std::string sql = "UPDATE " + tableName + " SET name = ?, asset = ?, assets = ? where id = " +
264 std::to_string(i);
265 sqlite3_stmt *stmt = nullptr;
266 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
267 ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 1, dataName), E_OK); // 1st bind
268 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetBlob, false), E_OK); // 2nd bind
269 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 3, assetsBlob, false), E_OK); // 3rd bind
270 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
271 SQLiteUtils::ResetStatement(stmt, true, errCode);
272 }
273 }
274
InsertRecordToCloud(const std::vector<VBucket> & record)275 DBStatus DistributedDBCloudAssetsOperationSyncTest::InsertRecordToCloud(const std::vector<VBucket> &record)
276 {
277 std::vector<VBucket> extend;
278 for (size_t i = 0; i < record.size(); ++i) {
279 VBucket log;
280 Timestamp now = TimeHelper::GetSysCurrentTime();
281 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
282 now / CloudDbConstant::TEN_THOUSAND));
283 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
284 now / CloudDbConstant::TEN_THOUSAND));
285 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
286 extend.push_back(log);
287 }
288 std::vector<VBucket> copyRecord = record;
289 return virtualCloudDb_->BatchInsert(tableName_, std::move(copyRecord), extend);
290 }
291
PrepareDataInCloud()292 void DistributedDBCloudAssetsOperationSyncTest::PrepareDataInCloud()
293 {
294 std::map<std::string, int32_t> colType;
295 colType["asset"] = TYPE_INDEX<Asset>;
296 colType["assets"] = TYPE_INDEX<Assets>;
297 Asset templateAsset = g_localAsset;
298 const int assetsCount = 10;
299 const int rowCount = 200;
300 auto recordAssets = GenerateAssetsRecords(colType, templateAsset, assetsCount, rowCount);
301 EXPECT_EQ(InsertRecordToCloud(recordAssets), OK);
302 }
303
GetAssets(const std::string & baseName,const Assets & templateAsset,size_t assetCount)304 std::vector<Asset> DistributedDBCloudAssetsOperationSyncTest::GetAssets(const std::string &baseName,
305 const Assets &templateAsset, size_t assetCount)
306 {
307 std::vector<Asset> assets;
308 for (size_t i = 1; i <= assetCount; ++i) {
309 Asset asset;
310 if (i - 1 < templateAsset.size()) {
311 asset = templateAsset[i - 1];
312 } else {
313 asset = g_localAsset;
314 asset.name = baseName + "_" + std::to_string(i);
315 asset.status = static_cast<uint32_t>(AssetStatus::INSERT);
316 }
317 assets.push_back(asset);
318 }
319 return assets;
320 }
321
UpdateCloudTableRecord(int64_t begin,int64_t count,bool assetIsNull)322 void DistributedDBCloudAssetsOperationSyncTest::UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull)
323 {
324 std::vector<VBucket> record;
325 std::vector<VBucket> extend;
326 Timestamp now = TimeHelper::GetSysCurrentTime();
327 const int assetCount = 2;
328 for (int64_t i = begin; i < (begin + count); ++i) {
329 VBucket data;
330 data.insert_or_assign("id", std::to_string(i));
331 data.insert_or_assign("name", "Cloud" + std::to_string(i));
332 Assets assets;
333 for (int j = 1; j <= assetCount; ++j) {
334 Asset asset;
335 asset.name = "Phone_" + std::to_string(j);
336 asset.assetId = std::to_string(j);
337 asset.status = AssetStatus::UPDATE;
338 assets.push_back(asset);
339 }
340 assetIsNull ? data.insert_or_assign("assets", Nil()) : data.insert_or_assign("assets", assets);
341 record.push_back(data);
342 VBucket log;
343 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
344 now / CloudDbConstant::TEN_THOUSAND));
345 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
346 now / CloudDbConstant::TEN_THOUSAND));
347 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
348 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i));
349 extend.push_back(log);
350 }
351
352 ASSERT_EQ(virtualCloudDb_->BatchUpdate(tableName_, std::move(record), extend), DBStatus::OK);
353 }
354
CheckAssetsCount(const std::vector<size_t> & expectCount,bool checkAsset)355 void DistributedDBCloudAssetsOperationSyncTest::CheckAssetsCount(const std::vector<size_t> &expectCount,
356 bool checkAsset)
357 {
358 std::vector<VBucket> allData;
359 auto dbSchema = GetSchema();
360 ASSERT_GT(dbSchema.tables.size(), 0u);
361 ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
362 int index = 0;
363 ASSERT_EQ(allData.size(), expectCount.size());
364 for (const auto &data : allData) {
365 auto colIter = data.find("assets");
366 EXPECT_NE(colIter, data.end());
367 if (colIter == data.end()) {
368 index++;
369 continue;
370 }
371 Type colValue = data.at("assets");
372 auto translate = std::dynamic_pointer_cast<ICloudDataTranslate>(virtualTranslator_);
373 auto assets = RelationalTestUtils::GetAssets(colValue, translate);
374 size_t size = assets.size();
375 if (checkAsset) {
376 Type colValue1 = data.at("asset");
377 auto assets1 = RelationalTestUtils::GetAssets(colValue1, translate, true);
378 size += assets1.size();
379 }
380 LOGI("[DistributedDBCloudAssetsOperationSyncTest] Check data index %d", index);
381 EXPECT_EQ(static_cast<size_t>(size), expectCount[index]);
382 for (const auto &item : assets) {
383 LOGI("[DistributedDBCloudAssetsOperationSyncTest] Asset name %s status %" PRIu32, item.name.c_str(),
384 item.status);
385 }
386 index++;
387 }
388 }
389
ForkDownloadAndRemoveAsset(DBStatus removeStatus,int & downLoadCount,int & removeCount)390 void DistributedDBCloudAssetsOperationSyncTest::ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount,
391 int &removeCount)
392 {
393 virtualAssetLoader_->ForkDownload([this, &downLoadCount](const std::string &tableName,
394 std::map<std::string, Assets> &assets) {
395 downLoadCount++;
396 if (downLoadCount == 1) {
397 std::string sql = "UPDATE " + tableName_ + " SET assets = NULL WHERE id = 0;";
398 ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
399 }
400 });
401 virtualAssetLoader_->ForkRemoveLocalAssets([removeStatus, &removeCount](const std::vector<Asset> &assets) {
402 EXPECT_EQ(assets.size(), 2u); // one record has 2 asset
403 removeCount++;
404 return removeStatus;
405 });
406 }
407
GenerateAssetsRecords(const std::map<std::string,int32_t> & colType,const Asset & templateAsset,int assetsCount,int rowCount)408 std::vector<VBucket> DistributedDBCloudAssetsOperationSyncTest::GenerateAssetsRecords(
409 const std::map<std::string, int32_t> &colType, const Asset &templateAsset, int assetsCount, int rowCount)
410 {
411 std::vector<VBucket> res;
412 for (int i = 0; i < rowCount; ++i) {
413 VBucket record;
414 record["id"] = std::to_string(i);
415 for (const auto &[col, type] : colType) {
416 if (type == TYPE_INDEX<Asset>) {
417 record[col] = GenerateAsset(templateAsset, i);
418 } else if (type == TYPE_INDEX<Assets>) {
419 record[col] = GenerateAssets(templateAsset, i, assetsCount);
420 }
421 }
422 res.push_back(record);
423 }
424 return res;
425 }
426
GenerateAsset(const Asset & templateAsset,int id)427 Asset DistributedDBCloudAssetsOperationSyncTest::GenerateAsset(const Asset &templateAsset, int id)
428 {
429 Asset res = templateAsset;
430 res.name.append("_").append(std::to_string(id));
431 res.hash.append("_").append(std::to_string(id));
432 return res;
433 }
434
GenerateAssets(const Asset & templateAsset,int id,int assetsCount)435 Assets DistributedDBCloudAssetsOperationSyncTest::GenerateAssets(const Asset &templateAsset, int id, int assetsCount)
436 {
437 Assets assets;
438 auto baseAsset = GenerateAsset(templateAsset, id);
439 for (int i = 0; i < assetsCount; ++i) {
440 assets.push_back(GenerateAsset(baseAsset, i));
441 }
442 return assets;
443 }
444
445 /**
446 * @tc.name: SyncWithAssetOperation001
447 * @tc.desc: Delete Assets When Download
448 * @tc.type: FUNC
449 * @tc.require:
450 * @tc.author: zhangqiquan
451 */
452 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation001, TestSize.Level0)
453 {
454 const int actualCount = 10;
455 const int deleteDataCount = 5;
456 const int deleteAssetsCount = 4;
457 InsertUserTableRecord(tableName_, 0, actualCount);
458 std::string tableName = tableName_;
__anonde3e756b0602(const std::string &, VBucket &) 459 virtualCloudDb_->ForkUpload([this, deleteDataCount, deleteAssetsCount](const std::string &, VBucket &) {
460 for (int64_t i = 0; i < deleteDataCount; i++) {
461 std::string sql = "DELETE FROM " + tableName_ + " WHERE id = " + std::to_string(i) + ";";
462 ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
463 }
464 for (int64_t i = deleteDataCount; i < deleteDataCount + deleteAssetsCount; i++) {
465 std::string sql = "UPDATE " + tableName_ + " SET asset = NULL, assets = NULL WHERE id = " +
466 std::to_string(i) + ";";
467 ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
468 }
469 });
470 Query query = Query::Select().FromTable({ tableName_ });
471 BlockSync(query, delegate_);
472 virtualCloudDb_->ForkUpload(nullptr);
473 std::vector<size_t> expectCount(actualCount - deleteDataCount, 0);
474 expectCount[expectCount.size() - 1] = 2; // default one row has 2 assets
475 CheckAssetsCount(expectCount);
476 }
477
478 /**
479 * @tc.name: SyncWithAssetOperation002
480 * @tc.desc: Download Assets When local assets was removed
481 * @tc.type: FUNC
482 * @tc.require:
483 * @tc.author: zhangqiquan
484 */
485 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation002, TestSize.Level0)
486 {
487 const int actualCount = 1;
488 InsertUserTableRecord(tableName_, 0, actualCount);
489 Query query = Query::Select().FromTable({ tableName_ });
490 BlockSync(query, delegate_);
491 int downLoadCount = 0;
492 int removeCount = 0;
493 ForkDownloadAndRemoveAsset(OK, downLoadCount, removeCount);
494 UpdateCloudTableRecord(0, actualCount, false);
495 RelationalTestUtils::CloudBlockSync(query, delegate_);
496 EXPECT_EQ(downLoadCount, 1); // local asset was removed should download 1 times
497 EXPECT_EQ(removeCount, 1);
498 virtualAssetLoader_->ForkDownload(nullptr);
499 virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
500
501 std::vector<size_t> expectCount = { 0 };
502 CheckAssetsCount(expectCount);
503 }
504
505 /**
506 * @tc.name: SyncWithAssetOperation003
507 * @tc.desc: Delete Assets When Download
508 * @tc.type: FUNC
509 * @tc.require:
510 * @tc.author: bty
511 */
512 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation003, TestSize.Level0)
513 {
514 InsertUserTableRecord(tableName_, 0, 1); // 1 is count
515 int uploadCount = 0;
__anonde3e756b0702(const std::string &, VBucket &) 516 virtualCloudDb_->ForkUpload([this, &uploadCount](const std::string &, VBucket &) {
517 if (uploadCount > 0) {
518 return;
519 }
520 SqlCondition condition;
521 condition.sql = "UPDATE " + tableName_ + " SET age = '666' WHERE id = 0;";
522 std::vector<VBucket> records;
523 EXPECT_EQ(delegate_->ExecuteSql(condition, records), OK);
524 uploadCount++;
525 });
526 Query query = Query::Select().FromTable({ tableName_ });
527 BlockSync(query, delegate_);
528 virtualCloudDb_->ForkUpload(nullptr);
529
530 std::string sql = "SELECT assets from " + tableName_ + " where id = 0;";
531 sqlite3_stmt *stmt = nullptr;
532 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
533 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
534 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
535 Type cloudValue;
536 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
537 std::vector<uint8_t> assetsBlob;
538 Assets assets;
539 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
540 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
541 ASSERT_EQ(assets.size(), 2u); // 2 is asset num
542 for (size_t i = 0; i < assets.size(); ++i) {
543 EXPECT_EQ(assets[i].status, AssetStatus::NORMAL);
544 }
545 }
546 int errCode;
547 SQLiteUtils::ResetStatement(stmt, true, errCode);
548 }
549
LocalAssetRemoveTest()550 void DistributedDBCloudAssetsOperationSyncTest::LocalAssetRemoveTest()
551 {
552 const int actualCount = 5; // 5 record
553 InsertUserTableRecord(tableName_, 0, actualCount);
554 Query query = Query::Select().FromTable({ tableName_ });
555 BlockSync(query, delegate_);
556 int downLoadCount = 0;
557 int removeCount = 0;
558 ForkDownloadAndRemoveAsset(DB_ERROR, downLoadCount, removeCount);
559 UpdateCloudTableRecord(0, actualCount, false);
560 RelationalTestUtils::CloudBlockSync(query, delegate_, DBStatus::OK, DBStatus::REMOVE_ASSETS_FAIL);
561 EXPECT_EQ(downLoadCount, 5); // local asset was removed should download 5 times
562 EXPECT_EQ(removeCount, 1);
563 virtualAssetLoader_->ForkDownload(nullptr);
564 virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
565
566 std::vector<size_t> expectCount = { 0, 2, 2, 2, 2 };
567 CheckAssetsCount(expectCount);
568 }
569
570 /**
571 * @tc.name: SyncWithAssetOperation004
572 * @tc.desc: Download Assets When local assets was removed
573 * @tc.type: FUNC
574 * @tc.require:
575 * @tc.author: zhangqiquan
576 */
577 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation004, TestSize.Level0)
578 {
579 LocalAssetRemoveTest();
580 }
581
UpdateAssetWhenSyncUpload()582 void DistributedDBCloudAssetsOperationSyncTest::UpdateAssetWhenSyncUpload()
583 {
584 string sql = "UPDATE " + tableName_ + " SET asset = ? WHERE id = '54';";
585 Asset asset = g_localAsset;
586 asset.hash = "123";
587 const int assetId = 54;
588 asset.name = g_localAsset.name + std::to_string(assetId);
589 std::vector<uint8_t> assetBlob;
590 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
591 sqlite3_stmt *stmt = nullptr;
592 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
593 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
594 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
595 int errCode;
596 SQLiteUtils::ResetStatement(stmt, true, errCode);
597 }
598
599 /**
600 * @tc.name: SyncWithAssetOperation005
601 * @tc.desc: check asset when update in fill before upload sync process
602 * @tc.type: FUNC
603 * @tc.require:
604 * @tc.author: luoguo
605 */
606 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation005, TestSize.Level0)
607 {
608 /**
609 * @tc.steps:step1. Insert 60 records.
610 * @tc.expected: step1. ok.
611 */
612 InsertUserTableRecord(tableName_, 0, 60);
613
614 /**
615 * @tc.steps:step2. Sync to cloud and wait in upload.
616 * @tc.expected: step2. ok.
617 */
618 bool isUpload = false;
__anonde3e756b0802(const std::string &, VBucket &) 619 virtualCloudDb_->ForkUpload([&isUpload](const std::string &, VBucket &) {
620 if (isUpload == true) {
621 return;
622 }
623 isUpload = true;
624 std::this_thread::sleep_for(std::chrono::milliseconds(2000));
625 });
626 Query query = Query::Select().FromTable({tableName_});
627
628 bool finish = false;
__anonde3e756b0902(const std::map<std::string, SyncProcess> &process) 629 auto callback = [&finish](const std::map<std::string, SyncProcess> &process) {
630 for (const auto &item : process) {
631 if (item.second.process == DistributedDB::FINISHED) {
632 {
633 finish = true;
634 }
635 }
636 }
637 };
638 ASSERT_EQ(delegate_->Sync({"CLOUD"}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), OK);
639
640 while (isUpload == false) {
641 std::this_thread::sleep_for(std::chrono::milliseconds(50));
642 }
643
644 /**
645 * @tc.steps:step3. update asset when sync upload.
646 * @tc.expected: step3. ok.
647 */
648 UpdateAssetWhenSyncUpload();
649
650 /**
651 * @tc.steps:step4. check asset data.
652 * @tc.expected: step4. ok.
653 */
654 while (finish == false) {
655 std::this_thread::sleep_for(std::chrono::milliseconds(50));
656 }
657 virtualCloudDb_->ForkUpload(nullptr);
658 std::vector<VBucket> allData;
659 auto dbSchema = GetSchema();
660 ASSERT_GT(dbSchema.tables.size(), 0u);
661 ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
662 ASSERT_EQ(allData.size(), 60ul);
663 auto data = allData[54]; // update data
664 auto data1 = allData[55]; // no update data
665
666 Type colValue = data.at("asset");
667 auto translate = std::dynamic_pointer_cast<ICloudDataTranslate>(virtualTranslator_);
668 auto assets = RelationalTestUtils::GetAssets(colValue, translate, true);
669 ASSERT_EQ(assets[0].hash, std::string("123"));
670
671 Type colValue1 = data1.at("asset");
672 auto assets1 = RelationalTestUtils::GetAssets(colValue1, translate, true);
673 ASSERT_EQ(assets1[0].hash, std::string("DEC"));
674 }
675
676 /**
677 * @tc.name: SyncWithAssetOperation006
678 * @tc.desc: Remove Local Datas When local assets was empty
679 * @tc.type: FUNC
680 * @tc.require:
681 * @tc.author: lijun
682 */
683 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation006, TestSize.Level0)
684 {
685 const int actualCount = 5;
686 InsertUserTableRecord(tableName_, 0, actualCount);
687 Query query = Query::Select().FromTable({ tableName_ });
688 BlockSync(query, delegate_);
689
690 UpdateCloudTableRecord(0, 2, true);
691 BlockSync(query, delegate_);
692
693 int removeCount = 0;
__anonde3e756b0a02(const std::vector<Asset> &assets) 694 virtualAssetLoader_->ForkRemoveLocalAssets([&removeCount](const std::vector<Asset> &assets) {
695 removeCount = assets.size();
696 return DBStatus::OK;
697 });
698 std::string device = "";
699 ASSERT_EQ(delegate_->RemoveDeviceData(device, FLAG_AND_DATA), DBStatus::OK);
700 ASSERT_EQ(9, removeCount);
701 virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
702 }
703
704 /**
705 * @tc.name: SyncWithAssetOperation007
706 * @tc.desc: Test assetId fill when assetId changed
707 * @tc.type: FUNC
708 * @tc.require:
709 * @tc.author: wangxiangdong
710 */
711 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation007, TestSize.Level0)
712 {
713 /**
714 * @tc.steps:step1. Insert 5 records and sync.
715 * @tc.expected: step1. ok.
716 */
717 const int actualCount = 5;
718 std::string name = g_localAsset.name + std::to_string(0);
719 Assets expectAssets = GetAssets(name, {}, 3u); // contain 3 assets
720 expectAssets[0].hash.append("change"); // modify first asset
721 InsertUserTableRecord(tableName_, 0, actualCount, expectAssets.size(), expectAssets);
722 Query query = Query::Select().FromTable({ tableName_ });
723 BlockSync(query, delegate_);
724 /**
725 * @tc.steps:step2. modify data and sync.
726 * @tc.expected: step2. ok.
727 */
728 UpdateCloudTableRecord(0, 1, true);
729 BlockSync(query, delegate_);
730 /**
731 * @tc.steps:step3. check modified data cursor.
732 * @tc.expected: step3. ok.
733 */
734 std::string sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=1";
735 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
736 reinterpret_cast<void *>(7), nullptr), SQLITE_OK);
737 sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=5";
738 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
739 reinterpret_cast<void *>(5), nullptr), SQLITE_OK);
740 }
741
742 /**
743 * @tc.name: SyncWithAssetOperation008
744 * @tc.desc: Test assetId fill when assetId changed
745 * @tc.type: FUNC
746 * @tc.require:
747 * @tc.author: wangxiangdong
748 */
749 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation008, TestSize.Level0)
750 {
751 /**
752 * @tc.steps:step1. Insert 5 records and sync.
753 * @tc.expected: step1. ok.
754 */
755 const int actualCount = 5;
756 InsertUserTableRecord(tableName_, 0, actualCount);
757 Query query = Query::Select().FromTable({ tableName_ });
758 BlockSync(query, delegate_);
759 /**
760 * @tc.steps:step2. modify data and sync.
761 * @tc.expected: step2. ok.
762 */
763 UpdateCloudTableRecord(0, 1, true);
764 int removeCount = 0;
__anonde3e756b0b02(std::map<std::string, Assets> &assets) 765 virtualAssetLoader_->SetRemoveLocalAssetsCallback([&removeCount](std::map<std::string, Assets> &assets) {
766 removeCount = assets["asset"].size() + assets["assets"].size();
767 return LOCAL_ASSET_NOT_FOUND;
768 });
769 BlockSync(query, delegate_);
770 EXPECT_EQ(removeCount, 3); // one record has 3 asset
771 virtualAssetLoader_->SetRemoveLocalAssetsCallback(nullptr);
772 /**
773 * @tc.steps:step3. check asset number.
774 * @tc.expected: step3. ok.
775 */
776 std::vector<size_t> expectCount = { 3, 3, 3, 3, 3 };
777 CheckAssetsCount(expectCount, true);
778 /**
779 * @tc.steps:step4. sync and check.
780 * @tc.expected: step4. ok.
781 */
782 BlockSync(query, delegate_);
783 expectCount = { 0, 3, 3, 3, 3 };
784 CheckAssetsCount(expectCount, true);
785 }
786
787 /**
788 * @tc.name: SyncWithAssetOperation009
789 * @tc.desc: Test asset remove local and check db asset empty finally.
790 * @tc.type: FUNC
791 * @tc.require:
792 * @tc.author: wangxiangdong
793 */
794 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation009, TestSize.Level0)
795 {
796 /**
797 * @tc.steps:step1. Insert 5 records and sync.
798 * @tc.expected: step1. ok.
799 */
800 const int actualCount = 5;
801 RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
802 InsertUserTableRecord(tableName_, 0, actualCount);
803 /**
804 * @tc.steps:step2. modify data and sync.
805 * @tc.expected: step2. ok.
806 */
807 UpdateCloudTableRecord(0, 1, true);
808 Query query = Query::Select().FromTable({ tableName_ });
809 BlockSync(query, delegate_);
810 /**
811 * @tc.steps:step3. check asset number.
812 * @tc.expected: step3. ok.
813 */
814 std::vector<size_t> expectCount = { 0, 3, 3, 3, 3 };
815 CheckAssetsCount(expectCount, true);
816 }
817
InsertLocalAssetData(const std::string & assetHash)818 void DistributedDBCloudAssetsOperationSyncTest::InsertLocalAssetData(const std::string &assetHash)
819 {
820 Assets assets;
821 std::string assetNameBegin = "Phone";
822 for (int j = 1; j <= g_assetsNum; ++j) {
823 Asset asset;
824 asset.name = assetNameBegin + "_" + std::to_string(j);
825 asset.status = AssetStatus::NORMAL;
826 asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
827 asset.hash = assetHash + "_" + std::to_string(j);
828 asset.assetId = std::to_string(j);
829 assets.push_back(asset);
830 }
831 string sql = "INSERT OR REPLACE INTO " + tableName_ + " (id,name,asset,assets) VALUES('0','CloudTest0',?,?);";
832 sqlite3_stmt *stmt = nullptr;
833 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
834 std::vector<uint8_t> assetBlob;
835 std::vector<uint8_t> assetsBlob;
836 RuntimeContext::GetInstance()->AssetToBlob(g_localAsset, assetBlob);
837 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
838 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
839 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetsBlob, false), E_OK); // 2 is assetsBlob
840 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
841 int errCode;
842 SQLiteUtils::ResetStatement(stmt, true, errCode);
843 }
844
InsertCloudAssetData(const std::string & assetHash)845 void DistributedDBCloudAssetsOperationSyncTest::InsertCloudAssetData(const std::string &assetHash)
846 {
847 std::vector<VBucket> record;
848 std::vector<VBucket> extend;
849 Timestamp now = DistributedDB::TimeHelper::GetSysCurrentTime();
850 VBucket data;
851 data.insert_or_assign("id", "0");
852 data.insert_or_assign("name", "CloudTest0");
853 Asset asset = g_localAsset;
854 data.insert_or_assign("asset", asset);
855 Assets assets;
856 std::string assetNameBegin = "Phone";
857 for (int j = 1; j <= g_assetsNum; ++j) {
858 Asset assetTmp;
859 assetTmp.name = assetNameBegin + "_" + std::to_string(j);
860 assetTmp.status = AssetStatus::NORMAL;
861 assetTmp.hash = assetHash + "_" + std::to_string(j);
862 assetTmp.assetId = std::to_string(j);
863 assets.push_back(assetTmp);
864 }
865 data.insert_or_assign("assets", assets);
866 record.push_back(data);
867 VBucket log;
868 log.insert_or_assign(DistributedDB::CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
869 now / DistributedDB::CloudDbConstant::TEN_THOUSAND));
870 log.insert_or_assign(DistributedDB::CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
871 now / DistributedDB::CloudDbConstant::TEN_THOUSAND));
872 log.insert_or_assign(DistributedDB::CloudDbConstant::DELETE_FIELD, false);
873 extend.push_back(log);
874 virtualCloudDb_->BatchInsert(tableName_, std::move(record), extend);
875 }
876
PrepareForAssetOperation010()877 void DistributedDBCloudAssetsOperationSyncTest::PrepareForAssetOperation010()
878 {
879 InsertCloudAssetData("cloudAsset");
880 InsertLocalAssetData("localAsset");
881 }
882
883 /**
884 * @tc.name: SyncWithAssetOperation010
885 * @tc.desc: Test check status of asset, when the hash of asset is different.
886 * @tc.type: FUNC
887 * @tc.require:
888 * @tc.author: liufuchenxing
889 */
890 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation010, TestSize.Level0)
891 {
892 /**
893 * @tc.steps:step1. prepare local and cloud asset data.
894 * @tc.expected: step1. ok.
895 */
896 PrepareForAssetOperation010();
897
898 /**
899 * @tc.steps:step2. sync and check the status of assets.
900 * @tc.expected: step2. ok.
901 */
902 virtualCloudDb_->ForkBeforeBatchUpdate([](const std::string &, std::vector<VBucket> &record,
__anonde3e756b0c02(const std::string &, std::vector<VBucket> &record, std::vector<VBucket> &extend, bool) 903 std::vector<VBucket> &extend, bool) {
904 ASSERT_EQ(static_cast<int>(record.size()), 1);
905 VBucket &bucket = record[0];
906 ASSERT_TRUE(bucket.find("assets") != bucket.end());
907 Assets assets = std::get<Assets>(bucket["assets"]);
908 ASSERT_EQ(static_cast<int>(assets.size()), 3);
909 for (size_t i = 0; i < assets.size(); i++) {
910 ASSERT_EQ(assets[i].status, AssetStatus::UPDATE);
911 }
912 });
913
914 Query query = Query::Select().FromTable({ tableName_ });
915 BlockSync(query, delegate_, SYNC_MODE_CLOUD_FORCE_PUSH);
916 }
917
WriteDataWithoutCommitTransaction()918 void DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction()
919 {
920 ASSERT_NE(db_, nullptr);
921 SQLiteUtils::BeginTransaction(db_);
922 InsertLocalAssetData("localAsset");
923 constexpr int kSleepDurationSeconds = 3;
924 std::this_thread::sleep_for(std::chrono::seconds(kSleepDurationSeconds));
925 }
926
927 /**
928 * @tc.name: TestOpenDatabaseBusy001
929 * @tc.desc: Test open database when the database is busy.
930 * @tc.type: FUNC
931 * @tc.require:
932 * @tc.author: liufuchenxing
933 */
934 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, TestOpenDatabaseBusy001, TestSize.Level2)
935 {
936 /**
937 * @tc.steps:step1. close store.
938 * @tc.expected:step1. check ok.
939 */
940 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
941 delegate_ = nullptr;
942 /**
943 * @tc.steps:step2. Another thread write data into database into database without commit.
944 * @tc.expected:step2. check ok.
945 */
946 std::thread thread(&DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction, this);
947 std::this_thread::sleep_for(std::chrono::seconds(1));
948 /**
949 * @tc.steps:step3. open relational delegate.
950 * @tc.expected:step3. open success.
951 */
952 RelationalStoreDelegate::Option option;
953 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
954 thread.join();
955 }
956
957 /**
958 * @tc.name: SyncWithAssetOperation011
959 * @tc.desc: Test change assets between download and remove
960 * @tc.type: FUNC
961 * @tc.require:
962 * @tc.author: liaoyonghuang
963 */
964 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation011, TestSize.Level0)
965 {
966 /**
967 * @tc.steps:step1. Insert 5 records and sync.
968 * @tc.expected: step1. ok.
969 */
970 const int actualCount = 5;
971 InsertUserTableRecord(tableName_, 0, actualCount);
972 Query query = Query::Select().FromTable({ tableName_ });
973 BlockSync(query, delegate_);
974 /**
975 * @tc.steps:step2. modify assets of cloud data. update 1st asset and delete 2nd asset.
976 * @tc.expected: step2. ok.
977 */
978 std::vector<VBucket> record;
979 std::vector<VBucket> extend;
980 int dataNum = 0;
981 Timestamp now = TimeHelper::GetSysCurrentTime();
982 VBucket data;
983 data.insert_or_assign("id", std::to_string(dataNum));
984 data.insert_or_assign("name", "Cloud" + std::to_string(dataNum));
985 Asset cloudAsset = g_localAsset;
986 cloudAsset.name += std::to_string(dataNum);
987 cloudAsset.hash = "new_hash";
988 Assets cloudAssets = {cloudAsset};
989 data.insert_or_assign("assets", cloudAssets);
990 record.push_back(data);
991 VBucket log;
992 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
993 now / CloudDbConstant::TEN_THOUSAND));
994 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
995 now / CloudDbConstant::TEN_THOUSAND));
996 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
997 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(dataNum));
998 extend.push_back(log);
999 ASSERT_EQ(virtualCloudDb_->BatchUpdate(tableName_, std::move(record), extend), DBStatus::OK);
1000 /**
1001 * @tc.steps:step3. Update local assets between remove and download, sync and check whether download is invoked.
1002 * @tc.expected: step3. ok.
1003 */
__anonde3e756b0d02(std::map<std::string, Assets> &assets) 1004 virtualAssetLoader_->SetRemoveLocalAssetsCallback([&](std::map<std::string, Assets> &assets) {
1005 UpdateLocalTableRecord(tableName_, 0, 1);
1006 return OK;
1007 });
__anonde3e756b0e02(const std::string &tableName, std::map<std::string, Assets> &assets) 1008 virtualAssetLoader_->ForkDownload([](const std::string &tableName, std::map<std::string, Assets> &assets) {
1009 EXPECT_TRUE(false);
1010 });
1011 BlockSync(query, delegate_);
1012
1013 virtualAssetLoader_->SetRemoveLocalAssetsCallback(nullptr);
1014 virtualAssetLoader_->ForkDownload(nullptr);
1015 }
1016
1017 /**
1018 * @tc.name: SyncWithAssetOperation012
1019 * @tc.desc: Batch download Assets When local assets was removed
1020 * @tc.type: FUNC
1021 * @tc.require:
1022 * @tc.author: liaoyonghuang
1023 */
1024 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation012, TestSize.Level0)
1025 {
1026 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1027 LocalAssetRemoveTest();
1028 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1029 }
1030
UpdateLocalAssetRecord(const std::string & tableName,int64_t begin,int64_t count)1031 void DistributedDBCloudAssetsOperationSyncTest::UpdateLocalAssetRecord(const std::string &tableName, int64_t begin,
1032 int64_t count)
1033 {
1034 int errCode;
1035 std::vector<uint8_t> assetBlob;
1036 std::vector<uint8_t> assetsBlob;
1037 for (int64_t i = begin; i < begin + count; ++i) {
1038 Asset asset = g_localAsset;
1039 asset.hash = "new_hash";
1040 asset.status = static_cast<uint32_t>(AssetStatus::UPDATE);
1041 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
1042 std::vector<Asset> assets;
1043 assets.push_back(asset);
1044 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
1045 std::string sql = "UPDATE " + tableName + " SET height = '175.0', asset = ?, assets = ? where id = " +
1046 std::to_string(i);
1047 sqlite3_stmt *stmt = nullptr;
1048 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
1049 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK); // 1st bind
1050 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetsBlob, false), E_OK); // 2nd bind
1051 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
1052 SQLiteUtils::ResetStatement(stmt, true, errCode);
1053 }
1054 }
1055
1056 /**
1057 * @tc.name: SyncWithAssetOperation013
1058 * @tc.desc: Test device modify data and then sync cursor will not changes
1059 * @tc.type: FUNC
1060 * @tc.require:
1061 * @tc.author: caihaoting
1062 */
1063 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation013, TestSize.Level0)
1064 {
1065 /**
1066 * @tc.steps:step1. Insert local and cloud asset data and sync.
1067 * @tc.expected: step1. ok.
1068 */
1069 InsertCloudAssetData("assetHash");
1070 InsertLocalAssetData("assetHash");
1071 Query query = Query::Select().FromTable({ tableName_ });
1072 BlockSync(query, delegate_);
1073 /**
1074 * @tc.steps:step2. modify data of asset and sync.
1075 * @tc.expected: step2. ok.
1076 */
1077 UpdateLocalAssetRecord(tableName_, 0, 1);
1078 const int cursor = 2;
1079 std::string sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=1";
1080 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1081 reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1082 BlockSync(query, delegate_);
1083 /**
1084 * @tc.steps:step3. check modified data cursor and cursor is not changed.
1085 * @tc.expected: step3. ok.
1086 */
1087 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1088 reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1089 }
1090
1091 /**
1092 * @tc.name: SyncWithAssetOperation014
1093 * @tc.desc: Test device data does not change while sync and cursor will not changes
1094 * @tc.type: FUNC
1095 * @tc.require:
1096 * @tc.author: caihaoting
1097 */
1098 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation014, TestSize.Level0)
1099 {
1100 /**
1101 * @tc.steps:step1. Insert 5 records and sync.
1102 * @tc.expected: step1. ok.
1103 */
1104 const int actualCount = 5;
1105 RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
1106 InsertUserTableRecord(tableName_, 0, actualCount);
1107 Query query = Query::Select().FromTable({ tableName_ });
1108 BlockSync(query, delegate_);
1109 /**
1110 * @tc.steps:step2. modify data and sync.
1111 * @tc.expected: step2. ok.
1112 */
1113 UpdateLocalAssetRecord(tableName_, 0, 1);
1114 const int cursor = 6;
1115 std::string sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=1";
1116 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1117 reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1118 BlockSync(query, delegate_);
1119 /**
1120 * @tc.steps:step3. check modified data cursor and cursor is not changed.
1121 * @tc.expected: step3. ok.
1122 */
1123 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1124 reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1125 }
1126
1127 /**
1128 * @tc.name: IgnoreRecord001
1129 * @tc.desc: Download Assets When local assets was removed
1130 * @tc.type: FUNC
1131 * @tc.require:
1132 * @tc.author: zhangqiquan
1133 */
1134 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord001, TestSize.Level0)
1135 {
1136 const int actualCount = 1;
1137 InsertUserTableRecord(tableName_, 0, actualCount);
1138 Query query = Query::Select().FromTable({ tableName_ });
1139 BlockSync(query, delegate_);
1140 std::vector<size_t> expectCount = { 2 };
1141 CheckAssetsCount(expectCount);
1142
1143 VBucket record;
1144 record["id"] = std::to_string(0);
1145 record["assets"] = Assets();
1146 EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
1147 record["id"] = std::to_string(1);
1148 EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
1149 expectCount = { 0, 0 };
1150 CheckAssetsCount(expectCount);
1151
1152 std::vector<VBucket> logs;
1153 EXPECT_EQ(RelationalTestUtils::GetRecordLog(db_, tableName_, logs), E_OK);
1154 for (const auto &log : logs) {
1155 int64_t cursor = std::get<int64_t>(log.at("cursor"));
1156 EXPECT_GE(cursor, 0);
1157 }
1158 }
1159
1160 /**
1161 * @tc.name: IgnoreRecord002
1162 * @tc.desc: Ignore Assets When Download
1163 * @tc.type: FUNC
1164 * @tc.require:
1165 * @tc.author: zhangqiquan
1166 */
1167 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord002, TestSize.Level0)
1168 {
1169 const int actualCount = 1;
1170 InsertUserTableRecord(tableName_, 0, actualCount);
1171 Query query = Query::Select().FromTable({ tableName_ });
1172 RelationalTestUtils::CloudBlockSync(query, delegate_);
1173 UpdateCloudTableRecord(0, actualCount, false);
1174
1175 virtualAssetLoader_->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
1176 RelationalTestUtils::CloudBlockSync(query, delegate_);
1177 virtualAssetLoader_->SetDownloadStatus(DBStatus::OK);
1178 std::vector<size_t> expectCount = { 4 };
1179 CheckAssetsCount(expectCount);
1180 RelationalTestUtils::CloudBlockSync(query, delegate_);
1181 }
1182
1183 /**
1184 * @tc.name: IgnoreRecord003
1185 * @tc.desc: Ignore Assets When Upload
1186 * @tc.type: FUNC
1187 * @tc.require:
1188 * @tc.author: zhangqiquan
1189 */
1190 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord003, TestSize.Level0)
1191 {
1192 const int actualCount = 1;
1193 InsertUserTableRecord(tableName_, 0, actualCount);
1194 Query query = Query::Select().FromTable({ tableName_ });
1195 virtualCloudDb_->SetConflictInUpload(true);
1196 RelationalTestUtils::CloudBlockSync(query, delegate_);
1197 virtualCloudDb_->SetConflictInUpload(false);
1198 std::vector<size_t> expectCount = { 2 };
1199 CheckAssetsCount(expectCount);
1200 RelationalTestUtils::CloudBlockSync(query, delegate_);
1201 }
1202
1203 /**
1204 * @tc.name: IgnoreRecord004
1205 * @tc.desc: Ignore Assets When Btch Download
1206 * @tc.type: FUNC
1207 * @tc.require:
1208 * @tc.author: luoguo
1209 */
1210 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord004, TestSize.Level0)
1211 {
1212 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1213 const int actualCount = 10;
1214 InsertUserTableRecord(tableName_, 0, actualCount);
1215 Query query = Query::Select().FromTable({ tableName_ });
1216 RelationalTestUtils::CloudBlockSync(query, delegate_);
1217 UpdateCloudTableRecord(0, actualCount, false);
1218
1219 virtualAssetLoader_->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
1220 RelationalTestUtils::CloudBlockSync(query, delegate_);
1221 virtualAssetLoader_->SetDownloadStatus(DBStatus::OK);
1222 std::vector<size_t> expectCount(10, 4);
1223 CheckAssetsCount(expectCount);
1224 RelationalTestUtils::CloudBlockSync(query, delegate_);
1225 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1226 }
1227
1228 /**
1229 * @tc.name: UpsertData001
1230 * @tc.desc: Upsert data after delete it
1231 * @tc.type: FUNC
1232 * @tc.require:
1233 * @tc.author: zhangqiquan
1234 */
1235 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData001, TestSize.Level0)
1236 {
1237 // insert id 0 to local
1238 const int actualCount = 1;
1239 InsertUserTableRecord(tableName_, 0, actualCount); // 10 is phone size
1240 std::vector<std::map<std::string, std::string>> conditions;
1241 std::map<std::string, std::string> entries;
1242 entries["id"] = "0";
1243 conditions.push_back(entries);
1244 // delete id 0 in local
1245 RelationalTestUtils::DeleteRecord(db_, tableName_, conditions);
1246 // upsert id 0 to local
1247 VBucket record;
1248 record["id"] = std::to_string(0);
1249 record["assets"] = Assets();
1250 EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
1251 // check id 0 exist
1252 CheckAssetsCount({ 0 });
1253 }
1254
1255 /**
1256 * @tc.name: UpsertData002
1257 * @tc.desc: Test sync after Upsert.
1258 * @tc.type: FUNC
1259 * @tc.require:
1260 * @tc.author: liaoyonghuang
1261 */
1262 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData002, TestSize.Level0)
1263 {
1264 /**
1265 * @tc.steps:step1. Insert 5 records and sync.
1266 * @tc.expected: step1. ok.
1267 */
1268 const int actualCount = 5;
1269 InsertUserTableRecord(tableName_, 0, actualCount);
1270 Query query = Query::Select().FromTable({ tableName_ });
1271 BlockSync(query, delegate_);
1272
1273 /**
1274 * @tc.steps:step2. UpsertData and sync.
1275 * @tc.expected: step2. ok.
1276 */
1277 int dataCnt = -1;
1278 std::string checkLogSql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName_) + " where cursor = 5";
__anonde3e756b0f02(sqlite3_stmt *stmt) 1279 RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
1280 dataCnt = sqlite3_column_int(stmt, 0);
1281 return E_OK;
1282 });
1283 EXPECT_EQ(dataCnt, 1);
1284 vector<VBucket> records;
1285 for (int i = 0; i < actualCount; i++) {
1286 VBucket record;
1287 record["id"] = std::to_string(i);
1288 record["name"] = std::string("UpsertName");
1289 records.push_back(record);
1290 }
1291 EXPECT_EQ(delegate_->UpsertData(tableName_, records), OK);
1292 // check cursor has been increased
1293 checkLogSql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName_) + " where cursor = 10";
__anonde3e756b1002(sqlite3_stmt *stmt) 1294 RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
1295 dataCnt = sqlite3_column_int(stmt, 0);
1296 return E_OK;
1297 });
1298 EXPECT_EQ(dataCnt, 1);
1299 BlockSync(query, delegate_);
1300
1301 /**
1302 * @tc.steps:step3. Check local data.
1303 * @tc.expected: step3. All local data has been merged by the cloud.
1304 */
1305 std::vector<VBucket> allData;
1306 auto dbSchema = GetSchema();
1307 ASSERT_GT(dbSchema.tables.size(), 0u);
1308 ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
1309 for (const auto &data : allData) {
1310 ASSERT_EQ(std::get<std::string>(data.at("name")), "local");
1311 }
1312 }
1313
1314 /**
1315 * @tc.name: SyncWithAssetConflict001
1316 * @tc.desc: Upload with asset no change
1317 * @tc.type: FUNC
1318 * @tc.require:
1319 * @tc.author: zhangqiquan
1320 */
1321 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetConflict001, TestSize.Level0)
1322 {
1323 // cloud and local insert same data
1324 const int actualCount = 1;
1325 RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
1326 std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for data conflict
1327 InsertUserTableRecord(tableName_, 0, actualCount);
1328 // sync and local asset's status are normal
1329 Query query = Query::Select().FromTable({ tableName_ });
1330 RelationalTestUtils::CloudBlockSync(query, delegate_);
1331 auto dbSchema = GetSchema();
1332 ASSERT_GT(dbSchema.tables.size(), 0u);
1333 auto assets = RelationalTestUtils::GetAllAssets(db_, dbSchema.tables[0], virtualTranslator_);
1334 for (const auto &oneRow : assets) {
1335 for (const auto &asset : oneRow) {
1336 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
1337 }
1338 }
1339 }
1340
1341 /**
1342 * @tc.name: UpsertDataInvalid001
1343 * @tc.desc: Upsert invalid data
1344 * @tc.type: FUNC
1345 * @tc.require:
1346 * @tc.author: wangxiangdong
1347 */
1348 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid001, TestSize.Level0)
1349 {
1350 VBucket record;
1351 record["id"] = std::to_string(0);
1352 record["assets"] = Assets();
1353 /**
1354 * @tc.steps:step1. UpsertData to empty table.
1355 * @tc.expected: step1. INVALID_ARGS.
1356 */
1357 EXPECT_EQ(delegate_->UpsertData("", { record }), INVALID_ARGS);
1358 /**
1359 * @tc.steps:step2. UpsertData to shared table.
1360 * @tc.expected: step2. INVALID_ARGS.
1361 */
1362 EXPECT_EQ(delegate_->UpsertData(tableName_ + "_shared", { record }), NOT_SUPPORT);
1363 /**
1364 * @tc.steps:step3. UpsertData to not device table and shared table.
1365 * @tc.expected: step3. NOT_FOUND.
1366 */
1367 const char *createSQL =
1368 "CREATE TABLE IF NOT EXISTS testing(" \
1369 "id TEXT PRIMARY KEY," \
1370 "name TEXT," \
1371 "height REAL ," \
1372 "photo BLOB," \
1373 "asset ASSET," \
1374 "assets ASSETS," \
1375 "age INT);";
1376 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
1377 EXPECT_EQ(delegate_->UpsertData("testing", { record }), NOT_FOUND);
1378 /**
1379 * @tc.steps:step4. UpsertData to not exist table.
1380 * @tc.expected: step4. NOT_FOUND.
1381 */
1382 EXPECT_EQ(delegate_->UpsertData("TABLE_NOT_EXIST", { record }), NOT_FOUND);
1383 }
1384
1385 /**
1386 * @tc.name: UpsertDataInvalid002
1387 * @tc.desc: Upsert device data
1388 * @tc.type: FUNC
1389 * @tc.require:
1390 * @tc.author: wangxiangdong
1391 */
1392 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid002, TestSize.Level0)
1393 {
1394 VBucket record;
1395 record["id"] = std::to_string(0);
1396 record["assets"] = Assets();
1397 /**
1398 * @tc.steps:step1. create user table.
1399 * @tc.expected: step1. INVALID_ARGS.
1400 */
1401 const char *createSQL =
1402 "CREATE TABLE IF NOT EXISTS deviceTable(" \
1403 "id TEXT PRIMARY KEY," \
1404 "name TEXT," \
1405 "height REAL ," \
1406 "photo BLOB," \
1407 "asset ASSET," \
1408 "assets ASSETS," \
1409 "age INT);";
1410 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
1411 /**
1412 * @tc.steps:step2. create device table.
1413 * @tc.expected: step2. OK.
1414 */
1415 RelationalStoreDelegate *delegate1 = nullptr;
1416 std::shared_ptr<RelationalStoreManager> mgr1 = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
1417 RelationalStoreDelegate::Option option;
1418 ASSERT_EQ(mgr1->OpenStore(storePath_, STORE_ID_1, option, delegate1), DBStatus::OK);
1419 ASSERT_NE(delegate1, nullptr);
1420 std::string deviceTableName = "deviceTable";
1421 ASSERT_EQ(delegate1->CreateDistributedTable(deviceTableName, DEVICE_COOPERATION), DBStatus::OK);
1422 DataBaseSchema dataBaseSchema;
1423 TableSchema tableSchema;
1424 tableSchema.name = deviceTableName;
1425 tableSchema.sharedTableName = deviceTableName + "_shared";
1426 tableSchema.fields = {
1427 {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
1428 {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
1429 {"age", TYPE_INDEX<int64_t>}
1430 };
1431 dataBaseSchema.tables.push_back(tableSchema);
1432 ASSERT_EQ(delegate1->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1433 /**
1434 * @tc.steps:step3. UpsertData to device table.
1435 * @tc.expected: step3. NOT_FOUND.
1436 */
1437 EXPECT_EQ(delegate1->UpsertData(deviceTableName, { record }), NOT_FOUND);
1438 EXPECT_EQ(mgr1->CloseStore(delegate1), DBStatus::OK);
1439 delegate1 = nullptr;
1440 mgr1 = nullptr;
1441 }
1442
1443 /**
1444 * @tc.name: DownloadAssetStatusTest004
1445 * @tc.desc: Test upload asset status
1446 * @tc.type: FUNC
1447 * @tc.require:
1448 * @tc.author: zhangqiquan
1449 */
1450 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, DownloadAssetStatusTest004, TestSize.Level0)
1451 {
1452 /**
1453 * @tc.steps:step1. cloud assets {0, 1}
1454 * @tc.expected: step1. OK.
1455 */
1456 // cloud and local insert same data
1457 // cloud assets {0, 1} local assets {0, 1, 2}
1458 const int actualCount = 1;
1459 RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_, 2);
1460 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms for data conflict
1461 /**
1462 * @tc.steps:step2. local assets {0, 1, 2}, and change assert {0}
1463 * @tc.expected: step2. OK.
1464 */
1465 std::string name = g_localAsset.name + std::to_string(0);
1466 Assets expectAssets = GetAssets(name, {}, 3u); // contain 3 assets
1467 expectAssets[0].hash.append("change"); // modify first asset
1468 InsertUserTableRecord(tableName_, 0, actualCount, expectAssets.size(), expectAssets);
1469 /**
1470 * @tc.steps:step3. sync
1471 * @tc.expected: step3. upload status is {UPDATE, NORMAL, INSERT}
1472 */
1473 std::vector<AssetStatus> expectStatus = {
1474 AssetStatus::UPDATE, AssetStatus::NORMAL, AssetStatus::INSERT
1475 };
1476 // sync and local asset's status are normal
1477 Query query = Query::Select().FromTable({ tableName_ });
1478 RelationalTestUtils::CloudBlockSync(query, delegate_);
1479 auto dbSchema = GetSchema();
1480 ASSERT_GT(dbSchema.tables.size(), 0u);
1481 // cloud asset status is update normal insert
1482 VBucket extend;
1483 extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
1484 std::vector<VBucket> data;
1485 ASSERT_EQ(virtualCloudDb_->Query(tableName_, extend, data), QUERY_END);
1486 ASSERT_EQ(data.size(), static_cast<size_t>(actualCount));
1487 Assets actualAssets;
1488 ASSERT_EQ(CloudStorageUtils::GetValueFromType(data[0]["assets"], actualAssets), E_OK);
1489 ASSERT_EQ(actualAssets.size(), expectStatus.size());
1490 for (size_t i = 0; i < actualAssets.size(); ++i) {
1491 EXPECT_EQ(actualAssets[i].status, expectStatus[i]);
1492 }
1493 /**
1494 * @tc.steps:step4. check local assets status.
1495 * @tc.expected: step4. all assets status is NORMAL.
1496 */
1497 auto assets = RelationalTestUtils::GetAllAssets(db_, dbSchema.tables[0], virtualTranslator_);
1498 for (const auto &oneRow : assets) {
1499 for (const auto &asset : oneRow) {
1500 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
1501 }
1502 }
1503 }
1504
1505 /**
1506 * @tc.name: UploadAssetsTest001
1507 * @tc.desc: Test upload asset with error.
1508 * @tc.type: FUNC
1509 * @tc.require:
1510 * @tc.author: liaoyonghuang
1511 */
1512 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest001, TestSize.Level1)
1513 {
1514 /**
1515 * @tc.steps:step1. Insert 10 records.
1516 * @tc.expected: step1. ok.
1517 */
1518 const int actualCount = 10;
1519 InsertUserTableRecord(tableName_, 0, actualCount);
1520 /**
1521 * @tc.steps:step2. Set callback function to cause some upstream data to fail.
1522 * @tc.expected: step2. ok.
1523 */
1524 int recordIndex = 0;
1525 Asset tempAsset = {
1526 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
1527 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
1528 };
__anonde3e756b1102(const std::string &tableName, VBucket &extend) 1529 virtualCloudDb_->ForkUpload([&tempAsset, &recordIndex](const std::string &tableName, VBucket &extend) {
1530 Asset asset;
1531 Assets assets;
1532 switch (recordIndex) {
1533 case 0: // record[0] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
1534 extend[std::string(CloudDbConstant::ERROR_FIELD)] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
1535 break;
1536 case 1: // record[1] is considered successful because it is a conflict.
1537 extend[std::string(CloudDbConstant::ERROR_FIELD)] =
1538 static_cast<int64_t>(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
1539 break;
1540 case 2: // record[2] fail because of empty gid.
1541 extend[std::string(CloudDbConstant::GID_FIELD)] = std::string("");
1542 break;
1543 case 3: // record[3] fail because of empty assetId.
1544 asset = tempAsset;
1545 asset.assetId = "";
1546 extend[std::string(CloudDbConstant::ASSET)] = asset;
1547 break;
1548 case 4: // record[4] fail because of empty assetId.
1549 assets.push_back(tempAsset);
1550 assets[0].assetId = "";
1551 extend[std::string(CloudDbConstant::ASSETS)] = assets;
1552 break;
1553 case 5: // record[5] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
1554 extend[std::string(CloudDbConstant::ERROR_FIELD)] = std::string("");
1555 break;
1556 default:
1557 break;
1558 }
1559 recordIndex++;
1560 });
1561 /**
1562 * @tc.steps:step3. Sync and check upLoadInfo.
1563 * @tc.expected: step3. failCount is 5 and successCount is 5.
1564 */
1565 Query query = Query::Select().FromTable({ tableName_ });
1566 BlockSync(query, delegate_);
1567 for (const auto &table : lastProcess_.tableProcess) {
1568 EXPECT_EQ(table.second.upLoadInfo.total, 10u);
1569 EXPECT_EQ(table.second.upLoadInfo.failCount, 3u);
1570 EXPECT_EQ(table.second.upLoadInfo.successCount, 7u);
1571 }
1572 virtualCloudDb_->ForkUpload(nullptr);
1573 }
1574
1575 /**
1576 * @tc.name: UploadAssetsTest002
1577 * @tc.desc: Test upload asset with error.
1578 * @tc.type: FUNC
1579 * @tc.require:
1580 * @tc.author: liaoyonghuang
1581 */
1582 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest002, TestSize.Level1)
1583 {
1584 /**
1585 * @tc.steps:step1. Insert 10 records.
1586 * @tc.expected: step1. ok.
1587 */
1588 const int actualCount = 10;
1589 InsertUserTableRecord(tableName_, 0, actualCount);
1590 Query query = Query::Select().FromTable({ tableName_ });
1591 BlockSync(query, delegate_);
1592 /**
1593 * @tc.steps:step2. Delete local data.
1594 * @tc.expected: step2. OK.
1595 */
1596 std::string sql = "delete from " + tableName_ + " where id >= " + std::to_string(actualCount / 2);
1597 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
1598 /**
1599 * @tc.steps:step3. Set callback function to cause some upstream data to fail.
1600 * @tc.expected: step3. ok.
1601 */
__anonde3e756b1202(const std::string &tableName, VBucket &extend) 1602 virtualCloudDb_->ForkUpload([](const std::string &tableName, VBucket &extend) {
1603 extend[std::string(CloudDbConstant::GID_FIELD)] = "";
1604 });
1605 BlockSync(query, delegate_);
1606 for (const auto &table : lastProcess_.tableProcess) {
1607 EXPECT_EQ(table.second.upLoadInfo.total, 5u);
1608 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1609 EXPECT_EQ(table.second.upLoadInfo.successCount, 5u);
1610 }
1611 virtualCloudDb_->ForkUpload(nullptr);
1612 }
1613
1614 /**
1615 * @tc.name: UploadAssetsTest003
1616 * @tc.desc: Test upload asset with error CLOUD_RECORD_ALREADY_EXISTED.
1617 * @tc.type: FUNC
1618 * @tc.require:
1619 * @tc.author: liaoyonghuang
1620 */
1621 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest003, TestSize.Level0)
1622 {
1623 /**
1624 * @tc.steps:step1. Insert 100 records.
1625 * @tc.expected: step1. ok.
1626 */
1627 const int actualCount = 100;
1628 InsertUserTableRecord(tableName_, 0, actualCount);
1629 /**
1630 * @tc.steps:step2. Set callback function to return CLOUD_RECORD_ALREADY_EXISTED in 1st batch.
1631 * @tc.expected: step2. ok.
1632 */
1633 int uploadCount = 0;
__anonde3e756b1302(const std::string &tableName, VBucket &extend) 1634 virtualCloudDb_->ForkUpload([&uploadCount](const std::string &tableName, VBucket &extend) {
1635 if (uploadCount < 30) { // There are a total of 30 pieces of data in one batch of upstream data
1636 extend[std::string(CloudDbConstant::ERROR_FIELD)] =
1637 static_cast<int64_t>(DBStatus::CLOUD_RECORD_ALREADY_EXISTED);
1638 }
1639 uploadCount++;
1640 });
1641 Query query = Query::Select().FromTable({ tableName_ });
1642 BlockSync(query, delegate_);
1643 for (const auto &table : lastProcess_.tableProcess) {
1644 EXPECT_EQ(table.second.upLoadInfo.batchIndex, 4u);
1645 EXPECT_EQ(table.second.upLoadInfo.total, 100u);
1646 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1647 EXPECT_EQ(table.second.upLoadInfo.successCount, 100u);
1648 EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
1649 }
1650 virtualCloudDb_->ForkUpload(nullptr);
1651 }
1652
1653 /**
1654 * @tc.name: UploadAssetsTest004
1655 * @tc.desc: Test batch delete return error CLOUD_RECORD_NOT_FOUND.
1656 * @tc.type: FUNC
1657 * @tc.require:
1658 * @tc.author: liaoyonghuang
1659 */
1660 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest004, TestSize.Level0)
1661 {
1662 /**
1663 * @tc.steps:step1. Insert 100 records and sync to cloud.
1664 * @tc.expected: step1. ok.
1665 */
1666 const int actualCount = 100;
1667 InsertUserTableRecord(tableName_, 0, actualCount);
1668 Query query = Query::Select().FromTable({ tableName_ });
1669 BlockSync(query, delegate_);
1670 /**
1671 * @tc.steps:step2. delete 50 records in local.
1672 * @tc.expected: step2. ok.
1673 */
1674 std::string sql = "delete from " + tableName_ + " where CAST(id AS INTEGER) >= " + std::to_string(actualCount / 2);
1675 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
1676 /**
1677 * @tc.steps:step3. set return error CLOUD_RECORD_NOT_FOUND in batch delete.
1678 * @tc.expected: step3. ok.
1679 */
1680 int index = 0;
__anonde3e756b1402(const std::string &tableName, VBucket &extend) 1681 virtualCloudDb_->ForkUpload([&index](const std::string &tableName, VBucket &extend) {
1682 if (extend.count(CloudDbConstant::DELETE_FIELD) != 0 && index % 2 == 0 &&
1683 std::get<bool>(extend.at(CloudDbConstant::DELETE_FIELD))) {
1684 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
1685 }
1686 index++;
1687 });
1688 /**
1689 * @tc.steps:step4. sync and check result.
1690 * @tc.expected: step4. ok.
1691 */
1692 BlockSync(query, delegate_);
1693 for (const auto &table : lastProcess_.tableProcess) {
1694 EXPECT_EQ(table.second.upLoadInfo.total, 50u);
1695 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1696 EXPECT_EQ(table.second.upLoadInfo.successCount, 50u);
1697 }
1698 virtualCloudDb_->ForkUpload(nullptr);
1699 }
1700
1701 /**
1702 * @tc.name: BatchNormalDownloadAsset001
1703 * @tc.desc: Test batch download asset in two batch.
1704 * @tc.type: FUNC
1705 * @tc.require:
1706 * @tc.author: zqq
1707 */
1708 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, BatchNormalDownloadAsset001, TestSize.Level0)
1709 {
1710 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1711 PrepareDataInCloud();
1712
1713 Query query = Query::Select().FromTable({ tableName_ });
1714 BlockSync(query, delegate_);
1715 EXPECT_EQ(virtualAssetLoader_->GetBatchDownloadCount(), 2u); // download 2 times
1716 EXPECT_EQ(virtualAssetLoader_->GetBatchRemoveCount(), 0u); // remove 0 times
1717 virtualAssetLoader_->Reset();
1718 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1719 }
1720
1721 /**
1722 * @tc.name: BatchAbnormalDownloadAsset001
1723 * @tc.desc: Test batch download asset failed.
1724 * @tc.type: FUNC
1725 * @tc.require:
1726 * @tc.author: zqq
1727 */
1728 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, BatchAbnormalDownloadAsset001, TestSize.Level0)
1729 {
1730 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1731 PrepareDataInCloud();
__anonde3e756b1502(int rowIndex, std::map<std::string, Assets> &assets) 1732 virtualAssetLoader_->ForkBatchDownload([](int rowIndex, std::map<std::string, Assets> &assets) {
1733 if (rowIndex > 50) { // 50 record failed
1734 for (auto &asset : assets) {
1735 for (auto &item : asset.second) {
1736 item.status = AssetStatus::ABNORMAL;
1737 }
1738 }
1739 return DB_ERROR;
1740 }
1741 return OK;
1742 });
1743
1744 Query query = Query::Select().FromTable({ tableName_ });
1745 BlockSync(query, delegate_);
1746 EXPECT_EQ(virtualAssetLoader_->GetBatchDownloadCount(), 1u); // download 1 times
1747 EXPECT_EQ(virtualAssetLoader_->GetBatchRemoveCount(), 0u); // remove 0 times
1748 virtualAssetLoader_->Reset();
1749
1750 virtualAssetLoader_->ForkBatchDownload(nullptr);
1751 BlockSync(query, delegate_);
1752 EXPECT_EQ(virtualAssetLoader_->GetBatchDownloadCount(), 2u); // download 2 times
1753 EXPECT_EQ(virtualAssetLoader_->GetBatchRemoveCount(), 0u); // remove 0 times
1754 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1755 }
1756 }
1757 #endif
1758