• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_db_types.h"
19 #include "distributeddb_data_generate_unit_test.h"
20 #include "log_print.h"
21 #include "relational_store_delegate.h"
22 #include "relational_store_manager.h"
23 #include "runtime_config.h"
24 #include "time_helper.h"
25 #include "virtual_asset_loader.h"
26 #include "virtual_cloud_data_translate.h"
27 #include "virtual_cloud_db.h"
28 #include "sqlite_relational_utils.h"
29 #include "cloud/cloud_storage_utils.h"
30 
31 namespace {
32 using namespace testing::ext;
33 using namespace DistributedDB;
34 using namespace DistributedDBUnitTest;
35 const char *g_createSQL =
36     "CREATE TABLE IF NOT EXISTS DistributedDBCloudAssetsOperationSyncTest(" \
37     "id TEXT PRIMARY KEY," \
38     "name TEXT," \
39     "height REAL ," \
40     "photo BLOB," \
41     "asset ASSET," \
42     "assets ASSETS," \
43     "age INT);";
44 const int64_t g_syncWaitTime = 60;
45 const int g_assetsNum = 3;
46 const Asset g_localAsset = {
47     .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
48     .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
49 };
50 SyncProcess lastProcess_;
51 
CreateUserDBAndTable(sqlite3 * & db)52 void CreateUserDBAndTable(sqlite3 *&db)
53 {
54     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
55     EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createSQL), SQLITE_OK);
56 }
57 
BlockSync(const Query & query,RelationalStoreDelegate * delegate)58 void BlockSync(const Query &query, RelationalStoreDelegate *delegate)
59 {
60     std::mutex dataMutex;
61     std::condition_variable cv;
62     bool finish = false;
63     SyncProcess last;
64     auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
65         for (const auto &item: process) {
66             if (item.second.process == DistributedDB::FINISHED) {
67                 {
68                     std::lock_guard<std::mutex> autoLock(dataMutex);
69                     finish = true;
70                 }
71                 last = item.second;
72                 cv.notify_one();
73             }
74         }
75     };
76     LOGW("begin call sync");
77     ASSERT_EQ(delegate->Sync({ "CLOUD" }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), OK);
78     std::unique_lock<std::mutex> uniqueLock(dataMutex);
79     cv.wait(uniqueLock, [&finish]() {
80         return finish;
81     });
82     lastProcess_ = last;
83     LOGW("end call sync");
84 }
85 
86 class DistributedDBCloudAssetsOperationSyncTest : public testing::Test {
87 public:
88     static void SetUpTestCase();
89     static void TearDownTestCase();
90     void SetUp() override;
91     void TearDown() override;
92     void WriteDataWithoutCommitTransaction();
93 protected:
94     void InitTestDir();
95     DataBaseSchema GetSchema();
96     void CloseDb();
97     void InsertUserTableRecord(const std::string &tableName, int64_t begin, int64_t count, size_t assetCount = 2u,
98         const Assets &templateAsset = {});
99     void CheckAssetsCount(const std::vector<size_t> &expectCount);
100     void UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull);
101     void ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount, int &removeCount);
102     void InsertLocalAssetData(const std::string &assetHash);
103     std::vector<Asset> GetAssets(const std::string &baseName, const Assets &templateAsset, size_t assetCount);
104     std::string testDir_;
105     std::string storePath_;
106     sqlite3 *db_ = nullptr;
107     RelationalStoreDelegate *delegate_ = nullptr;
108     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
109     std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
110     std::shared_ptr<VirtualCloudDataTranslate> virtualTranslator_ = nullptr;
111     std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
112     std::string tableName_ = "DistributedDBCloudAssetsOperationSyncTest";
113     TrackerSchema trackerSchema = {
114         .tableName = tableName_, .extendColName = "name", .trackerColNames = {"age"}
115     };
116 };
117 
SetUpTestCase()118 void DistributedDBCloudAssetsOperationSyncTest::SetUpTestCase()
119 {
120     RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
121 }
122 
TearDownTestCase()123 void DistributedDBCloudAssetsOperationSyncTest::TearDownTestCase()
124 {}
125 
SetUp()126 void DistributedDBCloudAssetsOperationSyncTest::SetUp()
127 {
128     DistributedDBToolsUnitTest::PrintTestCaseInfo();
129     InitTestDir();
130     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
131         LOGE("rm test db files error.");
132     }
133     DistributedDBToolsUnitTest::PrintTestCaseInfo();
134     LOGD("Test dir is %s", testDir_.c_str());
135     db_ = RelationalTestUtils::CreateDataBase(storePath_);
136     ASSERT_NE(db_, nullptr);
137     CreateUserDBAndTable(db_);
138     mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
139     RelationalStoreDelegate::Option option;
140     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
141     ASSERT_NE(delegate_, nullptr);
142     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
143     ASSERT_EQ(delegate_->SetTrackerTable(trackerSchema), DBStatus::OK);
144     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
145     virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
146     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
147     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
148     virtualTranslator_ = std::make_shared<VirtualCloudDataTranslate>();
149     DataBaseSchema dataBaseSchema = GetSchema();
150     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
151 }
152 
TearDown()153 void DistributedDBCloudAssetsOperationSyncTest::TearDown()
154 {
155     CloseDb();
156     EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
157     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
158         LOGE("rm test db files error.");
159     }
160 }
161 
InitTestDir()162 void DistributedDBCloudAssetsOperationSyncTest::InitTestDir()
163 {
164     if (!testDir_.empty()) {
165         return;
166     }
167     DistributedDBToolsUnitTest::TestDirInit(testDir_);
168     storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
169     LOGI("The test db is:%s", testDir_.c_str());
170 }
171 
GetSchema()172 DataBaseSchema DistributedDBCloudAssetsOperationSyncTest::GetSchema()
173 {
174     DataBaseSchema schema;
175     TableSchema tableSchema;
176     tableSchema.name = tableName_;
177     tableSchema.sharedTableName = tableName_ + "_shared";
178     tableSchema.fields = {
179         {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
180         {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
181         {"age", TYPE_INDEX<int64_t>}
182     };
183     schema.tables.push_back(tableSchema);
184     return schema;
185 }
186 
CloseDb()187 void DistributedDBCloudAssetsOperationSyncTest::CloseDb()
188 {
189     virtualCloudDb_->ForkUpload(nullptr);
190     virtualCloudDb_ = nullptr;
191     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
192     delegate_ = nullptr;
193     mgr_ = nullptr;
194 }
195 
InsertUserTableRecord(const std::string & tableName,int64_t begin,int64_t count,size_t assetCount,const Assets & templateAsset)196 void DistributedDBCloudAssetsOperationSyncTest::InsertUserTableRecord(const std::string &tableName, int64_t begin,
197     int64_t count, size_t assetCount, const Assets &templateAsset)
198 {
199     std::string photo = "phone";
200     int errCode;
201     std::vector<uint8_t> assetBlob;
202     std::vector<uint8_t> assetsBlob;
203     const int64_t index2 = 2;
204     for (int64_t i = begin; i < begin + count; ++i) {
205         std::string name = g_localAsset.name + std::to_string(i);
206         Asset asset = g_localAsset;
207         asset.name = name;
208         RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
209         std::vector<Asset> assets = GetAssets(name, templateAsset, assetCount);
210         string sql = "INSERT OR REPLACE INTO " + tableName +
211             " (id, name, height, photo, asset, assets, age) VALUES ('" + std::to_string(i) +
212             "', 'local', '178.0', '" + photo + "', ?, ?, '18');";
213         sqlite3_stmt *stmt = nullptr;
214         ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
215         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
216         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
217         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, index2, assetsBlob, false), E_OK);
218         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
219         SQLiteUtils::ResetStatement(stmt, true, errCode);
220     }
221 }
222 
GetAssets(const std::string & baseName,const Assets & templateAsset,size_t assetCount)223 std::vector<Asset> DistributedDBCloudAssetsOperationSyncTest::GetAssets(const std::string &baseName,
224     const Assets &templateAsset, size_t assetCount)
225 {
226     std::vector<Asset> assets;
227     for (size_t i = 1; i <= assetCount; ++i) {
228         Asset asset;
229         if (i - 1 < templateAsset.size()) {
230             asset = templateAsset[i - 1];
231         } else {
232             asset = g_localAsset;
233             asset.name = baseName + "_" + std::to_string(i);
234             asset.status = static_cast<uint32_t>(AssetStatus::INSERT);
235         }
236         assets.push_back(asset);
237     }
238     return assets;
239 }
240 
UpdateCloudTableRecord(int64_t begin,int64_t count,bool assetIsNull)241 void DistributedDBCloudAssetsOperationSyncTest::UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull)
242 {
243     std::vector<VBucket> record;
244     std::vector<VBucket> extend;
245     Timestamp now = TimeHelper::GetSysCurrentTime();
246     const int assetCount = 2;
247     for (int64_t i = begin; i < (begin + count); ++i) {
248         VBucket data;
249         data.insert_or_assign("id", std::to_string(i));
250         data.insert_or_assign("name", "Cloud" + std::to_string(i));
251         Assets assets;
252         for (int j = 1; j <= assetCount; ++j) {
253             Asset asset;
254             asset.name = "Phone_" + std::to_string(j);
255             asset.assetId = std::to_string(j);
256             asset.status = AssetStatus::UPDATE;
257             assets.push_back(asset);
258         }
259         data.insert_or_assign("assets", assets);
260         record.push_back(data);
261         VBucket log;
262         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
263             now / CloudDbConstant::TEN_THOUSAND));
264         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
265             now / CloudDbConstant::TEN_THOUSAND));
266         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
267         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i));
268         extend.push_back(log);
269     }
270 
271     ASSERT_EQ(virtualCloudDb_->BatchUpdate(tableName_, std::move(record), extend), DBStatus::OK);
272 }
273 
CheckAssetsCount(const std::vector<size_t> & expectCount)274 void DistributedDBCloudAssetsOperationSyncTest::CheckAssetsCount(const std::vector<size_t> &expectCount)
275 {
276     std::vector<VBucket> allData;
277     auto dbSchema = GetSchema();
278     ASSERT_GT(dbSchema.tables.size(), 0u);
279     ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
280     int index = 0;
281     ASSERT_EQ(allData.size(), expectCount.size());
282     for (const auto &data : allData) {
283         auto colIter = data.find("assets");
284         EXPECT_NE(colIter, data.end());
285         if (colIter == data.end()) {
286             index++;
287             continue;
288         }
289         Type colValue = data.at("assets");
290         auto translate = std::dynamic_pointer_cast<ICloudDataTranslate>(virtualTranslator_);
291         auto assets = RelationalTestUtils::GetAssets(colValue, translate);
292         LOGI("[DistributedDBCloudAssetsOperationSyncTest] Check data index %d", index);
293         EXPECT_EQ(assets.size(), expectCount[index]);
294         for (const auto &item : assets) {
295             LOGI("[DistributedDBCloudAssetsOperationSyncTest] Asset name %s status %" PRIu32, item.name.c_str(),
296                 item.status);
297         }
298         index++;
299     }
300 }
301 
ForkDownloadAndRemoveAsset(DBStatus removeStatus,int & downLoadCount,int & removeCount)302 void DistributedDBCloudAssetsOperationSyncTest::ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount,
303     int &removeCount)
304 {
305     virtualAssetLoader_->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
306         downLoadCount++;
307         if (downLoadCount == 1) {
308             std::string sql = "UPDATE " + tableName_ + " SET assets = NULL WHERE id = 0;";
309             ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
310         }
311     });
312     virtualAssetLoader_->ForkRemoveLocalAssets([removeStatus, &removeCount](const std::vector<Asset> &assets) {
313         EXPECT_EQ(assets.size(), 2u); // one record has 2 asset
314         removeCount++;
315         return removeStatus;
316     });
317 }
318 
319 /**
320  * @tc.name: SyncWithAssetOperation001
321  * @tc.desc: Delete Assets When Download
322  * @tc.type: FUNC
323  * @tc.require:
324  * @tc.author: zhangqiquan
325  */
326 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation001, TestSize.Level0)
327 {
328     const int actualCount = 10;
329     const int deleteDataCount = 5;
330     const int deleteAssetsCount = 4;
331     InsertUserTableRecord(tableName_, 0, actualCount);
332     std::string tableName = tableName_;
__anon3504d92b0602(const std::string &, VBucket &) 333     virtualCloudDb_->ForkUpload([this, deleteDataCount, deleteAssetsCount](const std::string &, VBucket &) {
334         for (int64_t i = 0; i < deleteDataCount; i++) {
335             std::string sql = "DELETE FROM " + tableName_ + " WHERE id = " + std::to_string(i) + ";";
336             ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
337         }
338         for (int64_t i = deleteDataCount; i < deleteDataCount + deleteAssetsCount; i++) {
339             std::string sql = "UPDATE " + tableName_ + " SET asset = NULL, assets = NULL WHERE id = " +
340                 std::to_string(i) + ";";
341             ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
342         }
343     });
344     Query query = Query::Select().FromTable({ tableName_ });
345     BlockSync(query, delegate_);
346     virtualCloudDb_->ForkUpload(nullptr);
347     std::vector<size_t> expectCount(actualCount - deleteDataCount, 0);
348     expectCount[expectCount.size() - 1] = 2; // default one row has 2 assets
349     CheckAssetsCount(expectCount);
350 }
351 
352 /**
353  * @tc.name: SyncWithAssetOperation002
354  * @tc.desc: Download Assets When local assets was removed
355  * @tc.type: FUNC
356  * @tc.require:
357  * @tc.author: zhangqiquan
358  */
359 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation002, TestSize.Level0)
360 {
361     const int actualCount = 1;
362     InsertUserTableRecord(tableName_, 0, actualCount);
363     Query query = Query::Select().FromTable({ tableName_ });
364     BlockSync(query, delegate_);
365     int downLoadCount = 0;
366     int removeCount = 0;
367     ForkDownloadAndRemoveAsset(OK, downLoadCount, removeCount);
368     UpdateCloudTableRecord(0, actualCount, false);
369     RelationalTestUtils::CloudBlockSync(query, delegate_);
370     EXPECT_EQ(downLoadCount, 1); // local asset was removed should download 1 times
371     EXPECT_EQ(removeCount, 1);
372     virtualAssetLoader_->ForkDownload(nullptr);
373     virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
374 
375     std::vector<size_t> expectCount = { 0 };
376     CheckAssetsCount(expectCount);
377 }
378 
379 /**
380  * @tc.name: SyncWithAssetOperation003
381  * @tc.desc: Delete Assets When Download
382  * @tc.type: FUNC
383  * @tc.require:
384  * @tc.author: bty
385  */
386 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation003, TestSize.Level0)
387 {
388     InsertUserTableRecord(tableName_, 0, 1); // 1 is count
389     int uploadCount = 0;
__anon3504d92b0702(const std::string &, VBucket &) 390     virtualCloudDb_->ForkUpload([this, &uploadCount](const std::string &, VBucket &) {
391         if (uploadCount > 0) {
392             return;
393         }
394         SqlCondition condition;
395         condition.sql = "UPDATE " + tableName_ + " SET age = '666' WHERE id = 0;";
396         std::vector<VBucket> records;
397         EXPECT_EQ(delegate_->ExecuteSql(condition, records), OK);
398         uploadCount++;
399     });
400     Query query = Query::Select().FromTable({ tableName_ });
401     BlockSync(query, delegate_);
402     virtualCloudDb_->ForkUpload(nullptr);
403 
404     std::string sql = "SELECT assets from " + tableName_ + " where id = 0;";
405     sqlite3_stmt *stmt = nullptr;
406     ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
407     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
408         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
409         Type cloudValue;
410         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
411         std::vector<uint8_t> assetsBlob;
412         Assets assets;
413         ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
414         ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
415         ASSERT_EQ(assets.size(), 2u); // 2 is asset num
416         for (size_t i = 0; i < assets.size(); ++i) {
417             EXPECT_EQ(assets[i].status, AssetStatus::NORMAL);
418         }
419     }
420     int errCode;
421     SQLiteUtils::ResetStatement(stmt, true, errCode);
422 }
423 
424 /**
425  * @tc.name: SyncWithAssetOperation004
426  * @tc.desc: Download Assets When local assets was removed
427  * @tc.type: FUNC
428  * @tc.require:
429  * @tc.author: zhangqiquan
430  */
431 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation004, TestSize.Level0)
432 {
433     const int actualCount = 5; // 5 record
434     InsertUserTableRecord(tableName_, 0, actualCount);
435     Query query = Query::Select().FromTable({ tableName_ });
436     BlockSync(query, delegate_);
437     int downLoadCount = 0;
438     int removeCount = 0;
439     ForkDownloadAndRemoveAsset(DB_ERROR, downLoadCount, removeCount);
440     UpdateCloudTableRecord(0, actualCount, false);
441     RelationalTestUtils::CloudBlockSync(query, delegate_, DBStatus::OK, DBStatus::REMOTE_ASSETS_FAIL);
442     EXPECT_EQ(downLoadCount, 5); // local asset was removed should download 5 times
443     EXPECT_EQ(removeCount, 1);
444     virtualAssetLoader_->ForkDownload(nullptr);
445     virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
446 
447     std::vector<size_t> expectCount = { 0, 2, 2, 2, 2 };
448     CheckAssetsCount(expectCount);
449 }
450 
InsertLocalAssetData(const std::string & assetHash)451 void DistributedDBCloudAssetsOperationSyncTest::InsertLocalAssetData(const std::string &assetHash)
452 {
453     Assets assets;
454     std::string assetNameBegin = "Phone";
455     for (int j = 1; j <= g_assetsNum; ++j) {
456         Asset asset;
457         asset.name = assetNameBegin + "_" + std::to_string(j);
458         asset.status = AssetStatus::NORMAL;
459         asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
460         asset.hash = assetHash + "_" + std::to_string(j);
461         asset.assetId = std::to_string(j);
462         assets.push_back(asset);
463     }
464     string sql = "INSERT OR REPLACE INTO " + tableName_ + " (id,name,asset,assets) VALUES('0','CloudTest0',?,?);";
465     sqlite3_stmt *stmt = nullptr;
466     ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
467     std::vector<uint8_t> assetBlob;
468     std::vector<uint8_t> assetsBlob;
469     RuntimeContext::GetInstance()->AssetToBlob(g_localAsset, assetBlob);
470     RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
471     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
472     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetsBlob, false), E_OK); // 2 is assetsBlob
473     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
474     int errCode;
475     SQLiteUtils::ResetStatement(stmt, true, errCode);
476 }
477 
WriteDataWithoutCommitTransaction()478 void DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction()
479 {
480     ASSERT_NE(db_, nullptr);
481     SQLiteUtils::BeginTransaction(db_);
482     InsertLocalAssetData("localAsset");
483     constexpr int kSleepDurationSeconds = 3;
484     std::this_thread::sleep_for(std::chrono::seconds(kSleepDurationSeconds));
485 }
486 
487 /**
488  * @tc.name: TestOpenDatabaseBusy001
489  * @tc.desc: Test open database when the database is busy.
490  * @tc.type: FUNC
491  * @tc.require:
492  * @tc.author: liufuchenxing
493  */
494 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, TestOpenDatabaseBusy001, TestSize.Level2)
495 {
496     /**
497      * @tc.steps:step1. close store.
498      * @tc.expected:step1. check ok.
499      */
500     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
501     delegate_ = nullptr;
502     /**
503      * @tc.steps:step2. Another thread write data into database into database without commit.
504      * @tc.expected:step2. check ok.
505      */
506     std::thread thread(&DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction, this);
507     std::this_thread::sleep_for(std::chrono::seconds(1));
508     /**
509      * @tc.steps:step3. open relational delegate.
510      * @tc.expected:step3. open success.
511      */
512     RelationalStoreDelegate::Option option;
513     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
514     thread.join();
515 }
516 
517 /**
518  * @tc.name: IgnoreRecord001
519  * @tc.desc: Download Assets When local assets was removed
520  * @tc.type: FUNC
521  * @tc.require:
522  * @tc.author: zhangqiquan
523  */
524 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord001, TestSize.Level0)
525 {
526     const int actualCount = 1;
527     InsertUserTableRecord(tableName_, 0, actualCount);
528     Query query = Query::Select().FromTable({ tableName_ });
529     BlockSync(query, delegate_);
530     std::vector<size_t> expectCount = { 2 };
531     CheckAssetsCount(expectCount);
532 
533     VBucket record;
534     record["id"] = std::to_string(0);
535     record["assets"] = Assets();
536     EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
537     record["id"] = std::to_string(1);
538     EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
539     expectCount = { 0, 0 };
540     CheckAssetsCount(expectCount);
541 
542     std::vector<VBucket> logs;
543     EXPECT_EQ(RelationalTestUtils::GetRecordLog(db_, tableName_, logs), E_OK);
544     for (const auto &log : logs) {
545         int64_t cursor = std::get<int64_t>(log.at("cursor"));
546         EXPECT_GE(cursor, 0);
547     }
548 }
549 
550 /**
551  * @tc.name: IgnoreRecord002
552  * @tc.desc: Ignore Assets When Download
553  * @tc.type: FUNC
554  * @tc.require:
555  * @tc.author: zhangqiquan
556  */
557 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord002, TestSize.Level0)
558 {
559     const int actualCount = 1;
560     InsertUserTableRecord(tableName_, 0, actualCount);
561     Query query = Query::Select().FromTable({ tableName_ });
562     RelationalTestUtils::CloudBlockSync(query, delegate_);
563     UpdateCloudTableRecord(0, actualCount, false);
564 
565     virtualAssetLoader_->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
566     RelationalTestUtils::CloudBlockSync(query, delegate_);
567     virtualAssetLoader_->SetDownloadStatus(DBStatus::OK);
568     std::vector<size_t> expectCount = { 4 };
569     CheckAssetsCount(expectCount);
570     RelationalTestUtils::CloudBlockSync(query, delegate_);
571 }
572 
573 /**
574  * @tc.name: IgnoreRecord003
575  * @tc.desc: Ignore Assets When Upload
576  * @tc.type: FUNC
577  * @tc.require:
578  * @tc.author: zhangqiquan
579  */
580 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord003, TestSize.Level0)
581 {
582     const int actualCount = 1;
583     InsertUserTableRecord(tableName_, 0, actualCount);
584     Query query = Query::Select().FromTable({ tableName_ });
585     virtualCloudDb_->SetConflictInUpload(true);
586     RelationalTestUtils::CloudBlockSync(query, delegate_);
587     virtualCloudDb_->SetConflictInUpload(false);
588     std::vector<size_t> expectCount = { 2 };
589     CheckAssetsCount(expectCount);
590     RelationalTestUtils::CloudBlockSync(query, delegate_);
591 }
592 
593 /**
594  * @tc.name: UpsertData001
595  * @tc.desc: Upsert data after delete it
596  * @tc.type: FUNC
597  * @tc.require:
598  * @tc.author: zhangqiquan
599  */
600 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData001, TestSize.Level0)
601 {
602     // insert id 0 to local
603     const int actualCount = 1;
604     InsertUserTableRecord(tableName_, 0, actualCount); // 10 is phone size
605     std::vector<std::map<std::string, std::string>> conditions;
606     std::map<std::string, std::string> entries;
607     entries["id"] = "0";
608     conditions.push_back(entries);
609     // delete id 0 in local
610     RelationalTestUtils::DeleteRecord(db_, tableName_, conditions);
611     // upsert id 0 to local
612     VBucket record;
613     record["id"] = std::to_string(0);
614     record["assets"] = Assets();
615     EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
616     // check id 0 exist
617     CheckAssetsCount({ 0 });
618 }
619 
620 /**
621  * @tc.name: UpsertData002
622  * @tc.desc: Test sync after Upsert.
623  * @tc.type: FUNC
624  * @tc.require:
625  * @tc.author: liaoyonghuang
626  */
627 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData002, TestSize.Level0)
628 {
629     /**
630      * @tc.steps:step1. Insert 5 records and sync.
631      * @tc.expected: step1. ok.
632      */
633     const int actualCount = 5;
634     InsertUserTableRecord(tableName_, 0, actualCount);
635     Query query = Query::Select().FromTable({ tableName_ });
636     BlockSync(query, delegate_);
637 
638     /**
639      * @tc.steps:step2. UpsertData and sync.
640      * @tc.expected: step2. ok.
641      */
642     vector<VBucket> records;
643     for (int i = 0; i < actualCount; i++) {
644         VBucket record;
645         record["id"] = std::to_string(i);
646         record["name"] = std::string("UpsertName");
647         records.push_back(record);
648     }
649     EXPECT_EQ(delegate_->UpsertData(tableName_, records), OK);
650     BlockSync(query, delegate_);
651 
652     /**
653      * @tc.steps:step3. Check local data.
654      * @tc.expected: step3. All local data has been merged by the cloud.
655      */
656     std::vector<VBucket> allData;
657     auto dbSchema = GetSchema();
658     ASSERT_GT(dbSchema.tables.size(), 0u);
659     ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
660     for (const auto &data : allData) {
661         ASSERT_EQ(std::get<std::string>(data.at("name")), "local");
662     }
663 }
664 
665 /**
666  * @tc.name: SyncWithAssetConflict001
667  * @tc.desc: Upload with asset no change
668  * @tc.type: FUNC
669  * @tc.require:
670  * @tc.author: zhangqiquan
671  */
672 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetConflict001, TestSize.Level0)
673 {
674     // cloud and local insert same data
675     const int actualCount = 1;
676     RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
677     std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for data conflict
678     InsertUserTableRecord(tableName_, 0, actualCount);
679     // sync and local asset's status are normal
680     Query query = Query::Select().FromTable({ tableName_ });
681     RelationalTestUtils::CloudBlockSync(query, delegate_);
682     auto dbSchema = GetSchema();
683     ASSERT_GT(dbSchema.tables.size(), 0u);
684     auto assets = RelationalTestUtils::GetAllAssets(db_, dbSchema.tables[0], virtualTranslator_);
685     for (const auto &oneRow : assets) {
686         for (const auto &asset : oneRow) {
687             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
688         }
689     }
690 }
691 
692 /**
693  * @tc.name: UpsertDataInvalid001
694  * @tc.desc: Upsert invalid data
695  * @tc.type: FUNC
696  * @tc.require:
697  * @tc.author: wangxiangdong
698  */
699 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid001, TestSize.Level0)
700 {
701     VBucket record;
702     record["id"] = std::to_string(0);
703     record["assets"] = Assets();
704     /**
705      * @tc.steps:step1. UpsertData to empty table.
706      * @tc.expected: step1. INVALID_ARGS.
707      */
708     EXPECT_EQ(delegate_->UpsertData("", { record }), INVALID_ARGS);
709     /**
710      * @tc.steps:step2. UpsertData to shared table.
711      * @tc.expected: step2. INVALID_ARGS.
712      */
713     EXPECT_EQ(delegate_->UpsertData(tableName_ + "_shared", { record }), NOT_SUPPORT);
714     /**
715      * @tc.steps:step3. UpsertData to not device table and shared table.
716      * @tc.expected: step3. NOT_FOUND.
717      */
718     const char *createSQL =
719         "CREATE TABLE IF NOT EXISTS testing(" \
720         "id TEXT PRIMARY KEY," \
721         "name TEXT," \
722         "height REAL ," \
723         "photo BLOB," \
724         "asset ASSET," \
725         "assets ASSETS," \
726         "age INT);";
727     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
728     EXPECT_EQ(delegate_->UpsertData("testing", { record }), NOT_FOUND);
729     /**
730      * @tc.steps:step4. UpsertData to not exist table.
731      * @tc.expected: step4. NOT_FOUND.
732      */
733     EXPECT_EQ(delegate_->UpsertData("TABLE_NOT_EXIST", { record }), NOT_FOUND);
734 }
735 
736 /**
737  * @tc.name: UpsertDataInvalid002
738  * @tc.desc: Upsert device data
739  * @tc.type: FUNC
740  * @tc.require:
741  * @tc.author: wangxiangdong
742  */
743 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid002, TestSize.Level0)
744 {
745     VBucket record;
746     record["id"] = std::to_string(0);
747     record["assets"] = Assets();
748     /**
749      * @tc.steps:step1. create user table.
750      * @tc.expected: step1. INVALID_ARGS.
751      */
752     const char *createSQL =
753         "CREATE TABLE IF NOT EXISTS devTable(" \
754         "id TEXT PRIMARY KEY," \
755         "name TEXT," \
756         "height REAL ," \
757         "photo BLOB," \
758         "asset ASSET," \
759         "assets ASSETS," \
760         "age INT);";
761     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
762     /**
763      * @tc.steps:step2. create device table.
764      * @tc.expected: step2. OK.
765      */
766     RelationalStoreDelegate *delegate1 = nullptr;
767     std::shared_ptr<RelationalStoreManager> mgr1 = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
768     RelationalStoreDelegate::Option option;
769     ASSERT_EQ(mgr1->OpenStore(storePath_, STORE_ID_1, option, delegate1), DBStatus::OK);
770     ASSERT_NE(delegate1, nullptr);
771     std::string deviceTableName = "devTable";
772     ASSERT_EQ(delegate1->CreateDistributedTable(deviceTableName, DEVICE_COOPERATION), DBStatus::OK);
773     DataBaseSchema dataBaseSchema;
774     TableSchema tableSchema;
775     tableSchema.name = deviceTableName;
776     tableSchema.sharedTableName = deviceTableName + "_shared";
777     tableSchema.fields = {
778         {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
779         {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
780         {"age", TYPE_INDEX<int64_t>}
781     };
782     dataBaseSchema.tables.push_back(tableSchema);
783     ASSERT_EQ(delegate1->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
784     /**
785      * @tc.steps:step3. UpsertData to device table.
786      * @tc.expected: step3. NOT_FOUND.
787      */
788     EXPECT_EQ(delegate1->UpsertData(deviceTableName, { record }), NOT_FOUND);
789     EXPECT_EQ(mgr1->CloseStore(delegate1), DBStatus::OK);
790     delegate1 = nullptr;
791     mgr1 = nullptr;
792 }
793 
794 /**
795  * @tc.name: UploadAssetsTest001
796  * @tc.desc: Test upload asset with error.
797  * @tc.type: FUNC
798  * @tc.require:
799  * @tc.author: liaoyonghuang
800  */
801 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest001, TestSize.Level1)
802 {
803     /**
804      * @tc.steps:step1. Insert 10 records.
805      * @tc.expected: step1. ok.
806      */
807     const int actualCount = 10;
808     InsertUserTableRecord(tableName_, 0, actualCount);
809     /**
810      * @tc.steps:step2. Set callback function to cause some upstream data to fail.
811      * @tc.expected: step2. ok.
812      */
813     int recordIndex = 0;
814     Asset tempAsset = {
815             .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
816             .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
817     };
__anon3504d92b0802(const std::string &tableName, VBucket &extend) 818     virtualCloudDb_->ForkUpload([&tempAsset, &recordIndex](const std::string &tableName, VBucket &extend) {
819         Asset asset;
820         Assets assets;
821         switch (recordIndex) {
822             case 0: // record[0] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
823                 extend[std::string(CloudDbConstant::ERROR_FIELD)] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
824                 break;
825             case 1: // record[1] is considered successful because it is a conflict.
826                 extend[std::string(CloudDbConstant::ERROR_FIELD)] =
827                     static_cast<int64_t>(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
828                 break;
829             case 2: // record[2] fail because of empty gid.
830                 extend[std::string(CloudDbConstant::GID_FIELD)] = std::string("");
831                 break;
832             case 3: // record[3] fail because of empty assetId.
833                 asset = tempAsset;
834                 asset.assetId = "";
835                 extend[std::string(CloudDbConstant::ASSET)] = asset;
836                 break;
837             case 4: // record[4] fail because of empty assetId.
838                 assets.push_back(tempAsset);
839                 assets[0].assetId = "";
840                 extend[std::string(CloudDbConstant::ASSETS)] = assets;
841                 break;
842             case 5: // record[5] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
843                 extend[std::string(CloudDbConstant::ERROR_FIELD)] = std::string("");
844                 break;
845             default:
846                 break;
847         }
848         recordIndex++;
849     });
850     /**
851      * @tc.steps:step3. Sync and check upLoadInfo.
852      * @tc.expected: step3. failCount is 5 and successCount is 5.
853      */
854     Query query = Query::Select().FromTable({ tableName_ });
855     BlockSync(query, delegate_);
856     for (const auto &table : lastProcess_.tableProcess) {
857         EXPECT_EQ(table.second.upLoadInfo.total, 10u);
858         EXPECT_EQ(table.second.upLoadInfo.failCount, 3u);
859         EXPECT_EQ(table.second.upLoadInfo.successCount, 7u);
860     }
861     virtualCloudDb_->ForkUpload(nullptr);
862 }
863 
864 /**
865  * @tc.name: UploadAssetsTest002
866  * @tc.desc: Test upload asset with error.
867  * @tc.type: FUNC
868  * @tc.require:
869  * @tc.author: liaoyonghuang
870  */
871 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest002, TestSize.Level1)
872 {
873     /**
874      * @tc.steps:step1. Insert 10 records.
875      * @tc.expected: step1. ok.
876      */
877     const int actualCount = 10;
878     InsertUserTableRecord(tableName_, 0, actualCount);
879     Query query = Query::Select().FromTable({ tableName_ });
880     BlockSync(query, delegate_);
881     /**
882      * @tc.steps:step2. Delete local data.
883      * @tc.expected: step2. OK.
884      */
885     std::string sql = "delete from " + tableName_ + " where id >= " + std::to_string(actualCount / 2);
886     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
887     /**
888      * @tc.steps:step3. Set callback function to cause some upstream data to fail.
889      * @tc.expected: step3. ok.
890      */
__anon3504d92b0902(const std::string &tableName, VBucket &extend) 891     virtualCloudDb_->ForkUpload([](const std::string &tableName, VBucket &extend) {
892         extend[std::string(CloudDbConstant::GID_FIELD)] = "";
893     });
894     BlockSync(query, delegate_);
895     for (const auto &table : lastProcess_.tableProcess) {
896         EXPECT_EQ(table.second.upLoadInfo.total, 5u);
897         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
898         EXPECT_EQ(table.second.upLoadInfo.successCount, 5u);
899     }
900     virtualCloudDb_->ForkUpload(nullptr);
901 }
902 
903 /**
904  * @tc.name: UploadAssetsTest003
905  * @tc.desc: Test upload asset with error CLOUD_RECORD_ALREADY_EXISTED.
906  * @tc.type: FUNC
907  * @tc.require:
908  * @tc.author: liaoyonghuang
909  */
910 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest003, TestSize.Level0)
911 {
912     /**
913      * @tc.steps:step1. Insert 100 records.
914      * @tc.expected: step1. ok.
915      */
916     const int actualCount = 100;
917     InsertUserTableRecord(tableName_, 0, actualCount);
918     /**
919      * @tc.steps:step2. Set callback function to return CLOUD_RECORD_ALREADY_EXISTED in 1st batch.
920      * @tc.expected: step2. ok.
921      */
922     int uploadCount = 0;
__anon3504d92b0a02(const std::string &tableName, VBucket &extend) 923     virtualCloudDb_->ForkUpload([&uploadCount](const std::string &tableName, VBucket &extend) {
924         if (uploadCount < 30) { // There are a total of 30 pieces of data in one batch of upstream data
925             extend[std::string(CloudDbConstant::ERROR_FIELD)] =
926                 static_cast<int64_t>(DBStatus::CLOUD_RECORD_ALREADY_EXISTED);
927         }
928         uploadCount++;
929     });
930     Query query = Query::Select().FromTable({ tableName_ });
931     BlockSync(query, delegate_);
932     for (const auto &table : lastProcess_.tableProcess) {
933         EXPECT_EQ(table.second.upLoadInfo.batchIndex, 4u);
934         EXPECT_EQ(table.second.upLoadInfo.total, 70u);
935         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
936         EXPECT_EQ(table.second.upLoadInfo.successCount, 70u);
937         EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
938     }
939     virtualCloudDb_->ForkUpload(nullptr);
940 }
941 }
942 #endif
943