• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "cloud/asset_operation_utils.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "mock_asset_loader.h"
23 #include "process_system_api_adapter_impl.h"
24 #include "relational_store_client.h"
25 #include "relational_store_instance.h"
26 #include "relational_store_manager.h"
27 #include "runtime_config.h"
28 #include "sqlite_relational_store.h"
29 #include "sqlite_relational_utils.h"
30 #include "time_helper.h"
31 #include "virtual_asset_loader.h"
32 #include "virtual_cloud_data_translate.h"
33 #include "virtual_cloud_db.h"
34 #include <gtest/gtest.h>
35 #include <iostream>
36 
37 using namespace testing::ext;
38 using namespace DistributedDB;
39 using namespace DistributedDBUnitTest;
40 using namespace std;
41 
42 namespace {
43 const string STORE_ID = "Relational_Store_SYNC";
44 const string DB_SUFFIX = ".db";
45 const string ASSETS_TABLE_NAME = "student";
46 const string ASSETS_TABLE_NAME_SHARED = "student_shared";
47 const string NO_PRIMARY_TABLE = "teacher";
48 const string NO_PRIMARY_TABLE_SHARED = "teacher_shared";
49 const string COMPOUND_PRIMARY_TABLE = "worker1";
50 const string COMPOUND_PRIMARY_TABLE_SHARED = "worker1_shared";
51 const string DEVICE_CLOUD = "cloud_dev";
52 const string COL_ID = "id";
53 const string COL_NAME = "name";
54 const string COL_HEIGHT = "height";
55 const string COL_ASSET = "asset";
56 const string COL_ASSETS = "assets";
57 const string COL_AGE = "age";
58 const int64_t SYNC_WAIT_TIME = 600;
59 const int64_t COMPENSATED_SYNC_WAIT_TIME = 5;
60 const std::vector<Field> CLOUD_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true}, {COL_NAME, TYPE_INDEX<std::string>},
61     {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
62     {COL_AGE, TYPE_INDEX<int64_t>}};
63 const std::vector<Field> NO_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>}, {COL_NAME, TYPE_INDEX<std::string>},
64     {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
65     {COL_AGE, TYPE_INDEX<int64_t>}};
66 const std::vector<Field> COMPOUND_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true},
67     {COL_NAME, TYPE_INDEX<std::string>}, {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>},
68     {COL_ASSETS, TYPE_INDEX<Assets>}, {COL_AGE, TYPE_INDEX<int64_t>, true}};
69 const string CREATE_SINGLE_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + ASSETS_TABLE_NAME + "(" + COL_ID +
70     " INTEGER PRIMARY KEY," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," +
71     COL_ASSETS + " ASSETS," + COL_AGE + " INT);";
72 const string CREATE_NO_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + NO_PRIMARY_TABLE + "(" + COL_ID +
73     " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS +
74     " ASSETS," + COL_AGE + " INT);";
75 const string CREATE_COMPOUND_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + COMPOUND_PRIMARY_TABLE + "(" + COL_ID +
76     " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS + " ASSETS," +
77     COL_AGE + " INT, PRIMARY KEY (id, age));";
78 const Asset ASSET_COPY = {.version = 1,
79     .name = "Phone",
80     .assetId = "0",
81     .subpath = "/local/sync",
82     .uri = "/local/sync",
83     .modifyTime = "123456",
84     .createTime = "",
85     .size = "256",
86     .hash = "ASE"};
87 const Asset ASSET_COPY2 = {.version = 1,
88     .name = "Phone_copy_2",
89     .assetId = "0",
90     .subpath = "/local/sync",
91     .uri = "/local/sync",
92     .modifyTime = "123456",
93     .createTime = "",
94     .size = "256",
95     .hash = "ASE"};
96 const Assets ASSETS_COPY1 = { ASSET_COPY, ASSET_COPY2 };
97 const std::string QUERY_CONSISTENT_SQL = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
98     " where flag&0x20=0;";
99 const std::string QUERY_COMPENSATED_SQL = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
100     " where flag&0x10!=0;";
101 
102 string g_storePath;
103 string g_testDir;
104 RelationalStoreObserverUnitTest *g_observer = nullptr;
105 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
106 RelationalStoreDelegate *g_delegate = nullptr;
107 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
108 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
109 std::shared_ptr<VirtualCloudDataTranslate> g_virtualCloudDataTranslate;
110 SyncProcess g_syncProcess;
111 std::condition_variable g_processCondition;
112 std::mutex g_processMutex;
113 IRelationalStore *g_store = nullptr;
114 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
115 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
116 
InitDatabase(sqlite3 * & db)117 void InitDatabase(sqlite3 *&db)
118 {
119     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
120     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_SINGLE_PRIMARY_KEY_TABLE), SQLITE_OK);
121     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_NO_PRIMARY_KEY_TABLE), SQLITE_OK);
122     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_COMPOUND_PRIMARY_KEY_TABLE), SQLITE_OK);
123 }
124 
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)125 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
126 {
127     TableSchema assetsTableSchema = {.name = ASSETS_TABLE_NAME, .sharedTableName = ASSETS_TABLE_NAME_SHARED,
128                                      .fields = CLOUD_FIELDS};
129     dataBaseSchema.tables.push_back(assetsTableSchema);
130     assetsTableSchema = {.name = NO_PRIMARY_TABLE, .sharedTableName = NO_PRIMARY_TABLE_SHARED,
131                          .fields = NO_PRIMARY_FIELDS};
132     dataBaseSchema.tables.push_back(assetsTableSchema);
133     assetsTableSchema = {.name = COMPOUND_PRIMARY_TABLE, .sharedTableName = COMPOUND_PRIMARY_TABLE_SHARED,
134                          .fields = COMPOUND_PRIMARY_FIELDS};
135     dataBaseSchema.tables.push_back(assetsTableSchema);
136 }
137 
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,std::vector<VBucket> & record,std::vector<VBucket> & extend)138 void GenerateDataRecords(
139     int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend)
140 {
141     for (int64_t i = begin; i < begin + count; i++) {
142         Assets assets;
143         Asset asset = ASSET_COPY;
144         asset.name = ASSET_COPY.name + std::to_string(i);
145         assets.emplace_back(asset);
146         asset.name = ASSET_COPY.name + std::to_string(i) + "_copy";
147         assets.emplace_back(asset);
148         VBucket data;
149         data.insert_or_assign(COL_ID, i);
150         data.insert_or_assign(COL_NAME, "name" + std::to_string(i));
151         data.insert_or_assign(COL_HEIGHT, 166.0 * i); // 166.0 is random double value
152         data.insert_or_assign(COL_ASSETS, assets);
153         data.insert_or_assign(COL_AGE, 18L + i); // 18 is random int value
154         record.push_back(data);
155 
156         VBucket log;
157         Timestamp now = TimeHelper::GetSysCurrentTime();
158         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
159         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
160         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
161         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
162         extend.push_back(log);
163     }
164 }
165 
InsertLocalData(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName,bool isAssetNull=true)166 void InsertLocalData(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName, bool isAssetNull = true)
167 {
168     int errCode;
169     std::vector<VBucket> record;
170     std::vector<VBucket> extend;
171     GenerateDataRecords(begin, count, 0, record, extend);
172     const string sql = "insert or replace into " + tableName + " values (?,?,?,?,?,?);";
173     for (VBucket vBucket : record) {
174         sqlite3_stmt *stmt = nullptr;
175         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
176         ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, std::get<int64_t>(vBucket[COL_ID])), E_OK); // 1 is id
177         ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, std::get<string>(vBucket[COL_NAME])), E_OK); // 2 is name
178         ASSERT_EQ(SQLiteUtils::MapSQLiteErrno(
179             sqlite3_bind_double(stmt, 3, std::get<double>(vBucket[COL_HEIGHT]))), E_OK); // 3 is height
180         if (isAssetNull) {
181             ASSERT_EQ(sqlite3_bind_null(stmt, 4), SQLITE_OK); // 4 is asset
182         } else {
183             std::vector<uint8_t> assetBlob = g_virtualCloudDataTranslate->AssetToBlob(ASSET_COPY);
184             ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 4, assetBlob, false), E_OK); // 4 is asset
185         }
186         std::vector<uint8_t> assetsBlob = g_virtualCloudDataTranslate->AssetsToBlob(
187             std::get<Assets>(vBucket[COL_ASSETS]));
188         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 5, assetsBlob, false), E_OK); // 5 is assets
189         ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 6, std::get<int64_t>(vBucket[COL_AGE])), E_OK); // 6 is age
190         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
191         SQLiteUtils::ResetStatement(stmt, true, errCode);
192     }
193 }
194 
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets)195 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets)
196 {
197     int errCode;
198     std::vector<uint8_t> assetBlob;
199     const string sql = "update " + tableName + " set assets=?;";
200     sqlite3_stmt *stmt = nullptr;
201     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
202     assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
203     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
204     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
205     SQLiteUtils::ResetStatement(stmt, true, errCode);
206 }
207 
DeleteLocalRecord(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName)208 void DeleteLocalRecord(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName)
209 {
210     ASSERT_NE(db, nullptr);
211     for (int64_t i = begin; i < begin + count; i++) {
212         string sql = "DELETE FROM " + tableName + " WHERE id ='" + std::to_string(i) + "';";
213         ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db, sql), E_OK);
214     }
215 }
216 
DeleteCloudDBData(int64_t begin,int64_t count,const std::string & tableName)217 void DeleteCloudDBData(int64_t begin, int64_t count, const std::string &tableName)
218 {
219     for (int64_t i = begin; i < begin + count; i++) {
220         VBucket idMap;
221         idMap.insert_or_assign("#_gid", std::to_string(i));
222         ASSERT_EQ(g_virtualCloudDb->DeleteByGid(tableName, idMap), DBStatus::OK);
223     }
224 }
225 
UpdateCloudDBData(int64_t begin,int64_t count,int64_t gidStart,int64_t versionStart,const std::string & tableName)226 void UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart, int64_t versionStart,
227     const std::string &tableName)
228 {
229     std::this_thread::sleep_for(std::chrono::milliseconds(1));
230     std::vector<VBucket> record;
231     std::vector<VBucket> extend;
232     GenerateDataRecords(begin, count, gidStart, record, extend);
233     for (auto &entry: extend) {
234         entry[CloudDbConstant::VERSION_FIELD] = std::to_string(versionStart++);
235     }
236     ASSERT_EQ(g_virtualCloudDb->BatchUpdate(tableName, std::move(record), extend), DBStatus::OK);
237     std::this_thread::sleep_for(std::chrono::milliseconds(1));
238 }
239 
QueryStatusCallback(void * data,int count,char ** colValue,char ** colName)240 int QueryStatusCallback(void *data, int count, char **colValue, char **colName)
241 {
242     auto status = static_cast<std::vector<int64_t> *>(data);
243     int base = 10;
244     for (int i = 0; i < count; i++) {
245         status->push_back(strtol(colValue[0], nullptr, base));
246     }
247     return 0;
248 }
249 
CheckLockStatus(sqlite3 * db,int startId,int endId,LockStatus lockStatus)250 void CheckLockStatus(sqlite3 *db, int startId, int endId, LockStatus lockStatus)
251 {
252     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
253     std::string sql = "select status from " + logName + " where data_key >=" + std::to_string(startId) +
254         " and data_key <=" +  std::to_string(endId) + ";";
255     std::vector<int64_t> status;
256     char *str = NULL;
257     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryStatusCallback, static_cast<void *>(&status), &str),
258         SQLITE_OK);
259     ASSERT_EQ(static_cast<size_t>(endId - startId + 1), status.size());
260 
261     for (auto stat : status) {
262         ASSERT_EQ(static_cast<int64_t>(lockStatus), stat);
263     }
264 }
265 
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const std::string & tableName)266 void InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart, const std::string &tableName)
267 {
268     std::vector<VBucket> record;
269     std::vector<VBucket> extend;
270     GenerateDataRecords(begin, count, gidStart, record, extend);
271     if (tableName == ASSETS_TABLE_NAME_SHARED) {
272         for (auto &vBucket: record) {
273             vBucket.insert_or_assign(CloudDbConstant::CLOUD_OWNER, std::string("cloudA"));
274         }
275     }
276     ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(tableName, std::move(record), extend), DBStatus::OK);
277 }
278 
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)279 void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
280 {
281     std::unique_lock<std::mutex> lock(g_processMutex);
282     bool result = g_processCondition.wait_for(
283         lock, std::chrono::seconds(waitTime), [&syncProcess]() { return syncProcess.process == FINISHED; });
284     ASSERT_EQ(result, true);
285     LOGD("-------------------sync end--------------");
286 }
287 
CallSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus,DBStatus errCode=OK)288 void CallSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus, DBStatus errCode = OK)
289 {
290     g_syncProcess = {};
291     Query query = Query::Select().FromTable(tableNames);
292     std::vector<SyncProcess> expectProcess;
293     CloudSyncStatusCallback callback = [&errCode](const std::map<std::string, SyncProcess> &process) {
294         ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
295         g_syncProcess = std::move(process.begin()->second);
296         if (g_syncProcess.process == FINISHED) {
297             g_processCondition.notify_one();
298             ASSERT_EQ(g_syncProcess.errCode, errCode);
299         }
300     };
301     CloudSyncOption option;
302     option.devices = {DEVICE_CLOUD};
303     option.mode = mode;
304     option.query = query;
305     option.waitTime = SYNC_WAIT_TIME;
306     option.lockAction = static_cast<LockAction>(0xff); // lock all
307     ASSERT_EQ(g_delegate->Sync(option, callback), dbStatus);
308 
309     if (dbStatus == DBStatus::OK) {
310         WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
311     }
312 }
313 
CheckDownloadForTest001(int index,map<std::string,Assets> & assets)314 void CheckDownloadForTest001(int index, map<std::string, Assets> &assets)
315 {
316     for (auto &item : assets) {
317         for (auto &asset : item.second) {
318             EXPECT_EQ(AssetOperationUtils::EraseBitMask(asset.status), static_cast<uint32_t>(AssetStatus::INSERT));
319             if (index < 4) { // 1-4 is inserted
320                 EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::INSERT));
321             }
322             LOGD("asset [name]:%s, [status]:%u, [flag]:%u, [index]:%d", asset.name.c_str(), asset.status, asset.flag,
323                 index);
324         }
325     }
326 }
327 
CheckDownloadFailedForTest002(sqlite3 * & db)328 void CheckDownloadFailedForTest002(sqlite3 *&db)
329 {
330     std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME;
331     sqlite3_stmt *stmt = nullptr;
332     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
333     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
334         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
335         Type cloudValue;
336         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
337         std::vector<uint8_t> assetsBlob;
338         Assets assets;
339         ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
340         ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
341         ASSERT_EQ(assets.size(), 2u); // 2 is asset num
342         for (size_t i = 0; i < assets.size(); ++i) {
343             EXPECT_EQ(assets[i].hash, "");
344             EXPECT_EQ(assets[i].status, AssetStatus::ABNORMAL);
345         }
346     }
347     int errCode;
348     SQLiteUtils::ResetStatement(stmt, true, errCode);
349 }
350 
UpdateAssetsForLocal(sqlite3 * & db,int id,uint32_t status)351 void UpdateAssetsForLocal(sqlite3 *&db, int id, uint32_t status)
352 {
353     Assets assets;
354     Asset asset = ASSET_COPY;
355     asset.name = ASSET_COPY.name + std::to_string(id);
356     asset.status = status;
357     assets.emplace_back(asset);
358     asset.name = ASSET_COPY.name + std::to_string(id) + "_copy";
359     assets.emplace_back(asset);
360     int errCode;
361     std::vector<uint8_t> assetBlob;
362     const string sql = "update " + ASSETS_TABLE_NAME + " set assets=? where id = " + std::to_string(id);
363     sqlite3_stmt *stmt = nullptr;
364     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
365     assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
366     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
367     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
368     SQLiteUtils::ResetStatement(stmt, true, errCode);
369 }
370 
CheckConsistentCount(sqlite3 * db,int64_t expectCount)371 void CheckConsistentCount(sqlite3 *db, int64_t expectCount)
372 {
373     EXPECT_EQ(sqlite3_exec(db, QUERY_CONSISTENT_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
374         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
375 }
376 
CheckCompensatedCount(sqlite3 * db,int64_t expectCount)377 void CheckCompensatedCount(sqlite3 *db, int64_t expectCount)
378 {
379     EXPECT_EQ(sqlite3_exec(db, QUERY_COMPENSATED_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
380         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
381 }
382 
CloseDb()383 void CloseDb()
384 {
385     delete g_observer;
386     g_virtualCloudDb = nullptr;
387     if (g_delegate != nullptr) {
388         EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
389         g_delegate = nullptr;
390     }
391 }
392 
393 class DistributedDBCloudSyncerDownloadAssetsTest : public testing::Test {
394 public:
395     static void SetUpTestCase(void);
396     static void TearDownTestCase(void);
397     void SetUp();
398     void TearDown();
399 
400 protected:
401     void CheckLocaLAssets(const std::string &tableName, const std::string &expectAssetId,
402         const std::set<int> &failIndex);
403     void CheckLocalAssetIsEmpty(const std::string &tableName);
404     void CheckCursorData(const std::string &tableName, int begin);
405     void WaitForSync(int &syncCount);
406     const RelationalSyncAbleStorage *GetRelationalStore();
407     void InitDataStatusTest(bool needDownload);
408     void DataStatusTest001(bool needDownload);
409     void DataStatusTest003();
410     void DataStatusTest004();
411     void DataStatusTest005();
412     void DataStatusTest006();
413     void DataStatusTest007();
414     sqlite3 *db = nullptr;
415 };
416 
SetUpTestCase(void)417 void DistributedDBCloudSyncerDownloadAssetsTest::SetUpTestCase(void)
418 {
419     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
420     g_storePath = g_testDir + "/" + STORE_ID + DB_SUFFIX;
421     LOGI("The test db is:%s", g_storePath.c_str());
422     g_virtualCloudDataTranslate = std::make_shared<VirtualCloudDataTranslate>();
423     RuntimeConfig::SetCloudTranslate(g_virtualCloudDataTranslate);
424 }
425 
TearDownTestCase(void)426 void DistributedDBCloudSyncerDownloadAssetsTest::TearDownTestCase(void) {}
427 
SetUp(void)428 void DistributedDBCloudSyncerDownloadAssetsTest::SetUp(void)
429 {
430     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
431         LOGE("rm test db files error.");
432     }
433     DistributedDBToolsUnitTest::PrintTestCaseInfo();
434     LOGD("Test dir is %s", g_testDir.c_str());
435     db = RelationalTestUtils::CreateDataBase(g_storePath);
436     ASSERT_NE(db, nullptr);
437     InitDatabase(db);
438     g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
439     ASSERT_NE(g_observer, nullptr);
440     ASSERT_EQ(
441         g_mgr.OpenStore(g_storePath, STORE_ID, RelationalStoreDelegate::Option{.observer = g_observer}, g_delegate),
442         DBStatus::OK);
443     ASSERT_NE(g_delegate, nullptr);
444     ASSERT_EQ(g_delegate->CreateDistributedTable(ASSETS_TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
445     ASSERT_EQ(g_delegate->CreateDistributedTable(NO_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
446     ASSERT_EQ(g_delegate->CreateDistributedTable(COMPOUND_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
447     g_virtualCloudDb = make_shared<VirtualCloudDb>();
448     g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
449     g_syncProcess = {};
450     ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
451     ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
452     DataBaseSchema dataBaseSchema;
453     GetCloudDbSchema(dataBaseSchema);
454     ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
455     g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
456     ASSERT_NE(g_cloudStoreHook, nullptr);
457 }
458 
TearDown(void)459 void DistributedDBCloudSyncerDownloadAssetsTest::TearDown(void)
460 {
461     RefObject::DecObjRef(g_store);
462     g_virtualCloudDb->ForkUpload(nullptr);
463     CloseDb();
464     EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
465     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
466         LOGE("rm test db files error.");
467     }
468 }
469 
CheckLocaLAssets(const std::string & tableName,const std::string & expectAssetId,const std::set<int> & failIndex)470 void DistributedDBCloudSyncerDownloadAssetsTest::CheckLocaLAssets(const std::string &tableName,
471     const std::string &expectAssetId, const std::set<int> &failIndex)
472 {
473     std::string sql = "SELECT assets FROM " + tableName + ";";
474     sqlite3_stmt *stmt = nullptr;
475     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
476     int index = 0;
477     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
478         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
479         Type cloudValue;
480         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
481         Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
482         for (const auto &asset : assets) {
483             index++;
484             if (failIndex.find(index) != failIndex.end()) {
485                 EXPECT_EQ(asset.assetId, "0");
486             } else {
487                 EXPECT_EQ(asset.assetId, expectAssetId);
488             }
489         }
490     }
491     int errCode = E_OK;
492     SQLiteUtils::ResetStatement(stmt, true, errCode);
493 }
494 
CheckLocalAssetIsEmpty(const std::string & tableName)495 void DistributedDBCloudSyncerDownloadAssetsTest::CheckLocalAssetIsEmpty(const std::string &tableName)
496 {
497     std::string sql = "SELECT asset FROM " + tableName + ";";
498     sqlite3_stmt *stmt = nullptr;
499     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
500     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
501         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
502     }
503     int errCode = E_OK;
504     SQLiteUtils::ResetStatement(stmt, true, errCode);
505 }
506 
CheckCursorData(const std::string & tableName,int begin)507 void DistributedDBCloudSyncerDownloadAssetsTest::CheckCursorData(const std::string &tableName, int begin)
508 {
509     std::string logTableName = DBCommon::GetLogTableName(tableName);
510     std::string sql = "SELECT cursor FROM " + logTableName + ";";
511     sqlite3_stmt *stmt = nullptr;
512     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
513     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
514         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_INTEGER);
515         Type cloudValue;
516         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
517         EXPECT_EQ(std::get<int64_t>(cloudValue), begin);
518         begin++;
519     }
520     int errCode = E_OK;
521     SQLiteUtils::ResetStatement(stmt, true, errCode);
522 }
523 
WaitForSync(int & syncCount)524 void DistributedDBCloudSyncerDownloadAssetsTest::WaitForSync(int &syncCount)
525 {
526     std::unique_lock<std::mutex> lock(g_processMutex);
527     bool result = g_processCondition.wait_for(lock, std::chrono::seconds(COMPENSATED_SYNC_WAIT_TIME),
528         [&syncCount]() { return syncCount == 2; }); // 2 is compensated sync
529     ASSERT_EQ(result, true);
530 }
531 
GetRelationalStore()532 const RelationalSyncAbleStorage* DistributedDBCloudSyncerDownloadAssetsTest::GetRelationalStore()
533 {
534     RelationalDBProperties properties;
535     CloudDBSyncUtilsTest::InitStoreProp(g_storePath, APP_ID, USER_ID, STORE_ID, properties);
536     int errCode = E_OK;
537     g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
538     if (g_store == nullptr) {
539         return nullptr;
540     }
541     return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
542 }
543 
InitDataStatusTest(bool needDownload)544 void DistributedDBCloudSyncerDownloadAssetsTest::InitDataStatusTest(bool needDownload)
545 {
546     int cloudCount = 20;
547     int localCount = 10;
548     InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
549     if (needDownload) {
550         UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
551     }
552     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
553     std::string sql = "update " + logName + " SET status = 1 where data_key in (1,11);";
554     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
555     sql = "update " + logName + " SET status = 2 where data_key in (2,12);";
556     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
557     sql = "update " + logName + " SET status = 3 where data_key in (3,13);";
558     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
559     std::this_thread::sleep_for(std::chrono::milliseconds(1));
560     InsertCloudDBData(0, localCount, 0, ASSETS_TABLE_NAME);
561     std::this_thread::sleep_for(std::chrono::milliseconds(1));
562     sql = "update " + ASSETS_TABLE_NAME + " set age='666' where id in (4);";
563     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
564     sql = "update " + logName + " SET status = 1 where data_key in (4);";
565     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
566 }
567 
DataStatusTest001(bool needDownload)568 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest001(bool needDownload)
569 {
570     int cloudCount = 20;
571     int count = 0;
572     g_cloudStoreHook->SetSyncFinishHook([&count, cloudCount, this]() {
573         count++;
574         if (count == 1) {
575             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
576                 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (11, 4)) or (status = 0)";
577             CloudDBSyncUtilsTest::CheckCount(db, sql, cloudCount);
578         }
579         if (count == 2) { // 2 is compensated sync
580             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
581                 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
582             CloudDBSyncUtilsTest::CheckCount(db, sql, cloudCount);
583             g_processCondition.notify_one();
584         }
585     });
586     InitDataStatusTest(needDownload);
587     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
588     WaitForSync(count);
589 }
590 
DataStatusTest003()591 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest003()
592 {
593     int count = 0;
594     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
595         count++;
596         if (count == 1) {
597             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
598                 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0 and data_key = 11)";
599             CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
600         }
601         if (count == 2) { // 2 is compensated sync
602             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
603                 " (status = 3 and data_key in (0,2,3,12,13) or (status = 0))";
604             CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
605             g_processCondition.notify_one();
606         }
607     });
608     int downLoadCount = 0;
609     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
610         downLoadCount++;
611         if (downLoadCount == 1) {
612             std::vector<std::vector<uint8_t>> hashKey;
613             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
614             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
615         }
616     });
617     InitDataStatusTest(true);
618     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
619     WaitForSync(count);
620 }
621 
DataStatusTest004()622 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest004()
623 {
624     int count = 0;
625     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
626         count++;
627         if (count == 1) {
628             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
629                 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (-1,11))";
630             CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
631         }
632         if (count == 2) { // 2 is compensated sync
633             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
634                 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
635             CloudDBSyncUtilsTest::CheckCount(db, sql, 19); // 19 is match count
636             g_processCondition.notify_one();
637         }
638     });
639     int downLoadCount = 0;
640     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
641         downLoadCount++;
642         if (downLoadCount == 1) {
643             std::vector<std::vector<uint8_t>> hashKey;
644             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
645             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
646             std::string sql = "delete from " + ASSETS_TABLE_NAME + " WHERE id=0";
647             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
648         }
649     });
650     int queryIdx = 0;
651     g_virtualCloudDb->ForkQuery([this, &queryIdx](const std::string &, VBucket &) {
652         LOGD("query index:%d", ++queryIdx);
653         if (queryIdx == 4) { // 4 is compensated sync
654             std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
655                 " SET status = 1 where data_key=15;";
656             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
657         }
658     });
659     InitDataStatusTest(true);
660     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
661     WaitForSync(count);
662 }
663 
DataStatusTest005()664 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest005()
665 {
666     int count = 0;
667     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
668         count++;
669         if (count == 1) {
670             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
671                 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0 and data_key in (11))";
672             CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
673         }
674         if (count == 2) { // 2 is compensated sync
675             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
676                 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0)";
677             CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
678             g_processCondition.notify_one();
679         }
680     });
681     int downLoadCount = 0;
682     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
683         downLoadCount++;
684         if (downLoadCount == 1) {
685             std::vector<std::vector<uint8_t>> hashKey;
686             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
687             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
688             std::string sql = "update " + ASSETS_TABLE_NAME + " set name='x' WHERE id=0";
689             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
690         }
691     });
692     InitDataStatusTest(true);
693     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
694     WaitForSync(count);
695 }
696 
DataStatusTest006()697 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest006()
698 {
699     int count = 0;
700     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
701         count++;
702         if (count == 1) {
703             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
704                 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (0)) or "
705                 "(status = 0 and data_key in (11))";
706             CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
707         }
708         if (count == 2) { // 2 is compensated sync
709             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
710                 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
711             CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
712             g_processCondition.notify_one();
713         }
714     });
715     int downLoadCount = 0;
716     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
717         downLoadCount++;
718         if (downLoadCount == 1) {
719             std::vector<std::vector<uint8_t>> hashKey;
720             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
721             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
722             std::string sql = "update " + ASSETS_TABLE_NAME + " set name='x' WHERE id=0";
723             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
724             EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
725         }
726     });
727     InitDataStatusTest(true);
728     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
729     WaitForSync(count);
730 }
731 
DataStatusTest007()732 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest007()
733 {
734     int count = 0;
735     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
736         count++;
737         if (count == 1) {
738             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
739                 " (status = 3 and data_key in (2,3,13)) or (status = 1 and data_key in (1,11))";
740             CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
741         }
742         if (count == 2) { // 2 is compensated sync
743             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
744                 " (status = 3 and data_key in (2,3,13)) or (status = 1 and data_key in (1,11))";
745             CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
746             g_processCondition.notify_one();
747         }
748     });
749     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
750     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
751     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
752         .WillRepeatedly([](const std::string &, const std::string &gid, const Type &,
753             std::map<std::string, Assets> &assets) {
754             return CLOUD_ERROR;
755         });
756     InitDataStatusTest(true);
757     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
758     WaitForSync(count);
759 }
760 
761 /*
762  * @tc.name: DownloadAssetForDupDataTest001
763  * @tc.desc: Test the download interface call with duplicate data for the same primary key.
764  * @tc.type: FUNC
765  * @tc.require:
766  * @tc.author: liufuchenxing
767  */
768 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest001, TestSize.Level0)
769 {
770     /**
771      * @tc.steps:step1. Mock asset download interface.
772      * @tc.expected: step1. return OK and interface will be called 4 times. delete 1, delete 2, insert 1, insert 2
773      */
774     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
775     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
776     int index = 1;
777     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
778         .Times(2)
779         .WillRepeatedly(
__anon947dd0281102(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 780             [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
781                 LOGD("Download GID:%s", gid.c_str());
782                 CheckDownloadForTest001(index, assets);
783                 index++;
784                 return DBStatus::OK;
785             });
786 
787     /**
788      * @tc.steps:step2. Insert local data [0, 10), sync data
789      * @tc.expected: step2. sync success.
790      */
791     InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME);
792     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
793 
794     /**
795      * @tc.steps:step3. delete cloud data [1, 2], then insert cloud data [1,2] with new gid. Finally sync data.
796      * @tc.expected: step3. sync success.
797      */
798     DeleteCloudDBData(1, 2, ASSETS_TABLE_NAME);
799     InsertCloudDBData(1, 2, 10, ASSETS_TABLE_NAME);
800     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
801 }
802 
803 /**
804  * @tc.name: FillAssetId001
805  * @tc.desc: Test if assetId is filled in single primary key table
806  * @tc.type: FUNC
807  * @tc.require:
808  * @tc.author: chenchaohao
809  */
810 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId001, TestSize.Level0)
811 {
812     /**
813      * @tc.steps:step1. local insert assets and sync, check the local assetId.
814      * @tc.expected: step1. return OK.
815      */
816     int localCount = 50;
817     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
818     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
819     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
820 
821     /**
822      * @tc.steps:step2. local update assets and sync ,check the local assetId.
823      * @tc.expected: step2. sync success.
824      */
825     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
826     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
827     CheckLocalAssetIsEmpty(ASSETS_TABLE_NAME);
828     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
829 }
830 
831 /**
832  * @tc.name: FillAssetId002
833  * @tc.desc: Test if assetId is filled in no primary key table
834  * @tc.type: FUNC
835  * @tc.require:
836  * @tc.author: chenchaohao
837  */
838 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId002, TestSize.Level0)
839 {
840     /**
841      * @tc.steps:step1. local insert assets and sync, check the local assetId.
842      * @tc.expected: step1. return OK.
843      */
844     int localCount = 50;
845     InsertLocalData(db, 0, localCount, NO_PRIMARY_TABLE);
846     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
847     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
848 
849     /**
850      * @tc.steps:step2. local update assets and sync ,check the local assetId.
851      * @tc.expected: step2. sync success.
852      */
853     UpdateLocalData(db, NO_PRIMARY_TABLE, ASSETS_COPY1);
854     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
855     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
856 }
857 
858 /**
859  * @tc.name: FillAssetId003
860  * @tc.desc: Test if assetId is filled in compound primary key table
861  * @tc.type: FUNC
862  * @tc.require:
863  * @tc.author: chenchaohao
864  */
865 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId003, TestSize.Level0)
866 {
867     /**
868      * @tc.steps:step1. local insert assets and sync, check the local assetId.
869      * @tc.expected: step1. return OK.
870      */
871     int localCount = 50;
872     InsertLocalData(db, 0, localCount, COMPOUND_PRIMARY_TABLE);
873     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
874     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
875 
876     /**
877      * @tc.steps:step2. local update assets and sync ,check the local assetId.
878      * @tc.expected: step2. sync success.
879      */
880     UpdateLocalData(db, COMPOUND_PRIMARY_TABLE, ASSETS_COPY1);
881     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
882     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
883 }
884 
885 /**
886  * @tc.name: FillAssetId004
887  * @tc.desc: Test if assetId is filled in single primary key table when CLOUD_FORCE_PUSH
888  * @tc.type: FUNC
889  * @tc.require:
890  * @tc.author: chenchaohao
891  */
892 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId004, TestSize.Level0)
893 {
894     /**
895      * @tc.steps:step1. local insert assets and sync, check the local assetId.
896      * @tc.expected: step1. return OK.
897      */
898     int localCount = 50;
899     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
900     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
901     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
902 
903     /**
904      * @tc.steps:step2. local update assets and sync ,check the local assetId.
905      * @tc.expected: step2. sync success.
906      */
907     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
908     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
909     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
910 }
911 
912 /**
913  * @tc.name: FillAssetId001
914  * @tc.desc: Test if assetId is filled in no primary key table when CLOUD_FORCE_PUSH
915  * @tc.type: FUNC
916  * @tc.require:
917  * @tc.author: chenchaohao
918  */
919 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId005, TestSize.Level0)
920 {
921     /**
922      * @tc.steps:step1. local insert assets and sync, check the local assetId.
923      * @tc.expected: step1. return OK.
924      */
925     int localCount = 50;
926     InsertLocalData(db, 0, localCount, NO_PRIMARY_TABLE);
927     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
928     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
929 
930     /**
931      * @tc.steps:step2. local update assets and sync ,check the local assetId.
932      * @tc.expected: step2. sync success.
933      */
934     UpdateLocalData(db, NO_PRIMARY_TABLE, ASSETS_COPY1);
935     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
936     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
937 }
938 
939 /**
940  * @tc.name: FillAssetId006
941  * @tc.desc: Test if assetId is filled in compound primary key table when CLOUD_FORCE_PUSH
942  * @tc.type: FUNC
943  * @tc.require:
944  * @tc.author: chenchaohao
945  */
946 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId006, TestSize.Level0)
947 {
948     /**
949      * @tc.steps:step1. local insert assets and sync, check the local assetId.
950      * @tc.expected: step1. return OK.
951      */
952     int localCount = 50;
953     InsertLocalData(db, 0, localCount, COMPOUND_PRIMARY_TABLE);
954     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
955     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
956 
957     /**
958      * @tc.steps:step2. local update assets and sync ,check the local assetId.
959      * @tc.expected: step2. sync success.
960      */
961     UpdateLocalData(db, COMPOUND_PRIMARY_TABLE, ASSETS_COPY1);
962     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
963     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
964 }
965 
966 /**
967  * @tc.name: FillAssetId007
968  * @tc.desc: Test if assetId is filled when extend lack of assets
969  * @tc.type: FUNC
970  * @tc.require:
971  * @tc.author: chenchaohao
972  */
973 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId007, TestSize.Level0)
974 {
975     CloudSyncConfig config;
976     config.maxUploadCount = 200; // max upload 200
977     g_delegate->SetCloudSyncConfig(config);
978     /**
979      * @tc.steps:step1. local insert assets and sync, check the local assetId.
980      * @tc.expected: step1. return OK.
981      */
982     int localCount = 50;
983     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon947dd0281202(const std::string &tableName, VBucket &extend) 984     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
985         extend.erase("assets");
986     });
987     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
988     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
989 
990     /**
991      * @tc.steps:step2. local update assets and sync ,check the local assetId.
992      * @tc.expected: step2. sync success.
993      */
994     int addLocalCount = 10;
995     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
__anon947dd0281302(const std::string &tableName, VBucket &extend) 996     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
997         if (extend.find("assets") != extend.end()) {
998             for (auto &asset : std::get<Assets>(extend["assets"])) {
999                 asset.name = "pad";
1000             }
1001         }
1002     });
1003     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1004     int beginFailFillNum = 101;
1005     int endFailFillNum = 120;
1006     std::set<int> index;
1007     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1008         index.insert(i);
1009     }
1010     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1011 
1012     /**
1013      * @tc.steps:step2. local update assets and sync ,check the local assetId.
1014      * @tc.expected: step2. sync success.
1015      */
1016     g_virtualCloudDb->ForkUpload(nullptr);
1017     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1018     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1019 }
1020 
1021 /**
1022  * @tc.name: FillAssetId008
1023  * @tc.desc: Test if assetId is filled when extend lack of assetId
1024  * @tc.type: FUNC
1025  * @tc.require:
1026  * @tc.author: chenchaohao
1027  */
1028 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId008, TestSize.Level0)
1029 {
1030     /**
1031      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1032      * @tc.expected: step1. return OK.
1033      */
1034     int localCount = 50;
1035     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon947dd0281402(const std::string &tableName, VBucket &extend) 1036     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1037         if (extend.find("assets") != extend.end()) {
1038             for (auto &asset : std::get<Assets>(extend["assets"])) {
1039                 asset.assetId = "";
1040             }
1041         }
1042     });
1043     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1044     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1045 
1046     /**
1047      * @tc.steps:step2. local update assets and sync ,check the local assetId.
1048      * @tc.expected: step2. sync success.
1049      */
1050     g_virtualCloudDb->ForkUpload(nullptr);
1051     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1052     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1053 }
1054 
1055 /**
1056  * @tc.name: FillAssetId009
1057  * @tc.desc: Test if assetId is filled when extend exists useless assets
1058  * @tc.type: FUNC
1059  * @tc.require:
1060  * @tc.author: chenchaohao
1061  */
1062 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId009, TestSize.Level0)
1063 {
1064     /**
1065      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1066      * @tc.expected: step1. return OK.
1067      */
1068     int localCount = 50;
1069     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon947dd0281502(const std::string &tableName, VBucket &extend) 1070     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1071         if (extend.find("assets") != extend.end()) {
1072             Asset asset = ASSET_COPY2;
1073             Assets &assets = std::get<Assets>(extend["assets"]);
1074             assets.push_back(asset);
1075         }
1076     });
1077     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1078     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1079 }
1080 
1081 /**
1082  * @tc.name: FillAssetId010
1083  * @tc.desc: Test if assetId is filled when some success and some fail
1084  * @tc.type: FUNC
1085  * @tc.require:
1086  * @tc.author: chenchaohao
1087  */
1088 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId010, TestSize.Level0)
1089 {
1090     /**
1091      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1092      * @tc.expected: step1. return OK.
1093      */
1094     int localCount = 30;
1095     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1096     g_virtualCloudDb->SetInsertFailed(1);
1097     std::atomic<int> count = 0;
__anon947dd0281602(const std::string &tableName, VBucket &extend) 1098     g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1099         if (extend.find("assets") != extend.end() && count == 0) {
1100             extend["#_error"] = static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR);
1101             count++;
1102         }
1103     });
1104     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1105     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", { 1, 2 }); // 1st, 2nd asset do not fill
1106 }
1107 
1108 /**
1109  * @tc.name: FillAssetId011
1110  * @tc.desc: Test if assetId is null when removedevicedata in FLAG_ONLY
1111  * @tc.type: FUNC
1112  * @tc.require:
1113  * @tc.author: chenchaohao
1114  */
1115 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId011, TestSize.Level0)
1116 {
1117     /**
1118      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1119      * @tc.expected: step1. return OK.
1120      */
1121     int localCount = 50;
1122     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1123     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1124     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1125 
1126     g_delegate->RemoveDeviceData("", FLAG_ONLY);
1127     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1128     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1129     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1130 }
1131 
1132 /**
1133  * @tc.name: FillAssetId012
1134  * @tc.desc: Test if assetid is filled when extend size is not equal to record size
1135  * @tc.type: FUNC
1136  * @tc.require:
1137  * @tc.author: chenchaohao
1138  */
1139 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId012, TestSize.Level0)
1140 {
1141     /**
1142      * @tc.steps:step1. set extend size missing then sync, check the asseid.
1143      * @tc.expected: step1. return OK.
1144      */
1145     int localCount = 50;
1146     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1147     std::atomic<int> count = 1;
1148     g_virtualCloudDb->SetClearExtend(count);
1149     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1150     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1151 
1152     /**
1153      * @tc.steps:step2. set extend size normal then sync, check the asseid.
1154      * @tc.expected: step2. return OK.
1155      */
1156     g_virtualCloudDb->SetClearExtend(0);
1157     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1158     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1159 
1160     /**
1161      * @tc.steps:step3. set extend size large then sync, check the asseid.
1162      * @tc.expected: step3. return OK.
1163      */
1164     count = -1; // -1 means extend push a empty vBucket
1165     g_virtualCloudDb->SetClearExtend(count);
1166     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1167     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1168 }
1169 
1170 /**
1171  * @tc.name: FillAssetId013
1172  * @tc.desc: Test fill assetId and removedevicedata when data is delete
1173  * @tc.type: FUNC
1174  * @tc.require:
1175  * @tc.author: chenchaohao
1176  */
1177 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId013, TestSize.Level0)
1178 {
1179     /**
1180      * @tc.steps:step1. local insert data and sync, then delete local data and insert new data
1181      * @tc.expected: step1. return OK.
1182      */
1183     int localCount = 20;
1184     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1185     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1186     int deleteLocalCount = 10;
1187     DeleteLocalRecord(db, 0, deleteLocalCount, ASSETS_TABLE_NAME);
1188     int addLocalCount = 30;
1189     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
1190     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1191 
1192     /**
1193      * @tc.steps:step2. RemoveDeviceData.
1194      * @tc.expected: step2. return OK.
1195      */
1196     g_delegate->RemoveDeviceData("", FLAG_ONLY);
1197     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1198 }
1199 
1200 /**
1201  * @tc.name: FillAssetId014
1202  * @tc.desc: Test if asset status is reset when removedevicedata in FLAG_ONLY
1203  * @tc.type: FUNC
1204  * @tc.require:
1205  * @tc.author: bty
1206  */
1207 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId014, TestSize.Level0)
1208 {
1209     /**
1210      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1211      * @tc.expected: step1. return OK.
1212      */
1213     int localCount = 50;
1214     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1215     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1216     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1217 
1218     /**
1219      * @tc.steps:step2. RemoveDeviceData
1220      * @tc.expected: step2. return OK.
1221      */
1222     Assets assets;
1223     std::vector<AssetStatus> statusVec = {
1224         AssetStatus::INSERT, AssetStatus::UPDATE, AssetStatus::DELETE, AssetStatus::NORMAL,
1225         AssetStatus::ABNORMAL, AssetStatus::DOWNLOADING, AssetStatus::DOWNLOAD_WITH_NULL
1226     };
1227     for (auto &status : statusVec) {
1228         Asset temp = ASSET_COPY;
1229         temp.name += std::to_string(status);
1230         temp.status = status | AssetStatus::UPLOADING;
1231         assets.emplace_back(temp);
1232     }
1233     UpdateLocalData(db, ASSETS_TABLE_NAME, assets);
1234     EXPECT_EQ(g_delegate->RemoveDeviceData("", FLAG_ONLY), OK);
1235     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1236 
1237     /**
1238      * @tc.steps:step3. check status
1239      * @tc.expected: step3. return OK.
1240      */
1241     std::string sql = "SELECT assets FROM " + ASSETS_TABLE_NAME + ";";
1242     sqlite3_stmt *stmt = nullptr;
1243     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1244     int index = 0;
1245     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1246         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1247         Type cloudValue;
1248         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
1249         Assets newAssets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
1250         for (const auto &ast : newAssets) {
1251             EXPECT_EQ(ast.status, statusVec[index++ % statusVec.size()]);
1252         }
1253     }
1254     int errCode = E_OK;
1255     SQLiteUtils::ResetStatement(stmt, true, errCode);
1256 }
1257 
1258 /**
1259  * @tc.name: FillAssetId015
1260  * @tc.desc: Test if fill assetId when upload return cloud network error
1261  * @tc.type: FUNC
1262  * @tc.require:
1263  * @tc.author: chenchaohao
1264  */
1265 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId015, TestSize.Level0)
1266 {
1267     /**
1268      * @tc.steps:step1. local insert data and fork batchinsert return CLOUD_NETWORK_ERROR, then sync
1269      * @tc.expected: step1. return OK, errcode is CLOUD_NETWORK_ERROR.
1270      */
1271     int localCount = 20;
1272     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1273     g_virtualCloudDb->SetCloudNetworkError(true);
1274     std::atomic<int> count = 0;
__anon947dd0281702(const std::string &tableName, VBucket &extend) 1275     g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1276         if (extend.find("assets") != extend.end() && count == 0) {
1277             extend["#_error"] = static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR);
1278             count++;
1279         }
1280     });
1281     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_NETWORK_ERROR);
1282     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", { 1, 2 }); // 1st, 2nd asset do not fill
1283     g_virtualCloudDb->SetCloudNetworkError(false);
1284     g_virtualCloudDb->ForkUpload(nullptr);
1285     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1286     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1287 
1288     /**
1289      * @tc.steps:step2. local insert data and fork batchinsert return CLOUD_NETWORK_ERROR, then sync.
1290      * @tc.expected: step2. return OK, errcode is CLOUD_ERROR.
1291      */
1292     int addLocalCount = 10;
1293     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
1294     std::atomic<int> num = 0;
__anon947dd0281802(const std::string &tableName, VBucket &extend) 1295     g_virtualCloudDb->ForkUpload([&num](const std::string &tableName, VBucket &extend) {
1296         if (extend.find("assets") != extend.end() && num == 0) {
1297             for (auto &asset : std::get<Assets>(extend["assets"])) {
1298                 asset.name = "pad";
1299                 break;
1300             }
1301             num++;
1302         }
1303     });
1304     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1305     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {41}); // // 41th asset do not fill
1306 }
1307 
1308 /**
1309  * @tc.name: FillAssetId016
1310  * @tc.desc: Test fill assetId and removedevicedata when last data is delete
1311  * @tc.type: FUNC
1312  * @tc.require:
1313  * @tc.author: chenchaohao
1314  */
1315 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId016, TestSize.Level0)
1316 {
1317     /**
1318      * @tc.steps:step1. local insert data and sync, then delete last local data
1319      * @tc.expected: step1. return OK.
1320      */
1321     int localCount = 20;
1322     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1323     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1324     int deletLocalCount = 10;
1325     DeleteLocalRecord(db, deletLocalCount, deletLocalCount, ASSETS_TABLE_NAME);
1326     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1327 
1328     /**
1329      * @tc.steps:step2. RemoveDeviceData.
1330      * @tc.expected: step2. return OK.
1331      */
1332     g_delegate->RemoveDeviceData("", FLAG_ONLY);
1333     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1334 }
1335 
1336 /**
1337  * @tc.name: FillAssetId017
1338  * @tc.desc: Test cursor when download not change
1339  * @tc.type: FUNC
1340  * @tc.require:
1341  * @tc.author: chenchaohao
1342  */
1343 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId017, TestSize.Level0)
1344 {
1345     /**
1346      * @tc.steps:step1. local insert data and sync,check cursor.
1347      * @tc.expected: step1. return OK.
1348      */
1349     int localCount = 20;
1350     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1351     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1352     CheckCursorData(ASSETS_TABLE_NAME, 1);
1353 
1354     /**
1355      * @tc.steps:step2. sync again and optype is not change, check cursor.
1356      * @tc.expected: step2. return OK.
1357      */
1358     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1359     CheckCursorData(ASSETS_TABLE_NAME, localCount + 1);
1360 }
1361 
1362 /**
1363  * @tc.name: FillAssetId018
1364  * @tc.desc: Test if assetId is filled when contains "#_error"
1365  * @tc.type: FUNC
1366  * @tc.require:
1367  * @tc.author: zhaoliang
1368  */
1369 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId018, TestSize.Level0)
1370 {
1371     /**
1372      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1373      * @tc.expected: step1. return OK.
1374      */
1375     int localCount = 30;
1376     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1377     std::atomic<int> count = 0;
__anon947dd0281902(const std::string &tableName, VBucket &extend) 1378     g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1379         if (extend.find("assets") != extend.end() && count == 0) {
1380             extend["#_error"] = std::string("test");
1381             count++;
1382         }
1383     });
1384     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1385     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1386 }
1387 
1388 /**
1389  * @tc.name: DownloadAssetForDupDataTest002
1390  * @tc.desc: Test download failed
1391  * @tc.type: FUNC
1392  * @tc.require:
1393  * @tc.author: bty
1394  */
1395 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest002, TestSize.Level0)
1396 {
1397     /**
1398      * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1399      * @tc.expected: step1. return OK
1400      */
1401     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1402     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1403     int index = 0;
1404     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1405         .WillRepeatedly(
__anon947dd0281a02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1406             [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1407                 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1408                 return DBStatus::CLOUD_ERROR;
1409             });
1410 
1411     /**
1412      * @tc.steps:step2. Insert cloud data [0, 10), sync data
1413      * @tc.expected: step2. sync success.
1414      */
1415     InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME);
1416     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1417 
1418     /**
1419      * @tc.steps:step3. check if the hash of assets in db is empty
1420      * @tc.expected: step3. OK
1421      */
1422     CheckDownloadFailedForTest002(db);
1423 }
1424 
1425 /**
1426  * @tc.name: DownloadAssetForDupDataTest003
1427  * @tc.desc: Test download failed and flag was modified
1428  * @tc.type: FUNC
1429  * @tc.require:
1430  * @tc.author: bty
1431  */
1432 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest003, TestSize.Level0)
1433 {
1434     /**
1435      * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1436      * @tc.expected: step1. return OK
1437      */
1438     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1439     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1440     int index = 0;
1441     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1442         .WillRepeatedly(
__anon947dd0281b02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1443             [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1444                 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1445                 for (auto &item : assets) {
1446                     for (auto &asset : item.second) {
1447                         asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
1448                     }
1449                 }
1450                 return DBStatus::CLOUD_ERROR;
1451             });
1452 
1453     /**
1454      * @tc.steps:step2. Insert cloud data [0, 10), sync data
1455      * @tc.expected: step2. sync success.
1456      */
1457     InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME);
1458     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1459 
1460     /**
1461      * @tc.steps:step3. check if the hash of assets in db is empty
1462      * @tc.expected: step3. OK
1463      */
1464     CheckDownloadFailedForTest002(db);
1465 }
1466 
1467 /**
1468  * @tc.name: DownloadAssetForDupDataTest004
1469  * @tc.desc: test sync with deleted assets
1470  * @tc.type: FUNC
1471  * @tc.require:
1472  * @tc.author: bty
1473  */
1474 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest004, TestSize.Level0)
1475 {
1476     /**
1477      * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1478      * @tc.expected: step1. return OK
1479      */
1480     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1481     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1482     int index = 0;
1483     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1484         .WillRepeatedly(
__anon947dd0281c02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1485             [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1486                 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1487                 return DBStatus::OK;
1488             });
1489 
1490     /**
1491      * @tc.steps:step2. insert local data, update assets status to delete, then insert cloud data
1492      * @tc.expected: step2. return OK
1493      */
1494     InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME); // 10 is num
1495     UpdateAssetsForLocal(db, 1, AssetStatus::DELETE); // 1 is id
1496     UpdateAssetsForLocal(db, 2, AssetStatus::DELETE | AssetStatus::UPLOADING); // 2 is id
1497     InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME); // 10 is num
1498 
1499     /**
1500      * @tc.steps:step3. sync, check download num
1501      * @tc.expected: step3. return OK
1502      */
1503     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1504     EXPECT_GE(index, 2); // 2 is download num
1505 }
1506 
1507 /**
1508  * @tc.name: DownloadAssetForDupDataTest005
1509  * @tc.desc: test DOWNLOADING status of asset after uploading
1510  * @tc.type: FUNC
1511  * @tc.require:
1512  * @tc.author: bty
1513  */
1514 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest005, TestSize.Level0)
1515 {
1516     /**
1517      * @tc.steps:step1. init data and sync
1518      * @tc.expected: step1. return OK
1519      */
1520     InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME); // 10 is num
1521     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1522     UpdateAssetsForLocal(db, 6,  AssetStatus::DOWNLOADING); // 6 is id
1523     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1524 
1525     /**
1526      * @tc.steps:step2. check asset status
1527      * @tc.expected: step2. return OK
1528      */
1529     std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME + " where id = 6;";
1530     sqlite3_stmt *stmt = nullptr;
1531     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1532     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1533         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1534         Type cloudValue;
1535         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
1536         std::vector<uint8_t> assetsBlob;
1537         Assets assets;
1538         ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
1539         ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
1540         ASSERT_EQ(assets.size(), 2u); // 2 is asset num
1541         for (size_t i = 0; i < assets.size(); ++i) {
1542             EXPECT_EQ(assets[i].hash, ASSET_COPY.hash);
1543             EXPECT_EQ(assets[i].status, AssetStatus::NORMAL);
1544         }
1545     }
1546     int errCode;
1547     SQLiteUtils::ResetStatement(stmt, true, errCode);
1548 }
1549 
1550 /**
1551  * @tc.name: FillAssetId019
1552  * @tc.desc: Test the stability of cleaning asset id
1553  * @tc.type: FUNC
1554  * @tc.require:
1555  * @tc.author: bty
1556  */
1557 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId019, TestSize.Level0)
1558 {
1559     /**
1560      * @tc.steps:step1. local insert assets and sync.
1561      * @tc.expected: step1. return OK.
1562      */
1563     int localCount = 20;
1564     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1565     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1566 
1567     /**
1568      * @tc.steps:step2. construct multiple abnormal data_key, then RemoveDeviceData.
1569      * @tc.expected: step2. return OK.
1570      */
1571     std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME)
1572         + " set data_key='999' where data_key>'10';";
1573     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
1574     EXPECT_EQ(g_delegate->RemoveDeviceData("", FLAG_ONLY), OK);
1575 }
1576 
1577 /**
1578  * @tc.name: FillAssetId020
1579  * @tc.desc: Test if assetId is filled when extend(lack of assets/assetId is empty/modify asset info)
1580  * @tc.type: FUNC
1581  * @tc.require:
1582  * @tc.author: zhangtao
1583  */
1584 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId020, TestSize.Level0)
1585 {
1586     CloudSyncConfig config;
1587     config.maxUploadCount = 200; // max upload 200
1588     g_delegate->SetCloudSyncConfig(config);
1589 
1590     /**
1591      * @tc.steps:step1. local insert assets and erase assets extends
1592      * @tc.expected: step1. return OK.
1593      */
1594     int localCount = 50;
1595     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon947dd0281d02(const std::string &tableName, VBucket &extend) 1596     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1597         extend.erase("assets");
1598     });
1599     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1600     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1601 
1602     /**
1603      * @tc.steps:step2. local insert assets and modify assetId to empty
1604      * @tc.expected: step2. return OK.
1605      */
1606     int addLocalCount = 10;
1607     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
__anon947dd0281e02(const std::string &tableName, VBucket &extend) 1608     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1609         if (extend.find("assets") != extend.end()) {
1610             for (auto &asset : std::get<Assets>(extend["assets"])) {
1611                 asset.assetId = "";
1612             }
1613         }
1614     });
1615     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1616     int beginFailFillNum = 101;
1617     int endFailFillNum = 120;
1618     std::set<int> index;
1619     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1620         index.insert(i);
1621     }
1622     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1623 
1624     /**
1625      * @tc.steps:step3. local insert assets and modify assetId info such as asset.name
1626      * @tc.expected: step3. return OK.
1627      */
1628     InsertLocalData(db, localCount + addLocalCount, addLocalCount, ASSETS_TABLE_NAME);
__anon947dd0281f02(const std::string &tableName, VBucket &extend) 1629     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1630         if (extend.find("assets") != extend.end()) {
1631             for (auto &asset : std::get<Assets>(extend["assets"])) {
1632                 asset.name = "mod_pat";
1633             }
1634         }
1635     });
1636     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1637     beginFailFillNum = 121;
1638     endFailFillNum = 140;
1639     std::set<int> newIndex;
1640     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1641         newIndex.insert(i);
1642     }
1643     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", newIndex);
1644 
1645     /**
1646      * @tc.steps:step4. local update assets and sync, check the local assetId.
1647      * @tc.expected: step4. sync success.
1648      */
1649     g_virtualCloudDb->ForkUpload(nullptr);
1650     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1651     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1652 }
1653 
1654 /**
1655  * @tc.name: FillAssetId021
1656  * @tc.desc: Test if local assets missing, one records's assets missing will not mark the whole sync progress failure
1657  * @tc.type: FUNC
1658  * @tc.require:
1659  * @tc.author: zhangtao
1660  */
1661 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId021, TestSize.Level0)
1662 {
1663     CloudSyncConfig config;
1664     config.maxUploadCount = 200; // max upload 200
1665     g_delegate->SetCloudSyncConfig(config);
1666 
1667     /**
1668      * @tc.steps:step1. local insert assets and erase assets extends
1669      * @tc.expected: step1. return OK.
1670      */
1671     int localCount = 50;
1672     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1673 
1674     /**
1675      * @tc.steps:step2. ForkInsertConflict, make one record assets missing during batch insert
1676      * @tc.expected: step2. SyncProgress return OK. One record's assets missing will not block other progress.
1677      */
1678     int uploadFailId = 0;
1679     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon947dd0282002(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1680         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1681         uploadFailId++;
1682         if (uploadFailId == 25) { // 25 is the middle record
1683             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1684             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1685         }
1686         return OK;
1687     });
1688 
1689     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1690     int beginFailFillNum = 49;
1691     int endFailFillNum = 50;
1692     std::set<int> index;
1693     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1694         index.insert(i);
1695     }
1696     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1697     g_virtualCloudDb->ForkUpload(nullptr);
1698 }
1699 
1700 /**
1701  * @tc.name: FillAssetId023
1702  * @tc.desc: Test if BatchUpdate with local assets missing
1703  * @tc.type: FUNC
1704  * @tc.require:
1705  * @tc.author: zhangtao
1706  */
1707 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId023, TestSize.Level0)
1708 {
1709     /**
1710      * @tc.steps:step1. set extend size missing then sync, check the asseid.
1711      * @tc.expected: step1. return OK.
1712      */
1713     int localCount = 50;
1714     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1715     std::atomic<int> count = 1;
1716     g_virtualCloudDb->SetClearExtend(count);
1717     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1718     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1719 
1720     /**
1721      * @tc.steps:step2. set extend size normal and BatchUpdate with local assets missing then sync, check the asseid.
1722      * @tc.expected: step2. return OK.
1723      */
1724     g_virtualCloudDb->SetClearExtend(0);
1725 
1726     int uploadFailId = 0;
1727     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon947dd0282102(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1728         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1729         uploadFailId++;
1730         if (uploadFailId == 25) { // 25 is the middle record
1731             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1732             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1733         }
1734         return OK;
1735     });
1736     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1737     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1738 }
1739 
1740 /**
1741  * @tc.name: FillAssetId024
1742  * @tc.desc: Test if BatchUpdate with multiple local assets missing
1743  * @tc.type: FUNC
1744  * @tc.require:
1745  * @tc.author: zhangtao
1746  */
1747 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId024, TestSize.Level0)
1748 {
1749     /**
1750      * @tc.steps:step1. set extend size missing then sync, check the asseid.
1751      * @tc.expected: step1. return OK.
1752      */
1753     int localCount = 50;
1754     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1755     std::atomic<int> count = 1;
1756     g_virtualCloudDb->SetClearExtend(count);
1757     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1758     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1759 
1760     /**
1761      * @tc.steps:step2. set extend size normal and BatchUpdate with 3 local assets missing then sync, check the asseid.
1762      * @tc.expected: step2. return OK.
1763      */
1764     g_virtualCloudDb->SetClearExtend(0);
1765 
1766     int uploadFailId = 0;
1767     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon947dd0282202(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1768         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1769         uploadFailId++;
1770         if (uploadFailId >= 25 && uploadFailId <= 27) { // 25-27 is the middle record
1771             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1772             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1773         }
1774         return OK;
1775     });
1776     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1777     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1778 }
1779 
1780 /**
1781  * @tc.name: FillAssetId022
1782  * @tc.desc: Test if local assets missing, many records's assets missing will not mark the whole sync progress failure
1783  * @tc.type: FUNC
1784  * @tc.require:
1785  * @tc.author: zhangtao
1786  */
1787 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId022, TestSize.Level0)
1788 {
1789     CloudSyncConfig config;
1790     config.maxUploadCount = 200; // max upload 200
1791     g_delegate->SetCloudSyncConfig(config);
1792 
1793     /**
1794      * @tc.steps:step1. local insert assets and erase assets extends
1795      * @tc.expected: step1. return OK.
1796      */
1797     int localCount = 50;
1798     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1799 
1800     /**
1801      * @tc.steps:step2. ForkInsertConflict, make one record assets missing during batch insert
1802      * @tc.expected: step2. SyncProgress return OK. One record's assets missing will not block other progress.
1803      */
1804     int uploadFailId = 0;
1805     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon947dd0282302(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1806         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1807         uploadFailId++;
1808         if (uploadFailId >= 25 && uploadFailId <= 27) { // 25-27 is the middle record
1809             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1810             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1811         }
1812         return OK;
1813     });
1814 
1815     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1816     int beginFailFillNum = 49;
1817     int endFailFillNum = 54;
1818     std::set<int> index;
1819     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1820         index.insert(i);
1821     }
1822     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1823     g_virtualCloudDb->ForkUpload(nullptr);
1824 }
1825 
1826 /**
1827  * @tc.name: ConsistentFlagTest001
1828  * @tc.desc:Assets are the different, check the 0x20 bit of flag after sync
1829  * @tc.type: FUNC
1830  * @tc.require:
1831  * @tc.author: bty
1832  */
1833 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest001, TestSize.Level0)
1834 {
1835     /**
1836      * @tc.steps:step1. init data for the different asset, sync and check flag
1837      * @tc.expected: step1. return OK.
1838      */
1839     int localCount = 10; // 10 is num of local
1840     int cloudCount = 20; // 20 is num of cloud
1841     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1842     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1843     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1844     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1845     CheckConsistentCount(db, cloudCount);
1846 
1847     /**
1848      * @tc.steps:step2. update local data, sync and check flag
1849      * @tc.expected: step2. return OK.
1850      */
1851     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1852     DeleteCloudDBData(1, 1, ASSETS_TABLE_NAME);
1853     CheckConsistentCount(db, 0L);
1854     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1855     CheckConsistentCount(db, cloudCount);
1856 }
1857 
1858 /**
1859  * @tc.name: ConsistentFlagTest002
1860  * @tc.desc: Assets are the same, check the 0x20 bit of flag after sync
1861  * @tc.type: FUNC
1862  * @tc.require:
1863  * @tc.author: bty
1864  */
1865 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest002, TestSize.Level0)
1866 {
1867     /**
1868      * @tc.steps:step1. init data for the same asset, sync and check flag
1869      * @tc.expected: step1. return OK.
1870      */
1871     int cloudCount = 20; // 20 is num of cloud
1872     InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
1873     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1874     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1875     CheckConsistentCount(db, cloudCount);
1876 
1877     /**
1878      * @tc.steps:step2. update local data, sync and check flag
1879      * @tc.expected: step2. return OK.
1880      */
1881     int deleteLocalCount = 5;
1882     DeleteLocalRecord(db, 0, deleteLocalCount, ASSETS_TABLE_NAME);
1883     CheckConsistentCount(db, cloudCount - deleteLocalCount);
1884     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1885     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1886     CheckConsistentCount(db, cloudCount);
1887 }
1888 
1889 /**
1890  * @tc.name: ConsistentFlagTest003
1891  * @tc.desc: Download returns a conflict, check the 0x20 bit of flag after sync
1892  * @tc.type: FUNC
1893  * @tc.require:
1894  * @tc.author: bty
1895  */
1896 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest003, TestSize.Level0)
1897 {
1898     /**
1899      * @tc.steps:step1. init data
1900      * @tc.expected: step1. return OK.
1901      */
1902     int localCount = 20; // 20 is num of local
1903     int cloudCount = 10; // 10 is num of cloud
1904     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1905     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1906     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1907 
1908     /**
1909      * @tc.steps:step2. fork download, return CLOUD_RECORD_EXIST_CONFLICT once
1910      * @tc.expected: step2. return OK.
1911      */
1912     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1913     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1914     int index = 0;
1915     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1916         .WillRepeatedly(
__anon947dd0282402(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1917             [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1918                 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
1919                 if (index == 1) { // 1 is first download
1920                     return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
1921                 }
1922                 return DBStatus::OK;
1923             });
1924 
1925     /**
1926      * @tc.steps:step3. fork upload, check consistent count
1927      * @tc.expected: step3. return OK.
1928      */
1929     int upIdx = 0;
__anon947dd0282502(const std::string &tableName, VBucket &extend) 1930     g_virtualCloudDb->ForkUpload([this, localCount, cloudCount, &upIdx](const std::string &tableName, VBucket &extend) {
1931         LOGD("upload index:%d", ++upIdx);
1932         if (upIdx == 1) { // 1 is first upload
1933             CheckConsistentCount(db, localCount - cloudCount - 1);
1934         }
1935     });
1936 
1937     /**
1938      * @tc.steps:step4. fork query, check consistent count
1939      * @tc.expected: step4. return OK.
1940      */
1941     int queryIdx = 0;
__anon947dd0282602(const std::string &, VBucket &) 1942     g_virtualCloudDb->ForkQuery([this, localCount, &queryIdx](const std::string &, VBucket &) {
1943         LOGD("query index:%d", ++queryIdx);
1944         if (queryIdx == 3) { // 3 is the last query
1945             CheckConsistentCount(db, localCount - 1);
1946         }
1947     });
1948     int count = 0;
__anon947dd0282702() 1949     g_cloudStoreHook->SetSyncFinishHook([&count]() {
1950         count++;
1951         if (count == 2) { // 2 is compensated sync
1952             g_processCondition.notify_one();
1953         }
1954     });
1955     /**
1956      * @tc.steps:step5. sync, check consistent count
1957      * @tc.expected: step5. return OK.
1958      */
1959     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1960     WaitForSync(count);
1961     CheckConsistentCount(db, localCount);
1962 }
1963 
1964 /**
1965  * @tc.name: ConsistentFlagTest004
1966  * @tc.desc: Upload returns error, check the 0x20 bit of flag after sync
1967  * @tc.type: FUNC
1968  * @tc.require:
1969  * @tc.author: bty
1970  */
1971 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest004, TestSize.Level0)
1972 {
1973     /**
1974      * @tc.steps:step1. init data
1975      * @tc.expected: step1. return OK.
1976      */
1977     int localCount = 20; // 20 is num of local
1978     int cloudCount = 10; // 10 is num of cloud
1979     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1980     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1981     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1982 
1983     /**
1984      * @tc.steps:step2. fork upload, not return error filed of CLOUD_NETWORK_ERROR
1985      * @tc.expected: step2. return OK.
1986      */
1987     int upIdx = 0;
__anon947dd0282802(const std::string &tableName, VBucket &extend) 1988     g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
1989         LOGD("upload index:%d", ++upIdx);
1990         if (upIdx == 1) {
1991             extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR));
1992         }
1993     });
1994 
1995     /**
1996      * @tc.steps:step3. sync, check consistent count
1997      * @tc.expected: step3. return OK.
1998      */
1999     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2000     CheckConsistentCount(db, localCount - 1);
2001 
2002     /**
2003      * @tc.steps:step4. update local data, fork upload, return error filed of type int64_t
2004      * @tc.expected: step4. return OK.
2005      */
2006     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2007     upIdx = 0;
__anon947dd0282902(const std::string &tableName, VBucket &extend) 2008     g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
2009         LOGD("upload index:%d", ++upIdx);
2010         if (upIdx == 1) {
2011             int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2012             extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
2013         }
2014         if (upIdx == 2) {
2015             int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT + 1;
2016             extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
2017         }
2018     });
2019 
2020     /**
2021      * @tc.steps:step5. sync, check consistent count
2022      * @tc.expected: step5. return OK.
2023      */
2024     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2025     CheckConsistentCount(db, localCount - 2);
2026 }
2027 
2028 /**
2029  * @tc.name: ConsistentFlagTest005
2030  * @tc.desc: Local data changes during download, check the 0x20 bit of flag after sync
2031  * @tc.type: FUNC
2032  * @tc.require:
2033  * @tc.author: bty
2034  */
2035 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest005, TestSize.Level0)
2036 {
2037     /**
2038      * @tc.steps:step1. init data
2039      * @tc.expected: step1. return OK.
2040      */
2041     int localCount = 20; // 20 is num of local
2042     int cloudCount = 10; // 10 is num of cloud
2043     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2044     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2045     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2046 
2047     /**
2048      * @tc.steps:step2. fork download, update local assets where id=2
2049      * @tc.expected: step2. return OK.
2050      */
2051     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2052     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2053     int index = 0;
2054     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2055         .WillRepeatedly(
2056             [this, &index](const std::string &, const std::string &gid, const Type &,
__anon947dd0282a02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2057                 std::map<std::string, Assets> &assets) {
2058                 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2059                 if (index == 1) { // 1 is first download
2060                     std::string sql = "UPDATE " + ASSETS_TABLE_NAME + " SET assets=NULL where id=2;";
2061                     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
2062                 }
2063                 return DBStatus::OK;
2064             });
2065 
2066     /**
2067      * @tc.steps:step3. fork upload, check consistent count
2068      * @tc.expected: step3. return OK.
2069      */
2070     int upIdx = 0;
__anon947dd0282b02(const std::string &tableName, VBucket &extend) 2071     g_virtualCloudDb->ForkUpload([this, localCount, cloudCount, &upIdx](const std::string &tableName, VBucket &extend) {
2072         LOGD("upload index:%d", ++upIdx);
2073         if (upIdx == 1) { // 1 is first upload
2074             CheckConsistentCount(db, localCount - cloudCount - 1);
2075         }
2076     });
2077 
2078     /**
2079      * @tc.steps:step4. sync, check consistent count
2080      * @tc.expected: step4. return OK.
2081      */
2082     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2083     CheckConsistentCount(db, localCount);
2084 }
2085 
2086 /**
2087  * @tc.name: ConsistentFlagTest006
2088  * @tc.desc:
2089  * @tc.type: FUNC
2090  * @tc.require:
2091  * @tc.author: bty
2092  */
2093 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest006, TestSize.Level0)
2094 {
2095     /**
2096      * @tc.steps:step1. init data
2097      * @tc.expected: step1. return OK.
2098      */
2099     int cloudCount = 10; // 10 is num of cloud
2100     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2101     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2102 
2103     /**
2104      * @tc.steps:step2. fork download, update local assets where id=2
2105      * @tc.expected: step2. return OK.
2106      */
2107     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2108     std::this_thread::sleep_for(std::chrono::milliseconds(1));
2109     int delCount = 3; // 3 is num of cloud
2110     DeleteCloudDBData(1, delCount, ASSETS_TABLE_NAME);
2111     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2112     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2113     int index = 0;
2114     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2115         .WillRepeatedly(
2116             [&index](const std::string &, const std::string &gid, const Type &,
__anon947dd0282c02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2117                 std::map<std::string, Assets> &assets) {
2118                 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2119                 if (index == 1) { // 1 is first download
2120                     return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2121                 }
2122                 return DBStatus::OK;
2123             });
2124 
2125     /**
2126      * @tc.steps:step3. fork upload, check consistent count
2127      * @tc.expected: step3. return OK.
2128      */
2129     int upIdx = 0;
__anon947dd0282d02(const std::string &tableName, VBucket &extend) 2130     g_virtualCloudDb->ForkUpload([this, delCount, &upIdx](const std::string &tableName, VBucket &extend) {
2131         LOGD("upload index:%d", ++upIdx);
2132         if (upIdx == 1) { // 1 is first upload
2133             CheckConsistentCount(db, delCount);
2134             CheckCompensatedCount(db, 0L);
2135         }
2136     });
2137 
2138     /**
2139      * @tc.steps:step4. sync, check consistent count
2140      * @tc.expected: step4. return OK.
2141      */
2142     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2143     CheckConsistentCount(db, cloudCount);
2144 }
2145 
2146 /**
2147  * @tc.name: SyncDataStatusTest001
2148  * @tc.desc: No need to download asset, check status after sync
2149  * @tc.type: FUNC
2150  * @tc.require:
2151  * @tc.author: bty
2152  */
2153 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest001, TestSize.Level0)
2154 {
2155     DataStatusTest001(false);
2156 }
2157 
2158 /**
2159  * @tc.name: SyncDataStatusTest002
2160  * @tc.desc: Need to download asset, check status after sync
2161  * @tc.type: FUNC
2162  * @tc.require:
2163  * @tc.author: bty
2164  */
2165 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest002, TestSize.Level0)
2166 {
2167     DataStatusTest001(true);
2168 }
2169 
2170 /**
2171  * @tc.name: SyncDataStatusTest003
2172  * @tc.desc: Lock during download and check status
2173  * @tc.type: FUNC
2174  * @tc.require:
2175  * @tc.author: bty
2176  */
2177 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest003, TestSize.Level0)
2178 {
2179     DataStatusTest003();
2180 }
2181 
2182 /**
2183  * @tc.name: SyncDataStatusTest004
2184  * @tc.desc: Lock and delete during download, check status
2185  * @tc.type: FUNC
2186  * @tc.require:
2187  * @tc.author: bty
2188  */
2189 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest004, TestSize.Level0)
2190 {
2191     DataStatusTest004();
2192 }
2193 
2194 /**
2195  * @tc.name: SyncDataStatusTest005
2196  * @tc.desc: Lock and update during download, check status
2197  * @tc.type: FUNC
2198  * @tc.require:
2199  * @tc.author: bty
2200  */
2201 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest005, TestSize.Level0)
2202 {
2203     DataStatusTest005();
2204 }
2205 
2206 /**
2207  * @tc.name: SyncDataStatusTest006
2208  * @tc.desc: Lock and update and Unlock during download, check status
2209  * @tc.type: FUNC
2210  * @tc.require:
2211  * @tc.author: bty
2212  */
2213 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest006, TestSize.Level0)
2214 {
2215     DataStatusTest006();
2216 }
2217 
2218 /**
2219  * @tc.name: SyncDataStatusTest007
2220  * @tc.desc: Download return error, check status
2221  * @tc.type: FUNC
2222  * @tc.require:
2223  * @tc.author: bty
2224  */
2225 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest007, TestSize.Level0)
2226 {
2227     DataStatusTest007();
2228 }
2229 
2230 /**
2231  * @tc.name: SyncDataStatusTest008
2232  * @tc.desc: Test upload process when data locked
2233  * @tc.type: FUNC
2234  * @tc.require:
2235  * @tc.author: bty
2236  */
2237 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest008, TestSize.Level0)
2238 {
2239     /**
2240      * @tc.steps:step1. init local data
2241      * @tc.expected: step1. return OK.
2242      */
2243     int localCount = 40;
2244     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2245     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2246     std::string sql = "update " + logName + " SET status = 2 where data_key >=20;";
2247     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
2248 
2249     /**
2250      * @tc.steps:step2. sync and check process
2251      * @tc.expected: step2. return OK.
2252      */
2253     g_syncProcess = {};
2254     Query query = Query::Select().FromTable({ ASSETS_TABLE_NAME });
2255     std::vector<TableProcessInfo> expectProcess = {
2256         { PROCESSING, { 0, 0, 0, 0 }, { 0, 0, 0, 0 } },
2257         { FINISHED, { 0, 0, 0, 0 }, { 1, 40, 40, 0 } } // 1 is index, 40 is count
2258     };
2259     int index = 0;
2260     CloudSyncConfig config;
2261     config.maxUploadCount = 100; // max upload 100
2262     g_delegate->SetCloudSyncConfig(config);
__anon947dd0282e02(const std::map<std::string, SyncProcess> &process) 2263     CloudSyncStatusCallback callback = [&index, &expectProcess](const std::map<std::string, SyncProcess> &process) {
2264         g_syncProcess = std::move(process.begin()->second);
2265         ASSERT_LT(index, 2);
2266         for (const auto &[tableName, info]: g_syncProcess.tableProcess) {
2267             EXPECT_EQ(info.process, expectProcess[index].process);
2268             EXPECT_EQ(info.upLoadInfo.batchIndex, expectProcess[index].upLoadInfo.batchIndex);
2269             EXPECT_EQ(info.upLoadInfo.total, expectProcess[index].upLoadInfo.total);
2270             EXPECT_EQ(info.upLoadInfo.successCount, expectProcess[index].upLoadInfo.successCount);
2271             EXPECT_EQ(tableName, ASSETS_TABLE_NAME);
2272         }
2273         index++;
2274         if (g_syncProcess.process == FINISHED) {
2275             g_processCondition.notify_one();
2276             ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2277         }
2278     };
2279     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, SYNC_WAIT_TIME), OK);
2280     WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
2281 }
2282 
2283 /**
2284  * @tc.name: DownloadAssetTest001
2285  * @tc.desc: Test the asset status after the share table sync
2286  * @tc.type: FUNC
2287  * @tc.require:
2288  * @tc.author: bty
2289  */
2290 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest001, TestSize.Level0)
2291 {
2292     /**
2293      * @tc.steps:step1. init data and sync
2294      * @tc.expected: step1. return OK.
2295      */
2296     int cloudCount = 10; // 10 is num of cloud
2297     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME_SHARED);
2298     CallSync({ASSETS_TABLE_NAME_SHARED}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2299 
2300     /**
2301      * @tc.steps:step2. check asset status
2302      * @tc.expected: step2. return OK.
2303      */
2304     SqlCondition condition;
2305     condition.sql = "select assets from " + ASSETS_TABLE_NAME_SHARED + " where _rowid_ = 1;";
2306     condition.readOnly = true;
2307     std::vector<VBucket> records;
2308     EXPECT_EQ(g_delegate->ExecuteSql(condition, records), OK);
2309     for (const auto &data: records) {
2310         Assets assets;
2311         CloudStorageUtils::GetValueFromVBucket(COL_ASSETS, data, assets);
2312         for (const auto &asset: assets) {
2313             EXPECT_EQ(asset.status, AssetStatus::NORMAL);
2314         }
2315     }
2316 }
2317 
2318 /**
2319  * @tc.name: DownloadAssetTest002
2320  * @tc.desc: Test asset download failed and re download
2321  * @tc.type: FUNC
2322  * @tc.require:
2323  * @tc.author: liaoyonghuang
2324  */
2325 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest002, TestSize.Level0)
2326 {
2327     /**
2328      * @tc.steps:step1. init data
2329      * @tc.expected: step1. return OK.
2330      */
2331     int cloudCount = 10; // 10 is num of cloud
2332     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2333 
2334     /**
2335      * @tc.steps:step2. Set asset download status error and sync
2336      * @tc.expected: step2. sync successful but download assets fail.
2337      */
2338     g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2339     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2340 
2341     /**
2342      * @tc.steps:step3. Set asset download status OK and sync
2343      * @tc.expected: step3. return OK.
2344      */
2345     g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
2346     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2347 
2348     /**
2349      * @tc.steps:step4. Check assets status
2350      * @tc.expected: step4. status is NORMAL.
2351      */
2352     std::string sql = "SELECT assets FROM " + ASSETS_TABLE_NAME + ";";
2353     sqlite3_stmt *stmt = nullptr;
2354     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
2355     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2356         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
2357         Type cloudValue;
2358         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
2359         Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
2360         for (const auto &asset : assets) {
2361             EXPECT_EQ(asset.status, AssetStatus::NORMAL);
2362         }
2363     }
2364     int errCode = E_OK;
2365     SQLiteUtils::ResetStatement(stmt, true, errCode);
2366     EXPECT_EQ(errCode, E_OK);
2367 }
2368 
2369 /**
2370  * @tc.name: RecordLockFuncTest001
2371  * @tc.desc: UNLOCKING->UNLOCKING Synchronous download failure wholly.
2372  * @tc.type: FUNC
2373  * @tc.author: lijun
2374  */
2375 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, RecordLockFuncTest001, TestSize.Level0)
2376 {
2377     /**
2378      * @tc.steps:step1. init local data
2379      * @tc.expected: step1. return OK.
2380      */
2381     int localCount = 100;
2382     int cloudCount = 100;
2383     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2384     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2385     std::string sql = "update " + logName + " SET status = 2 where data_key >=70;";
2386     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
2387     CheckLockStatus(db, 0, 69, LockStatus::UNLOCK);
2388     CheckLockStatus(db, 70, 99, LockStatus::LOCK);
2389     DeleteLocalRecord(db, 70, 30, ASSETS_TABLE_NAME);
2390 
2391     /**
2392      * @tc.steps:step2. init cloud data
2393      * @tc.expected: step2. return OK.
2394      */
2395     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2396     UpdateCloudDBData(0, 70, 0, 0, ASSETS_TABLE_NAME);
2397 
2398     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2399     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2400     int index = 0;
2401     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2402         .WillRepeatedly(
__anon947dd0282f02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2403             [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
2404                 LOGD("Download GID:%s  %d", gid.c_str(), index);
2405                 index++;
2406                 if (index <= 30) {
2407                     return DBStatus::CLOUD_ERROR;
2408                 } else {
2409                     return DBStatus::OK;
2410                 }
2411 
2412             });
2413 
2414     std::mutex mtx;
2415     std::condition_variable cv;
2416     int queryIdx = 0;
__anon947dd0283002(const std::string &, VBucket &) 2417     g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2418         LOGD("query index:%d", ++queryIdx);
2419         if (queryIdx == 2) { // 2 is compensated sync
2420             mtx.lock();
2421             cv.notify_one();
2422             mtx.unlock();
2423             std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2424         }
2425     });
2426     g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2427     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2428 
2429     {
2430         std::unique_lock<std::mutex> lock(mtx);
2431         cv.wait(lock);
2432     }
2433     g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
2434 
2435     /**
2436      * @tc.steps:step3. check before compensated sync
2437      * @tc.expected: 70-99 is UNLOCKING.
2438      */
2439     CheckLockStatus(db, 0, 69, LockStatus::UNLOCK);
2440     CheckLockStatus(db, 70, 99, LockStatus::UNLOCKING);
2441 
2442     std::this_thread::sleep_for(std::chrono::seconds(3));
2443     /**
2444      * @tc.steps:step4. check after compensated sync
2445      * @tc.expected: all is UNLOCKING.
2446      */
2447     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2448 }
2449 } // namespace
2450 #endif // RELATIONAL_STORE
2451