• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_db_types.h"
19 #include "distributeddb_data_generate_unit_test.h"
20 #include "log_print.h"
21 #include "relational_store_delegate.h"
22 #include "relational_store_manager.h"
23 #include "runtime_config.h"
24 #include "time_helper.h"
25 #include "virtual_asset_loader.h"
26 #include "virtual_cloud_data_translate.h"
27 #include "virtual_cloud_db.h"
28 #include "virtual_communicator_aggregator.h"
29 #include "sqlite_relational_utils.h"
30 #include "cloud/cloud_storage_utils.h"
31 #include "cloud_db_sync_utils_test.h"
32 
33 namespace {
34 using namespace testing::ext;
35 using namespace DistributedDB;
36 using namespace DistributedDBUnitTest;
37 const char *g_createSQL =
38     "CREATE TABLE IF NOT EXISTS DistributedDBCloudAssetsOperationSyncTest(" \
39     "id TEXT PRIMARY KEY," \
40     "name TEXT," \
41     "height REAL ," \
42     "photo BLOB," \
43     "asset ASSET," \
44     "assets ASSETS," \
45     "age INT);";
46 const int64_t g_syncWaitTime = 60;
47 const int g_assetsNum = 3;
48 const Asset g_localAsset = {
49     .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
50     .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
51 };
52 SyncProcess lastProcess_;
53 
CreateUserDBAndTable(sqlite3 * & db)54 void CreateUserDBAndTable(sqlite3 *&db)
55 {
56     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
57     EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createSQL), SQLITE_OK);
58 }
59 
BlockSync(const Query & query,RelationalStoreDelegate * delegate,SyncMode syncMode=SYNC_MODE_CLOUD_MERGE)60 void BlockSync(const Query &query, RelationalStoreDelegate *delegate, SyncMode syncMode = SYNC_MODE_CLOUD_MERGE)
61 {
62     std::mutex dataMutex;
63     std::condition_variable cv;
64     bool finish = false;
65     SyncProcess last;
66     auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
67         for (const auto &item: process) {
68             if (item.second.process == DistributedDB::FINISHED) {
69                 {
70                     std::lock_guard<std::mutex> autoLock(dataMutex);
71                     finish = true;
72                 }
73                 last = item.second;
74                 cv.notify_one();
75             }
76         }
77     };
78     LOGW("begin call sync");
79     ASSERT_EQ(delegate->Sync({ "CLOUD" }, syncMode, query, callback, g_syncWaitTime), OK);
80     std::unique_lock<std::mutex> uniqueLock(dataMutex);
81     cv.wait(uniqueLock, [&finish]() {
82         return finish;
83     });
84     lastProcess_ = last;
85     LOGW("end call sync");
86 }
87 
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)88 int QueryCountCallback(void *data, int count, char **colValue, char **colName)
89 {
90     if (count != 1) {
91         return 0;
92     }
93     auto expectCount = reinterpret_cast<int64_t>(data);
94     EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
95     return 0;
96 }
97 
98 class  DistributedDBCloudAssetsOperationSyncTest : public testing::Test {
99 public:
100     static void SetUpTestCase();
101     static void TearDownTestCase();
102     void SetUp() override;
103     void TearDown() override;
104     void WriteDataWithoutCommitTransaction();
105 protected:
106     void InitTestDir();
107     DataBaseSchema GetSchema();
108     void CloseDb();
109     void InsertUserTableRecord(const std::string &tableName, int64_t begin, int64_t count, size_t assetCount = 2u,
110         const Assets &templateAsset = {});
111     void UpdateLocalTableRecord(const std::string &tableName, int64_t begin, int64_t count, size_t assetCount = 2u,
112         bool updateAssets = true);
113     void UpdateLocalAssetRecord(const std::string &tableName, int64_t begin, int64_t count);
114     void CheckAssetsCount(const std::vector<size_t> &expectCount, bool checkAsset = false);
115     void UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull);
116     void ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount, int &removeCount);
117     void InsertLocalAssetData(const std::string &assetHash);
118     void InsertCloudAssetData(const std::string &assetHash);
119     void DeleteCloudDBData(int64_t beginGid, int64_t count, const std::string &tableName);
120     void PrepareForAssetOperation010();
121     void UpdateAssetWhenSyncUpload();
122     DBStatus InsertRecordToCloud(const std::vector<VBucket> &record);
123     void PrepareDataInCloud();
124     void LocalAssetRemoveTest();
125 
126     static std::vector<Asset> GetAssets(const std::string &baseName, const Assets &templateAsset, size_t assetCount);
127     static std::vector<VBucket> GenerateAssetsRecords(const std::map<std::string, int32_t> &colType,
128         const Asset &templateAsset, int assetsCount, int rowCount);
129     static Asset GenerateAsset(const Asset &templateAsset, int id);
130     static Assets GenerateAssets(const Asset &templateAsset, int id, int assetsCount);
131     std::string testDir_;
132     std::string storePath_;
133     sqlite3 *db_ = nullptr;
134     RelationalStoreDelegate *delegate_ = nullptr;
135     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
136     std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
137     std::shared_ptr<VirtualCloudDataTranslate> virtualTranslator_ = nullptr;
138     std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
139     std::string tableName_ = "DistributedDBCloudAssetsOperationSyncTest";
140     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
141     TrackerSchema trackerSchema = {
142         .tableName = tableName_, .extendColNames = {"name"}, .trackerColNames = {"age"}
143     };
144 };
145 
SetUpTestCase()146 void DistributedDBCloudAssetsOperationSyncTest::SetUpTestCase()
147 {
148     RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
149 }
150 
TearDownTestCase()151 void DistributedDBCloudAssetsOperationSyncTest::TearDownTestCase()
152 {}
153 
SetUp()154 void DistributedDBCloudAssetsOperationSyncTest::SetUp()
155 {
156     DistributedDBToolsUnitTest::PrintTestCaseInfo();
157     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
158     InitTestDir();
159     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
160         LOGE("rm test db files error.");
161     }
162     DistributedDBToolsUnitTest::PrintTestCaseInfo();
163     LOGD("Test dir is %s", testDir_.c_str());
164     db_ = RelationalTestUtils::CreateDataBase(storePath_);
165     ASSERT_NE(db_, nullptr);
166     CreateUserDBAndTable(db_);
167     mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
168     RelationalStoreDelegate::Option option;
169     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
170     ASSERT_NE(delegate_, nullptr);
171     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
172     ASSERT_EQ(delegate_->SetTrackerTable(trackerSchema), DBStatus::OK);
173     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
174     virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
175     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
176     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
177     virtualTranslator_ = std::make_shared<VirtualCloudDataTranslate>();
178     DataBaseSchema dataBaseSchema = GetSchema();
179     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
180     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
181     ASSERT_TRUE(communicatorAggregator_ != nullptr);
182     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
183 }
184 
TearDown()185 void DistributedDBCloudAssetsOperationSyncTest::TearDown()
186 {
187     CloseDb();
188     EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
189     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
190         LOGE("rm test db files error.");
191     }
192     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
193     communicatorAggregator_ = nullptr;
194     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
195 }
196 
InitTestDir()197 void DistributedDBCloudAssetsOperationSyncTest::InitTestDir()
198 {
199     if (!testDir_.empty()) {
200         return;
201     }
202     DistributedDBToolsUnitTest::TestDirInit(testDir_);
203     storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
204     LOGI("The test db is:%s", testDir_.c_str());
205 }
206 
GetSchema()207 DataBaseSchema DistributedDBCloudAssetsOperationSyncTest::GetSchema()
208 {
209     DataBaseSchema schema;
210     TableSchema tableSchema;
211     tableSchema.name = tableName_;
212     tableSchema.sharedTableName = tableName_ + "_shared";
213     tableSchema.fields = {
214         {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
215         {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
216         {"age", TYPE_INDEX<int64_t>}
217     };
218     schema.tables.push_back(tableSchema);
219     return schema;
220 }
221 
CloseDb()222 void DistributedDBCloudAssetsOperationSyncTest::CloseDb()
223 {
224     virtualCloudDb_->ForkUpload(nullptr);
225     virtualCloudDb_ = nullptr;
226     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
227     delegate_ = nullptr;
228     mgr_ = nullptr;
229 }
230 
InsertUserTableRecord(const std::string & tableName,int64_t begin,int64_t count,size_t assetCount,const Assets & templateAsset)231 void DistributedDBCloudAssetsOperationSyncTest::InsertUserTableRecord(const std::string &tableName, int64_t begin,
232     int64_t count, size_t assetCount, const Assets &templateAsset)
233 {
234     std::string photo = "phone";
235     int errCode;
236     std::vector<uint8_t> assetBlob;
237     std::vector<uint8_t> assetsBlob;
238     const int64_t index2 = 2;
239     for (int64_t i = begin; i < begin + count; ++i) {
240         std::string name = g_localAsset.name + std::to_string(i);
241         Asset asset = g_localAsset;
242         asset.name = name;
243         RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
244         std::vector<Asset> assets = GetAssets(name, templateAsset, assetCount);
245         string sql = "INSERT OR REPLACE INTO " + tableName +
246             " (id, name, height, photo, asset, assets, age) VALUES ('" + std::to_string(i) +
247             "', 'local', '178.0', '" + photo + "', ?, ?, '18');";
248         sqlite3_stmt *stmt = nullptr;
249         ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
250         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
251         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
252         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, index2, assetsBlob, false), E_OK);
253         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
254         SQLiteUtils::ResetStatement(stmt, true, errCode);
255     }
256 }
257 
UpdateLocalTableRecord(const std::string & tableName,int64_t begin,int64_t count,size_t assetCount,bool updateAssets)258 void DistributedDBCloudAssetsOperationSyncTest::UpdateLocalTableRecord(const std::string &tableName, int64_t begin,
259     int64_t count, size_t assetCount, bool updateAssets)
260 {
261     int errCode;
262     std::vector<uint8_t> assetBlob;
263     std::vector<uint8_t> assetsBlob;
264     std::string hash = updateAssets ? "new_hash" : g_localAsset.hash;
265     for (int64_t i = begin; i < begin + count; ++i) {
266         std::string name = g_localAsset.name + std::to_string(i);
267         Asset asset = g_localAsset;
268         asset.name = name;
269         asset.hash = hash;
270         RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
271         std::vector<Asset> assets = GetAssets(name, {}, assetCount);
272         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
273         std::string dataName = "new_name_" + std::to_string(i);
274         std::string sql = "UPDATE " + tableName + " SET name = ?, asset = ?, assets = ? where id = " +
275             std::to_string(i);
276         sqlite3_stmt *stmt = nullptr;
277         ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
278         ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 1, dataName), E_OK); // 1st bind
279         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetBlob, false), E_OK); // 2nd bind
280         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 3, assetsBlob, false), E_OK); // 3rd bind
281         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
282         SQLiteUtils::ResetStatement(stmt, true, errCode);
283     }
284 }
285 
InsertRecordToCloud(const std::vector<VBucket> & record)286 DBStatus DistributedDBCloudAssetsOperationSyncTest::InsertRecordToCloud(const std::vector<VBucket> &record)
287 {
288     std::vector<VBucket> extend;
289     for (size_t i = 0; i < record.size(); ++i) {
290         VBucket log;
291         Timestamp now = TimeHelper::GetSysCurrentTime();
292         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
293                 now / CloudDbConstant::TEN_THOUSAND));
294         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
295                 now / CloudDbConstant::TEN_THOUSAND));
296         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
297         extend.push_back(log);
298     }
299     std::vector<VBucket> copyRecord = record;
300     return virtualCloudDb_->BatchInsert(tableName_, std::move(copyRecord), extend);
301 }
302 
PrepareDataInCloud()303 void DistributedDBCloudAssetsOperationSyncTest::PrepareDataInCloud()
304 {
305     std::map<std::string, int32_t> colType;
306     colType["asset"] = TYPE_INDEX<Asset>;
307     colType["assets"] = TYPE_INDEX<Assets>;
308     Asset templateAsset = g_localAsset;
309     const int assetsCount = 10;
310     const int rowCount = 200;
311     auto recordAssets = GenerateAssetsRecords(colType, templateAsset, assetsCount, rowCount);
312     EXPECT_EQ(InsertRecordToCloud(recordAssets), OK);
313 }
314 
GetAssets(const std::string & baseName,const Assets & templateAsset,size_t assetCount)315 std::vector<Asset> DistributedDBCloudAssetsOperationSyncTest::GetAssets(const std::string &baseName,
316     const Assets &templateAsset, size_t assetCount)
317 {
318     std::vector<Asset> assets;
319     for (size_t i = 1; i <= assetCount; ++i) {
320         Asset asset;
321         if (i - 1 < templateAsset.size()) {
322             asset = templateAsset[i - 1];
323         } else {
324             asset = g_localAsset;
325             asset.name = baseName + "_" + std::to_string(i);
326             asset.status = static_cast<uint32_t>(AssetStatus::INSERT);
327         }
328         assets.push_back(asset);
329     }
330     return assets;
331 }
332 
UpdateCloudTableRecord(int64_t begin,int64_t count,bool assetIsNull)333 void DistributedDBCloudAssetsOperationSyncTest::UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull)
334 {
335     std::vector<VBucket> record;
336     std::vector<VBucket> extend;
337     Timestamp now = TimeHelper::GetSysCurrentTime();
338     const int assetCount = 2;
339     for (int64_t i = begin; i < (begin + count); ++i) {
340         VBucket data;
341         data.insert_or_assign("id", std::to_string(i));
342         data.insert_or_assign("name", "Cloud" + std::to_string(i));
343         Assets assets;
344         for (int j = 1; j <= assetCount; ++j) {
345             Asset asset;
346             asset.name = "Phone_" + std::to_string(j);
347             asset.assetId = std::to_string(j);
348             asset.status = AssetStatus::UPDATE;
349             assets.push_back(asset);
350         }
351         assetIsNull ? data.insert_or_assign("assets", Nil()) : data.insert_or_assign("assets", assets);
352         record.push_back(data);
353         VBucket log;
354         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
355             now / CloudDbConstant::TEN_THOUSAND));
356         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
357             now / CloudDbConstant::TEN_THOUSAND));
358         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
359         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i));
360         extend.push_back(log);
361     }
362 
363     ASSERT_EQ(virtualCloudDb_->BatchUpdate(tableName_, std::move(record), extend), DBStatus::OK);
364 }
365 
CheckAssetsCount(const std::vector<size_t> & expectCount,bool checkAsset)366 void DistributedDBCloudAssetsOperationSyncTest::CheckAssetsCount(const std::vector<size_t> &expectCount,
367     bool checkAsset)
368 {
369     std::vector<VBucket> allData;
370     auto dbSchema = GetSchema();
371     ASSERT_GT(dbSchema.tables.size(), 0u);
372     ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
373     int index = 0;
374     ASSERT_EQ(allData.size(), expectCount.size());
375     for (const auto &data : allData) {
376         auto colIter = data.find("assets");
377         EXPECT_NE(colIter, data.end());
378         if (colIter == data.end()) {
379             index++;
380             continue;
381         }
382         Type colValue = data.at("assets");
383         auto translate = std::dynamic_pointer_cast<ICloudDataTranslate>(virtualTranslator_);
384         auto assets = RelationalTestUtils::GetAssets(colValue, translate);
385         size_t size = assets.size();
386         if (checkAsset) {
387             Type colValue1 = data.at("asset");
388             auto assets1 = RelationalTestUtils::GetAssets(colValue1, translate, true);
389             size += assets1.size();
390         }
391         LOGI("[DistributedDBCloudAssetsOperationSyncTest] Check data index %d", index);
392         EXPECT_EQ(static_cast<size_t>(size), expectCount[index]);
393         for (const auto &item : assets) {
394             LOGI("[DistributedDBCloudAssetsOperationSyncTest] Asset name %s status %" PRIu32, item.name.c_str(),
395                 item.status);
396         }
397         index++;
398     }
399 }
400 
ForkDownloadAndRemoveAsset(DBStatus removeStatus,int & downLoadCount,int & removeCount)401 void DistributedDBCloudAssetsOperationSyncTest::ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount,
402     int &removeCount)
403 {
404     virtualAssetLoader_->ForkDownload([this, &downLoadCount](const std::string &tableName,
405         std::map<std::string, Assets> &assets) {
406         downLoadCount++;
407         if (downLoadCount == 1) {
408             std::string sql = "UPDATE " + tableName_ + " SET assets = NULL WHERE id = 0;";
409             ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
410         }
411     });
412     virtualAssetLoader_->ForkRemoveLocalAssets([removeStatus, &removeCount](const std::vector<Asset> &assets) {
413         EXPECT_EQ(assets.size(), 2u); // one record has 2 asset
414         removeCount++;
415         return removeStatus;
416     });
417 }
418 
GenerateAssetsRecords(const std::map<std::string,int32_t> & colType,const Asset & templateAsset,int assetsCount,int rowCount)419 std::vector<VBucket> DistributedDBCloudAssetsOperationSyncTest::GenerateAssetsRecords(
420     const std::map<std::string, int32_t> &colType, const Asset &templateAsset, int assetsCount, int rowCount)
421 {
422     std::vector<VBucket> res;
423     for (int i = 0; i < rowCount; ++i) {
424         VBucket record;
425         record["id"] = std::to_string(i);
426         for (const auto &[col, type] : colType) {
427             if (type == TYPE_INDEX<Asset>) {
428                 record[col] = GenerateAsset(templateAsset, i);
429             } else if (type == TYPE_INDEX<Assets>) {
430                 record[col] = GenerateAssets(templateAsset, i, assetsCount);
431             }
432         }
433         res.push_back(record);
434     }
435     return res;
436 }
437 
GenerateAsset(const Asset & templateAsset,int id)438 Asset DistributedDBCloudAssetsOperationSyncTest::GenerateAsset(const Asset &templateAsset, int id)
439 {
440     Asset res = templateAsset;
441     res.name.append("_").append(std::to_string(id));
442     res.hash.append("_").append(std::to_string(id));
443     return res;
444 }
445 
GenerateAssets(const Asset & templateAsset,int id,int assetsCount)446 Assets DistributedDBCloudAssetsOperationSyncTest::GenerateAssets(const Asset &templateAsset, int id, int assetsCount)
447 {
448     Assets assets;
449     auto baseAsset = GenerateAsset(templateAsset, id);
450     for (int i = 0; i < assetsCount; ++i) {
451         assets.push_back(GenerateAsset(baseAsset, i));
452     }
453     return assets;
454 }
455 
456 /**
457  * @tc.name: SyncWithAssetOperation001
458  * @tc.desc: Delete Assets When Download
459  * @tc.type: FUNC
460  * @tc.require:
461  * @tc.author: zhangqiquan
462  */
463 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation001, TestSize.Level1)
464 {
465     const int actualCount = 10;
466     const int deleteDataCount = 5;
467     const int deleteAssetsCount = 4;
468     InsertUserTableRecord(tableName_, 0, actualCount);
469     std::string tableName = tableName_;
__anonaa4bfccd0602(const std::string &, VBucket &) 470     virtualCloudDb_->ForkUpload([this, deleteDataCount, deleteAssetsCount](const std::string &, VBucket &) {
471         for (int64_t i = 0; i < deleteDataCount; i++) {
472             std::string sql = "DELETE FROM " + tableName_ + " WHERE id = " + std::to_string(i) + ";";
473             ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
474         }
475         for (int64_t i = deleteDataCount; i < deleteDataCount + deleteAssetsCount; i++) {
476             std::string sql = "UPDATE " + tableName_ + " SET asset = NULL, assets = NULL WHERE id = " +
477                 std::to_string(i) + ";";
478             ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
479         }
480     });
481     Query query = Query::Select().FromTable({ tableName_ });
482     BlockSync(query, delegate_);
483     virtualCloudDb_->ForkUpload(nullptr);
484     std::vector<size_t> expectCount(actualCount - deleteDataCount, 0);
485     expectCount[expectCount.size() - 1] = 2; // default one row has 2 assets
486     CheckAssetsCount(expectCount);
487 }
488 
489 /**
490  * @tc.name: SyncWithAssetOperation002
491  * @tc.desc: Download Assets When local assets was removed
492  * @tc.type: FUNC
493  * @tc.require:
494  * @tc.author: zhangqiquan
495  */
496 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation002, TestSize.Level1)
497 {
498     const int actualCount = 1;
499     InsertUserTableRecord(tableName_, 0, actualCount);
500     Query query = Query::Select().FromTable({ tableName_ });
501     BlockSync(query, delegate_);
502     int downLoadCount = 0;
503     int removeCount = 0;
504     ForkDownloadAndRemoveAsset(OK, downLoadCount, removeCount);
505     UpdateCloudTableRecord(0, actualCount, false);
506     RelationalTestUtils::CloudBlockSync(query, delegate_);
507     EXPECT_EQ(downLoadCount, 1); // local asset was removed should download 1 times
508     EXPECT_EQ(removeCount, 1);
509     virtualAssetLoader_->ForkDownload(nullptr);
510     virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
511 
512     std::vector<size_t> expectCount = { 0 };
513     CheckAssetsCount(expectCount);
514 }
515 
516 /**
517  * @tc.name: SyncWithAssetOperation003
518  * @tc.desc: Delete Assets When Download
519  * @tc.type: FUNC
520  * @tc.require:
521  * @tc.author: bty
522  */
523 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation003, TestSize.Level1)
524 {
525     InsertUserTableRecord(tableName_, 0, 1); // 1 is count
526     int uploadCount = 0;
__anonaa4bfccd0702(const std::string &, VBucket &) 527     virtualCloudDb_->ForkUpload([this, &uploadCount](const std::string &, VBucket &) {
528         if (uploadCount > 0) {
529             return;
530         }
531         SqlCondition condition;
532         condition.sql = "UPDATE " + tableName_ + " SET age = '666' WHERE id = 0;";
533         std::vector<VBucket> records;
534         EXPECT_EQ(delegate_->ExecuteSql(condition, records), OK);
535         uploadCount++;
536     });
537     Query query = Query::Select().FromTable({ tableName_ });
538     BlockSync(query, delegate_);
539     virtualCloudDb_->ForkUpload(nullptr);
540 
541     std::string sql = "SELECT assets from " + tableName_ + " where id = 0;";
542     sqlite3_stmt *stmt = nullptr;
543     ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
544     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
545         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
546         Type cloudValue;
547         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
548         std::vector<uint8_t> assetsBlob;
549         Assets assets;
550         ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
551         ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
552         ASSERT_EQ(assets.size(), 2u); // 2 is asset num
553         for (size_t i = 0; i < assets.size(); ++i) {
554             EXPECT_EQ(assets[i].status, AssetStatus::NORMAL);
555         }
556     }
557     int errCode;
558     SQLiteUtils::ResetStatement(stmt, true, errCode);
559 }
560 
LocalAssetRemoveTest()561 void DistributedDBCloudAssetsOperationSyncTest::LocalAssetRemoveTest()
562 {
563     const int actualCount = 5; // 5 record
564     InsertUserTableRecord(tableName_, 0, actualCount);
565     Query query = Query::Select().FromTable({ tableName_ });
566     BlockSync(query, delegate_);
567     int downLoadCount = 0;
568     int removeCount = 0;
569     ForkDownloadAndRemoveAsset(DB_ERROR, downLoadCount, removeCount);
570     UpdateCloudTableRecord(0, actualCount, false);
571     RelationalTestUtils::CloudBlockSync(query, delegate_, DBStatus::OK, DBStatus::REMOVE_ASSETS_FAIL);
572     EXPECT_EQ(downLoadCount, 5); // local asset was removed should download 5 times
573     EXPECT_EQ(removeCount, 1);
574     virtualAssetLoader_->ForkDownload(nullptr);
575     virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
576 
577     std::vector<size_t> expectCount = { 0, 2, 2, 2, 2 };
578     CheckAssetsCount(expectCount);
579 }
580 
581 /**
582  * @tc.name: SyncWithAssetOperation004
583  * @tc.desc: Download Assets When local assets was removed
584  * @tc.type: FUNC
585  * @tc.require:
586  * @tc.author: zhangqiquan
587  */
588 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation004, TestSize.Level1)
589 {
590     LocalAssetRemoveTest();
591 }
592 
UpdateAssetWhenSyncUpload()593 void DistributedDBCloudAssetsOperationSyncTest::UpdateAssetWhenSyncUpload()
594 {
595     string sql = "UPDATE " + tableName_ + " SET asset = ? WHERE id = '54';";
596     Asset asset = g_localAsset;
597     asset.hash = "123";
598     const int assetId = 54;
599     asset.name = g_localAsset.name + std::to_string(assetId);
600     std::vector<uint8_t> assetBlob;
601     RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
602     sqlite3_stmt *stmt = nullptr;
603     ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
604     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
605     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
606     int errCode;
607     SQLiteUtils::ResetStatement(stmt, true, errCode);
608 }
609 
610 /**
611  * @tc.name: SyncWithAssetOperation005
612  * @tc.desc: check asset when update in fill before upload sync process
613  * @tc.type: FUNC
614  * @tc.require:
615  * @tc.author: luoguo
616  */
617 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation005, TestSize.Level1)
618 {
619     /**
620      * @tc.steps:step1. Insert 60 records.
621      * @tc.expected: step1. ok.
622      */
623     InsertUserTableRecord(tableName_, 0, 60);
624 
625     /**
626      * @tc.steps:step2. Sync to cloud and wait in upload.
627      * @tc.expected: step2. ok.
628      */
629     bool isUpload = false;
__anonaa4bfccd0802(const std::string &, VBucket &) 630     virtualCloudDb_->ForkUpload([&isUpload](const std::string &, VBucket &) {
631         if (isUpload == true) {
632             return;
633         }
634         isUpload = true;
635         std::this_thread::sleep_for(std::chrono::milliseconds(2000));
636     });
637     Query query = Query::Select().FromTable({tableName_});
638 
639     bool finish = false;
__anonaa4bfccd0902(const std::map<std::string, SyncProcess> &process) 640     auto callback = [&finish](const std::map<std::string, SyncProcess> &process) {
641         for (const auto &item : process) {
642             if (item.second.process == DistributedDB::FINISHED) {
643                 {
644                     finish = true;
645                 }
646             }
647         }
648     };
649     ASSERT_EQ(delegate_->Sync({"CLOUD"}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), OK);
650 
651     while (isUpload == false) {
652         std::this_thread::sleep_for(std::chrono::milliseconds(50));
653     }
654 
655     /**
656      * @tc.steps:step3. update asset when sync upload.
657      * @tc.expected: step3. ok.
658      */
659     UpdateAssetWhenSyncUpload();
660 
661     /**
662      * @tc.steps:step4. check asset data.
663      * @tc.expected: step4. ok.
664      */
665     while (finish == false) {
666         std::this_thread::sleep_for(std::chrono::milliseconds(50));
667     }
668     virtualCloudDb_->ForkUpload(nullptr);
669     std::vector<VBucket> allData;
670     auto dbSchema = GetSchema();
671     ASSERT_GT(dbSchema.tables.size(), 0u);
672     ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
673     ASSERT_EQ(allData.size(), 60ul);
674     auto data = allData[54]; // update data
675     auto data1 = allData[55]; // no update data
676 
677     Type colValue = data.at("asset");
678     auto translate = std::dynamic_pointer_cast<ICloudDataTranslate>(virtualTranslator_);
679     auto assets = RelationalTestUtils::GetAssets(colValue, translate, true);
680     ASSERT_EQ(assets[0].hash, std::string("123"));
681 
682     Type colValue1 = data1.at("asset");
683     auto assets1 = RelationalTestUtils::GetAssets(colValue1, translate, true);
684     ASSERT_EQ(assets1[0].hash, std::string("DEC"));
685 }
686 
687 /**
688  * @tc.name: SyncWithAssetOperation006
689  * @tc.desc: Remove Local Datas When local assets was empty
690  * @tc.type: FUNC
691  * @tc.require:
692  * @tc.author: lijun
693  */
694 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation006, TestSize.Level1)
695 {
696     const int actualCount = 5;
697     InsertUserTableRecord(tableName_, 0, actualCount);
698     Query query = Query::Select().FromTable({ tableName_ });
699     BlockSync(query, delegate_);
700 
701     UpdateCloudTableRecord(0, 2, true);
702     BlockSync(query, delegate_);
703 
704     int removeCount = 0;
__anonaa4bfccd0a02(const std::vector<Asset> &assets) 705     virtualAssetLoader_->ForkRemoveLocalAssets([&removeCount](const std::vector<Asset> &assets) {
706         removeCount = assets.size();
707         return DBStatus::OK;
708     });
709     std::string device = "";
710     ASSERT_EQ(delegate_->RemoveDeviceData(device, FLAG_AND_DATA), DBStatus::OK);
711     ASSERT_EQ(9, removeCount);
712     virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
713 }
714 
715 /**
716  * @tc.name: SyncWithAssetOperation007
717  * @tc.desc: Test assetId fill when assetId changed
718  * @tc.type: FUNC
719  * @tc.require:
720  * @tc.author: wangxiangdong
721  */
722 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation007, TestSize.Level1)
723 {
724     /**
725      * @tc.steps:step1. Insert 5 records and sync.
726      * @tc.expected: step1. ok.
727      */
728     const int actualCount = 5;
729     std::string name = g_localAsset.name + std::to_string(0);
730     Assets expectAssets = GetAssets(name, {}, 3u); // contain 3 assets
731     expectAssets[0].hash.append("change"); // modify first asset
732     InsertUserTableRecord(tableName_, 0, actualCount, expectAssets.size(), expectAssets);
733     Query query = Query::Select().FromTable({ tableName_ });
734     BlockSync(query, delegate_);
735     /**
736      * @tc.steps:step2. modify data and sync.
737      * @tc.expected: step2. ok.
738      */
739     UpdateCloudTableRecord(0, 1, true);
740     BlockSync(query, delegate_);
741     /**
742      * @tc.steps:step3. check modified data cursor.
743      * @tc.expected: step3. ok.
744      */
745     std::string sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=1";
746     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
747         reinterpret_cast<void *>(7), nullptr), SQLITE_OK);
748     sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=5";
749     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
750         reinterpret_cast<void *>(5), nullptr), SQLITE_OK);
751 }
752 
753 /**
754  * @tc.name: SyncWithAssetOperation008
755  * @tc.desc: Test assetId fill when assetId changed
756  * @tc.type: FUNC
757  * @tc.require:
758  * @tc.author: wangxiangdong
759  */
760 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation008, TestSize.Level1)
761 {
762     /**
763      * @tc.steps:step1. Insert 5 records and sync.
764      * @tc.expected: step1. ok.
765      */
766     const int actualCount = 5;
767     InsertUserTableRecord(tableName_, 0, actualCount);
768     Query query = Query::Select().FromTable({ tableName_ });
769     BlockSync(query, delegate_);
770     /**
771      * @tc.steps:step2. modify data and sync.
772      * @tc.expected: step2. ok.
773      */
774     UpdateCloudTableRecord(0, 1, true);
775     int removeCount = 0;
__anonaa4bfccd0b02(std::map<std::string, Assets> &assets) 776     virtualAssetLoader_->SetRemoveLocalAssetsCallback([&removeCount](std::map<std::string, Assets> &assets) {
777         removeCount = assets["asset"].size() + assets["assets"].size();
778         return LOCAL_ASSET_NOT_FOUND;
779     });
780     BlockSync(query, delegate_);
781     EXPECT_EQ(removeCount, 3); // one record has 3 asset
782     virtualAssetLoader_->SetRemoveLocalAssetsCallback(nullptr);
783     /**
784      * @tc.steps:step3. check asset number.
785      * @tc.expected: step3. ok.
786      */
787     std::vector<size_t> expectCount = { 3, 3, 3, 3, 3 };
788     CheckAssetsCount(expectCount, true);
789     /**
790      * @tc.steps:step4. sync and check.
791      * @tc.expected: step4. ok.
792      */
793     BlockSync(query, delegate_);
794     expectCount = { 0, 3, 3, 3, 3 };
795     CheckAssetsCount(expectCount, true);
796 }
797 
798 /**
799  * @tc.name: SyncWithAssetOperation009
800  * @tc.desc: Test asset remove local and check db asset empty finally.
801  * @tc.type: FUNC
802  * @tc.require:
803  * @tc.author: wangxiangdong
804  */
805 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation009, TestSize.Level1)
806 {
807     /**
808      * @tc.steps:step1. Insert 5 records and sync.
809      * @tc.expected: step1. ok.
810      */
811     const int actualCount = 5;
812     RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
813     InsertUserTableRecord(tableName_, 0, actualCount);
814     /**
815      * @tc.steps:step2. modify data and sync.
816      * @tc.expected: step2. ok.
817      */
818     UpdateCloudTableRecord(0, 1, true);
819     Query query = Query::Select().FromTable({ tableName_ });
820     BlockSync(query, delegate_);
821     /**
822      * @tc.steps:step3. check asset number.
823      * @tc.expected: step3. ok.
824      */
825     std::vector<size_t> expectCount = { 0, 3, 3, 3, 3 };
826     CheckAssetsCount(expectCount, true);
827 }
828 
InsertLocalAssetData(const std::string & assetHash)829 void DistributedDBCloudAssetsOperationSyncTest::InsertLocalAssetData(const std::string &assetHash)
830 {
831     Assets assets;
832     std::string assetNameBegin = "Phone";
833     for (int j = 1; j <= g_assetsNum; ++j) {
834         Asset asset;
835         asset.name = assetNameBegin + "_" + std::to_string(j);
836         asset.status = AssetStatus::NORMAL;
837         asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
838         asset.hash = assetHash + "_" + std::to_string(j);
839         asset.assetId = std::to_string(j);
840         assets.push_back(asset);
841     }
842     string sql = "INSERT OR REPLACE INTO " + tableName_ + " (id,name,asset,assets) VALUES('0','CloudTest0',?,?);";
843     sqlite3_stmt *stmt = nullptr;
844     ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
845     std::vector<uint8_t> assetBlob;
846     std::vector<uint8_t> assetsBlob;
847     RuntimeContext::GetInstance()->AssetToBlob(g_localAsset, assetBlob);
848     RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
849     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
850     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetsBlob, false), E_OK); // 2 is assetsBlob
851     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
852     int errCode;
853     SQLiteUtils::ResetStatement(stmt, true, errCode);
854 }
855 
InsertCloudAssetData(const std::string & assetHash)856 void DistributedDBCloudAssetsOperationSyncTest::InsertCloudAssetData(const std::string &assetHash)
857 {
858     std::vector<VBucket> record;
859     std::vector<VBucket> extend;
860     Timestamp now = DistributedDB::TimeHelper::GetSysCurrentTime();
861     VBucket data;
862     data.insert_or_assign("id", "0");
863     data.insert_or_assign("name", "CloudTest0");
864     Asset asset = g_localAsset;
865     data.insert_or_assign("asset", asset);
866     Assets assets;
867     std::string assetNameBegin = "Phone";
868     for (int j = 1; j <= g_assetsNum; ++j) {
869         Asset assetTmp;
870         assetTmp.name = assetNameBegin + "_" + std::to_string(j);
871         assetTmp.status = AssetStatus::NORMAL;
872         assetTmp.hash = assetHash + "_" + std::to_string(j);
873         assetTmp.assetId = std::to_string(j);
874         assets.push_back(assetTmp);
875     }
876     data.insert_or_assign("assets", assets);
877     record.push_back(data);
878     VBucket log;
879     log.insert_or_assign(DistributedDB::CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
880         now / DistributedDB::CloudDbConstant::TEN_THOUSAND));
881     log.insert_or_assign(DistributedDB::CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
882         now / DistributedDB::CloudDbConstant::TEN_THOUSAND));
883     log.insert_or_assign(DistributedDB::CloudDbConstant::DELETE_FIELD, false);
884     extend.push_back(log);
885     virtualCloudDb_->BatchInsert(tableName_, std::move(record), extend);
886 }
887 
DeleteCloudDBData(int64_t beginGid,int64_t count,const std::string & tableName)888 void DistributedDBCloudAssetsOperationSyncTest::DeleteCloudDBData(int64_t beginGid, int64_t count,
889     const std::string &tableName)
890 {
891     Timestamp now = TimeHelper::GetSysCurrentTime();
892     std::vector<VBucket> extend;
893     for (int64_t i = 0; i < count; ++i) {
894         VBucket log;
895         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
896         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
897         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(beginGid + i));
898         extend.push_back(log);
899     }
900     ASSERT_EQ(virtualCloudDb_->BatchDelete(tableName, extend), DBStatus::OK);
901     std::this_thread::sleep_for(std::chrono::milliseconds(count));
902 }
903 
PrepareForAssetOperation010()904 void DistributedDBCloudAssetsOperationSyncTest::PrepareForAssetOperation010()
905 {
906     InsertCloudAssetData("cloudAsset");
907     InsertLocalAssetData("localAsset");
908 }
909 
910 /**
911  * @tc.name: SyncWithAssetOperation010
912  * @tc.desc: Test check status of asset, when the hash of asset is different.
913  * @tc.type: FUNC
914  * @tc.require:
915  * @tc.author: liufuchenxing
916  */
917 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation010, TestSize.Level1)
918 {
919     /**
920      * @tc.steps:step1. prepare local and cloud asset data.
921      * @tc.expected: step1. ok.
922      */
923     PrepareForAssetOperation010();
924 
925     /**
926      * @tc.steps:step2. sync and check the status of assets.
927      * @tc.expected: step2. ok.
928      */
929     virtualCloudDb_->ForkBeforeBatchUpdate([](const std::string &, std::vector<VBucket> &record,
__anonaa4bfccd0c02(const std::string &, std::vector<VBucket> &record, std::vector<VBucket> &extend, bool) 930         std::vector<VBucket> &extend, bool) {
931         ASSERT_EQ(static_cast<int>(record.size()), 1);
932         VBucket &bucket = record[0];
933         ASSERT_TRUE(bucket.find("assets") != bucket.end());
934         Assets assets = std::get<Assets>(bucket["assets"]);
935         ASSERT_EQ(static_cast<int>(assets.size()), 3);
936         for (size_t i = 0; i < assets.size(); i++) {
937             ASSERT_EQ(assets[i].status, AssetStatus::UPDATE);
938         }
939     });
940 
941     Query query = Query::Select().FromTable({ tableName_ });
942     BlockSync(query, delegate_, SYNC_MODE_CLOUD_FORCE_PUSH);
943 }
944 
WriteDataWithoutCommitTransaction()945 void DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction()
946 {
947     ASSERT_NE(db_, nullptr);
948     SQLiteUtils::BeginTransaction(db_);
949     InsertLocalAssetData("localAsset");
950     constexpr int kSleepDurationSeconds = 3;
951     std::this_thread::sleep_for(std::chrono::seconds(kSleepDurationSeconds));
952 }
953 
954 /**
955  * @tc.name: TestOpenDatabaseBusy001
956  * @tc.desc: Test open database when the database is busy.
957  * @tc.type: FUNC
958  * @tc.require:
959  * @tc.author: liufuchenxing
960  */
961 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, TestOpenDatabaseBusy001, TestSize.Level2)
962 {
963     /**
964      * @tc.steps:step1. close store.
965      * @tc.expected:step1. check ok.
966      */
967     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
968     delegate_ = nullptr;
969     /**
970      * @tc.steps:step2. Another thread write data into database into database without commit.
971      * @tc.expected:step2. check ok.
972      */
973     std::thread thread(&DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction, this);
974     std::this_thread::sleep_for(std::chrono::seconds(1));
975     /**
976      * @tc.steps:step3. open relational delegate.
977      * @tc.expected:step3. open success.
978      */
979     RelationalStoreDelegate::Option option;
980     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
981     thread.join();
982 }
983 
984 /**
985  * @tc.name: SyncWithAssetOperation011
986  * @tc.desc: Test change assets between download and remove
987  * @tc.type: FUNC
988  * @tc.require:
989  * @tc.author: liaoyonghuang
990  */
991 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation011, TestSize.Level1)
992 {
993     /**
994      * @tc.steps:step1. Insert 5 records and sync.
995      * @tc.expected: step1. ok.
996      */
997     const int actualCount = 5;
998     InsertUserTableRecord(tableName_, 0, actualCount);
999     Query query = Query::Select().FromTable({ tableName_ });
1000     BlockSync(query, delegate_);
1001     /**
1002      * @tc.steps:step2. modify assets of cloud data. update 1st asset and delete 2nd asset.
1003      * @tc.expected: step2. ok.
1004      */
1005     std::vector<VBucket> record;
1006     std::vector<VBucket> extend;
1007     int dataNum = 0;
1008     Timestamp now = TimeHelper::GetSysCurrentTime();
1009     VBucket data;
1010     data.insert_or_assign("id", std::to_string(dataNum));
1011     data.insert_or_assign("name", "Cloud" + std::to_string(dataNum));
1012     Asset cloudAsset = g_localAsset;
1013     cloudAsset.name += std::to_string(dataNum);
1014     cloudAsset.hash = "new_hash";
1015     Assets cloudAssets = {cloudAsset};
1016     data.insert_or_assign("assets", cloudAssets);
1017     record.push_back(data);
1018     VBucket log;
1019     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
1020             now / CloudDbConstant::TEN_THOUSAND));
1021     log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
1022             now / CloudDbConstant::TEN_THOUSAND));
1023     log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1024     log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(dataNum));
1025     extend.push_back(log);
1026     ASSERT_EQ(virtualCloudDb_->BatchUpdate(tableName_, std::move(record), extend), DBStatus::OK);
1027     /**
1028      * @tc.steps:step3. Update local assets between remove and download, sync and check whether download is invoked.
1029      * @tc.expected: step3. ok.
1030      */
__anonaa4bfccd0d02(std::map<std::string, Assets> &assets) 1031     virtualAssetLoader_->SetRemoveLocalAssetsCallback([&](std::map<std::string, Assets> &assets) {
1032         UpdateLocalTableRecord(tableName_, 0, 1);
1033         return OK;
1034     });
__anonaa4bfccd0e02(const std::string &tableName, std::map<std::string, Assets> &assets) 1035     virtualAssetLoader_->ForkDownload([](const std::string &tableName, std::map<std::string, Assets> &assets) {
1036         EXPECT_TRUE(false);
1037     });
1038     BlockSync(query, delegate_);
1039 
1040     virtualAssetLoader_->SetRemoveLocalAssetsCallback(nullptr);
1041     virtualAssetLoader_->ForkDownload(nullptr);
1042 }
1043 
1044 /**
1045  * @tc.name: SyncWithAssetOperation012
1046  * @tc.desc: Batch download Assets When local assets was removed
1047  * @tc.type: FUNC
1048  * @tc.require:
1049  * @tc.author: liaoyonghuang
1050  */
1051 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation012, TestSize.Level1)
1052 {
1053     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1054     LocalAssetRemoveTest();
1055     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1056 }
1057 
UpdateLocalAssetRecord(const std::string & tableName,int64_t begin,int64_t count)1058 void DistributedDBCloudAssetsOperationSyncTest::UpdateLocalAssetRecord(const std::string &tableName, int64_t begin,
1059     int64_t count)
1060 {
1061     int errCode;
1062     std::vector<uint8_t> assetBlob;
1063     std::vector<uint8_t> assetsBlob;
1064     for (int64_t i = begin; i < begin + count; ++i) {
1065         Asset asset = g_localAsset;
1066         asset.hash = "new_hash";
1067         asset.status = static_cast<uint32_t>(AssetStatus::UPDATE);
1068         RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
1069         std::vector<Asset> assets;
1070         assets.push_back(asset);
1071         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
1072         std::string sql = "UPDATE " + tableName + " SET height = '175.0', asset = ?, assets = ? where id = " +
1073             std::to_string(i);
1074         sqlite3_stmt *stmt = nullptr;
1075         ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
1076         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK); // 1st bind
1077         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetsBlob, false), E_OK); // 2nd bind
1078         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
1079         SQLiteUtils::ResetStatement(stmt, true, errCode);
1080     }
1081 }
1082 
1083 /**
1084  * @tc.name: SyncWithAssetOperation013
1085  * @tc.desc: Test device modify data and then sync cursor will not changes
1086  * @tc.type: FUNC
1087  * @tc.require:
1088  * @tc.author: caihaoting
1089  */
1090 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation013, TestSize.Level1)
1091 {
1092     /**
1093      * @tc.steps:step1. Insert local and cloud asset data and sync.
1094      * @tc.expected: step1. ok.
1095      */
1096     InsertCloudAssetData("assetHash");
1097     InsertLocalAssetData("assetHash");
1098     Query query = Query::Select().FromTable({ tableName_ });
1099     BlockSync(query, delegate_);
1100     /**
1101      * @tc.steps:step2. modify data of asset and sync.
1102      * @tc.expected: step2. ok.
1103      */
1104     UpdateLocalAssetRecord(tableName_, 0, 1);
1105     const int cursor = 2;
1106     std::string sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=1";
1107     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1108         reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1109     BlockSync(query, delegate_);
1110     /**
1111      * @tc.steps:step3. check modified data cursor and cursor is not changed.
1112      * @tc.expected: step3. ok.
1113      */
1114     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1115         reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1116 }
1117 
1118 /**
1119  * @tc.name: SyncWithAssetOperation014
1120  * @tc.desc: Test device data does not change while sync and cursor will not changes
1121  * @tc.type: FUNC
1122  * @tc.require:
1123  * @tc.author: caihaoting
1124  */
1125 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation014, TestSize.Level1)
1126 {
1127     /**
1128      * @tc.steps:step1. Insert 5 records and sync.
1129      * @tc.expected: step1. ok.
1130      */
1131     const int actualCount = 5;
1132     RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
1133     InsertUserTableRecord(tableName_, 0, actualCount);
1134     Query query = Query::Select().FromTable({ tableName_ });
1135     BlockSync(query, delegate_);
1136     /**
1137      * @tc.steps:step2. modify data and sync.
1138      * @tc.expected: step2. ok.
1139      */
1140     UpdateLocalAssetRecord(tableName_, 0, 1);
1141     const int cursor = 6;
1142     std::string sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=1";
1143     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1144         reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1145     BlockSync(query, delegate_);
1146     /**
1147      * @tc.steps:step3. check modified data cursor and cursor is not changed.
1148      * @tc.expected: step3. ok.
1149      */
1150     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1151         reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1152 }
1153 
1154 /**
1155  * @tc.name: IgnoreRecord001
1156  * @tc.desc: Download Assets When local assets was removed
1157  * @tc.type: FUNC
1158  * @tc.require:
1159  * @tc.author: zhangqiquan
1160  */
1161 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord001, TestSize.Level1)
1162 {
1163     const int actualCount = 1;
1164     InsertUserTableRecord(tableName_, 0, actualCount);
1165     Query query = Query::Select().FromTable({ tableName_ });
1166     BlockSync(query, delegate_);
1167     std::vector<size_t> expectCount = { 2 };
1168     CheckAssetsCount(expectCount);
1169 
1170     VBucket record;
1171     record["id"] = std::to_string(0);
1172     record["assets"] = Assets();
1173     EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
1174     record["id"] = std::to_string(1);
1175     EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
1176     expectCount = { 0, 0 };
1177     CheckAssetsCount(expectCount);
1178 
1179     std::vector<VBucket> logs;
1180     EXPECT_EQ(RelationalTestUtils::GetRecordLog(db_, tableName_, logs), E_OK);
1181     for (const auto &log : logs) {
1182         int64_t cursor = std::get<int64_t>(log.at("cursor"));
1183         EXPECT_GE(cursor, 0);
1184     }
1185 }
1186 
1187 /**
1188  * @tc.name: IgnoreRecord002
1189  * @tc.desc: Ignore Assets When Download
1190  * @tc.type: FUNC
1191  * @tc.require:
1192  * @tc.author: zhangqiquan
1193  */
1194 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord002, TestSize.Level1)
1195 {
1196     const int actualCount = 1;
1197     InsertUserTableRecord(tableName_, 0, actualCount);
1198     Query query = Query::Select().FromTable({ tableName_ });
1199     RelationalTestUtils::CloudBlockSync(query, delegate_);
1200     UpdateCloudTableRecord(0, actualCount, false);
1201 
1202     virtualAssetLoader_->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
1203     RelationalTestUtils::CloudBlockSync(query, delegate_);
1204     virtualAssetLoader_->SetDownloadStatus(DBStatus::OK);
1205     std::vector<size_t> expectCount = { 4 };
1206     CheckAssetsCount(expectCount);
1207     RelationalTestUtils::CloudBlockSync(query, delegate_);
1208 }
1209 
1210 /**
1211  * @tc.name: IgnoreRecord003
1212  * @tc.desc: Ignore Assets When Upload
1213  * @tc.type: FUNC
1214  * @tc.require:
1215  * @tc.author: zhangqiquan
1216  */
1217 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord003, TestSize.Level1)
1218 {
1219     const int actualCount = 1;
1220     InsertUserTableRecord(tableName_, 0, actualCount);
1221     Query query = Query::Select().FromTable({ tableName_ });
1222     virtualCloudDb_->SetConflictInUpload(true);
1223     RelationalTestUtils::CloudBlockSync(query, delegate_);
1224     virtualCloudDb_->SetConflictInUpload(false);
1225     std::vector<size_t> expectCount = { 2 };
1226     CheckAssetsCount(expectCount);
1227     RelationalTestUtils::CloudBlockSync(query, delegate_);
1228 }
1229 
1230 /**
1231  * @tc.name: IgnoreRecord004
1232  * @tc.desc: Ignore Assets When Btch Download
1233  * @tc.type: FUNC
1234  * @tc.require:
1235  * @tc.author: luoguo
1236  */
1237 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord004, TestSize.Level1)
1238 {
1239     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1240     const int actualCount = 10;
1241     InsertUserTableRecord(tableName_, 0, actualCount);
1242     Query query = Query::Select().FromTable({ tableName_ });
1243     RelationalTestUtils::CloudBlockSync(query, delegate_);
1244     UpdateCloudTableRecord(0, actualCount, false);
1245 
1246     virtualAssetLoader_->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
1247     RelationalTestUtils::CloudBlockSync(query, delegate_);
1248     virtualAssetLoader_->SetDownloadStatus(DBStatus::OK);
1249     std::vector<size_t> expectCount(10, 4);
1250     CheckAssetsCount(expectCount);
1251     RelationalTestUtils::CloudBlockSync(query, delegate_);
1252     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1253 }
1254 
1255 /**
1256  * @tc.name: UpsertData001
1257  * @tc.desc: Upsert data after delete it
1258  * @tc.type: FUNC
1259  * @tc.require:
1260  * @tc.author: zhangqiquan
1261  */
1262 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData001, TestSize.Level1)
1263 {
1264     // insert id 0 to local
1265     const int actualCount = 1;
1266     InsertUserTableRecord(tableName_, 0, actualCount); // 10 is phone size
1267     std::vector<std::map<std::string, std::string>> conditions;
1268     std::map<std::string, std::string> entries;
1269     entries["id"] = "0";
1270     conditions.push_back(entries);
1271     // delete id 0 in local
1272     RelationalTestUtils::DeleteRecord(db_, tableName_, conditions);
1273     // upsert id 0 to local
1274     VBucket record;
1275     record["id"] = std::to_string(0);
1276     record["assets"] = Assets();
1277     EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
1278     // check id 0 exist
1279     CheckAssetsCount({ 0 });
1280 }
1281 
1282 /**
1283  * @tc.name: UpsertData002
1284  * @tc.desc: Test sync after Upsert.
1285  * @tc.type: FUNC
1286  * @tc.require:
1287  * @tc.author: liaoyonghuang
1288  */
1289 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData002, TestSize.Level1)
1290 {
1291     /**
1292      * @tc.steps:step1. Insert 5 records and sync.
1293      * @tc.expected: step1. ok.
1294      */
1295     const int actualCount = 5;
1296     InsertUserTableRecord(tableName_, 0, actualCount);
1297     Query query = Query::Select().FromTable({ tableName_ });
1298     BlockSync(query, delegate_);
1299 
1300     /**
1301      * @tc.steps:step2. UpsertData and sync.
1302      * @tc.expected: step2. ok.
1303      */
1304     int dataCnt = -1;
1305     std::string checkLogSql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName_) + " where cursor = 5";
__anonaa4bfccd0f02(sqlite3_stmt *stmt) 1306     RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
1307         dataCnt = sqlite3_column_int(stmt, 0);
1308         return E_OK;
1309     });
1310     EXPECT_EQ(dataCnt, 1);
1311     vector<VBucket> records;
1312     for (int i = 0; i < actualCount; i++) {
1313         VBucket record;
1314         record["id"] = std::to_string(i);
1315         record["name"] = std::string("UpsertName");
1316         records.push_back(record);
1317     }
1318     EXPECT_EQ(delegate_->UpsertData(tableName_, records), OK);
1319     // check cursor has been increased
1320     checkLogSql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName_) + " where cursor = 10";
__anonaa4bfccd1002(sqlite3_stmt *stmt) 1321     RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
1322         dataCnt = sqlite3_column_int(stmt, 0);
1323         return E_OK;
1324     });
1325     EXPECT_EQ(dataCnt, 1);
1326     BlockSync(query, delegate_);
1327 
1328     /**
1329      * @tc.steps:step3. Check local data.
1330      * @tc.expected: step3. All local data has been merged by the cloud.
1331      */
1332     std::vector<VBucket> allData;
1333     auto dbSchema = GetSchema();
1334     ASSERT_GT(dbSchema.tables.size(), 0u);
1335     ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
1336     for (const auto &data : allData) {
1337         ASSERT_EQ(std::get<std::string>(data.at("name")), "local");
1338     }
1339 }
1340 
1341 /**
1342  * @tc.name: SyncWithAssetConflict001
1343  * @tc.desc: Upload with asset no change
1344  * @tc.type: FUNC
1345  * @tc.require:
1346  * @tc.author: zhangqiquan
1347  */
1348 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetConflict001, TestSize.Level1)
1349 {
1350     // cloud and local insert same data
1351     const int actualCount = 1;
1352     RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
1353     std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for data conflict
1354     InsertUserTableRecord(tableName_, 0, actualCount);
1355     // sync and local asset's status are normal
1356     Query query = Query::Select().FromTable({ tableName_ });
1357     RelationalTestUtils::CloudBlockSync(query, delegate_);
1358     auto dbSchema = GetSchema();
1359     ASSERT_GT(dbSchema.tables.size(), 0u);
1360     auto assets = RelationalTestUtils::GetAllAssets(db_, dbSchema.tables[0], virtualTranslator_);
1361     for (const auto &oneRow : assets) {
1362         for (const auto &asset : oneRow) {
1363             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
1364         }
1365     }
1366 }
1367 
1368 /**
1369  * @tc.name: UpsertDataInvalid001
1370  * @tc.desc: Upsert invalid data
1371  * @tc.type: FUNC
1372  * @tc.require:
1373  * @tc.author: wangxiangdong
1374  */
1375 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid001, TestSize.Level0)
1376 {
1377     VBucket record;
1378     record["id"] = std::to_string(0);
1379     record["assets"] = Assets();
1380     /**
1381      * @tc.steps:step1. UpsertData to empty table.
1382      * @tc.expected: step1. INVALID_ARGS.
1383      */
1384     EXPECT_EQ(delegate_->UpsertData("", { record }), INVALID_ARGS);
1385     /**
1386      * @tc.steps:step2. UpsertData to shared table.
1387      * @tc.expected: step2. INVALID_ARGS.
1388      */
1389     EXPECT_EQ(delegate_->UpsertData(tableName_ + "_shared", { record }), NOT_SUPPORT);
1390     /**
1391      * @tc.steps:step3. UpsertData to not device table and shared table.
1392      * @tc.expected: step3. NOT_FOUND.
1393      */
1394     const char *createSQL =
1395         "CREATE TABLE IF NOT EXISTS testing(" \
1396         "id TEXT PRIMARY KEY," \
1397         "name TEXT," \
1398         "height REAL ," \
1399         "photo BLOB," \
1400         "asset ASSET," \
1401         "assets ASSETS," \
1402         "age INT);";
1403     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
1404     EXPECT_EQ(delegate_->UpsertData("testing", { record }), NOT_FOUND);
1405     /**
1406      * @tc.steps:step4. UpsertData to not exist table.
1407      * @tc.expected: step4. NOT_FOUND.
1408      */
1409     EXPECT_EQ(delegate_->UpsertData("TABLE_NOT_EXIST", { record }), NOT_FOUND);
1410 }
1411 
1412 /**
1413  * @tc.name: UpsertDataInvalid002
1414  * @tc.desc: Upsert device data
1415  * @tc.type: FUNC
1416  * @tc.require:
1417  * @tc.author: wangxiangdong
1418  */
1419 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid002, TestSize.Level1)
1420 {
1421     VBucket record;
1422     record["id"] = std::to_string(0);
1423     record["assets"] = Assets();
1424     /**
1425      * @tc.steps:step1. create user table.
1426      * @tc.expected: step1. INVALID_ARGS.
1427      */
1428     const char *createSQL =
1429         "CREATE TABLE IF NOT EXISTS deviceTable(" \
1430         "id TEXT PRIMARY KEY," \
1431         "name TEXT," \
1432         "height REAL ," \
1433         "photo BLOB," \
1434         "asset ASSET," \
1435         "assets ASSETS," \
1436         "age INT);";
1437     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
1438     /**
1439      * @tc.steps:step2. create device table.
1440      * @tc.expected: step2. OK.
1441      */
1442     RelationalStoreDelegate *delegate1 = nullptr;
1443     std::shared_ptr<RelationalStoreManager> mgr1 = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
1444     RelationalStoreDelegate::Option option;
1445     ASSERT_EQ(mgr1->OpenStore(storePath_, STORE_ID_1, option, delegate1), DBStatus::OK);
1446     ASSERT_NE(delegate1, nullptr);
1447     std::string deviceTableName = "deviceTable";
1448     ASSERT_EQ(delegate1->CreateDistributedTable(deviceTableName, DEVICE_COOPERATION), DBStatus::OK);
1449     DataBaseSchema dataBaseSchema;
1450     TableSchema tableSchema;
1451     tableSchema.name = deviceTableName;
1452     tableSchema.sharedTableName = deviceTableName + "_shared";
1453     tableSchema.fields = {
1454         {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
1455         {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
1456         {"age", TYPE_INDEX<int64_t>}
1457     };
1458     dataBaseSchema.tables.push_back(tableSchema);
1459     ASSERT_EQ(delegate1->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1460     /**
1461      * @tc.steps:step3. UpsertData to device table.
1462      * @tc.expected: step3. NOT_FOUND.
1463      */
1464     EXPECT_EQ(delegate1->UpsertData(deviceTableName, { record }), NOT_FOUND);
1465     EXPECT_EQ(mgr1->CloseStore(delegate1), DBStatus::OK);
1466     delegate1 = nullptr;
1467     mgr1 = nullptr;
1468 }
1469 
1470 /**
1471  * @tc.name: DownloadAssetStatusTest004
1472  * @tc.desc: Test upload asset status
1473  * @tc.type: FUNC
1474  * @tc.require:
1475  * @tc.author: zhangqiquan
1476  */
1477 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, DownloadAssetStatusTest004, TestSize.Level1)
1478 {
1479     /**
1480      * @tc.steps:step1. cloud assets {0, 1}
1481      * @tc.expected: step1. OK.
1482      */
1483     // cloud and local insert same data
1484     // cloud assets {0, 1} local assets {0, 1, 2}
1485     const int actualCount = 1;
1486     RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_, 2);
1487     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms for data conflict
1488     /**
1489      * @tc.steps:step2. local assets {0, 1, 2}, and change assert {0}
1490      * @tc.expected: step2. OK.
1491      */
1492     std::string name = g_localAsset.name + std::to_string(0);
1493     Assets expectAssets = GetAssets(name, {}, 3u); // contain 3 assets
1494     expectAssets[0].hash.append("change"); // modify first asset
1495     InsertUserTableRecord(tableName_, 0, actualCount, expectAssets.size(), expectAssets);
1496     /**
1497      * @tc.steps:step3. sync
1498      * @tc.expected: step3. upload status is {UPDATE, NORMAL, INSERT}
1499      */
1500     std::vector<AssetStatus> expectStatus = {
1501         AssetStatus::UPDATE, AssetStatus::NORMAL, AssetStatus::INSERT
1502     };
1503     // sync and local asset's status are normal
1504     Query query = Query::Select().FromTable({ tableName_ });
1505     RelationalTestUtils::CloudBlockSync(query, delegate_);
1506     auto dbSchema = GetSchema();
1507     ASSERT_GT(dbSchema.tables.size(), 0u);
1508     // cloud asset status is update normal insert
1509     VBucket extend;
1510     extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
1511     std::vector<VBucket> data;
1512     ASSERT_EQ(virtualCloudDb_->Query(tableName_, extend, data), QUERY_END);
1513     ASSERT_EQ(data.size(), static_cast<size_t>(actualCount));
1514     Assets actualAssets;
1515     ASSERT_EQ(CloudStorageUtils::GetValueFromType(data[0]["assets"], actualAssets), E_OK);
1516     ASSERT_EQ(actualAssets.size(), expectStatus.size());
1517     for (size_t i = 0; i < actualAssets.size(); ++i) {
1518         EXPECT_EQ(actualAssets[i].status, expectStatus[i]);
1519     }
1520     /**
1521      * @tc.steps:step4. check local assets status.
1522      * @tc.expected: step4. all assets status is NORMAL.
1523      */
1524     auto assets = RelationalTestUtils::GetAllAssets(db_, dbSchema.tables[0], virtualTranslator_);
1525     for (const auto &oneRow : assets) {
1526         for (const auto &asset : oneRow) {
1527             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
1528         }
1529     }
1530 }
1531 
1532 /**
1533  * @tc.name: UploadAssetsTest001
1534  * @tc.desc: Test upload asset with error.
1535  * @tc.type: FUNC
1536  * @tc.require:
1537  * @tc.author: liaoyonghuang
1538  */
1539 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest001, TestSize.Level1)
1540 {
1541     /**
1542      * @tc.steps:step1. Insert 10 records.
1543      * @tc.expected: step1. ok.
1544      */
1545     const int actualCount = 10;
1546     InsertUserTableRecord(tableName_, 0, actualCount);
1547     /**
1548      * @tc.steps:step2. Set callback function to cause some upstream data to fail.
1549      * @tc.expected: step2. ok.
1550      */
1551     int recordIndex = 0;
1552     Asset tempAsset = {
1553             .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
1554             .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
1555     };
__anonaa4bfccd1102(const std::string &tableName, VBucket &extend) 1556     virtualCloudDb_->ForkUpload([&tempAsset, &recordIndex](const std::string &tableName, VBucket &extend) {
1557         Asset asset;
1558         Assets assets;
1559         switch (recordIndex) {
1560             case 0: // record[0] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
1561                 extend[std::string(CloudDbConstant::ERROR_FIELD)] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
1562                 break;
1563             case 1: // record[1] is considered successful because it is a conflict.
1564                 extend[std::string(CloudDbConstant::ERROR_FIELD)] =
1565                     static_cast<int64_t>(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
1566                 break;
1567             case 2: // record[2] fail because of empty gid.
1568                 extend[std::string(CloudDbConstant::GID_FIELD)] = std::string("");
1569                 break;
1570             case 3: // record[3] fail because of empty assetId.
1571                 asset = tempAsset;
1572                 asset.assetId = "";
1573                 extend[std::string(CloudDbConstant::ASSET)] = asset;
1574                 break;
1575             case 4: // record[4] fail because of empty assetId.
1576                 assets.push_back(tempAsset);
1577                 assets[0].assetId = "";
1578                 extend[std::string(CloudDbConstant::ASSETS)] = assets;
1579                 break;
1580             case 5: // record[5] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
1581                 extend[std::string(CloudDbConstant::ERROR_FIELD)] = std::string("");
1582                 break;
1583             default:
1584                 break;
1585         }
1586         recordIndex++;
1587     });
1588     /**
1589      * @tc.steps:step3. Sync and check upLoadInfo.
1590      * @tc.expected: step3. failCount is 5 and successCount is 5.
1591      */
1592     Query query = Query::Select().FromTable({ tableName_ });
1593     BlockSync(query, delegate_);
1594     for (const auto &table : lastProcess_.tableProcess) {
1595         EXPECT_EQ(table.second.upLoadInfo.total, 10u);
1596         EXPECT_EQ(table.second.upLoadInfo.failCount, 3u);
1597         EXPECT_EQ(table.second.upLoadInfo.successCount, 7u);
1598     }
1599     virtualCloudDb_->ForkUpload(nullptr);
1600 }
1601 
1602 /**
1603  * @tc.name: UploadAssetsTest002
1604  * @tc.desc: Test upload asset with error.
1605  * @tc.type: FUNC
1606  * @tc.require:
1607  * @tc.author: liaoyonghuang
1608  */
1609 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest002, TestSize.Level1)
1610 {
1611     /**
1612      * @tc.steps:step1. Insert 10 records.
1613      * @tc.expected: step1. ok.
1614      */
1615     const int actualCount = 10;
1616     InsertUserTableRecord(tableName_, 0, actualCount);
1617     Query query = Query::Select().FromTable({ tableName_ });
1618     BlockSync(query, delegate_);
1619     /**
1620      * @tc.steps:step2. Delete local data.
1621      * @tc.expected: step2. OK.
1622      */
1623     std::string sql = "delete from " + tableName_ + " where id >= " + std::to_string(actualCount / 2);
1624     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
1625     /**
1626      * @tc.steps:step3. Set callback function to cause some upstream data to fail.
1627      * @tc.expected: step3. ok.
1628      */
__anonaa4bfccd1202(const std::string &tableName, VBucket &extend) 1629     virtualCloudDb_->ForkUpload([](const std::string &tableName, VBucket &extend) {
1630         extend[std::string(CloudDbConstant::GID_FIELD)] = "";
1631     });
1632     BlockSync(query, delegate_);
1633     for (const auto &table : lastProcess_.tableProcess) {
1634         EXPECT_EQ(table.second.upLoadInfo.total, 5u);
1635         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1636         EXPECT_EQ(table.second.upLoadInfo.successCount, 5u);
1637     }
1638     virtualCloudDb_->ForkUpload(nullptr);
1639 }
1640 
1641 /**
1642  * @tc.name: UploadAssetsTest003
1643  * @tc.desc: Test upload asset with error CLOUD_RECORD_ALREADY_EXISTED.
1644  * @tc.type: FUNC
1645  * @tc.require:
1646  * @tc.author: liaoyonghuang
1647  */
1648 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest003, TestSize.Level1)
1649 {
1650     /**
1651      * @tc.steps:step1. Insert 100 records.
1652      * @tc.expected: step1. ok.
1653      */
1654     const int actualCount = 100;
1655     InsertUserTableRecord(tableName_, 0, actualCount);
1656     /**
1657      * @tc.steps:step2. Set callback function to return CLOUD_RECORD_ALREADY_EXISTED in 1st batch.
1658      * @tc.expected: step2. ok.
1659      */
1660     int uploadCount = 0;
__anonaa4bfccd1302(const std::string &tableName, VBucket &extend) 1661     virtualCloudDb_->ForkUpload([&uploadCount](const std::string &tableName, VBucket &extend) {
1662         if (uploadCount < 30) { // There are a total of 30 pieces of data in one batch of upstream data
1663             extend[std::string(CloudDbConstant::ERROR_FIELD)] =
1664                 static_cast<int64_t>(DBStatus::CLOUD_RECORD_ALREADY_EXISTED);
1665         }
1666         uploadCount++;
1667     });
1668     Query query = Query::Select().FromTable({ tableName_ });
1669     BlockSync(query, delegate_);
1670     for (const auto &table : lastProcess_.tableProcess) {
1671         EXPECT_EQ(table.second.upLoadInfo.batchIndex, 4u);
1672         EXPECT_EQ(table.second.upLoadInfo.total, 100u);
1673         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1674         EXPECT_EQ(table.second.upLoadInfo.successCount, 100u);
1675         EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
1676     }
1677     virtualCloudDb_->ForkUpload(nullptr);
1678 }
1679 
1680 /**
1681  * @tc.name: UploadAssetsTest004
1682  * @tc.desc: Test batch delete return error CLOUD_RECORD_NOT_FOUND.
1683  * @tc.type: FUNC
1684  * @tc.require:
1685  * @tc.author: liaoyonghuang
1686  */
1687 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest004, TestSize.Level1)
1688 {
1689     /**
1690      * @tc.steps:step1. Insert 100 records and sync to cloud.
1691      * @tc.expected: step1. ok.
1692      */
1693     const int actualCount = 100;
1694     InsertUserTableRecord(tableName_, 0, actualCount);
1695     Query query = Query::Select().FromTable({ tableName_ });
1696     BlockSync(query, delegate_);
1697     /**
1698      * @tc.steps:step2. delete 50 records in local.
1699      * @tc.expected: step2. ok.
1700      */
1701     std::string sql = "delete from " + tableName_ + " where CAST(id AS INTEGER) >= " + std::to_string(actualCount / 2);
1702     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
1703     /**
1704      * @tc.steps:step3. set return error CLOUD_RECORD_NOT_FOUND in batch delete.
1705      * @tc.expected: step3. ok.
1706      */
1707     int index = 0;
__anonaa4bfccd1402(const std::string &tableName, VBucket &extend) 1708     virtualCloudDb_->ForkUpload([&index](const std::string &tableName, VBucket &extend) {
1709         if (extend.count(CloudDbConstant::DELETE_FIELD) != 0 && index % 2 == 0 &&
1710             std::get<bool>(extend.at(CloudDbConstant::DELETE_FIELD))) {
1711             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
1712         }
1713         index++;
1714     });
1715     /**
1716      * @tc.steps:step4. sync and check result.
1717      * @tc.expected: step4. ok.
1718      */
1719     BlockSync(query, delegate_);
1720     for (const auto &table : lastProcess_.tableProcess) {
1721         EXPECT_EQ(table.second.upLoadInfo.total, 50u);
1722         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1723         EXPECT_EQ(table.second.upLoadInfo.successCount, 50u);
1724     }
1725     virtualCloudDb_->ForkUpload(nullptr);
1726 }
1727 
1728 /**
1729  * @tc.name: BatchNormalDownloadAsset001
1730  * @tc.desc: Test batch download asset in two batch.
1731  * @tc.type: FUNC
1732  * @tc.require:
1733  * @tc.author: zqq
1734  */
1735 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, BatchNormalDownloadAsset001, TestSize.Level1)
1736 {
1737     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1738     PrepareDataInCloud();
1739 
1740     Query query = Query::Select().FromTable({ tableName_ });
1741     BlockSync(query, delegate_);
1742     EXPECT_EQ(virtualAssetLoader_->GetBatchDownloadCount(), 2u); // download 2 times
1743     EXPECT_EQ(virtualAssetLoader_->GetBatchRemoveCount(), 0u);   // remove 0 times
1744     virtualAssetLoader_->Reset();
1745     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1746 }
1747 
1748 /**
1749  * @tc.name: BatchAbnormalDownloadAsset001
1750  * @tc.desc: Test batch download asset failed.
1751  * @tc.type: FUNC
1752  * @tc.require:
1753  * @tc.author: zqq
1754  */
1755 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, BatchAbnormalDownloadAsset001, TestSize.Level1)
1756 {
1757     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1758     PrepareDataInCloud();
__anonaa4bfccd1502(int rowIndex, std::map<std::string, Assets> &assets) 1759     virtualAssetLoader_->ForkBatchDownload([](int rowIndex, std::map<std::string, Assets> &assets) {
1760         if (rowIndex > 50) { // 50 record failed
1761             for (auto &asset : assets) {
1762                 for (auto &item : asset.second) {
1763                     item.status = AssetStatus::ABNORMAL;
1764                 }
1765             }
1766             return DB_ERROR;
1767         }
1768         return OK;
1769     });
1770 
1771     Query query = Query::Select().FromTable({ tableName_ });
1772     BlockSync(query, delegate_);
1773     EXPECT_EQ(virtualAssetLoader_->GetBatchDownloadCount(), 1u); // download 1 times
1774     EXPECT_EQ(virtualAssetLoader_->GetBatchRemoveCount(), 0u);   // remove 0 times
1775     virtualAssetLoader_->Reset();
1776 
1777     virtualAssetLoader_->ForkBatchDownload(nullptr);
1778     BlockSync(query, delegate_);
1779     EXPECT_EQ(virtualAssetLoader_->GetBatchDownloadCount(), 2u); // download 2 times
1780     EXPECT_EQ(virtualAssetLoader_->GetBatchRemoveCount(), 0u);   // remove 0 times
1781     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1782 }
1783 
1784 /**
1785  * @tc.name: SyncWithLogFlag001
1786  * @tc.desc: Test cloud update to local, flag will change.
1787  * @tc.type: FUNC
1788  * @tc.require:
1789  * @tc.author: lg
1790  */
1791 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithLogFlag001, TestSize.Level1)
1792 {
1793     /**
1794      * @tc.steps:step1. Insert 5 records and sync.
1795      * @tc.expected: step1. ok.
1796      */
1797     const int actualCount = 5;
1798     RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
1799     InsertUserTableRecord(tableName_, 0, actualCount);
1800     /**
1801      * @tc.steps:step2. modify data and sync, check flag count.
1802      * @tc.expected: step2. ok.
1803      */
1804     UpdateCloudTableRecord(0, 3, true);
1805     Query query = Query::Select().FromTable({ tableName_ });
1806     BlockSync(query, delegate_);
1807     string checkSql = "select count(*) from naturalbase_rdb_aux_" + tableName_ + "_log where flag&0x4000=0x4000;";
1808     EXPECT_EQ(sqlite3_exec(db_, checkSql.c_str(), QueryCountCallback,
1809                 reinterpret_cast<void *>(3u), nullptr), SQLITE_OK);
1810     /**
1811      * @tc.steps:step3. delete local data and sync, check flag count.
1812      * @tc.expected: step3. ok.
1813      */
1814     std::string sql = "delete from " + tableName_ + " where id = 0;" ;
1815     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
1816     BlockSync(query, delegate_);
1817     EXPECT_EQ(sqlite3_exec(db_, checkSql.c_str(), QueryCountCallback,
1818                 reinterpret_cast<void *>(2u), nullptr), SQLITE_OK);
1819 
1820     /**
1821      * @tc.steps:step4. delete cloud data and sync, check flag count.
1822      * @tc.expected: step4. ok.
1823      */
1824     DeleteCloudDBData(1, 1, tableName_);
1825     BlockSync(query, delegate_);
1826     EXPECT_EQ(sqlite3_exec(db_, checkSql.c_str(), QueryCountCallback,
1827                 reinterpret_cast<void *>(1u), nullptr), SQLITE_OK);
1828 }
1829 }
1830 #endif
1831