1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_db_types.h"
19 #include "distributeddb_data_generate_unit_test.h"
20 #include "log_print.h"
21 #include "relational_store_delegate.h"
22 #include "relational_store_manager.h"
23 #include "runtime_config.h"
24 #include "time_helper.h"
25 #include "virtual_asset_loader.h"
26 #include "virtual_cloud_data_translate.h"
27 #include "virtual_cloud_db.h"
28 #include "sqlite_relational_utils.h"
29 #include "cloud/cloud_storage_utils.h"
30
31 namespace {
32 using namespace testing::ext;
33 using namespace DistributedDB;
34 using namespace DistributedDBUnitTest;
35 const char *g_createSQL =
36 "CREATE TABLE IF NOT EXISTS DistributedDBCloudAssetsOperationSyncTest(" \
37 "id TEXT PRIMARY KEY," \
38 "name TEXT," \
39 "height REAL ," \
40 "photo BLOB," \
41 "asset ASSET," \
42 "assets ASSETS," \
43 "age INT);";
44 const int64_t g_syncWaitTime = 60;
45 const int g_assetsNum = 3;
46 const Asset g_localAsset = {
47 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
48 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
49 };
50 SyncProcess lastProcess_;
51
CreateUserDBAndTable(sqlite3 * & db)52 void CreateUserDBAndTable(sqlite3 *&db)
53 {
54 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
55 EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createSQL), SQLITE_OK);
56 }
57
BlockSync(const Query & query,RelationalStoreDelegate * delegate)58 void BlockSync(const Query &query, RelationalStoreDelegate *delegate)
59 {
60 std::mutex dataMutex;
61 std::condition_variable cv;
62 bool finish = false;
63 SyncProcess last;
64 auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
65 for (const auto &item: process) {
66 if (item.second.process == DistributedDB::FINISHED) {
67 {
68 std::lock_guard<std::mutex> autoLock(dataMutex);
69 finish = true;
70 }
71 last = item.second;
72 cv.notify_one();
73 }
74 }
75 };
76 LOGW("begin call sync");
77 ASSERT_EQ(delegate->Sync({ "CLOUD" }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), OK);
78 std::unique_lock<std::mutex> uniqueLock(dataMutex);
79 cv.wait(uniqueLock, [&finish]() {
80 return finish;
81 });
82 lastProcess_ = last;
83 LOGW("end call sync");
84 }
85
86 class DistributedDBCloudAssetsOperationSyncTest : public testing::Test {
87 public:
88 static void SetUpTestCase();
89 static void TearDownTestCase();
90 void SetUp() override;
91 void TearDown() override;
92 void WriteDataWithoutCommitTransaction();
93 protected:
94 void InitTestDir();
95 DataBaseSchema GetSchema();
96 void CloseDb();
97 void InsertUserTableRecord(const std::string &tableName, int64_t begin, int64_t count, size_t assetCount = 2u,
98 const Assets &templateAsset = {});
99 void CheckAssetsCount(const std::vector<size_t> &expectCount);
100 void UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull);
101 void ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount, int &removeCount);
102 void InsertLocalAssetData(const std::string &assetHash);
103 std::vector<Asset> GetAssets(const std::string &baseName, const Assets &templateAsset, size_t assetCount);
104 std::string testDir_;
105 std::string storePath_;
106 sqlite3 *db_ = nullptr;
107 RelationalStoreDelegate *delegate_ = nullptr;
108 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
109 std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
110 std::shared_ptr<VirtualCloudDataTranslate> virtualTranslator_ = nullptr;
111 std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
112 std::string tableName_ = "DistributedDBCloudAssetsOperationSyncTest";
113 TrackerSchema trackerSchema = {
114 .tableName = tableName_, .extendColName = "name", .trackerColNames = {"age"}
115 };
116 };
117
SetUpTestCase()118 void DistributedDBCloudAssetsOperationSyncTest::SetUpTestCase()
119 {
120 RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
121 }
122
TearDownTestCase()123 void DistributedDBCloudAssetsOperationSyncTest::TearDownTestCase()
124 {}
125
SetUp()126 void DistributedDBCloudAssetsOperationSyncTest::SetUp()
127 {
128 DistributedDBToolsUnitTest::PrintTestCaseInfo();
129 InitTestDir();
130 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
131 LOGE("rm test db files error.");
132 }
133 DistributedDBToolsUnitTest::PrintTestCaseInfo();
134 LOGD("Test dir is %s", testDir_.c_str());
135 db_ = RelationalTestUtils::CreateDataBase(storePath_);
136 ASSERT_NE(db_, nullptr);
137 CreateUserDBAndTable(db_);
138 mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
139 RelationalStoreDelegate::Option option;
140 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
141 ASSERT_NE(delegate_, nullptr);
142 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
143 ASSERT_EQ(delegate_->SetTrackerTable(trackerSchema), DBStatus::OK);
144 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
145 virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
146 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
147 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
148 virtualTranslator_ = std::make_shared<VirtualCloudDataTranslate>();
149 DataBaseSchema dataBaseSchema = GetSchema();
150 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
151 }
152
TearDown()153 void DistributedDBCloudAssetsOperationSyncTest::TearDown()
154 {
155 CloseDb();
156 EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
157 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
158 LOGE("rm test db files error.");
159 }
160 }
161
InitTestDir()162 void DistributedDBCloudAssetsOperationSyncTest::InitTestDir()
163 {
164 if (!testDir_.empty()) {
165 return;
166 }
167 DistributedDBToolsUnitTest::TestDirInit(testDir_);
168 storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
169 LOGI("The test db is:%s", testDir_.c_str());
170 }
171
GetSchema()172 DataBaseSchema DistributedDBCloudAssetsOperationSyncTest::GetSchema()
173 {
174 DataBaseSchema schema;
175 TableSchema tableSchema;
176 tableSchema.name = tableName_;
177 tableSchema.sharedTableName = tableName_ + "_shared";
178 tableSchema.fields = {
179 {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
180 {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
181 {"age", TYPE_INDEX<int64_t>}
182 };
183 schema.tables.push_back(tableSchema);
184 return schema;
185 }
186
CloseDb()187 void DistributedDBCloudAssetsOperationSyncTest::CloseDb()
188 {
189 virtualCloudDb_->ForkUpload(nullptr);
190 virtualCloudDb_ = nullptr;
191 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
192 delegate_ = nullptr;
193 mgr_ = nullptr;
194 }
195
InsertUserTableRecord(const std::string & tableName,int64_t begin,int64_t count,size_t assetCount,const Assets & templateAsset)196 void DistributedDBCloudAssetsOperationSyncTest::InsertUserTableRecord(const std::string &tableName, int64_t begin,
197 int64_t count, size_t assetCount, const Assets &templateAsset)
198 {
199 std::string photo = "phone";
200 int errCode;
201 std::vector<uint8_t> assetBlob;
202 std::vector<uint8_t> assetsBlob;
203 const int64_t index2 = 2;
204 for (int64_t i = begin; i < begin + count; ++i) {
205 std::string name = g_localAsset.name + std::to_string(i);
206 Asset asset = g_localAsset;
207 asset.name = name;
208 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
209 std::vector<Asset> assets = GetAssets(name, templateAsset, assetCount);
210 string sql = "INSERT OR REPLACE INTO " + tableName +
211 " (id, name, height, photo, asset, assets, age) VALUES ('" + std::to_string(i) +
212 "', 'local', '178.0', '" + photo + "', ?, ?, '18');";
213 sqlite3_stmt *stmt = nullptr;
214 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
215 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
216 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
217 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, index2, assetsBlob, false), E_OK);
218 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
219 SQLiteUtils::ResetStatement(stmt, true, errCode);
220 }
221 }
222
GetAssets(const std::string & baseName,const Assets & templateAsset,size_t assetCount)223 std::vector<Asset> DistributedDBCloudAssetsOperationSyncTest::GetAssets(const std::string &baseName,
224 const Assets &templateAsset, size_t assetCount)
225 {
226 std::vector<Asset> assets;
227 for (size_t i = 1; i <= assetCount; ++i) {
228 Asset asset;
229 if (i - 1 < templateAsset.size()) {
230 asset = templateAsset[i - 1];
231 } else {
232 asset = g_localAsset;
233 asset.name = baseName + "_" + std::to_string(i);
234 asset.status = static_cast<uint32_t>(AssetStatus::INSERT);
235 }
236 assets.push_back(asset);
237 }
238 return assets;
239 }
240
UpdateCloudTableRecord(int64_t begin,int64_t count,bool assetIsNull)241 void DistributedDBCloudAssetsOperationSyncTest::UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull)
242 {
243 std::vector<VBucket> record;
244 std::vector<VBucket> extend;
245 Timestamp now = TimeHelper::GetSysCurrentTime();
246 const int assetCount = 2;
247 for (int64_t i = begin; i < (begin + count); ++i) {
248 VBucket data;
249 data.insert_or_assign("id", std::to_string(i));
250 data.insert_or_assign("name", "Cloud" + std::to_string(i));
251 Assets assets;
252 for (int j = 1; j <= assetCount; ++j) {
253 Asset asset;
254 asset.name = "Phone_" + std::to_string(j);
255 asset.assetId = std::to_string(j);
256 asset.status = AssetStatus::UPDATE;
257 assets.push_back(asset);
258 }
259 data.insert_or_assign("assets", assets);
260 record.push_back(data);
261 VBucket log;
262 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
263 now / CloudDbConstant::TEN_THOUSAND));
264 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
265 now / CloudDbConstant::TEN_THOUSAND));
266 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
267 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i));
268 extend.push_back(log);
269 }
270
271 ASSERT_EQ(virtualCloudDb_->BatchUpdate(tableName_, std::move(record), extend), DBStatus::OK);
272 }
273
CheckAssetsCount(const std::vector<size_t> & expectCount)274 void DistributedDBCloudAssetsOperationSyncTest::CheckAssetsCount(const std::vector<size_t> &expectCount)
275 {
276 std::vector<VBucket> allData;
277 auto dbSchema = GetSchema();
278 ASSERT_GT(dbSchema.tables.size(), 0u);
279 ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
280 int index = 0;
281 ASSERT_EQ(allData.size(), expectCount.size());
282 for (const auto &data : allData) {
283 auto colIter = data.find("assets");
284 EXPECT_NE(colIter, data.end());
285 if (colIter == data.end()) {
286 index++;
287 continue;
288 }
289 Type colValue = data.at("assets");
290 auto translate = std::dynamic_pointer_cast<ICloudDataTranslate>(virtualTranslator_);
291 auto assets = RelationalTestUtils::GetAssets(colValue, translate);
292 LOGI("[DistributedDBCloudAssetsOperationSyncTest] Check data index %d", index);
293 EXPECT_EQ(assets.size(), expectCount[index]);
294 for (const auto &item : assets) {
295 LOGI("[DistributedDBCloudAssetsOperationSyncTest] Asset name %s status %" PRIu32, item.name.c_str(),
296 item.status);
297 }
298 index++;
299 }
300 }
301
ForkDownloadAndRemoveAsset(DBStatus removeStatus,int & downLoadCount,int & removeCount)302 void DistributedDBCloudAssetsOperationSyncTest::ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount,
303 int &removeCount)
304 {
305 virtualAssetLoader_->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
306 downLoadCount++;
307 if (downLoadCount == 1) {
308 std::string sql = "UPDATE " + tableName_ + " SET assets = NULL WHERE id = 0;";
309 ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
310 }
311 });
312 virtualAssetLoader_->ForkRemoveLocalAssets([removeStatus, &removeCount](const std::vector<Asset> &assets) {
313 EXPECT_EQ(assets.size(), 2u); // one record has 2 asset
314 removeCount++;
315 return removeStatus;
316 });
317 }
318
319 /**
320 * @tc.name: SyncWithAssetOperation001
321 * @tc.desc: Delete Assets When Download
322 * @tc.type: FUNC
323 * @tc.require:
324 * @tc.author: zhangqiquan
325 */
326 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation001, TestSize.Level0)
327 {
328 const int actualCount = 10;
329 const int deleteDataCount = 5;
330 const int deleteAssetsCount = 4;
331 InsertUserTableRecord(tableName_, 0, actualCount);
332 std::string tableName = tableName_;
__anon3504d92b0602(const std::string &, VBucket &) 333 virtualCloudDb_->ForkUpload([this, deleteDataCount, deleteAssetsCount](const std::string &, VBucket &) {
334 for (int64_t i = 0; i < deleteDataCount; i++) {
335 std::string sql = "DELETE FROM " + tableName_ + " WHERE id = " + std::to_string(i) + ";";
336 ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
337 }
338 for (int64_t i = deleteDataCount; i < deleteDataCount + deleteAssetsCount; i++) {
339 std::string sql = "UPDATE " + tableName_ + " SET asset = NULL, assets = NULL WHERE id = " +
340 std::to_string(i) + ";";
341 ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
342 }
343 });
344 Query query = Query::Select().FromTable({ tableName_ });
345 BlockSync(query, delegate_);
346 virtualCloudDb_->ForkUpload(nullptr);
347 std::vector<size_t> expectCount(actualCount - deleteDataCount, 0);
348 expectCount[expectCount.size() - 1] = 2; // default one row has 2 assets
349 CheckAssetsCount(expectCount);
350 }
351
352 /**
353 * @tc.name: SyncWithAssetOperation002
354 * @tc.desc: Download Assets When local assets was removed
355 * @tc.type: FUNC
356 * @tc.require:
357 * @tc.author: zhangqiquan
358 */
359 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation002, TestSize.Level0)
360 {
361 const int actualCount = 1;
362 InsertUserTableRecord(tableName_, 0, actualCount);
363 Query query = Query::Select().FromTable({ tableName_ });
364 BlockSync(query, delegate_);
365 int downLoadCount = 0;
366 int removeCount = 0;
367 ForkDownloadAndRemoveAsset(OK, downLoadCount, removeCount);
368 UpdateCloudTableRecord(0, actualCount, false);
369 RelationalTestUtils::CloudBlockSync(query, delegate_);
370 EXPECT_EQ(downLoadCount, 1); // local asset was removed should download 1 times
371 EXPECT_EQ(removeCount, 1);
372 virtualAssetLoader_->ForkDownload(nullptr);
373 virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
374
375 std::vector<size_t> expectCount = { 0 };
376 CheckAssetsCount(expectCount);
377 }
378
379 /**
380 * @tc.name: SyncWithAssetOperation003
381 * @tc.desc: Delete Assets When Download
382 * @tc.type: FUNC
383 * @tc.require:
384 * @tc.author: bty
385 */
386 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation003, TestSize.Level0)
387 {
388 InsertUserTableRecord(tableName_, 0, 1); // 1 is count
389 int uploadCount = 0;
__anon3504d92b0702(const std::string &, VBucket &) 390 virtualCloudDb_->ForkUpload([this, &uploadCount](const std::string &, VBucket &) {
391 if (uploadCount > 0) {
392 return;
393 }
394 SqlCondition condition;
395 condition.sql = "UPDATE " + tableName_ + " SET age = '666' WHERE id = 0;";
396 std::vector<VBucket> records;
397 EXPECT_EQ(delegate_->ExecuteSql(condition, records), OK);
398 uploadCount++;
399 });
400 Query query = Query::Select().FromTable({ tableName_ });
401 BlockSync(query, delegate_);
402 virtualCloudDb_->ForkUpload(nullptr);
403
404 std::string sql = "SELECT assets from " + tableName_ + " where id = 0;";
405 sqlite3_stmt *stmt = nullptr;
406 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
407 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
408 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
409 Type cloudValue;
410 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
411 std::vector<uint8_t> assetsBlob;
412 Assets assets;
413 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
414 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
415 ASSERT_EQ(assets.size(), 2u); // 2 is asset num
416 for (size_t i = 0; i < assets.size(); ++i) {
417 EXPECT_EQ(assets[i].status, AssetStatus::NORMAL);
418 }
419 }
420 int errCode;
421 SQLiteUtils::ResetStatement(stmt, true, errCode);
422 }
423
424 /**
425 * @tc.name: SyncWithAssetOperation004
426 * @tc.desc: Download Assets When local assets was removed
427 * @tc.type: FUNC
428 * @tc.require:
429 * @tc.author: zhangqiquan
430 */
431 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation004, TestSize.Level0)
432 {
433 const int actualCount = 5; // 5 record
434 InsertUserTableRecord(tableName_, 0, actualCount);
435 Query query = Query::Select().FromTable({ tableName_ });
436 BlockSync(query, delegate_);
437 int downLoadCount = 0;
438 int removeCount = 0;
439 ForkDownloadAndRemoveAsset(DB_ERROR, downLoadCount, removeCount);
440 UpdateCloudTableRecord(0, actualCount, false);
441 RelationalTestUtils::CloudBlockSync(query, delegate_, DBStatus::OK, DBStatus::REMOTE_ASSETS_FAIL);
442 EXPECT_EQ(downLoadCount, 5); // local asset was removed should download 5 times
443 EXPECT_EQ(removeCount, 1);
444 virtualAssetLoader_->ForkDownload(nullptr);
445 virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
446
447 std::vector<size_t> expectCount = { 0, 2, 2, 2, 2 };
448 CheckAssetsCount(expectCount);
449 }
450
InsertLocalAssetData(const std::string & assetHash)451 void DistributedDBCloudAssetsOperationSyncTest::InsertLocalAssetData(const std::string &assetHash)
452 {
453 Assets assets;
454 std::string assetNameBegin = "Phone";
455 for (int j = 1; j <= g_assetsNum; ++j) {
456 Asset asset;
457 asset.name = assetNameBegin + "_" + std::to_string(j);
458 asset.status = AssetStatus::NORMAL;
459 asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
460 asset.hash = assetHash + "_" + std::to_string(j);
461 asset.assetId = std::to_string(j);
462 assets.push_back(asset);
463 }
464 string sql = "INSERT OR REPLACE INTO " + tableName_ + " (id,name,asset,assets) VALUES('0','CloudTest0',?,?);";
465 sqlite3_stmt *stmt = nullptr;
466 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
467 std::vector<uint8_t> assetBlob;
468 std::vector<uint8_t> assetsBlob;
469 RuntimeContext::GetInstance()->AssetToBlob(g_localAsset, assetBlob);
470 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
471 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
472 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetsBlob, false), E_OK); // 2 is assetsBlob
473 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
474 int errCode;
475 SQLiteUtils::ResetStatement(stmt, true, errCode);
476 }
477
WriteDataWithoutCommitTransaction()478 void DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction()
479 {
480 ASSERT_NE(db_, nullptr);
481 SQLiteUtils::BeginTransaction(db_);
482 InsertLocalAssetData("localAsset");
483 constexpr int kSleepDurationSeconds = 3;
484 std::this_thread::sleep_for(std::chrono::seconds(kSleepDurationSeconds));
485 }
486
487 /**
488 * @tc.name: TestOpenDatabaseBusy001
489 * @tc.desc: Test open database when the database is busy.
490 * @tc.type: FUNC
491 * @tc.require:
492 * @tc.author: liufuchenxing
493 */
494 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, TestOpenDatabaseBusy001, TestSize.Level2)
495 {
496 /**
497 * @tc.steps:step1. close store.
498 * @tc.expected:step1. check ok.
499 */
500 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
501 delegate_ = nullptr;
502 /**
503 * @tc.steps:step2. Another thread write data into database into database without commit.
504 * @tc.expected:step2. check ok.
505 */
506 std::thread thread(&DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction, this);
507 std::this_thread::sleep_for(std::chrono::seconds(1));
508 /**
509 * @tc.steps:step3. open relational delegate.
510 * @tc.expected:step3. open success.
511 */
512 RelationalStoreDelegate::Option option;
513 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
514 thread.join();
515 }
516
517 /**
518 * @tc.name: IgnoreRecord001
519 * @tc.desc: Download Assets When local assets was removed
520 * @tc.type: FUNC
521 * @tc.require:
522 * @tc.author: zhangqiquan
523 */
524 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord001, TestSize.Level0)
525 {
526 const int actualCount = 1;
527 InsertUserTableRecord(tableName_, 0, actualCount);
528 Query query = Query::Select().FromTable({ tableName_ });
529 BlockSync(query, delegate_);
530 std::vector<size_t> expectCount = { 2 };
531 CheckAssetsCount(expectCount);
532
533 VBucket record;
534 record["id"] = std::to_string(0);
535 record["assets"] = Assets();
536 EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
537 record["id"] = std::to_string(1);
538 EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
539 expectCount = { 0, 0 };
540 CheckAssetsCount(expectCount);
541
542 std::vector<VBucket> logs;
543 EXPECT_EQ(RelationalTestUtils::GetRecordLog(db_, tableName_, logs), E_OK);
544 for (const auto &log : logs) {
545 int64_t cursor = std::get<int64_t>(log.at("cursor"));
546 EXPECT_GE(cursor, 0);
547 }
548 }
549
550 /**
551 * @tc.name: IgnoreRecord002
552 * @tc.desc: Ignore Assets When Download
553 * @tc.type: FUNC
554 * @tc.require:
555 * @tc.author: zhangqiquan
556 */
557 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord002, TestSize.Level0)
558 {
559 const int actualCount = 1;
560 InsertUserTableRecord(tableName_, 0, actualCount);
561 Query query = Query::Select().FromTable({ tableName_ });
562 RelationalTestUtils::CloudBlockSync(query, delegate_);
563 UpdateCloudTableRecord(0, actualCount, false);
564
565 virtualAssetLoader_->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
566 RelationalTestUtils::CloudBlockSync(query, delegate_);
567 virtualAssetLoader_->SetDownloadStatus(DBStatus::OK);
568 std::vector<size_t> expectCount = { 4 };
569 CheckAssetsCount(expectCount);
570 RelationalTestUtils::CloudBlockSync(query, delegate_);
571 }
572
573 /**
574 * @tc.name: IgnoreRecord003
575 * @tc.desc: Ignore Assets When Upload
576 * @tc.type: FUNC
577 * @tc.require:
578 * @tc.author: zhangqiquan
579 */
580 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord003, TestSize.Level0)
581 {
582 const int actualCount = 1;
583 InsertUserTableRecord(tableName_, 0, actualCount);
584 Query query = Query::Select().FromTable({ tableName_ });
585 virtualCloudDb_->SetConflictInUpload(true);
586 RelationalTestUtils::CloudBlockSync(query, delegate_);
587 virtualCloudDb_->SetConflictInUpload(false);
588 std::vector<size_t> expectCount = { 2 };
589 CheckAssetsCount(expectCount);
590 RelationalTestUtils::CloudBlockSync(query, delegate_);
591 }
592
593 /**
594 * @tc.name: UpsertData001
595 * @tc.desc: Upsert data after delete it
596 * @tc.type: FUNC
597 * @tc.require:
598 * @tc.author: zhangqiquan
599 */
600 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData001, TestSize.Level0)
601 {
602 // insert id 0 to local
603 const int actualCount = 1;
604 InsertUserTableRecord(tableName_, 0, actualCount); // 10 is phone size
605 std::vector<std::map<std::string, std::string>> conditions;
606 std::map<std::string, std::string> entries;
607 entries["id"] = "0";
608 conditions.push_back(entries);
609 // delete id 0 in local
610 RelationalTestUtils::DeleteRecord(db_, tableName_, conditions);
611 // upsert id 0 to local
612 VBucket record;
613 record["id"] = std::to_string(0);
614 record["assets"] = Assets();
615 EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
616 // check id 0 exist
617 CheckAssetsCount({ 0 });
618 }
619
620 /**
621 * @tc.name: UpsertData002
622 * @tc.desc: Test sync after Upsert.
623 * @tc.type: FUNC
624 * @tc.require:
625 * @tc.author: liaoyonghuang
626 */
627 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData002, TestSize.Level0)
628 {
629 /**
630 * @tc.steps:step1. Insert 5 records and sync.
631 * @tc.expected: step1. ok.
632 */
633 const int actualCount = 5;
634 InsertUserTableRecord(tableName_, 0, actualCount);
635 Query query = Query::Select().FromTable({ tableName_ });
636 BlockSync(query, delegate_);
637
638 /**
639 * @tc.steps:step2. UpsertData and sync.
640 * @tc.expected: step2. ok.
641 */
642 vector<VBucket> records;
643 for (int i = 0; i < actualCount; i++) {
644 VBucket record;
645 record["id"] = std::to_string(i);
646 record["name"] = std::string("UpsertName");
647 records.push_back(record);
648 }
649 EXPECT_EQ(delegate_->UpsertData(tableName_, records), OK);
650 BlockSync(query, delegate_);
651
652 /**
653 * @tc.steps:step3. Check local data.
654 * @tc.expected: step3. All local data has been merged by the cloud.
655 */
656 std::vector<VBucket> allData;
657 auto dbSchema = GetSchema();
658 ASSERT_GT(dbSchema.tables.size(), 0u);
659 ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
660 for (const auto &data : allData) {
661 ASSERT_EQ(std::get<std::string>(data.at("name")), "local");
662 }
663 }
664
665 /**
666 * @tc.name: SyncWithAssetConflict001
667 * @tc.desc: Upload with asset no change
668 * @tc.type: FUNC
669 * @tc.require:
670 * @tc.author: zhangqiquan
671 */
672 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetConflict001, TestSize.Level0)
673 {
674 // cloud and local insert same data
675 const int actualCount = 1;
676 RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
677 std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for data conflict
678 InsertUserTableRecord(tableName_, 0, actualCount);
679 // sync and local asset's status are normal
680 Query query = Query::Select().FromTable({ tableName_ });
681 RelationalTestUtils::CloudBlockSync(query, delegate_);
682 auto dbSchema = GetSchema();
683 ASSERT_GT(dbSchema.tables.size(), 0u);
684 auto assets = RelationalTestUtils::GetAllAssets(db_, dbSchema.tables[0], virtualTranslator_);
685 for (const auto &oneRow : assets) {
686 for (const auto &asset : oneRow) {
687 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
688 }
689 }
690 }
691
692 /**
693 * @tc.name: UpsertDataInvalid001
694 * @tc.desc: Upsert invalid data
695 * @tc.type: FUNC
696 * @tc.require:
697 * @tc.author: wangxiangdong
698 */
699 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid001, TestSize.Level0)
700 {
701 VBucket record;
702 record["id"] = std::to_string(0);
703 record["assets"] = Assets();
704 /**
705 * @tc.steps:step1. UpsertData to empty table.
706 * @tc.expected: step1. INVALID_ARGS.
707 */
708 EXPECT_EQ(delegate_->UpsertData("", { record }), INVALID_ARGS);
709 /**
710 * @tc.steps:step2. UpsertData to shared table.
711 * @tc.expected: step2. INVALID_ARGS.
712 */
713 EXPECT_EQ(delegate_->UpsertData(tableName_ + "_shared", { record }), NOT_SUPPORT);
714 /**
715 * @tc.steps:step3. UpsertData to not device table and shared table.
716 * @tc.expected: step3. NOT_FOUND.
717 */
718 const char *createSQL =
719 "CREATE TABLE IF NOT EXISTS testing(" \
720 "id TEXT PRIMARY KEY," \
721 "name TEXT," \
722 "height REAL ," \
723 "photo BLOB," \
724 "asset ASSET," \
725 "assets ASSETS," \
726 "age INT);";
727 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
728 EXPECT_EQ(delegate_->UpsertData("testing", { record }), NOT_FOUND);
729 /**
730 * @tc.steps:step4. UpsertData to not exist table.
731 * @tc.expected: step4. NOT_FOUND.
732 */
733 EXPECT_EQ(delegate_->UpsertData("TABLE_NOT_EXIST", { record }), NOT_FOUND);
734 }
735
736 /**
737 * @tc.name: UpsertDataInvalid002
738 * @tc.desc: Upsert device data
739 * @tc.type: FUNC
740 * @tc.require:
741 * @tc.author: wangxiangdong
742 */
743 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid002, TestSize.Level0)
744 {
745 VBucket record;
746 record["id"] = std::to_string(0);
747 record["assets"] = Assets();
748 /**
749 * @tc.steps:step1. create user table.
750 * @tc.expected: step1. INVALID_ARGS.
751 */
752 const char *createSQL =
753 "CREATE TABLE IF NOT EXISTS devTable(" \
754 "id TEXT PRIMARY KEY," \
755 "name TEXT," \
756 "height REAL ," \
757 "photo BLOB," \
758 "asset ASSET," \
759 "assets ASSETS," \
760 "age INT);";
761 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
762 /**
763 * @tc.steps:step2. create device table.
764 * @tc.expected: step2. OK.
765 */
766 RelationalStoreDelegate *delegate1 = nullptr;
767 std::shared_ptr<RelationalStoreManager> mgr1 = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
768 RelationalStoreDelegate::Option option;
769 ASSERT_EQ(mgr1->OpenStore(storePath_, STORE_ID_1, option, delegate1), DBStatus::OK);
770 ASSERT_NE(delegate1, nullptr);
771 std::string deviceTableName = "devTable";
772 ASSERT_EQ(delegate1->CreateDistributedTable(deviceTableName, DEVICE_COOPERATION), DBStatus::OK);
773 DataBaseSchema dataBaseSchema;
774 TableSchema tableSchema;
775 tableSchema.name = deviceTableName;
776 tableSchema.sharedTableName = deviceTableName + "_shared";
777 tableSchema.fields = {
778 {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
779 {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
780 {"age", TYPE_INDEX<int64_t>}
781 };
782 dataBaseSchema.tables.push_back(tableSchema);
783 ASSERT_EQ(delegate1->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
784 /**
785 * @tc.steps:step3. UpsertData to device table.
786 * @tc.expected: step3. NOT_FOUND.
787 */
788 EXPECT_EQ(delegate1->UpsertData(deviceTableName, { record }), NOT_FOUND);
789 EXPECT_EQ(mgr1->CloseStore(delegate1), DBStatus::OK);
790 delegate1 = nullptr;
791 mgr1 = nullptr;
792 }
793
794 /**
795 * @tc.name: UploadAssetsTest001
796 * @tc.desc: Test upload asset with error.
797 * @tc.type: FUNC
798 * @tc.require:
799 * @tc.author: liaoyonghuang
800 */
801 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest001, TestSize.Level1)
802 {
803 /**
804 * @tc.steps:step1. Insert 10 records.
805 * @tc.expected: step1. ok.
806 */
807 const int actualCount = 10;
808 InsertUserTableRecord(tableName_, 0, actualCount);
809 /**
810 * @tc.steps:step2. Set callback function to cause some upstream data to fail.
811 * @tc.expected: step2. ok.
812 */
813 int recordIndex = 0;
814 Asset tempAsset = {
815 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
816 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
817 };
__anon3504d92b0802(const std::string &tableName, VBucket &extend) 818 virtualCloudDb_->ForkUpload([&tempAsset, &recordIndex](const std::string &tableName, VBucket &extend) {
819 Asset asset;
820 Assets assets;
821 switch (recordIndex) {
822 case 0: // record[0] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
823 extend[std::string(CloudDbConstant::ERROR_FIELD)] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
824 break;
825 case 1: // record[1] is considered successful because it is a conflict.
826 extend[std::string(CloudDbConstant::ERROR_FIELD)] =
827 static_cast<int64_t>(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
828 break;
829 case 2: // record[2] fail because of empty gid.
830 extend[std::string(CloudDbConstant::GID_FIELD)] = std::string("");
831 break;
832 case 3: // record[3] fail because of empty assetId.
833 asset = tempAsset;
834 asset.assetId = "";
835 extend[std::string(CloudDbConstant::ASSET)] = asset;
836 break;
837 case 4: // record[4] fail because of empty assetId.
838 assets.push_back(tempAsset);
839 assets[0].assetId = "";
840 extend[std::string(CloudDbConstant::ASSETS)] = assets;
841 break;
842 case 5: // record[5] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
843 extend[std::string(CloudDbConstant::ERROR_FIELD)] = std::string("");
844 break;
845 default:
846 break;
847 }
848 recordIndex++;
849 });
850 /**
851 * @tc.steps:step3. Sync and check upLoadInfo.
852 * @tc.expected: step3. failCount is 5 and successCount is 5.
853 */
854 Query query = Query::Select().FromTable({ tableName_ });
855 BlockSync(query, delegate_);
856 for (const auto &table : lastProcess_.tableProcess) {
857 EXPECT_EQ(table.second.upLoadInfo.total, 10u);
858 EXPECT_EQ(table.second.upLoadInfo.failCount, 3u);
859 EXPECT_EQ(table.second.upLoadInfo.successCount, 7u);
860 }
861 virtualCloudDb_->ForkUpload(nullptr);
862 }
863
864 /**
865 * @tc.name: UploadAssetsTest002
866 * @tc.desc: Test upload asset with error.
867 * @tc.type: FUNC
868 * @tc.require:
869 * @tc.author: liaoyonghuang
870 */
871 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest002, TestSize.Level1)
872 {
873 /**
874 * @tc.steps:step1. Insert 10 records.
875 * @tc.expected: step1. ok.
876 */
877 const int actualCount = 10;
878 InsertUserTableRecord(tableName_, 0, actualCount);
879 Query query = Query::Select().FromTable({ tableName_ });
880 BlockSync(query, delegate_);
881 /**
882 * @tc.steps:step2. Delete local data.
883 * @tc.expected: step2. OK.
884 */
885 std::string sql = "delete from " + tableName_ + " where id >= " + std::to_string(actualCount / 2);
886 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
887 /**
888 * @tc.steps:step3. Set callback function to cause some upstream data to fail.
889 * @tc.expected: step3. ok.
890 */
__anon3504d92b0902(const std::string &tableName, VBucket &extend) 891 virtualCloudDb_->ForkUpload([](const std::string &tableName, VBucket &extend) {
892 extend[std::string(CloudDbConstant::GID_FIELD)] = "";
893 });
894 BlockSync(query, delegate_);
895 for (const auto &table : lastProcess_.tableProcess) {
896 EXPECT_EQ(table.second.upLoadInfo.total, 5u);
897 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
898 EXPECT_EQ(table.second.upLoadInfo.successCount, 5u);
899 }
900 virtualCloudDb_->ForkUpload(nullptr);
901 }
902
903 /**
904 * @tc.name: UploadAssetsTest003
905 * @tc.desc: Test upload asset with error CLOUD_RECORD_ALREADY_EXISTED.
906 * @tc.type: FUNC
907 * @tc.require:
908 * @tc.author: liaoyonghuang
909 */
910 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest003, TestSize.Level0)
911 {
912 /**
913 * @tc.steps:step1. Insert 100 records.
914 * @tc.expected: step1. ok.
915 */
916 const int actualCount = 100;
917 InsertUserTableRecord(tableName_, 0, actualCount);
918 /**
919 * @tc.steps:step2. Set callback function to return CLOUD_RECORD_ALREADY_EXISTED in 1st batch.
920 * @tc.expected: step2. ok.
921 */
922 int uploadCount = 0;
__anon3504d92b0a02(const std::string &tableName, VBucket &extend) 923 virtualCloudDb_->ForkUpload([&uploadCount](const std::string &tableName, VBucket &extend) {
924 if (uploadCount < 30) { // There are a total of 30 pieces of data in one batch of upstream data
925 extend[std::string(CloudDbConstant::ERROR_FIELD)] =
926 static_cast<int64_t>(DBStatus::CLOUD_RECORD_ALREADY_EXISTED);
927 }
928 uploadCount++;
929 });
930 Query query = Query::Select().FromTable({ tableName_ });
931 BlockSync(query, delegate_);
932 for (const auto &table : lastProcess_.tableProcess) {
933 EXPECT_EQ(table.second.upLoadInfo.batchIndex, 4u);
934 EXPECT_EQ(table.second.upLoadInfo.total, 70u);
935 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
936 EXPECT_EQ(table.second.upLoadInfo.successCount, 70u);
937 EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
938 }
939 virtualCloudDb_->ForkUpload(nullptr);
940 }
941 }
942 #endif
943