• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_db_types.h"
19 #include "distributeddb_data_generate_unit_test.h"
20 #include "log_print.h"
21 #include "relational_store_delegate.h"
22 #include "relational_store_manager.h"
23 #include "runtime_config.h"
24 #include "time_helper.h"
25 #include "virtual_asset_loader.h"
26 #include "virtual_cloud_data_translate.h"
27 #include "virtual_cloud_db.h"
28 #include "virtual_communicator_aggregator.h"
29 #include "sqlite_relational_utils.h"
30 #include "cloud/cloud_storage_utils.h"
31 #include "cloud_db_sync_utils_test.h"
32 
33 namespace {
34 using namespace testing::ext;
35 using namespace DistributedDB;
36 using namespace DistributedDBUnitTest;
37 const char *g_createSQL =
38     "CREATE TABLE IF NOT EXISTS DistributedDBCloudAssetsOperationSyncTest(" \
39     "id TEXT PRIMARY KEY," \
40     "name TEXT," \
41     "height REAL ," \
42     "photo BLOB," \
43     "asset ASSET," \
44     "assets ASSETS," \
45     "age INT);";
46 const int64_t g_syncWaitTime = 60;
47 const int g_assetsNum = 3;
48 const Asset g_localAsset = {
49     .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
50     .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
51 };
52 SyncProcess lastProcess_;
53 
CreateUserDBAndTable(sqlite3 * & db)54 void CreateUserDBAndTable(sqlite3 *&db)
55 {
56     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
57     EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createSQL), SQLITE_OK);
58 }
59 
BlockSync(const Query & query,RelationalStoreDelegate * delegate,SyncMode syncMode=SYNC_MODE_CLOUD_MERGE)60 void BlockSync(const Query &query, RelationalStoreDelegate *delegate, SyncMode syncMode = SYNC_MODE_CLOUD_MERGE)
61 {
62     std::mutex dataMutex;
63     std::condition_variable cv;
64     bool finish = false;
65     SyncProcess last;
66     auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
67         for (const auto &item: process) {
68             if (item.second.process == DistributedDB::FINISHED) {
69                 {
70                     std::lock_guard<std::mutex> autoLock(dataMutex);
71                     finish = true;
72                 }
73                 last = item.second;
74                 cv.notify_one();
75             }
76         }
77     };
78     LOGW("begin call sync");
79     ASSERT_EQ(delegate->Sync({ "CLOUD" }, syncMode, query, callback, g_syncWaitTime), OK);
80     std::unique_lock<std::mutex> uniqueLock(dataMutex);
81     cv.wait(uniqueLock, [&finish]() {
82         return finish;
83     });
84     lastProcess_ = last;
85     LOGW("end call sync");
86 }
87 
88 class  DistributedDBCloudAssetsOperationSyncTest : public testing::Test {
89 public:
90     static void SetUpTestCase();
91     static void TearDownTestCase();
92     void SetUp() override;
93     void TearDown() override;
94     void WriteDataWithoutCommitTransaction();
95 protected:
96     void InitTestDir();
97     DataBaseSchema GetSchema();
98     void CloseDb();
99     void InsertUserTableRecord(const std::string &tableName, int64_t begin, int64_t count, size_t assetCount = 2u,
100         const Assets &templateAsset = {});
101     void UpdateLocalTableRecord(const std::string &tableName, int64_t begin, int64_t count, size_t assetCount = 2u,
102         bool updateAssets = true);
103     void UpdateLocalAssetRecord(const std::string &tableName, int64_t begin, int64_t count);
104     void CheckAssetsCount(const std::vector<size_t> &expectCount, bool checkAsset = false);
105     void UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull);
106     void ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount, int &removeCount);
107     void InsertLocalAssetData(const std::string &assetHash);
108     void InsertCloudAssetData(const std::string &assetHash);
109     void PrepareForAssetOperation010();
110     void UpdateAssetWhenSyncUpload();
111     DBStatus InsertRecordToCloud(const std::vector<VBucket> &record);
112     void PrepareDataInCloud();
113     void LocalAssetRemoveTest();
114 
115     static std::vector<Asset> GetAssets(const std::string &baseName, const Assets &templateAsset, size_t assetCount);
116     static std::vector<VBucket> GenerateAssetsRecords(const std::map<std::string, int32_t> &colType,
117         const Asset &templateAsset, int assetsCount, int rowCount);
118     static Asset GenerateAsset(const Asset &templateAsset, int id);
119     static Assets GenerateAssets(const Asset &templateAsset, int id, int assetsCount);
120     std::string testDir_;
121     std::string storePath_;
122     sqlite3 *db_ = nullptr;
123     RelationalStoreDelegate *delegate_ = nullptr;
124     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
125     std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
126     std::shared_ptr<VirtualCloudDataTranslate> virtualTranslator_ = nullptr;
127     std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
128     std::string tableName_ = "DistributedDBCloudAssetsOperationSyncTest";
129     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
130     TrackerSchema trackerSchema = {
131         .tableName = tableName_, .extendColNames = {"name"}, .trackerColNames = {"age"}
132     };
133 };
134 
SetUpTestCase()135 void DistributedDBCloudAssetsOperationSyncTest::SetUpTestCase()
136 {
137     RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
138 }
139 
TearDownTestCase()140 void DistributedDBCloudAssetsOperationSyncTest::TearDownTestCase()
141 {}
142 
SetUp()143 void DistributedDBCloudAssetsOperationSyncTest::SetUp()
144 {
145     DistributedDBToolsUnitTest::PrintTestCaseInfo();
146     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
147     InitTestDir();
148     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
149         LOGE("rm test db files error.");
150     }
151     DistributedDBToolsUnitTest::PrintTestCaseInfo();
152     LOGD("Test dir is %s", testDir_.c_str());
153     db_ = RelationalTestUtils::CreateDataBase(storePath_);
154     ASSERT_NE(db_, nullptr);
155     CreateUserDBAndTable(db_);
156     mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
157     RelationalStoreDelegate::Option option;
158     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
159     ASSERT_NE(delegate_, nullptr);
160     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
161     ASSERT_EQ(delegate_->SetTrackerTable(trackerSchema), DBStatus::OK);
162     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
163     virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
164     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
165     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
166     virtualTranslator_ = std::make_shared<VirtualCloudDataTranslate>();
167     DataBaseSchema dataBaseSchema = GetSchema();
168     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
169     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
170     ASSERT_TRUE(communicatorAggregator_ != nullptr);
171     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
172 }
173 
TearDown()174 void DistributedDBCloudAssetsOperationSyncTest::TearDown()
175 {
176     CloseDb();
177     EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
178     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
179         LOGE("rm test db files error.");
180     }
181     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
182     communicatorAggregator_ = nullptr;
183     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
184 }
185 
InitTestDir()186 void DistributedDBCloudAssetsOperationSyncTest::InitTestDir()
187 {
188     if (!testDir_.empty()) {
189         return;
190     }
191     DistributedDBToolsUnitTest::TestDirInit(testDir_);
192     storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
193     LOGI("The test db is:%s", testDir_.c_str());
194 }
195 
GetSchema()196 DataBaseSchema DistributedDBCloudAssetsOperationSyncTest::GetSchema()
197 {
198     DataBaseSchema schema;
199     TableSchema tableSchema;
200     tableSchema.name = tableName_;
201     tableSchema.sharedTableName = tableName_ + "_shared";
202     tableSchema.fields = {
203         {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
204         {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
205         {"age", TYPE_INDEX<int64_t>}
206     };
207     schema.tables.push_back(tableSchema);
208     return schema;
209 }
210 
CloseDb()211 void DistributedDBCloudAssetsOperationSyncTest::CloseDb()
212 {
213     virtualCloudDb_->ForkUpload(nullptr);
214     virtualCloudDb_ = nullptr;
215     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
216     delegate_ = nullptr;
217     mgr_ = nullptr;
218 }
219 
InsertUserTableRecord(const std::string & tableName,int64_t begin,int64_t count,size_t assetCount,const Assets & templateAsset)220 void DistributedDBCloudAssetsOperationSyncTest::InsertUserTableRecord(const std::string &tableName, int64_t begin,
221     int64_t count, size_t assetCount, const Assets &templateAsset)
222 {
223     std::string photo = "phone";
224     int errCode;
225     std::vector<uint8_t> assetBlob;
226     std::vector<uint8_t> assetsBlob;
227     const int64_t index2 = 2;
228     for (int64_t i = begin; i < begin + count; ++i) {
229         std::string name = g_localAsset.name + std::to_string(i);
230         Asset asset = g_localAsset;
231         asset.name = name;
232         RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
233         std::vector<Asset> assets = GetAssets(name, templateAsset, assetCount);
234         string sql = "INSERT OR REPLACE INTO " + tableName +
235             " (id, name, height, photo, asset, assets, age) VALUES ('" + std::to_string(i) +
236             "', 'local', '178.0', '" + photo + "', ?, ?, '18');";
237         sqlite3_stmt *stmt = nullptr;
238         ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
239         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
240         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
241         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, index2, assetsBlob, false), E_OK);
242         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
243         SQLiteUtils::ResetStatement(stmt, true, errCode);
244     }
245 }
246 
UpdateLocalTableRecord(const std::string & tableName,int64_t begin,int64_t count,size_t assetCount,bool updateAssets)247 void DistributedDBCloudAssetsOperationSyncTest::UpdateLocalTableRecord(const std::string &tableName, int64_t begin,
248     int64_t count, size_t assetCount, bool updateAssets)
249 {
250     int errCode;
251     std::vector<uint8_t> assetBlob;
252     std::vector<uint8_t> assetsBlob;
253     std::string hash = updateAssets ? "new_hash" : g_localAsset.hash;
254     for (int64_t i = begin; i < begin + count; ++i) {
255         std::string name = g_localAsset.name + std::to_string(i);
256         Asset asset = g_localAsset;
257         asset.name = name;
258         asset.hash = hash;
259         RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
260         std::vector<Asset> assets = GetAssets(name, {}, assetCount);
261         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
262         std::string dataName = "new_name_" + std::to_string(i);
263         std::string sql = "UPDATE " + tableName + " SET name = ?, asset = ?, assets = ? where id = " +
264             std::to_string(i);
265         sqlite3_stmt *stmt = nullptr;
266         ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
267         ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 1, dataName), E_OK); // 1st bind
268         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetBlob, false), E_OK); // 2nd bind
269         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 3, assetsBlob, false), E_OK); // 3rd bind
270         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
271         SQLiteUtils::ResetStatement(stmt, true, errCode);
272     }
273 }
274 
InsertRecordToCloud(const std::vector<VBucket> & record)275 DBStatus DistributedDBCloudAssetsOperationSyncTest::InsertRecordToCloud(const std::vector<VBucket> &record)
276 {
277     std::vector<VBucket> extend;
278     for (size_t i = 0; i < record.size(); ++i) {
279         VBucket log;
280         Timestamp now = TimeHelper::GetSysCurrentTime();
281         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
282                 now / CloudDbConstant::TEN_THOUSAND));
283         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
284                 now / CloudDbConstant::TEN_THOUSAND));
285         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
286         extend.push_back(log);
287     }
288     std::vector<VBucket> copyRecord = record;
289     return virtualCloudDb_->BatchInsert(tableName_, std::move(copyRecord), extend);
290 }
291 
PrepareDataInCloud()292 void DistributedDBCloudAssetsOperationSyncTest::PrepareDataInCloud()
293 {
294     std::map<std::string, int32_t> colType;
295     colType["asset"] = TYPE_INDEX<Asset>;
296     colType["assets"] = TYPE_INDEX<Assets>;
297     Asset templateAsset = g_localAsset;
298     const int assetsCount = 10;
299     const int rowCount = 200;
300     auto recordAssets = GenerateAssetsRecords(colType, templateAsset, assetsCount, rowCount);
301     EXPECT_EQ(InsertRecordToCloud(recordAssets), OK);
302 }
303 
GetAssets(const std::string & baseName,const Assets & templateAsset,size_t assetCount)304 std::vector<Asset> DistributedDBCloudAssetsOperationSyncTest::GetAssets(const std::string &baseName,
305     const Assets &templateAsset, size_t assetCount)
306 {
307     std::vector<Asset> assets;
308     for (size_t i = 1; i <= assetCount; ++i) {
309         Asset asset;
310         if (i - 1 < templateAsset.size()) {
311             asset = templateAsset[i - 1];
312         } else {
313             asset = g_localAsset;
314             asset.name = baseName + "_" + std::to_string(i);
315             asset.status = static_cast<uint32_t>(AssetStatus::INSERT);
316         }
317         assets.push_back(asset);
318     }
319     return assets;
320 }
321 
UpdateCloudTableRecord(int64_t begin,int64_t count,bool assetIsNull)322 void DistributedDBCloudAssetsOperationSyncTest::UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull)
323 {
324     std::vector<VBucket> record;
325     std::vector<VBucket> extend;
326     Timestamp now = TimeHelper::GetSysCurrentTime();
327     const int assetCount = 2;
328     for (int64_t i = begin; i < (begin + count); ++i) {
329         VBucket data;
330         data.insert_or_assign("id", std::to_string(i));
331         data.insert_or_assign("name", "Cloud" + std::to_string(i));
332         Assets assets;
333         for (int j = 1; j <= assetCount; ++j) {
334             Asset asset;
335             asset.name = "Phone_" + std::to_string(j);
336             asset.assetId = std::to_string(j);
337             asset.status = AssetStatus::UPDATE;
338             assets.push_back(asset);
339         }
340         assetIsNull ? data.insert_or_assign("assets", Nil()) : data.insert_or_assign("assets", assets);
341         record.push_back(data);
342         VBucket log;
343         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
344             now / CloudDbConstant::TEN_THOUSAND));
345         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
346             now / CloudDbConstant::TEN_THOUSAND));
347         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
348         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i));
349         extend.push_back(log);
350     }
351 
352     ASSERT_EQ(virtualCloudDb_->BatchUpdate(tableName_, std::move(record), extend), DBStatus::OK);
353 }
354 
CheckAssetsCount(const std::vector<size_t> & expectCount,bool checkAsset)355 void DistributedDBCloudAssetsOperationSyncTest::CheckAssetsCount(const std::vector<size_t> &expectCount,
356     bool checkAsset)
357 {
358     std::vector<VBucket> allData;
359     auto dbSchema = GetSchema();
360     ASSERT_GT(dbSchema.tables.size(), 0u);
361     ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
362     int index = 0;
363     ASSERT_EQ(allData.size(), expectCount.size());
364     for (const auto &data : allData) {
365         auto colIter = data.find("assets");
366         EXPECT_NE(colIter, data.end());
367         if (colIter == data.end()) {
368             index++;
369             continue;
370         }
371         Type colValue = data.at("assets");
372         auto translate = std::dynamic_pointer_cast<ICloudDataTranslate>(virtualTranslator_);
373         auto assets = RelationalTestUtils::GetAssets(colValue, translate);
374         size_t size = assets.size();
375         if (checkAsset) {
376             Type colValue1 = data.at("asset");
377             auto assets1 = RelationalTestUtils::GetAssets(colValue1, translate, true);
378             size += assets1.size();
379         }
380         LOGI("[DistributedDBCloudAssetsOperationSyncTest] Check data index %d", index);
381         EXPECT_EQ(static_cast<size_t>(size), expectCount[index]);
382         for (const auto &item : assets) {
383             LOGI("[DistributedDBCloudAssetsOperationSyncTest] Asset name %s status %" PRIu32, item.name.c_str(),
384                 item.status);
385         }
386         index++;
387     }
388 }
389 
ForkDownloadAndRemoveAsset(DBStatus removeStatus,int & downLoadCount,int & removeCount)390 void DistributedDBCloudAssetsOperationSyncTest::ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount,
391     int &removeCount)
392 {
393     virtualAssetLoader_->ForkDownload([this, &downLoadCount](const std::string &tableName,
394         std::map<std::string, Assets> &assets) {
395         downLoadCount++;
396         if (downLoadCount == 1) {
397             std::string sql = "UPDATE " + tableName_ + " SET assets = NULL WHERE id = 0;";
398             ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
399         }
400     });
401     virtualAssetLoader_->ForkRemoveLocalAssets([removeStatus, &removeCount](const std::vector<Asset> &assets) {
402         EXPECT_EQ(assets.size(), 2u); // one record has 2 asset
403         removeCount++;
404         return removeStatus;
405     });
406 }
407 
GenerateAssetsRecords(const std::map<std::string,int32_t> & colType,const Asset & templateAsset,int assetsCount,int rowCount)408 std::vector<VBucket> DistributedDBCloudAssetsOperationSyncTest::GenerateAssetsRecords(
409     const std::map<std::string, int32_t> &colType, const Asset &templateAsset, int assetsCount, int rowCount)
410 {
411     std::vector<VBucket> res;
412     for (int i = 0; i < rowCount; ++i) {
413         VBucket record;
414         record["id"] = std::to_string(i);
415         for (const auto &[col, type] : colType) {
416             if (type == TYPE_INDEX<Asset>) {
417                 record[col] = GenerateAsset(templateAsset, i);
418             } else if (type == TYPE_INDEX<Assets>) {
419                 record[col] = GenerateAssets(templateAsset, i, assetsCount);
420             }
421         }
422         res.push_back(record);
423     }
424     return res;
425 }
426 
GenerateAsset(const Asset & templateAsset,int id)427 Asset DistributedDBCloudAssetsOperationSyncTest::GenerateAsset(const Asset &templateAsset, int id)
428 {
429     Asset res = templateAsset;
430     res.name.append("_").append(std::to_string(id));
431     res.hash.append("_").append(std::to_string(id));
432     return res;
433 }
434 
GenerateAssets(const Asset & templateAsset,int id,int assetsCount)435 Assets DistributedDBCloudAssetsOperationSyncTest::GenerateAssets(const Asset &templateAsset, int id, int assetsCount)
436 {
437     Assets assets;
438     auto baseAsset = GenerateAsset(templateAsset, id);
439     for (int i = 0; i < assetsCount; ++i) {
440         assets.push_back(GenerateAsset(baseAsset, i));
441     }
442     return assets;
443 }
444 
445 /**
446  * @tc.name: SyncWithAssetOperation001
447  * @tc.desc: Delete Assets When Download
448  * @tc.type: FUNC
449  * @tc.require:
450  * @tc.author: zhangqiquan
451  */
452 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation001, TestSize.Level0)
453 {
454     const int actualCount = 10;
455     const int deleteDataCount = 5;
456     const int deleteAssetsCount = 4;
457     InsertUserTableRecord(tableName_, 0, actualCount);
458     std::string tableName = tableName_;
__anonde3e756b0602(const std::string &, VBucket &) 459     virtualCloudDb_->ForkUpload([this, deleteDataCount, deleteAssetsCount](const std::string &, VBucket &) {
460         for (int64_t i = 0; i < deleteDataCount; i++) {
461             std::string sql = "DELETE FROM " + tableName_ + " WHERE id = " + std::to_string(i) + ";";
462             ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
463         }
464         for (int64_t i = deleteDataCount; i < deleteDataCount + deleteAssetsCount; i++) {
465             std::string sql = "UPDATE " + tableName_ + " SET asset = NULL, assets = NULL WHERE id = " +
466                 std::to_string(i) + ";";
467             ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
468         }
469     });
470     Query query = Query::Select().FromTable({ tableName_ });
471     BlockSync(query, delegate_);
472     virtualCloudDb_->ForkUpload(nullptr);
473     std::vector<size_t> expectCount(actualCount - deleteDataCount, 0);
474     expectCount[expectCount.size() - 1] = 2; // default one row has 2 assets
475     CheckAssetsCount(expectCount);
476 }
477 
478 /**
479  * @tc.name: SyncWithAssetOperation002
480  * @tc.desc: Download Assets When local assets was removed
481  * @tc.type: FUNC
482  * @tc.require:
483  * @tc.author: zhangqiquan
484  */
485 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation002, TestSize.Level0)
486 {
487     const int actualCount = 1;
488     InsertUserTableRecord(tableName_, 0, actualCount);
489     Query query = Query::Select().FromTable({ tableName_ });
490     BlockSync(query, delegate_);
491     int downLoadCount = 0;
492     int removeCount = 0;
493     ForkDownloadAndRemoveAsset(OK, downLoadCount, removeCount);
494     UpdateCloudTableRecord(0, actualCount, false);
495     RelationalTestUtils::CloudBlockSync(query, delegate_);
496     EXPECT_EQ(downLoadCount, 1); // local asset was removed should download 1 times
497     EXPECT_EQ(removeCount, 1);
498     virtualAssetLoader_->ForkDownload(nullptr);
499     virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
500 
501     std::vector<size_t> expectCount = { 0 };
502     CheckAssetsCount(expectCount);
503 }
504 
505 /**
506  * @tc.name: SyncWithAssetOperation003
507  * @tc.desc: Delete Assets When Download
508  * @tc.type: FUNC
509  * @tc.require:
510  * @tc.author: bty
511  */
512 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation003, TestSize.Level0)
513 {
514     InsertUserTableRecord(tableName_, 0, 1); // 1 is count
515     int uploadCount = 0;
__anonde3e756b0702(const std::string &, VBucket &) 516     virtualCloudDb_->ForkUpload([this, &uploadCount](const std::string &, VBucket &) {
517         if (uploadCount > 0) {
518             return;
519         }
520         SqlCondition condition;
521         condition.sql = "UPDATE " + tableName_ + " SET age = '666' WHERE id = 0;";
522         std::vector<VBucket> records;
523         EXPECT_EQ(delegate_->ExecuteSql(condition, records), OK);
524         uploadCount++;
525     });
526     Query query = Query::Select().FromTable({ tableName_ });
527     BlockSync(query, delegate_);
528     virtualCloudDb_->ForkUpload(nullptr);
529 
530     std::string sql = "SELECT assets from " + tableName_ + " where id = 0;";
531     sqlite3_stmt *stmt = nullptr;
532     ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
533     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
534         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
535         Type cloudValue;
536         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
537         std::vector<uint8_t> assetsBlob;
538         Assets assets;
539         ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
540         ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
541         ASSERT_EQ(assets.size(), 2u); // 2 is asset num
542         for (size_t i = 0; i < assets.size(); ++i) {
543             EXPECT_EQ(assets[i].status, AssetStatus::NORMAL);
544         }
545     }
546     int errCode;
547     SQLiteUtils::ResetStatement(stmt, true, errCode);
548 }
549 
LocalAssetRemoveTest()550 void DistributedDBCloudAssetsOperationSyncTest::LocalAssetRemoveTest()
551 {
552     const int actualCount = 5; // 5 record
553     InsertUserTableRecord(tableName_, 0, actualCount);
554     Query query = Query::Select().FromTable({ tableName_ });
555     BlockSync(query, delegate_);
556     int downLoadCount = 0;
557     int removeCount = 0;
558     ForkDownloadAndRemoveAsset(DB_ERROR, downLoadCount, removeCount);
559     UpdateCloudTableRecord(0, actualCount, false);
560     RelationalTestUtils::CloudBlockSync(query, delegate_, DBStatus::OK, DBStatus::REMOVE_ASSETS_FAIL);
561     EXPECT_EQ(downLoadCount, 5); // local asset was removed should download 5 times
562     EXPECT_EQ(removeCount, 1);
563     virtualAssetLoader_->ForkDownload(nullptr);
564     virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
565 
566     std::vector<size_t> expectCount = { 0, 2, 2, 2, 2 };
567     CheckAssetsCount(expectCount);
568 }
569 
570 /**
571  * @tc.name: SyncWithAssetOperation004
572  * @tc.desc: Download Assets When local assets was removed
573  * @tc.type: FUNC
574  * @tc.require:
575  * @tc.author: zhangqiquan
576  */
577 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation004, TestSize.Level0)
578 {
579     LocalAssetRemoveTest();
580 }
581 
UpdateAssetWhenSyncUpload()582 void DistributedDBCloudAssetsOperationSyncTest::UpdateAssetWhenSyncUpload()
583 {
584     string sql = "UPDATE " + tableName_ + " SET asset = ? WHERE id = '54';";
585     Asset asset = g_localAsset;
586     asset.hash = "123";
587     const int assetId = 54;
588     asset.name = g_localAsset.name + std::to_string(assetId);
589     std::vector<uint8_t> assetBlob;
590     RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
591     sqlite3_stmt *stmt = nullptr;
592     ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
593     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
594     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
595     int errCode;
596     SQLiteUtils::ResetStatement(stmt, true, errCode);
597 }
598 
599 /**
600  * @tc.name: SyncWithAssetOperation005
601  * @tc.desc: check asset when update in fill before upload sync process
602  * @tc.type: FUNC
603  * @tc.require:
604  * @tc.author: luoguo
605  */
606 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation005, TestSize.Level0)
607 {
608     /**
609      * @tc.steps:step1. Insert 60 records.
610      * @tc.expected: step1. ok.
611      */
612     InsertUserTableRecord(tableName_, 0, 60);
613 
614     /**
615      * @tc.steps:step2. Sync to cloud and wait in upload.
616      * @tc.expected: step2. ok.
617      */
618     bool isUpload = false;
__anonde3e756b0802(const std::string &, VBucket &) 619     virtualCloudDb_->ForkUpload([&isUpload](const std::string &, VBucket &) {
620         if (isUpload == true) {
621             return;
622         }
623         isUpload = true;
624         std::this_thread::sleep_for(std::chrono::milliseconds(2000));
625     });
626     Query query = Query::Select().FromTable({tableName_});
627 
628     bool finish = false;
__anonde3e756b0902(const std::map<std::string, SyncProcess> &process) 629     auto callback = [&finish](const std::map<std::string, SyncProcess> &process) {
630         for (const auto &item : process) {
631             if (item.second.process == DistributedDB::FINISHED) {
632                 {
633                     finish = true;
634                 }
635             }
636         }
637     };
638     ASSERT_EQ(delegate_->Sync({"CLOUD"}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), OK);
639 
640     while (isUpload == false) {
641         std::this_thread::sleep_for(std::chrono::milliseconds(50));
642     }
643 
644     /**
645      * @tc.steps:step3. update asset when sync upload.
646      * @tc.expected: step3. ok.
647      */
648     UpdateAssetWhenSyncUpload();
649 
650     /**
651      * @tc.steps:step4. check asset data.
652      * @tc.expected: step4. ok.
653      */
654     while (finish == false) {
655         std::this_thread::sleep_for(std::chrono::milliseconds(50));
656     }
657     virtualCloudDb_->ForkUpload(nullptr);
658     std::vector<VBucket> allData;
659     auto dbSchema = GetSchema();
660     ASSERT_GT(dbSchema.tables.size(), 0u);
661     ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
662     ASSERT_EQ(allData.size(), 60ul);
663     auto data = allData[54]; // update data
664     auto data1 = allData[55]; // no update data
665 
666     Type colValue = data.at("asset");
667     auto translate = std::dynamic_pointer_cast<ICloudDataTranslate>(virtualTranslator_);
668     auto assets = RelationalTestUtils::GetAssets(colValue, translate, true);
669     ASSERT_EQ(assets[0].hash, std::string("123"));
670 
671     Type colValue1 = data1.at("asset");
672     auto assets1 = RelationalTestUtils::GetAssets(colValue1, translate, true);
673     ASSERT_EQ(assets1[0].hash, std::string("DEC"));
674 }
675 
676 /**
677  * @tc.name: SyncWithAssetOperation006
678  * @tc.desc: Remove Local Datas When local assets was empty
679  * @tc.type: FUNC
680  * @tc.require:
681  * @tc.author: lijun
682  */
683 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation006, TestSize.Level0)
684 {
685     const int actualCount = 5;
686     InsertUserTableRecord(tableName_, 0, actualCount);
687     Query query = Query::Select().FromTable({ tableName_ });
688     BlockSync(query, delegate_);
689 
690     UpdateCloudTableRecord(0, 2, true);
691     BlockSync(query, delegate_);
692 
693     int removeCount = 0;
__anonde3e756b0a02(const std::vector<Asset> &assets) 694     virtualAssetLoader_->ForkRemoveLocalAssets([&removeCount](const std::vector<Asset> &assets) {
695         removeCount = assets.size();
696         return DBStatus::OK;
697     });
698     std::string device = "";
699     ASSERT_EQ(delegate_->RemoveDeviceData(device, FLAG_AND_DATA), DBStatus::OK);
700     ASSERT_EQ(9, removeCount);
701     virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
702 }
703 
704 /**
705  * @tc.name: SyncWithAssetOperation007
706  * @tc.desc: Test assetId fill when assetId changed
707  * @tc.type: FUNC
708  * @tc.require:
709  * @tc.author: wangxiangdong
710  */
711 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation007, TestSize.Level0)
712 {
713     /**
714      * @tc.steps:step1. Insert 5 records and sync.
715      * @tc.expected: step1. ok.
716      */
717     const int actualCount = 5;
718     std::string name = g_localAsset.name + std::to_string(0);
719     Assets expectAssets = GetAssets(name, {}, 3u); // contain 3 assets
720     expectAssets[0].hash.append("change"); // modify first asset
721     InsertUserTableRecord(tableName_, 0, actualCount, expectAssets.size(), expectAssets);
722     Query query = Query::Select().FromTable({ tableName_ });
723     BlockSync(query, delegate_);
724     /**
725      * @tc.steps:step2. modify data and sync.
726      * @tc.expected: step2. ok.
727      */
728     UpdateCloudTableRecord(0, 1, true);
729     BlockSync(query, delegate_);
730     /**
731      * @tc.steps:step3. check modified data cursor.
732      * @tc.expected: step3. ok.
733      */
734     std::string sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=1";
735     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
736         reinterpret_cast<void *>(7), nullptr), SQLITE_OK);
737     sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=5";
738     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
739         reinterpret_cast<void *>(5), nullptr), SQLITE_OK);
740 }
741 
742 /**
743  * @tc.name: SyncWithAssetOperation008
744  * @tc.desc: Test assetId fill when assetId changed
745  * @tc.type: FUNC
746  * @tc.require:
747  * @tc.author: wangxiangdong
748  */
749 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation008, TestSize.Level0)
750 {
751     /**
752      * @tc.steps:step1. Insert 5 records and sync.
753      * @tc.expected: step1. ok.
754      */
755     const int actualCount = 5;
756     InsertUserTableRecord(tableName_, 0, actualCount);
757     Query query = Query::Select().FromTable({ tableName_ });
758     BlockSync(query, delegate_);
759     /**
760      * @tc.steps:step2. modify data and sync.
761      * @tc.expected: step2. ok.
762      */
763     UpdateCloudTableRecord(0, 1, true);
764     int removeCount = 0;
__anonde3e756b0b02(std::map<std::string, Assets> &assets) 765     virtualAssetLoader_->SetRemoveLocalAssetsCallback([&removeCount](std::map<std::string, Assets> &assets) {
766         removeCount = assets["asset"].size() + assets["assets"].size();
767         return LOCAL_ASSET_NOT_FOUND;
768     });
769     BlockSync(query, delegate_);
770     EXPECT_EQ(removeCount, 3); // one record has 3 asset
771     virtualAssetLoader_->SetRemoveLocalAssetsCallback(nullptr);
772     /**
773      * @tc.steps:step3. check asset number.
774      * @tc.expected: step3. ok.
775      */
776     std::vector<size_t> expectCount = { 3, 3, 3, 3, 3 };
777     CheckAssetsCount(expectCount, true);
778     /**
779      * @tc.steps:step4. sync and check.
780      * @tc.expected: step4. ok.
781      */
782     BlockSync(query, delegate_);
783     expectCount = { 0, 3, 3, 3, 3 };
784     CheckAssetsCount(expectCount, true);
785 }
786 
787 /**
788  * @tc.name: SyncWithAssetOperation009
789  * @tc.desc: Test asset remove local and check db asset empty finally.
790  * @tc.type: FUNC
791  * @tc.require:
792  * @tc.author: wangxiangdong
793  */
794 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation009, TestSize.Level0)
795 {
796     /**
797      * @tc.steps:step1. Insert 5 records and sync.
798      * @tc.expected: step1. ok.
799      */
800     const int actualCount = 5;
801     RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
802     InsertUserTableRecord(tableName_, 0, actualCount);
803     /**
804      * @tc.steps:step2. modify data and sync.
805      * @tc.expected: step2. ok.
806      */
807     UpdateCloudTableRecord(0, 1, true);
808     Query query = Query::Select().FromTable({ tableName_ });
809     BlockSync(query, delegate_);
810     /**
811      * @tc.steps:step3. check asset number.
812      * @tc.expected: step3. ok.
813      */
814     std::vector<size_t> expectCount = { 0, 3, 3, 3, 3 };
815     CheckAssetsCount(expectCount, true);
816 }
817 
InsertLocalAssetData(const std::string & assetHash)818 void DistributedDBCloudAssetsOperationSyncTest::InsertLocalAssetData(const std::string &assetHash)
819 {
820     Assets assets;
821     std::string assetNameBegin = "Phone";
822     for (int j = 1; j <= g_assetsNum; ++j) {
823         Asset asset;
824         asset.name = assetNameBegin + "_" + std::to_string(j);
825         asset.status = AssetStatus::NORMAL;
826         asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
827         asset.hash = assetHash + "_" + std::to_string(j);
828         asset.assetId = std::to_string(j);
829         assets.push_back(asset);
830     }
831     string sql = "INSERT OR REPLACE INTO " + tableName_ + " (id,name,asset,assets) VALUES('0','CloudTest0',?,?);";
832     sqlite3_stmt *stmt = nullptr;
833     ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
834     std::vector<uint8_t> assetBlob;
835     std::vector<uint8_t> assetsBlob;
836     RuntimeContext::GetInstance()->AssetToBlob(g_localAsset, assetBlob);
837     RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
838     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
839     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetsBlob, false), E_OK); // 2 is assetsBlob
840     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
841     int errCode;
842     SQLiteUtils::ResetStatement(stmt, true, errCode);
843 }
844 
InsertCloudAssetData(const std::string & assetHash)845 void DistributedDBCloudAssetsOperationSyncTest::InsertCloudAssetData(const std::string &assetHash)
846 {
847     std::vector<VBucket> record;
848     std::vector<VBucket> extend;
849     Timestamp now = DistributedDB::TimeHelper::GetSysCurrentTime();
850     VBucket data;
851     data.insert_or_assign("id", "0");
852     data.insert_or_assign("name", "CloudTest0");
853     Asset asset = g_localAsset;
854     data.insert_or_assign("asset", asset);
855     Assets assets;
856     std::string assetNameBegin = "Phone";
857     for (int j = 1; j <= g_assetsNum; ++j) {
858         Asset assetTmp;
859         assetTmp.name = assetNameBegin + "_" + std::to_string(j);
860         assetTmp.status = AssetStatus::NORMAL;
861         assetTmp.hash = assetHash + "_" + std::to_string(j);
862         assetTmp.assetId = std::to_string(j);
863         assets.push_back(assetTmp);
864     }
865     data.insert_or_assign("assets", assets);
866     record.push_back(data);
867     VBucket log;
868     log.insert_or_assign(DistributedDB::CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
869         now / DistributedDB::CloudDbConstant::TEN_THOUSAND));
870     log.insert_or_assign(DistributedDB::CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
871         now / DistributedDB::CloudDbConstant::TEN_THOUSAND));
872     log.insert_or_assign(DistributedDB::CloudDbConstant::DELETE_FIELD, false);
873     extend.push_back(log);
874     virtualCloudDb_->BatchInsert(tableName_, std::move(record), extend);
875 }
876 
PrepareForAssetOperation010()877 void DistributedDBCloudAssetsOperationSyncTest::PrepareForAssetOperation010()
878 {
879     InsertCloudAssetData("cloudAsset");
880     InsertLocalAssetData("localAsset");
881 }
882 
883 /**
884  * @tc.name: SyncWithAssetOperation010
885  * @tc.desc: Test check status of asset, when the hash of asset is different.
886  * @tc.type: FUNC
887  * @tc.require:
888  * @tc.author: liufuchenxing
889  */
890 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation010, TestSize.Level0)
891 {
892     /**
893      * @tc.steps:step1. prepare local and cloud asset data.
894      * @tc.expected: step1. ok.
895      */
896     PrepareForAssetOperation010();
897 
898     /**
899      * @tc.steps:step2. sync and check the status of assets.
900      * @tc.expected: step2. ok.
901      */
902     virtualCloudDb_->ForkBeforeBatchUpdate([](const std::string &, std::vector<VBucket> &record,
__anonde3e756b0c02(const std::string &, std::vector<VBucket> &record, std::vector<VBucket> &extend, bool) 903         std::vector<VBucket> &extend, bool) {
904         ASSERT_EQ(static_cast<int>(record.size()), 1);
905         VBucket &bucket = record[0];
906         ASSERT_TRUE(bucket.find("assets") != bucket.end());
907         Assets assets = std::get<Assets>(bucket["assets"]);
908         ASSERT_EQ(static_cast<int>(assets.size()), 3);
909         for (size_t i = 0; i < assets.size(); i++) {
910             ASSERT_EQ(assets[i].status, AssetStatus::UPDATE);
911         }
912     });
913 
914     Query query = Query::Select().FromTable({ tableName_ });
915     BlockSync(query, delegate_, SYNC_MODE_CLOUD_FORCE_PUSH);
916 }
917 
WriteDataWithoutCommitTransaction()918 void DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction()
919 {
920     ASSERT_NE(db_, nullptr);
921     SQLiteUtils::BeginTransaction(db_);
922     InsertLocalAssetData("localAsset");
923     constexpr int kSleepDurationSeconds = 3;
924     std::this_thread::sleep_for(std::chrono::seconds(kSleepDurationSeconds));
925 }
926 
927 /**
928  * @tc.name: TestOpenDatabaseBusy001
929  * @tc.desc: Test open database when the database is busy.
930  * @tc.type: FUNC
931  * @tc.require:
932  * @tc.author: liufuchenxing
933  */
934 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, TestOpenDatabaseBusy001, TestSize.Level2)
935 {
936     /**
937      * @tc.steps:step1. close store.
938      * @tc.expected:step1. check ok.
939      */
940     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
941     delegate_ = nullptr;
942     /**
943      * @tc.steps:step2. Another thread write data into database into database without commit.
944      * @tc.expected:step2. check ok.
945      */
946     std::thread thread(&DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction, this);
947     std::this_thread::sleep_for(std::chrono::seconds(1));
948     /**
949      * @tc.steps:step3. open relational delegate.
950      * @tc.expected:step3. open success.
951      */
952     RelationalStoreDelegate::Option option;
953     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
954     thread.join();
955 }
956 
957 /**
958  * @tc.name: SyncWithAssetOperation011
959  * @tc.desc: Test change assets between download and remove
960  * @tc.type: FUNC
961  * @tc.require:
962  * @tc.author: liaoyonghuang
963  */
964 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation011, TestSize.Level0)
965 {
966     /**
967      * @tc.steps:step1. Insert 5 records and sync.
968      * @tc.expected: step1. ok.
969      */
970     const int actualCount = 5;
971     InsertUserTableRecord(tableName_, 0, actualCount);
972     Query query = Query::Select().FromTable({ tableName_ });
973     BlockSync(query, delegate_);
974     /**
975      * @tc.steps:step2. modify assets of cloud data. update 1st asset and delete 2nd asset.
976      * @tc.expected: step2. ok.
977      */
978     std::vector<VBucket> record;
979     std::vector<VBucket> extend;
980     int dataNum = 0;
981     Timestamp now = TimeHelper::GetSysCurrentTime();
982     VBucket data;
983     data.insert_or_assign("id", std::to_string(dataNum));
984     data.insert_or_assign("name", "Cloud" + std::to_string(dataNum));
985     Asset cloudAsset = g_localAsset;
986     cloudAsset.name += std::to_string(dataNum);
987     cloudAsset.hash = "new_hash";
988     Assets cloudAssets = {cloudAsset};
989     data.insert_or_assign("assets", cloudAssets);
990     record.push_back(data);
991     VBucket log;
992     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
993             now / CloudDbConstant::TEN_THOUSAND));
994     log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
995             now / CloudDbConstant::TEN_THOUSAND));
996     log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
997     log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(dataNum));
998     extend.push_back(log);
999     ASSERT_EQ(virtualCloudDb_->BatchUpdate(tableName_, std::move(record), extend), DBStatus::OK);
1000     /**
1001      * @tc.steps:step3. Update local assets between remove and download, sync and check whether download is invoked.
1002      * @tc.expected: step3. ok.
1003      */
__anonde3e756b0d02(std::map<std::string, Assets> &assets) 1004     virtualAssetLoader_->SetRemoveLocalAssetsCallback([&](std::map<std::string, Assets> &assets) {
1005         UpdateLocalTableRecord(tableName_, 0, 1);
1006         return OK;
1007     });
__anonde3e756b0e02(const std::string &tableName, std::map<std::string, Assets> &assets) 1008     virtualAssetLoader_->ForkDownload([](const std::string &tableName, std::map<std::string, Assets> &assets) {
1009         EXPECT_TRUE(false);
1010     });
1011     BlockSync(query, delegate_);
1012 
1013     virtualAssetLoader_->SetRemoveLocalAssetsCallback(nullptr);
1014     virtualAssetLoader_->ForkDownload(nullptr);
1015 }
1016 
1017 /**
1018  * @tc.name: SyncWithAssetOperation012
1019  * @tc.desc: Batch download Assets When local assets was removed
1020  * @tc.type: FUNC
1021  * @tc.require:
1022  * @tc.author: liaoyonghuang
1023  */
1024 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation012, TestSize.Level0)
1025 {
1026     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1027     LocalAssetRemoveTest();
1028     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1029 }
1030 
UpdateLocalAssetRecord(const std::string & tableName,int64_t begin,int64_t count)1031 void DistributedDBCloudAssetsOperationSyncTest::UpdateLocalAssetRecord(const std::string &tableName, int64_t begin,
1032     int64_t count)
1033 {
1034     int errCode;
1035     std::vector<uint8_t> assetBlob;
1036     std::vector<uint8_t> assetsBlob;
1037     for (int64_t i = begin; i < begin + count; ++i) {
1038         Asset asset = g_localAsset;
1039         asset.hash = "new_hash";
1040         asset.status = static_cast<uint32_t>(AssetStatus::UPDATE);
1041         RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
1042         std::vector<Asset> assets;
1043         assets.push_back(asset);
1044         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
1045         std::string sql = "UPDATE " + tableName + " SET height = '175.0', asset = ?, assets = ? where id = " +
1046             std::to_string(i);
1047         sqlite3_stmt *stmt = nullptr;
1048         ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
1049         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK); // 1st bind
1050         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetsBlob, false), E_OK); // 2nd bind
1051         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
1052         SQLiteUtils::ResetStatement(stmt, true, errCode);
1053     }
1054 }
1055 
1056 /**
1057  * @tc.name: SyncWithAssetOperation013
1058  * @tc.desc: Test device modify data and then sync cursor will not changes
1059  * @tc.type: FUNC
1060  * @tc.require:
1061  * @tc.author: caihaoting
1062  */
1063 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation013, TestSize.Level0)
1064 {
1065     /**
1066      * @tc.steps:step1. Insert local and cloud asset data and sync.
1067      * @tc.expected: step1. ok.
1068      */
1069     InsertCloudAssetData("assetHash");
1070     InsertLocalAssetData("assetHash");
1071     Query query = Query::Select().FromTable({ tableName_ });
1072     BlockSync(query, delegate_);
1073     /**
1074      * @tc.steps:step2. modify data of asset and sync.
1075      * @tc.expected: step2. ok.
1076      */
1077     UpdateLocalAssetRecord(tableName_, 0, 1);
1078     const int cursor = 2;
1079     std::string sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=1";
1080     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1081         reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1082     BlockSync(query, delegate_);
1083     /**
1084      * @tc.steps:step3. check modified data cursor and cursor is not changed.
1085      * @tc.expected: step3. ok.
1086      */
1087     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1088         reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1089 }
1090 
1091 /**
1092  * @tc.name: SyncWithAssetOperation014
1093  * @tc.desc: Test device data does not change while sync and cursor will not changes
1094  * @tc.type: FUNC
1095  * @tc.require:
1096  * @tc.author: caihaoting
1097  */
1098 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation014, TestSize.Level0)
1099 {
1100     /**
1101      * @tc.steps:step1. Insert 5 records and sync.
1102      * @tc.expected: step1. ok.
1103      */
1104     const int actualCount = 5;
1105     RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
1106     InsertUserTableRecord(tableName_, 0, actualCount);
1107     Query query = Query::Select().FromTable({ tableName_ });
1108     BlockSync(query, delegate_);
1109     /**
1110      * @tc.steps:step2. modify data and sync.
1111      * @tc.expected: step2. ok.
1112      */
1113     UpdateLocalAssetRecord(tableName_, 0, 1);
1114     const int cursor = 6;
1115     std::string sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=1";
1116     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1117         reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1118     BlockSync(query, delegate_);
1119     /**
1120      * @tc.steps:step3. check modified data cursor and cursor is not changed.
1121      * @tc.expected: step3. ok.
1122      */
1123     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1124         reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1125 }
1126 
1127 /**
1128  * @tc.name: IgnoreRecord001
1129  * @tc.desc: Download Assets When local assets was removed
1130  * @tc.type: FUNC
1131  * @tc.require:
1132  * @tc.author: zhangqiquan
1133  */
1134 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord001, TestSize.Level0)
1135 {
1136     const int actualCount = 1;
1137     InsertUserTableRecord(tableName_, 0, actualCount);
1138     Query query = Query::Select().FromTable({ tableName_ });
1139     BlockSync(query, delegate_);
1140     std::vector<size_t> expectCount = { 2 };
1141     CheckAssetsCount(expectCount);
1142 
1143     VBucket record;
1144     record["id"] = std::to_string(0);
1145     record["assets"] = Assets();
1146     EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
1147     record["id"] = std::to_string(1);
1148     EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
1149     expectCount = { 0, 0 };
1150     CheckAssetsCount(expectCount);
1151 
1152     std::vector<VBucket> logs;
1153     EXPECT_EQ(RelationalTestUtils::GetRecordLog(db_, tableName_, logs), E_OK);
1154     for (const auto &log : logs) {
1155         int64_t cursor = std::get<int64_t>(log.at("cursor"));
1156         EXPECT_GE(cursor, 0);
1157     }
1158 }
1159 
1160 /**
1161  * @tc.name: IgnoreRecord002
1162  * @tc.desc: Ignore Assets When Download
1163  * @tc.type: FUNC
1164  * @tc.require:
1165  * @tc.author: zhangqiquan
1166  */
1167 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord002, TestSize.Level0)
1168 {
1169     const int actualCount = 1;
1170     InsertUserTableRecord(tableName_, 0, actualCount);
1171     Query query = Query::Select().FromTable({ tableName_ });
1172     RelationalTestUtils::CloudBlockSync(query, delegate_);
1173     UpdateCloudTableRecord(0, actualCount, false);
1174 
1175     virtualAssetLoader_->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
1176     RelationalTestUtils::CloudBlockSync(query, delegate_);
1177     virtualAssetLoader_->SetDownloadStatus(DBStatus::OK);
1178     std::vector<size_t> expectCount = { 4 };
1179     CheckAssetsCount(expectCount);
1180     RelationalTestUtils::CloudBlockSync(query, delegate_);
1181 }
1182 
1183 /**
1184  * @tc.name: IgnoreRecord003
1185  * @tc.desc: Ignore Assets When Upload
1186  * @tc.type: FUNC
1187  * @tc.require:
1188  * @tc.author: zhangqiquan
1189  */
1190 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord003, TestSize.Level0)
1191 {
1192     const int actualCount = 1;
1193     InsertUserTableRecord(tableName_, 0, actualCount);
1194     Query query = Query::Select().FromTable({ tableName_ });
1195     virtualCloudDb_->SetConflictInUpload(true);
1196     RelationalTestUtils::CloudBlockSync(query, delegate_);
1197     virtualCloudDb_->SetConflictInUpload(false);
1198     std::vector<size_t> expectCount = { 2 };
1199     CheckAssetsCount(expectCount);
1200     RelationalTestUtils::CloudBlockSync(query, delegate_);
1201 }
1202 
1203 /**
1204  * @tc.name: IgnoreRecord004
1205  * @tc.desc: Ignore Assets When Btch Download
1206  * @tc.type: FUNC
1207  * @tc.require:
1208  * @tc.author: luoguo
1209  */
1210 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord004, TestSize.Level0)
1211 {
1212     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1213     const int actualCount = 10;
1214     InsertUserTableRecord(tableName_, 0, actualCount);
1215     Query query = Query::Select().FromTable({ tableName_ });
1216     RelationalTestUtils::CloudBlockSync(query, delegate_);
1217     UpdateCloudTableRecord(0, actualCount, false);
1218 
1219     virtualAssetLoader_->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
1220     RelationalTestUtils::CloudBlockSync(query, delegate_);
1221     virtualAssetLoader_->SetDownloadStatus(DBStatus::OK);
1222     std::vector<size_t> expectCount(10, 4);
1223     CheckAssetsCount(expectCount);
1224     RelationalTestUtils::CloudBlockSync(query, delegate_);
1225     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1226 }
1227 
1228 /**
1229  * @tc.name: UpsertData001
1230  * @tc.desc: Upsert data after delete it
1231  * @tc.type: FUNC
1232  * @tc.require:
1233  * @tc.author: zhangqiquan
1234  */
1235 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData001, TestSize.Level0)
1236 {
1237     // insert id 0 to local
1238     const int actualCount = 1;
1239     InsertUserTableRecord(tableName_, 0, actualCount); // 10 is phone size
1240     std::vector<std::map<std::string, std::string>> conditions;
1241     std::map<std::string, std::string> entries;
1242     entries["id"] = "0";
1243     conditions.push_back(entries);
1244     // delete id 0 in local
1245     RelationalTestUtils::DeleteRecord(db_, tableName_, conditions);
1246     // upsert id 0 to local
1247     VBucket record;
1248     record["id"] = std::to_string(0);
1249     record["assets"] = Assets();
1250     EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
1251     // check id 0 exist
1252     CheckAssetsCount({ 0 });
1253 }
1254 
1255 /**
1256  * @tc.name: UpsertData002
1257  * @tc.desc: Test sync after Upsert.
1258  * @tc.type: FUNC
1259  * @tc.require:
1260  * @tc.author: liaoyonghuang
1261  */
1262 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData002, TestSize.Level0)
1263 {
1264     /**
1265      * @tc.steps:step1. Insert 5 records and sync.
1266      * @tc.expected: step1. ok.
1267      */
1268     const int actualCount = 5;
1269     InsertUserTableRecord(tableName_, 0, actualCount);
1270     Query query = Query::Select().FromTable({ tableName_ });
1271     BlockSync(query, delegate_);
1272 
1273     /**
1274      * @tc.steps:step2. UpsertData and sync.
1275      * @tc.expected: step2. ok.
1276      */
1277     int dataCnt = -1;
1278     std::string checkLogSql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName_) + " where cursor = 5";
__anonde3e756b0f02(sqlite3_stmt *stmt) 1279     RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
1280         dataCnt = sqlite3_column_int(stmt, 0);
1281         return E_OK;
1282     });
1283     EXPECT_EQ(dataCnt, 1);
1284     vector<VBucket> records;
1285     for (int i = 0; i < actualCount; i++) {
1286         VBucket record;
1287         record["id"] = std::to_string(i);
1288         record["name"] = std::string("UpsertName");
1289         records.push_back(record);
1290     }
1291     EXPECT_EQ(delegate_->UpsertData(tableName_, records), OK);
1292     // check cursor has been increased
1293     checkLogSql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName_) + " where cursor = 10";
__anonde3e756b1002(sqlite3_stmt *stmt) 1294     RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
1295         dataCnt = sqlite3_column_int(stmt, 0);
1296         return E_OK;
1297     });
1298     EXPECT_EQ(dataCnt, 1);
1299     BlockSync(query, delegate_);
1300 
1301     /**
1302      * @tc.steps:step3. Check local data.
1303      * @tc.expected: step3. All local data has been merged by the cloud.
1304      */
1305     std::vector<VBucket> allData;
1306     auto dbSchema = GetSchema();
1307     ASSERT_GT(dbSchema.tables.size(), 0u);
1308     ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
1309     for (const auto &data : allData) {
1310         ASSERT_EQ(std::get<std::string>(data.at("name")), "local");
1311     }
1312 }
1313 
1314 /**
1315  * @tc.name: SyncWithAssetConflict001
1316  * @tc.desc: Upload with asset no change
1317  * @tc.type: FUNC
1318  * @tc.require:
1319  * @tc.author: zhangqiquan
1320  */
1321 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetConflict001, TestSize.Level0)
1322 {
1323     // cloud and local insert same data
1324     const int actualCount = 1;
1325     RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
1326     std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for data conflict
1327     InsertUserTableRecord(tableName_, 0, actualCount);
1328     // sync and local asset's status are normal
1329     Query query = Query::Select().FromTable({ tableName_ });
1330     RelationalTestUtils::CloudBlockSync(query, delegate_);
1331     auto dbSchema = GetSchema();
1332     ASSERT_GT(dbSchema.tables.size(), 0u);
1333     auto assets = RelationalTestUtils::GetAllAssets(db_, dbSchema.tables[0], virtualTranslator_);
1334     for (const auto &oneRow : assets) {
1335         for (const auto &asset : oneRow) {
1336             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
1337         }
1338     }
1339 }
1340 
1341 /**
1342  * @tc.name: UpsertDataInvalid001
1343  * @tc.desc: Upsert invalid data
1344  * @tc.type: FUNC
1345  * @tc.require:
1346  * @tc.author: wangxiangdong
1347  */
1348 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid001, TestSize.Level0)
1349 {
1350     VBucket record;
1351     record["id"] = std::to_string(0);
1352     record["assets"] = Assets();
1353     /**
1354      * @tc.steps:step1. UpsertData to empty table.
1355      * @tc.expected: step1. INVALID_ARGS.
1356      */
1357     EXPECT_EQ(delegate_->UpsertData("", { record }), INVALID_ARGS);
1358     /**
1359      * @tc.steps:step2. UpsertData to shared table.
1360      * @tc.expected: step2. INVALID_ARGS.
1361      */
1362     EXPECT_EQ(delegate_->UpsertData(tableName_ + "_shared", { record }), NOT_SUPPORT);
1363     /**
1364      * @tc.steps:step3. UpsertData to not device table and shared table.
1365      * @tc.expected: step3. NOT_FOUND.
1366      */
1367     const char *createSQL =
1368         "CREATE TABLE IF NOT EXISTS testing(" \
1369         "id TEXT PRIMARY KEY," \
1370         "name TEXT," \
1371         "height REAL ," \
1372         "photo BLOB," \
1373         "asset ASSET," \
1374         "assets ASSETS," \
1375         "age INT);";
1376     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
1377     EXPECT_EQ(delegate_->UpsertData("testing", { record }), NOT_FOUND);
1378     /**
1379      * @tc.steps:step4. UpsertData to not exist table.
1380      * @tc.expected: step4. NOT_FOUND.
1381      */
1382     EXPECT_EQ(delegate_->UpsertData("TABLE_NOT_EXIST", { record }), NOT_FOUND);
1383 }
1384 
1385 /**
1386  * @tc.name: UpsertDataInvalid002
1387  * @tc.desc: Upsert device data
1388  * @tc.type: FUNC
1389  * @tc.require:
1390  * @tc.author: wangxiangdong
1391  */
1392 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid002, TestSize.Level0)
1393 {
1394     VBucket record;
1395     record["id"] = std::to_string(0);
1396     record["assets"] = Assets();
1397     /**
1398      * @tc.steps:step1. create user table.
1399      * @tc.expected: step1. INVALID_ARGS.
1400      */
1401     const char *createSQL =
1402         "CREATE TABLE IF NOT EXISTS deviceTable(" \
1403         "id TEXT PRIMARY KEY," \
1404         "name TEXT," \
1405         "height REAL ," \
1406         "photo BLOB," \
1407         "asset ASSET," \
1408         "assets ASSETS," \
1409         "age INT);";
1410     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
1411     /**
1412      * @tc.steps:step2. create device table.
1413      * @tc.expected: step2. OK.
1414      */
1415     RelationalStoreDelegate *delegate1 = nullptr;
1416     std::shared_ptr<RelationalStoreManager> mgr1 = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
1417     RelationalStoreDelegate::Option option;
1418     ASSERT_EQ(mgr1->OpenStore(storePath_, STORE_ID_1, option, delegate1), DBStatus::OK);
1419     ASSERT_NE(delegate1, nullptr);
1420     std::string deviceTableName = "deviceTable";
1421     ASSERT_EQ(delegate1->CreateDistributedTable(deviceTableName, DEVICE_COOPERATION), DBStatus::OK);
1422     DataBaseSchema dataBaseSchema;
1423     TableSchema tableSchema;
1424     tableSchema.name = deviceTableName;
1425     tableSchema.sharedTableName = deviceTableName + "_shared";
1426     tableSchema.fields = {
1427         {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
1428         {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
1429         {"age", TYPE_INDEX<int64_t>}
1430     };
1431     dataBaseSchema.tables.push_back(tableSchema);
1432     ASSERT_EQ(delegate1->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1433     /**
1434      * @tc.steps:step3. UpsertData to device table.
1435      * @tc.expected: step3. NOT_FOUND.
1436      */
1437     EXPECT_EQ(delegate1->UpsertData(deviceTableName, { record }), NOT_FOUND);
1438     EXPECT_EQ(mgr1->CloseStore(delegate1), DBStatus::OK);
1439     delegate1 = nullptr;
1440     mgr1 = nullptr;
1441 }
1442 
1443 /**
1444  * @tc.name: DownloadAssetStatusTest004
1445  * @tc.desc: Test upload asset status
1446  * @tc.type: FUNC
1447  * @tc.require:
1448  * @tc.author: zhangqiquan
1449  */
1450 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, DownloadAssetStatusTest004, TestSize.Level0)
1451 {
1452     /**
1453      * @tc.steps:step1. cloud assets {0, 1}
1454      * @tc.expected: step1. OK.
1455      */
1456     // cloud and local insert same data
1457     // cloud assets {0, 1} local assets {0, 1, 2}
1458     const int actualCount = 1;
1459     RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_, 2);
1460     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms for data conflict
1461     /**
1462      * @tc.steps:step2. local assets {0, 1, 2}, and change assert {0}
1463      * @tc.expected: step2. OK.
1464      */
1465     std::string name = g_localAsset.name + std::to_string(0);
1466     Assets expectAssets = GetAssets(name, {}, 3u); // contain 3 assets
1467     expectAssets[0].hash.append("change"); // modify first asset
1468     InsertUserTableRecord(tableName_, 0, actualCount, expectAssets.size(), expectAssets);
1469     /**
1470      * @tc.steps:step3. sync
1471      * @tc.expected: step3. upload status is {UPDATE, NORMAL, INSERT}
1472      */
1473     std::vector<AssetStatus> expectStatus = {
1474         AssetStatus::UPDATE, AssetStatus::NORMAL, AssetStatus::INSERT
1475     };
1476     // sync and local asset's status are normal
1477     Query query = Query::Select().FromTable({ tableName_ });
1478     RelationalTestUtils::CloudBlockSync(query, delegate_);
1479     auto dbSchema = GetSchema();
1480     ASSERT_GT(dbSchema.tables.size(), 0u);
1481     // cloud asset status is update normal insert
1482     VBucket extend;
1483     extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
1484     std::vector<VBucket> data;
1485     ASSERT_EQ(virtualCloudDb_->Query(tableName_, extend, data), QUERY_END);
1486     ASSERT_EQ(data.size(), static_cast<size_t>(actualCount));
1487     Assets actualAssets;
1488     ASSERT_EQ(CloudStorageUtils::GetValueFromType(data[0]["assets"], actualAssets), E_OK);
1489     ASSERT_EQ(actualAssets.size(), expectStatus.size());
1490     for (size_t i = 0; i < actualAssets.size(); ++i) {
1491         EXPECT_EQ(actualAssets[i].status, expectStatus[i]);
1492     }
1493     /**
1494      * @tc.steps:step4. check local assets status.
1495      * @tc.expected: step4. all assets status is NORMAL.
1496      */
1497     auto assets = RelationalTestUtils::GetAllAssets(db_, dbSchema.tables[0], virtualTranslator_);
1498     for (const auto &oneRow : assets) {
1499         for (const auto &asset : oneRow) {
1500             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
1501         }
1502     }
1503 }
1504 
1505 /**
1506  * @tc.name: UploadAssetsTest001
1507  * @tc.desc: Test upload asset with error.
1508  * @tc.type: FUNC
1509  * @tc.require:
1510  * @tc.author: liaoyonghuang
1511  */
1512 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest001, TestSize.Level1)
1513 {
1514     /**
1515      * @tc.steps:step1. Insert 10 records.
1516      * @tc.expected: step1. ok.
1517      */
1518     const int actualCount = 10;
1519     InsertUserTableRecord(tableName_, 0, actualCount);
1520     /**
1521      * @tc.steps:step2. Set callback function to cause some upstream data to fail.
1522      * @tc.expected: step2. ok.
1523      */
1524     int recordIndex = 0;
1525     Asset tempAsset = {
1526             .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
1527             .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
1528     };
__anonde3e756b1102(const std::string &tableName, VBucket &extend) 1529     virtualCloudDb_->ForkUpload([&tempAsset, &recordIndex](const std::string &tableName, VBucket &extend) {
1530         Asset asset;
1531         Assets assets;
1532         switch (recordIndex) {
1533             case 0: // record[0] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
1534                 extend[std::string(CloudDbConstant::ERROR_FIELD)] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
1535                 break;
1536             case 1: // record[1] is considered successful because it is a conflict.
1537                 extend[std::string(CloudDbConstant::ERROR_FIELD)] =
1538                     static_cast<int64_t>(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
1539                 break;
1540             case 2: // record[2] fail because of empty gid.
1541                 extend[std::string(CloudDbConstant::GID_FIELD)] = std::string("");
1542                 break;
1543             case 3: // record[3] fail because of empty assetId.
1544                 asset = tempAsset;
1545                 asset.assetId = "";
1546                 extend[std::string(CloudDbConstant::ASSET)] = asset;
1547                 break;
1548             case 4: // record[4] fail because of empty assetId.
1549                 assets.push_back(tempAsset);
1550                 assets[0].assetId = "";
1551                 extend[std::string(CloudDbConstant::ASSETS)] = assets;
1552                 break;
1553             case 5: // record[5] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
1554                 extend[std::string(CloudDbConstant::ERROR_FIELD)] = std::string("");
1555                 break;
1556             default:
1557                 break;
1558         }
1559         recordIndex++;
1560     });
1561     /**
1562      * @tc.steps:step3. Sync and check upLoadInfo.
1563      * @tc.expected: step3. failCount is 5 and successCount is 5.
1564      */
1565     Query query = Query::Select().FromTable({ tableName_ });
1566     BlockSync(query, delegate_);
1567     for (const auto &table : lastProcess_.tableProcess) {
1568         EXPECT_EQ(table.second.upLoadInfo.total, 10u);
1569         EXPECT_EQ(table.second.upLoadInfo.failCount, 3u);
1570         EXPECT_EQ(table.second.upLoadInfo.successCount, 7u);
1571     }
1572     virtualCloudDb_->ForkUpload(nullptr);
1573 }
1574 
1575 /**
1576  * @tc.name: UploadAssetsTest002
1577  * @tc.desc: Test upload asset with error.
1578  * @tc.type: FUNC
1579  * @tc.require:
1580  * @tc.author: liaoyonghuang
1581  */
1582 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest002, TestSize.Level1)
1583 {
1584     /**
1585      * @tc.steps:step1. Insert 10 records.
1586      * @tc.expected: step1. ok.
1587      */
1588     const int actualCount = 10;
1589     InsertUserTableRecord(tableName_, 0, actualCount);
1590     Query query = Query::Select().FromTable({ tableName_ });
1591     BlockSync(query, delegate_);
1592     /**
1593      * @tc.steps:step2. Delete local data.
1594      * @tc.expected: step2. OK.
1595      */
1596     std::string sql = "delete from " + tableName_ + " where id >= " + std::to_string(actualCount / 2);
1597     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
1598     /**
1599      * @tc.steps:step3. Set callback function to cause some upstream data to fail.
1600      * @tc.expected: step3. ok.
1601      */
__anonde3e756b1202(const std::string &tableName, VBucket &extend) 1602     virtualCloudDb_->ForkUpload([](const std::string &tableName, VBucket &extend) {
1603         extend[std::string(CloudDbConstant::GID_FIELD)] = "";
1604     });
1605     BlockSync(query, delegate_);
1606     for (const auto &table : lastProcess_.tableProcess) {
1607         EXPECT_EQ(table.second.upLoadInfo.total, 5u);
1608         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1609         EXPECT_EQ(table.second.upLoadInfo.successCount, 5u);
1610     }
1611     virtualCloudDb_->ForkUpload(nullptr);
1612 }
1613 
1614 /**
1615  * @tc.name: UploadAssetsTest003
1616  * @tc.desc: Test upload asset with error CLOUD_RECORD_ALREADY_EXISTED.
1617  * @tc.type: FUNC
1618  * @tc.require:
1619  * @tc.author: liaoyonghuang
1620  */
1621 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest003, TestSize.Level0)
1622 {
1623     /**
1624      * @tc.steps:step1. Insert 100 records.
1625      * @tc.expected: step1. ok.
1626      */
1627     const int actualCount = 100;
1628     InsertUserTableRecord(tableName_, 0, actualCount);
1629     /**
1630      * @tc.steps:step2. Set callback function to return CLOUD_RECORD_ALREADY_EXISTED in 1st batch.
1631      * @tc.expected: step2. ok.
1632      */
1633     int uploadCount = 0;
__anonde3e756b1302(const std::string &tableName, VBucket &extend) 1634     virtualCloudDb_->ForkUpload([&uploadCount](const std::string &tableName, VBucket &extend) {
1635         if (uploadCount < 30) { // There are a total of 30 pieces of data in one batch of upstream data
1636             extend[std::string(CloudDbConstant::ERROR_FIELD)] =
1637                 static_cast<int64_t>(DBStatus::CLOUD_RECORD_ALREADY_EXISTED);
1638         }
1639         uploadCount++;
1640     });
1641     Query query = Query::Select().FromTable({ tableName_ });
1642     BlockSync(query, delegate_);
1643     for (const auto &table : lastProcess_.tableProcess) {
1644         EXPECT_EQ(table.second.upLoadInfo.batchIndex, 4u);
1645         EXPECT_EQ(table.second.upLoadInfo.total, 100u);
1646         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1647         EXPECT_EQ(table.second.upLoadInfo.successCount, 100u);
1648         EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
1649     }
1650     virtualCloudDb_->ForkUpload(nullptr);
1651 }
1652 
1653 /**
1654  * @tc.name: UploadAssetsTest004
1655  * @tc.desc: Test batch delete return error CLOUD_RECORD_NOT_FOUND.
1656  * @tc.type: FUNC
1657  * @tc.require:
1658  * @tc.author: liaoyonghuang
1659  */
1660 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest004, TestSize.Level0)
1661 {
1662     /**
1663      * @tc.steps:step1. Insert 100 records and sync to cloud.
1664      * @tc.expected: step1. ok.
1665      */
1666     const int actualCount = 100;
1667     InsertUserTableRecord(tableName_, 0, actualCount);
1668     Query query = Query::Select().FromTable({ tableName_ });
1669     BlockSync(query, delegate_);
1670     /**
1671      * @tc.steps:step2. delete 50 records in local.
1672      * @tc.expected: step2. ok.
1673      */
1674     std::string sql = "delete from " + tableName_ + " where CAST(id AS INTEGER) >= " + std::to_string(actualCount / 2);
1675     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
1676     /**
1677      * @tc.steps:step3. set return error CLOUD_RECORD_NOT_FOUND in batch delete.
1678      * @tc.expected: step3. ok.
1679      */
1680     int index = 0;
__anonde3e756b1402(const std::string &tableName, VBucket &extend) 1681     virtualCloudDb_->ForkUpload([&index](const std::string &tableName, VBucket &extend) {
1682         if (extend.count(CloudDbConstant::DELETE_FIELD) != 0 && index % 2 == 0 &&
1683             std::get<bool>(extend.at(CloudDbConstant::DELETE_FIELD))) {
1684             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
1685         }
1686         index++;
1687     });
1688     /**
1689      * @tc.steps:step4. sync and check result.
1690      * @tc.expected: step4. ok.
1691      */
1692     BlockSync(query, delegate_);
1693     for (const auto &table : lastProcess_.tableProcess) {
1694         EXPECT_EQ(table.second.upLoadInfo.total, 50u);
1695         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1696         EXPECT_EQ(table.second.upLoadInfo.successCount, 50u);
1697     }
1698     virtualCloudDb_->ForkUpload(nullptr);
1699 }
1700 
1701 /**
1702  * @tc.name: BatchNormalDownloadAsset001
1703  * @tc.desc: Test batch download asset in two batch.
1704  * @tc.type: FUNC
1705  * @tc.require:
1706  * @tc.author: zqq
1707  */
1708 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, BatchNormalDownloadAsset001, TestSize.Level0)
1709 {
1710     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1711     PrepareDataInCloud();
1712 
1713     Query query = Query::Select().FromTable({ tableName_ });
1714     BlockSync(query, delegate_);
1715     EXPECT_EQ(virtualAssetLoader_->GetBatchDownloadCount(), 2u); // download 2 times
1716     EXPECT_EQ(virtualAssetLoader_->GetBatchRemoveCount(), 0u);   // remove 0 times
1717     virtualAssetLoader_->Reset();
1718     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1719 }
1720 
1721 /**
1722  * @tc.name: BatchAbnormalDownloadAsset001
1723  * @tc.desc: Test batch download asset failed.
1724  * @tc.type: FUNC
1725  * @tc.require:
1726  * @tc.author: zqq
1727  */
1728 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, BatchAbnormalDownloadAsset001, TestSize.Level0)
1729 {
1730     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1731     PrepareDataInCloud();
__anonde3e756b1502(int rowIndex, std::map<std::string, Assets> &assets) 1732     virtualAssetLoader_->ForkBatchDownload([](int rowIndex, std::map<std::string, Assets> &assets) {
1733         if (rowIndex > 50) { // 50 record failed
1734             for (auto &asset : assets) {
1735                 for (auto &item : asset.second) {
1736                     item.status = AssetStatus::ABNORMAL;
1737                 }
1738             }
1739             return DB_ERROR;
1740         }
1741         return OK;
1742     });
1743 
1744     Query query = Query::Select().FromTable({ tableName_ });
1745     BlockSync(query, delegate_);
1746     EXPECT_EQ(virtualAssetLoader_->GetBatchDownloadCount(), 1u); // download 1 times
1747     EXPECT_EQ(virtualAssetLoader_->GetBatchRemoveCount(), 0u);   // remove 0 times
1748     virtualAssetLoader_->Reset();
1749 
1750     virtualAssetLoader_->ForkBatchDownload(nullptr);
1751     BlockSync(query, delegate_);
1752     EXPECT_EQ(virtualAssetLoader_->GetBatchDownloadCount(), 2u); // download 2 times
1753     EXPECT_EQ(virtualAssetLoader_->GetBatchRemoveCount(), 0u);   // remove 0 times
1754     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1755 }
1756 }
1757 #endif
1758