• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "cloud/asset_operation_utils.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "mock_asset_loader.h"
23 #include "process_system_api_adapter_impl.h"
24 #include "relational_store_client.h"
25 #include "relational_store_delegate_impl.h"
26 #include "relational_store_instance.h"
27 #include "relational_store_manager.h"
28 #include "runtime_config.h"
29 #include "sqlite_relational_store.h"
30 #include "sqlite_relational_utils.h"
31 #include "time_helper.h"
32 #include "virtual_asset_loader.h"
33 #include "virtual_cloud_data_translate.h"
34 #include "virtual_cloud_db.h"
35 #include "virtual_communicator_aggregator.h"
36 #include <gtest/gtest.h>
37 #include <iostream>
38 
39 using namespace testing::ext;
40 using namespace DistributedDB;
41 using namespace DistributedDBUnitTest;
42 using namespace std;
43 
44 namespace {
45 const string STORE_ID = "Relational_Store_SYNC";
46 const string DB_SUFFIX = ".db";
47 const string ASSETS_TABLE_NAME = "student";
48 const string ASSETS_TABLE_NAME_SHARED = "student_shared";
49 const string NO_PRIMARY_TABLE = "teacher";
50 const string NO_PRIMARY_TABLE_SHARED = "teacher_shared";
51 const string COMPOUND_PRIMARY_TABLE = "worker1";
52 const string COMPOUND_PRIMARY_TABLE_SHARED = "worker1_shared";
53 const string DEVICE_CLOUD = "cloud_dev";
54 const string COL_ID = "id";
55 const string COL_NAME = "name";
56 const string COL_HEIGHT = "height";
57 const string COL_ASSET = "asset";
58 const string COL_ASSETS = "assets";
59 const string COL_AGE = "age";
60 const int64_t SYNC_WAIT_TIME = 600;
61 const int64_t COMPENSATED_SYNC_WAIT_TIME = 5;
62 const std::vector<Field> CLOUD_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true}, {COL_NAME, TYPE_INDEX<std::string>},
63     {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
64     {COL_AGE, TYPE_INDEX<int64_t>}};
65 const std::vector<Field> NO_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>}, {COL_NAME, TYPE_INDEX<std::string>},
66     {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
67     {COL_AGE, TYPE_INDEX<int64_t>}};
68 const std::vector<Field> COMPOUND_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true},
69     {COL_NAME, TYPE_INDEX<std::string>}, {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>},
70     {COL_ASSETS, TYPE_INDEX<Assets>}, {COL_AGE, TYPE_INDEX<int64_t>, true}};
71 const string CREATE_SINGLE_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + ASSETS_TABLE_NAME + "(" + COL_ID +
72     " INTEGER PRIMARY KEY," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," +
73     COL_ASSETS + " ASSETS," + COL_AGE + " INT);";
74 const string CREATE_NO_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + NO_PRIMARY_TABLE + "(" + COL_ID +
75     " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS +
76     " ASSETS," + COL_AGE + " INT);";
77 const string CREATE_COMPOUND_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + COMPOUND_PRIMARY_TABLE + "(" + COL_ID +
78     " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS + " ASSETS," +
79     COL_AGE + " INT, PRIMARY KEY (id, age));";
80 const Asset ASSET_COPY = {.version = 1,
81     .name = "Phone",
82     .assetId = "0",
83     .subpath = "/local/sync",
84     .uri = "/local/sync",
85     .modifyTime = "123456",
86     .createTime = "",
87     .size = "256",
88     .hash = "ASE"};
89 const Asset ASSET_COPY2 = {.version = 1,
90     .name = "Phone_copy_2",
91     .assetId = "0",
92     .subpath = "/local/sync",
93     .uri = "/local/sync",
94     .modifyTime = "123456",
95     .createTime = "",
96     .size = "256",
97     .hash = "ASE"};
98 const Assets ASSETS_COPY1 = { ASSET_COPY, ASSET_COPY2 };
99 const std::string QUERY_CONSISTENT_SQL = "select count(*) from naturalbase_rdb_aux_student_log where flag&0x20=0;";
100 const std::string QUERY_COMPENSATED_SQL = "select count(*) from naturalbase_rdb_aux_student_log where flag&0x10!=0;";
101 
102 string g_storePath;
103 string g_testDir;
104 RelationalStoreObserverUnitTest *g_observer = nullptr;
105 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
106 RelationalStoreDelegate *g_delegate = nullptr;
107 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
108 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
109 std::shared_ptr<VirtualCloudDataTranslate> g_virtualCloudDataTranslate;
110 SyncProcess g_syncProcess;
111 std::condition_variable g_processCondition;
112 std::mutex g_processMutex;
113 IRelationalStore *g_store = nullptr;
114 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
115 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
116 
InitDatabase(sqlite3 * & db)117 void InitDatabase(sqlite3 *&db)
118 {
119     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
120     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_SINGLE_PRIMARY_KEY_TABLE), SQLITE_OK);
121     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_NO_PRIMARY_KEY_TABLE), SQLITE_OK);
122     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_COMPOUND_PRIMARY_KEY_TABLE), SQLITE_OK);
123 }
124 
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)125 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
126 {
127     TableSchema assetsTableSchema = {.name = ASSETS_TABLE_NAME, .sharedTableName = ASSETS_TABLE_NAME_SHARED,
128                                      .fields = CLOUD_FIELDS};
129     dataBaseSchema.tables.push_back(assetsTableSchema);
130     assetsTableSchema = {.name = NO_PRIMARY_TABLE, .sharedTableName = NO_PRIMARY_TABLE_SHARED,
131                          .fields = NO_PRIMARY_FIELDS};
132     dataBaseSchema.tables.push_back(assetsTableSchema);
133     assetsTableSchema = {.name = COMPOUND_PRIMARY_TABLE, .sharedTableName = COMPOUND_PRIMARY_TABLE_SHARED,
134                          .fields = COMPOUND_PRIMARY_FIELDS};
135     dataBaseSchema.tables.push_back(assetsTableSchema);
136 }
137 
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,bool hasAsset)138 std::pair<std::vector<VBucket>, std::vector<VBucket>> GenerateDataRecords(int64_t begin, int64_t count,
139     int64_t gidStart, bool hasAsset)
140 {
141     std::pair<std::vector<VBucket>, std::vector<VBucket>> res;
142     auto &[record, extend] = res;
143     for (int64_t i = begin; i < begin + count; i++) {
144         Assets assets;
145         if (hasAsset) {
146             Asset asset = ASSET_COPY;
147             asset.name = ASSET_COPY.name + std::to_string(i);
148             assets.emplace_back(asset);
149             asset.name = ASSET_COPY.name + std::to_string(i) + "_copy";
150             assets.emplace_back(asset);
151         }
152         VBucket data;
153         data.insert_or_assign(COL_ID, i);
154         data.insert_or_assign(COL_NAME, "name" + std::to_string(i));
155         data.insert_or_assign(COL_HEIGHT, 166.0 * i); // 166.0 is random double value
156         data.insert_or_assign(COL_ASSETS, assets);
157         data.insert_or_assign(COL_AGE, 18L + i); // 18 is random int value
158         record.push_back(data);
159 
160         VBucket log;
161         Timestamp now = TimeHelper::GetSysCurrentTime();
162         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
163         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
164         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
165         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
166         extend.push_back(log);
167     }
168     return res;
169 }
170 
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,std::vector<VBucket> & record,std::vector<VBucket> & extend)171 void GenerateDataRecords(
172     int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend)
173 {
174     std::tie(record, extend) = GenerateDataRecords(begin, count, gidStart, true);
175 }
176 
InsertLocalData(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName,bool isAssetNull=true)177 void InsertLocalData(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName, bool isAssetNull = true)
178 {
179     int errCode;
180     std::vector<VBucket> record;
181     std::vector<VBucket> extend;
182     GenerateDataRecords(begin, count, 0, record, extend);
183     const string sql = "insert or replace into " + tableName + " values (?,?,?,?,?,?);";
184     for (VBucket vBucket : record) {
185         sqlite3_stmt *stmt = nullptr;
186         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
187         ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, std::get<int64_t>(vBucket[COL_ID])), E_OK); // 1 is id
188         ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, std::get<string>(vBucket[COL_NAME])), E_OK); // 2 is name
189         ASSERT_EQ(SQLiteUtils::MapSQLiteErrno(
190             sqlite3_bind_double(stmt, 3, std::get<double>(vBucket[COL_HEIGHT]))), E_OK); // 3 is height
191         if (isAssetNull) {
192             ASSERT_EQ(sqlite3_bind_null(stmt, 4), SQLITE_OK); // 4 is asset
193         } else {
194             std::vector<uint8_t> assetBlob = g_virtualCloudDataTranslate->AssetToBlob(ASSET_COPY);
195             ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 4, assetBlob, false), E_OK); // 4 is asset
196         }
197         std::vector<uint8_t> assetsBlob = g_virtualCloudDataTranslate->AssetsToBlob(
198             std::get<Assets>(vBucket[COL_ASSETS]));
199         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 5, assetsBlob, false), E_OK); // 5 is assets
200         ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 6, std::get<int64_t>(vBucket[COL_AGE])), E_OK); // 6 is age
201         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
202         SQLiteUtils::ResetStatement(stmt, true, errCode);
203     }
204 }
205 
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets,bool isEmptyAssets=false)206 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets, bool isEmptyAssets = false)
207 {
208     int errCode;
209     std::vector<uint8_t> assetBlob;
210     const string sql = "update " + tableName + " set assets=?;";
211     sqlite3_stmt *stmt = nullptr;
212     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
213     if (isEmptyAssets) {
214         ASSERT_EQ(sqlite3_bind_null(stmt, 1), SQLITE_OK);
215     } else {
216         assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
217         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
218     }
219     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
220     SQLiteUtils::ResetStatement(stmt, true, errCode);
221 }
222 
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets,int32_t begin,int32_t end)223 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets, int32_t begin, int32_t end)
224 {
225     int errCode;
226     std::vector<uint8_t> assetBlob;
227     const string sql = "update " + tableName + " set assets=? " + "where id>=" + std::to_string(begin) +
228         " and id<=" + std::to_string(end) + ";";
229     sqlite3_stmt *stmt = nullptr;
230     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
231     assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
232     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
233     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
234     SQLiteUtils::ResetStatement(stmt, true, errCode);
235 }
236 
DeleteLocalRecord(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName)237 void DeleteLocalRecord(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName)
238 {
239     ASSERT_NE(db, nullptr);
240     for (int64_t i = begin; i < begin + count; i++) {
241         string sql = "DELETE FROM " + tableName + " WHERE id ='" + std::to_string(i) + "';";
242         ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db, sql), E_OK);
243     }
244 }
245 
DeleteCloudDBData(int64_t begin,int64_t count,const std::string & tableName)246 void DeleteCloudDBData(int64_t begin, int64_t count, const std::string &tableName)
247 {
248     for (int64_t i = begin; i < begin + count; i++) {
249         VBucket idMap;
250         idMap.insert_or_assign("#_gid", std::to_string(i));
251         ASSERT_EQ(g_virtualCloudDb->DeleteByGid(tableName, idMap), DBStatus::OK);
252     }
253 }
254 
UpdateCloudDBData(int64_t begin,int64_t count,int64_t gidStart,int64_t versionStart,const std::string & tableName)255 void UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart, int64_t versionStart,
256     const std::string &tableName)
257 {
258     std::this_thread::sleep_for(std::chrono::milliseconds(1));
259     std::vector<VBucket> record;
260     std::vector<VBucket> extend;
261     GenerateDataRecords(begin, count, gidStart, record, extend);
262     for (auto &entry: extend) {
263         entry[CloudDbConstant::VERSION_FIELD] = std::to_string(versionStart++);
264     }
265     ASSERT_EQ(g_virtualCloudDb->BatchUpdate(tableName, std::move(record), extend), DBStatus::OK);
266     std::this_thread::sleep_for(std::chrono::milliseconds(1));
267 }
268 
QueryStatusCallback(void * data,int count,char ** colValue,char ** colName)269 int QueryStatusCallback(void *data, int count, char **colValue, char **colName)
270 {
271     auto status = static_cast<std::vector<int64_t> *>(data);
272     const int decimal = 10;
273     for (int i = 0; i < count; i++) {
274         status->push_back(strtol(colValue[0], nullptr, decimal));
275     }
276     return 0;
277 }
278 
CheckLockStatus(sqlite3 * db,int startId,int endId,LockStatus lockStatus)279 void CheckLockStatus(sqlite3 *db, int startId, int endId, LockStatus lockStatus)
280 {
281     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
282     std::string sql = "select status from " + logName + " where data_key >=" + std::to_string(startId) +
283         " and data_key <=" +  std::to_string(endId) + ";";
284     std::vector<int64_t> status;
285     char *str = NULL;
286     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryStatusCallback, static_cast<void *>(&status), &str),
287         SQLITE_OK);
288     ASSERT_EQ(static_cast<size_t>(endId - startId + 1), status.size());
289 
290     for (auto stat : status) {
291         ASSERT_EQ(static_cast<int64_t>(lockStatus), stat);
292     }
293 }
294 
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const std::string & tableName,bool hasAsset=true)295 void InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart, const std::string &tableName,
296     bool hasAsset = true)
297 {
298     auto [record, extend] = GenerateDataRecords(begin, count, gidStart, hasAsset);
299     if (tableName == ASSETS_TABLE_NAME_SHARED) {
300         for (auto &vBucket: record) {
301             vBucket.insert_or_assign(CloudDbConstant::CLOUD_OWNER, std::string("cloudA"));
302         }
303     }
304     ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(tableName, std::move(record), extend), DBStatus::OK);
305 }
306 
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)307 void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
308 {
309     std::unique_lock<std::mutex> lock(g_processMutex);
310     bool result = g_processCondition.wait_for(
311         lock, std::chrono::seconds(waitTime), [&syncProcess]() { return syncProcess.process == FINISHED; });
312     ASSERT_EQ(result, true);
313     LOGD("-------------------sync end--------------");
314 }
315 
CallSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus,DBStatus errCode=OK,bool isMerge=false)316 void CallSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus, DBStatus errCode = OK,
317     bool isMerge = false)
318 {
319     g_syncProcess = {};
320     Query query = Query::Select().FromTable(tableNames);
321     std::vector<SyncProcess> expectProcess;
322     CloudSyncStatusCallback callback = [&errCode](const std::map<std::string, SyncProcess> &process) {
323         ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
324         std::unique_lock<std::mutex> lock(g_processMutex);
325         g_syncProcess = process.begin()->second;
326         if (g_syncProcess.process == FINISHED) {
327             g_processCondition.notify_one();
328             ASSERT_EQ(g_syncProcess.errCode, errCode);
329         }
330     };
331     CloudSyncOption option;
332     option.devices = {DEVICE_CLOUD};
333     option.mode = mode;
334     option.query = query;
335     option.waitTime = SYNC_WAIT_TIME;
336     option.lockAction = static_cast<LockAction>(0xff); // lock all
337     option.merge = isMerge;
338     ASSERT_EQ(g_delegate->Sync(option, callback), dbStatus);
339 
340     if (dbStatus == DBStatus::OK) {
341         WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
342     }
343 }
344 
CheckDownloadForTest001(int index,map<std::string,Assets> & assets)345 void CheckDownloadForTest001(int index, map<std::string, Assets> &assets)
346 {
347     for (auto &item : assets) {
348         for (auto &asset : item.second) {
349             EXPECT_EQ(AssetOperationUtils::EraseBitMask(asset.status), static_cast<uint32_t>(AssetStatus::INSERT));
350             if (index < 4) { // 1-4 is inserted
351                 EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::INSERT));
352             }
353             LOGD("asset [name]:%s, [status]:%u, [flag]:%u, [index]:%d", asset.name.c_str(), asset.status, asset.flag,
354                 index);
355         }
356     }
357 }
358 
CheckDownloadFailedForTest002(sqlite3 * & db)359 void CheckDownloadFailedForTest002(sqlite3 *&db)
360 {
361     std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME;
362     sqlite3_stmt *stmt = nullptr;
363     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
364     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
365         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
366         Type cloudValue;
367         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
368         std::vector<uint8_t> assetsBlob;
369         Assets assets;
370         ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
371         ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
372         ASSERT_EQ(assets.size(), 2u); // 2 is asset num
373         for (size_t i = 0; i < assets.size(); ++i) {
374             EXPECT_EQ(assets[i].status, AssetStatus::ABNORMAL);
375         }
376     }
377     int errCode;
378     SQLiteUtils::ResetStatement(stmt, true, errCode);
379 }
380 
UpdateAssetsForLocal(sqlite3 * & db,int id,uint32_t status)381 void UpdateAssetsForLocal(sqlite3 *&db, int id, uint32_t status)
382 {
383     Assets assets;
384     Asset asset = ASSET_COPY;
385     asset.name = ASSET_COPY.name + std::to_string(id);
386     asset.status = status;
387     assets.emplace_back(asset);
388     asset.name = ASSET_COPY.name + std::to_string(id) + "_copy";
389     assets.emplace_back(asset);
390     int errCode;
391     std::vector<uint8_t> assetBlob;
392     const string sql = "update " + ASSETS_TABLE_NAME + " set assets=? where id = " + std::to_string(id);
393     sqlite3_stmt *stmt = nullptr;
394     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
395     assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
396     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
397     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
398     SQLiteUtils::ResetStatement(stmt, true, errCode);
399 }
400 
CheckConsistentCount(sqlite3 * db,int64_t expectCount)401 void CheckConsistentCount(sqlite3 *db, int64_t expectCount)
402 {
403     EXPECT_EQ(sqlite3_exec(db, QUERY_CONSISTENT_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
404         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
405 }
406 
CheckCompensatedCount(sqlite3 * db,int64_t expectCount)407 void CheckCompensatedCount(sqlite3 *db, int64_t expectCount)
408 {
409     EXPECT_EQ(sqlite3_exec(db, QUERY_COMPENSATED_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
410         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
411 }
412 
CloseDb()413 void CloseDb()
414 {
415     if (g_delegate != nullptr) {
416         EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
417         g_delegate = nullptr;
418     }
419     delete g_observer;
420     g_virtualCloudDb = nullptr;
421 }
422 
423 class DistributedDBCloudSyncerDownloadAssetsTest : public testing::Test {
424 public:
425     static void SetUpTestCase(void);
426     static void TearDownTestCase(void);
427     void SetUp();
428     void TearDown();
429 
430 protected:
431     void CheckLocaLAssets(const std::string &tableName, const std::string &expectAssetId,
432         const std::set<int> &failIndex);
433     void CheckLocalAssetIsEmpty(const std::string &tableName);
434     void CheckCursorData(const std::string &tableName, int begin);
435     void WaitForSync(int &syncCount);
436     const RelationalSyncAbleStorage *GetRelationalStore();
437     void InitDataStatusTest(bool needDownload);
438     void DataStatusTest001(bool needDownload);
439     void DataStatusTest003();
440     void DataStatusTest004();
441     void DataStatusTest005();
442     void DataStatusTest006();
443     void DataStatusTest007();
444     sqlite3 *db = nullptr;
445     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
446 };
447 
SetUpTestCase(void)448 void DistributedDBCloudSyncerDownloadAssetsTest::SetUpTestCase(void)
449 {
450     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
451     g_storePath = g_testDir + "/" + STORE_ID + DB_SUFFIX;
452     LOGI("The test db is:%s", g_storePath.c_str());
453     g_virtualCloudDataTranslate = std::make_shared<VirtualCloudDataTranslate>();
454     RuntimeConfig::SetCloudTranslate(g_virtualCloudDataTranslate);
455 }
456 
TearDownTestCase(void)457 void DistributedDBCloudSyncerDownloadAssetsTest::TearDownTestCase(void) {}
458 
SetUp(void)459 void DistributedDBCloudSyncerDownloadAssetsTest::SetUp(void)
460 {
461     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
462     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
463         LOGE("rm test db files error.");
464     }
465     DistributedDBToolsUnitTest::PrintTestCaseInfo();
466     LOGD("Test dir is %s", g_testDir.c_str());
467     db = RelationalTestUtils::CreateDataBase(g_storePath);
468     ASSERT_NE(db, nullptr);
469     InitDatabase(db);
470     g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
471     ASSERT_NE(g_observer, nullptr);
472     ASSERT_EQ(
473         g_mgr.OpenStore(g_storePath, STORE_ID, RelationalStoreDelegate::Option{.observer = g_observer}, g_delegate),
474         DBStatus::OK);
475     ASSERT_NE(g_delegate, nullptr);
476     ASSERT_EQ(g_delegate->CreateDistributedTable(ASSETS_TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
477     ASSERT_EQ(g_delegate->CreateDistributedTable(NO_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
478     ASSERT_EQ(g_delegate->CreateDistributedTable(COMPOUND_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
479     g_virtualCloudDb = make_shared<VirtualCloudDb>();
480     g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
481     g_syncProcess = {};
482     ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
483     ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
484     DataBaseSchema dataBaseSchema;
485     GetCloudDbSchema(dataBaseSchema);
486     ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
487     g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
488     ASSERT_NE(g_cloudStoreHook, nullptr);
489     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
490     ASSERT_TRUE(communicatorAggregator_ != nullptr);
491     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
492 }
493 
TearDown(void)494 void DistributedDBCloudSyncerDownloadAssetsTest::TearDown(void)
495 {
496     RefObject::DecObjRef(g_store);
497     g_virtualCloudDb->ForkUpload(nullptr);
498     CloseDb();
499     EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
500     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
501         LOGE("rm test db files error.");
502     }
503     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
504     communicatorAggregator_ = nullptr;
505     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
506 }
507 
CheckLocaLAssets(const std::string & tableName,const std::string & expectAssetId,const std::set<int> & failIndex)508 void DistributedDBCloudSyncerDownloadAssetsTest::CheckLocaLAssets(const std::string &tableName,
509     const std::string &expectAssetId, const std::set<int> &failIndex)
510 {
511     std::string sql = "SELECT assets FROM " + tableName + ";";
512     sqlite3_stmt *stmt = nullptr;
513     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
514     int index = 0;
515     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
516         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
517         Type cloudValue;
518         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
519         Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
520         for (const auto &asset : assets) {
521             index++;
522             if (failIndex.find(index) != failIndex.end()) {
523                 EXPECT_EQ(asset.assetId, "0");
524             } else {
525                 EXPECT_EQ(asset.assetId, expectAssetId);
526             }
527         }
528     }
529     int errCode = E_OK;
530     SQLiteUtils::ResetStatement(stmt, true, errCode);
531 }
532 
CheckLocalAssetIsEmpty(const std::string & tableName)533 void DistributedDBCloudSyncerDownloadAssetsTest::CheckLocalAssetIsEmpty(const std::string &tableName)
534 {
535     std::string sql = "SELECT asset FROM " + tableName + ";";
536     sqlite3_stmt *stmt = nullptr;
537     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
538     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
539         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
540     }
541     int errCode = E_OK;
542     SQLiteUtils::ResetStatement(stmt, true, errCode);
543 }
544 
CheckCursorData(const std::string & tableName,int begin)545 void DistributedDBCloudSyncerDownloadAssetsTest::CheckCursorData(const std::string &tableName, int begin)
546 {
547     std::string logTableName = DBCommon::GetLogTableName(tableName);
548     std::string sql = "SELECT cursor FROM " + logTableName + ";";
549     sqlite3_stmt *stmt = nullptr;
550     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
551     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
552         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_INTEGER);
553         Type cloudValue;
554         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
555         EXPECT_EQ(std::get<int64_t>(cloudValue), begin);
556         begin++;
557     }
558     int errCode = E_OK;
559     SQLiteUtils::ResetStatement(stmt, true, errCode);
560 }
561 
WaitForSync(int & syncCount)562 void DistributedDBCloudSyncerDownloadAssetsTest::WaitForSync(int &syncCount)
563 {
564     std::unique_lock<std::mutex> lock(g_processMutex);
565     bool result = g_processCondition.wait_for(lock, std::chrono::seconds(COMPENSATED_SYNC_WAIT_TIME),
566         [&syncCount]() { return syncCount == 2; }); // 2 is compensated sync
567     ASSERT_EQ(result, true);
568 }
569 
GetRelationalStore()570 const RelationalSyncAbleStorage* DistributedDBCloudSyncerDownloadAssetsTest::GetRelationalStore()
571 {
572     RelationalDBProperties properties;
573     CloudDBSyncUtilsTest::InitStoreProp(g_storePath, APP_ID, USER_ID, STORE_ID, properties);
574     int errCode = E_OK;
575     g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
576     if (g_store == nullptr) {
577         return nullptr;
578     }
579     return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
580 }
581 
InitDataStatusTest(bool needDownload)582 void DistributedDBCloudSyncerDownloadAssetsTest::InitDataStatusTest(bool needDownload)
583 {
584     int cloudCount = 20;
585     int localCount = 10;
586     InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
587     if (needDownload) {
588         UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
589     }
590     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
591     std::string sql = "update " + logName + " SET status = 1 where data_key in (1,11);";
592     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
593     sql = "update " + logName + " SET status = 2 where data_key in (2,12);";
594     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
595     sql = "update " + logName + " SET status = 3 where data_key in (3,13);";
596     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
597     std::this_thread::sleep_for(std::chrono::milliseconds(1));
598     InsertCloudDBData(0, localCount, 0, ASSETS_TABLE_NAME);
599     std::this_thread::sleep_for(std::chrono::milliseconds(1));
600     sql = "update " + ASSETS_TABLE_NAME + " set age='666' where id in (4);";
601     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
602     sql = "update " + logName + " SET status = 1 where data_key in (4);";
603     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
604 }
605 
DataStatusTest001(bool needDownload)606 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest001(bool needDownload)
607 {
608     int cloudCount = 20;
609     int count = 0;
610     g_cloudStoreHook->SetSyncFinishHook([&count, cloudCount, this]() {
611         count++;
612         if (count == 1) {
613             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
614                 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (11, 4)) or (status = 0)";
615             CloudDBSyncUtilsTest::CheckCount(db, sql, cloudCount);
616         }
617         if (count == 2) { // 2 is compensated sync
618             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
619                 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
620             CloudDBSyncUtilsTest::CheckCount(db, sql, cloudCount);
621             g_processCondition.notify_one();
622         }
623     });
624     InitDataStatusTest(needDownload);
625     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
626     WaitForSync(count);
627 }
628 
DataStatusTest003()629 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest003()
630 {
631     int count = 0;
632     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
633         count++;
634         if (count == 1) {
635             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
636                 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0 and data_key = 11)";
637             CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
638         }
639         if (count == 2) { // 2 is compensated sync
640             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
641                 " (status = 3 and data_key in (0,2,3,12,13) or (status = 0))";
642             CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
643             g_processCondition.notify_one();
644         }
645     });
646     int downLoadCount = 0;
647     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
648         std::map<std::string, Assets> &assets) {
649         downLoadCount++;
650         if (downLoadCount == 1) {
651             std::vector<std::vector<uint8_t>> hashKey;
652             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
653             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
654         }
655     });
656     InitDataStatusTest(true);
657     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
658     WaitForSync(count);
659     g_virtualAssetLoader->ForkDownload(nullptr);
660 }
661 
DataStatusTest004()662 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest004()
663 {
664     int count = 0;
665     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
666         count++;
667         if (count == 1) {
668             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
669                 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (-1,11))";
670             CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
671         }
672         if (count == 2) { // 2 is compensated sync
673             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
674                 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
675             CloudDBSyncUtilsTest::CheckCount(db, sql, 19); // 19 is match count
676             g_processCondition.notify_one();
677         }
678     });
679     int downLoadCount = 0;
680     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
681         std::map<std::string, Assets> &assets) {
682         downLoadCount++;
683         if (downLoadCount == 1) {
684             std::vector<std::vector<uint8_t>> hashKey;
685             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
686             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
687             std::string sql = "delete from " + ASSETS_TABLE_NAME + " WHERE id=0";
688             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
689         }
690     });
691     int queryIdx = 0;
692     g_virtualCloudDb->ForkQuery([this, &queryIdx](const std::string &, VBucket &) {
693         LOGD("query index:%d", ++queryIdx);
694         if (queryIdx == 4) { // 4 is compensated sync
695             std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
696                 " SET status = 1 where data_key=15;";
697             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
698         }
699     });
700     InitDataStatusTest(true);
701     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
702     WaitForSync(count);
703     g_virtualAssetLoader->ForkDownload(nullptr);
704     g_virtualCloudDb->ForkQuery(nullptr);
705 }
706 
DataStatusTest005()707 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest005()
708 {
709     int count = 0;
710     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
711         count++;
712         if (count == 1) {
713             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
714                 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0 and data_key in (11))";
715             CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
716         }
717         if (count == 2) { // 2 is compensated sync
718             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
719                 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0)";
720             CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
721             g_processCondition.notify_one();
722         }
723     });
724     int downLoadCount = 0;
725     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
726         std::map<std::string, Assets> &assets) {
727         downLoadCount++;
728         if (downLoadCount == 1) {
729             std::vector<std::vector<uint8_t>> hashKey;
730             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
731             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
732             std::string sql = "update " + ASSETS_TABLE_NAME + " set name='x' WHERE id=0";
733             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
734         }
735     });
736     InitDataStatusTest(true);
737     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
738     WaitForSync(count);
739     g_virtualAssetLoader->ForkDownload(nullptr);
740 }
741 
DataStatusTest006()742 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest006()
743 {
744     int count = 0;
745     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
746         count++;
747         if (count == 1) {
748             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
749                 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (0)) or "
750                 "(status = 0 and data_key in (11))";
751             CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
752         }
753         if (count == 2) { // 2 is compensated sync
754             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
755                 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
756             CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
757             g_processCondition.notify_one();
758         }
759     });
760     int downLoadCount = 0;
761     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
762         std::map<std::string, Assets> &assets) {
763         downLoadCount++;
764         if (downLoadCount == 1) {
765             std::vector<std::vector<uint8_t>> hashKey;
766             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
767             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
768             std::string sql = "update " + ASSETS_TABLE_NAME + " set name='x' WHERE id=0";
769             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
770             EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
771         }
772     });
773     InitDataStatusTest(true);
774     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
775     WaitForSync(count);
776     g_virtualAssetLoader->ForkDownload(nullptr);
777 }
778 
DataStatusTest007()779 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest007()
780 {
781     int count = 0;
782     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
783         count++;
784         if (count == 1) {
785             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
786                 " (status = 3 and data_key in (2,3,13)) or (status = 1 and data_key in (1,11))";
787             CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
788         }
789         if (count == 2) { // 2 is compensated sync
790             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
791                 " (status = 3 and data_key in (2,3,13)) or (status = 1 and data_key in (1,11))";
792             CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
793             g_processCondition.notify_one();
794         }
795     });
796     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
797     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
798     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
799         .WillRepeatedly([](const std::string &, const std::string &gid, const Type &,
800             std::map<std::string, Assets> &assets) {
801             return CLOUD_ERROR;
802         });
803     InitDataStatusTest(true);
804     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
805     WaitForSync(count);
806 }
807 
808 /*
809  * @tc.name: DownloadAssetForDupDataTest001
810  * @tc.desc: Test the download interface call with duplicate data for the same primary key.
811  * @tc.type: FUNC
812  * @tc.require:
813  * @tc.author: liufuchenxing
814  */
815 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest001, TestSize.Level1)
816 {
817     /**
818      * @tc.steps:step1. Mock asset download interface.
819      * @tc.expected: step1. return OK and interface will be called 4 times. delete 1, delete 2, insert 1, insert 2
820      */
821     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
822     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
823     int index = 1;
824     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
825         .Times(2)
826         .WillRepeatedly(
__anon09c4f3ca1102(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 827             [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
828                 LOGD("Download GID:%s", gid.c_str());
829                 CheckDownloadForTest001(index, assets);
830                 index++;
831                 return DBStatus::OK;
832             });
833 
834     /**
835      * @tc.steps:step2. Insert local data [0, 10), sync data
836      * @tc.expected: step2. sync success.
837      */
838     InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME);
839     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
840 
841     /**
842      * @tc.steps:step3. delete cloud data [1, 2], then insert cloud data [1,2] with new gid. Finally sync data.
843      * @tc.expected: step3. sync success.
844      */
845     DeleteCloudDBData(1, 2, ASSETS_TABLE_NAME);
846     InsertCloudDBData(1, 2, 10, ASSETS_TABLE_NAME);
847     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
848 }
849 
850 /**
851  * @tc.name: FillAssetId001
852  * @tc.desc: Test if assetId is filled in single primary key table
853  * @tc.type: FUNC
854  * @tc.require:
855  * @tc.author: chenchaohao
856  */
857 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId001, TestSize.Level1)
858 {
859     /**
860      * @tc.steps:step1. local insert assets and sync, check the local assetId.
861      * @tc.expected: step1. return OK.
862      */
863     int localCount = 50;
864     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
865     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
866     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
867 
868     /**
869      * @tc.steps:step2. local update assets and sync ,check the local assetId.
870      * @tc.expected: step2. sync success.
871      */
872     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
873     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
874     CheckLocalAssetIsEmpty(ASSETS_TABLE_NAME);
875     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
876 }
877 
878 /**
879  * @tc.name: FillAssetId002
880  * @tc.desc: Test if assetId is filled in no primary key table
881  * @tc.type: FUNC
882  * @tc.require:
883  * @tc.author: chenchaohao
884  */
885 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId002, TestSize.Level1)
886 {
887     /**
888      * @tc.steps:step1. local insert assets and sync, check the local assetId.
889      * @tc.expected: step1. return OK.
890      */
891     int localCount = 50;
892     InsertLocalData(db, 0, localCount, NO_PRIMARY_TABLE);
893     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
894     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
895 
896     /**
897      * @tc.steps:step2. local update assets and sync ,check the local assetId.
898      * @tc.expected: step2. sync success.
899      */
900     UpdateLocalData(db, NO_PRIMARY_TABLE, ASSETS_COPY1);
901     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
902     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
903 }
904 
905 /**
906  * @tc.name: FillAssetId003
907  * @tc.desc: Test if assetId is filled in compound primary key table
908  * @tc.type: FUNC
909  * @tc.require:
910  * @tc.author: chenchaohao
911  */
912 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId003, TestSize.Level1)
913 {
914     /**
915      * @tc.steps:step1. local insert assets and sync, check the local assetId.
916      * @tc.expected: step1. return OK.
917      */
918     int localCount = 50;
919     InsertLocalData(db, 0, localCount, COMPOUND_PRIMARY_TABLE);
920     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
921     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
922 
923     /**
924      * @tc.steps:step2. local update assets and sync ,check the local assetId.
925      * @tc.expected: step2. sync success.
926      */
927     UpdateLocalData(db, COMPOUND_PRIMARY_TABLE, ASSETS_COPY1);
928     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
929     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
930 }
931 
932 /**
933  * @tc.name: FillAssetId004
934  * @tc.desc: Test if assetId is filled in single primary key table when CLOUD_FORCE_PUSH
935  * @tc.type: FUNC
936  * @tc.require:
937  * @tc.author: chenchaohao
938  */
939 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId004, TestSize.Level1)
940 {
941     /**
942      * @tc.steps:step1. local insert assets and sync, check the local assetId.
943      * @tc.expected: step1. return OK.
944      */
945     int localCount = 50;
946     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
947     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
948     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
949 
950     /**
951      * @tc.steps:step2. local update assets and sync ,check the local assetId.
952      * @tc.expected: step2. sync success.
953      */
954     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
955     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
956     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
957 }
958 
959 /**
960  * @tc.name: FillAssetId001
961  * @tc.desc: Test if assetId is filled in no primary key table when CLOUD_FORCE_PUSH
962  * @tc.type: FUNC
963  * @tc.require:
964  * @tc.author: chenchaohao
965  */
966 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId005, TestSize.Level1)
967 {
968     /**
969      * @tc.steps:step1. local insert assets and sync, check the local assetId.
970      * @tc.expected: step1. return OK.
971      */
972     int localCount = 50;
973     InsertLocalData(db, 0, localCount, NO_PRIMARY_TABLE);
974     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
975     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
976 
977     /**
978      * @tc.steps:step2. local update assets and sync ,check the local assetId.
979      * @tc.expected: step2. sync success.
980      */
981     UpdateLocalData(db, NO_PRIMARY_TABLE, ASSETS_COPY1);
982     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
983     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
984 }
985 
986 /**
987  * @tc.name: FillAssetId006
988  * @tc.desc: Test if assetId is filled in compound primary key table when CLOUD_FORCE_PUSH
989  * @tc.type: FUNC
990  * @tc.require:
991  * @tc.author: chenchaohao
992  */
993 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId006, TestSize.Level1)
994 {
995     /**
996      * @tc.steps:step1. local insert assets and sync, check the local assetId.
997      * @tc.expected: step1. return OK.
998      */
999     int localCount = 50;
1000     InsertLocalData(db, 0, localCount, COMPOUND_PRIMARY_TABLE);
1001     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1002     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
1003 
1004     /**
1005      * @tc.steps:step2. local update assets and sync ,check the local assetId.
1006      * @tc.expected: step2. sync success.
1007      */
1008     UpdateLocalData(db, COMPOUND_PRIMARY_TABLE, ASSETS_COPY1);
1009     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1010     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
1011 }
1012 
1013 /**
1014  * @tc.name: FillAssetId007
1015  * @tc.desc: Test if assetId is filled when extend lack of assets
1016  * @tc.type: FUNC
1017  * @tc.require:
1018  * @tc.author: chenchaohao
1019  */
1020 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId007, TestSize.Level1)
1021 {
1022     CloudSyncConfig config;
1023     config.maxUploadCount = 200; // max upload 200
1024     g_delegate->SetCloudSyncConfig(config);
1025     /**
1026      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1027      * @tc.expected: step1. return OK.
1028      */
1029     int localCount = 50;
1030     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon09c4f3ca1202(const std::string &tableName, VBucket &extend) 1031     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1032         extend.erase("assets");
1033     });
1034     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1035     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1036 
1037     /**
1038      * @tc.steps:step2. local update assets and sync ,check the local assetId.
1039      * @tc.expected: step2. sync success.
1040      */
1041     int addLocalCount = 10;
1042     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
__anon09c4f3ca1302(const std::string &tableName, VBucket &extend) 1043     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1044         if (extend.find("assets") != extend.end()) {
1045             for (auto &asset : std::get<Assets>(extend["assets"])) {
1046                 asset.name = "pad";
1047             }
1048         }
1049     });
1050     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1051     int beginFailFillNum = 101;
1052     int endFailFillNum = 120;
1053     std::set<int> index;
1054     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1055         index.insert(i);
1056     }
1057     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1058 
1059     /**
1060      * @tc.steps:step2. local update assets and sync ,check the local assetId.
1061      * @tc.expected: step2. sync success.
1062      */
1063     g_virtualCloudDb->ForkUpload(nullptr);
1064     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1065     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1066 }
1067 
1068 /**
1069  * @tc.name: FillAssetId008
1070  * @tc.desc: Test if assetId is filled when extend lack of assetId
1071  * @tc.type: FUNC
1072  * @tc.require:
1073  * @tc.author: chenchaohao
1074  */
1075 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId008, TestSize.Level1)
1076 {
1077     /**
1078      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1079      * @tc.expected: step1. return OK.
1080      */
1081     int localCount = 50;
1082     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon09c4f3ca1402(const std::string &tableName, VBucket &extend) 1083     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1084         if (extend.find("assets") != extend.end()) {
1085             for (auto &asset : std::get<Assets>(extend["assets"])) {
1086                 asset.assetId = "";
1087             }
1088         }
1089     });
1090     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1091     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1092 
1093     /**
1094      * @tc.steps:step2. local update assets and sync ,check the local assetId.
1095      * @tc.expected: step2. sync success.
1096      */
1097     g_virtualCloudDb->ForkUpload(nullptr);
1098     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1099     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1100 }
1101 
1102 /**
1103  * @tc.name: FillAssetId009
1104  * @tc.desc: Test if assetId is filled when extend exists useless assets
1105  * @tc.type: FUNC
1106  * @tc.require:
1107  * @tc.author: chenchaohao
1108  */
1109 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId009, TestSize.Level1)
1110 {
1111     /**
1112      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1113      * @tc.expected: step1. return OK.
1114      */
1115     int localCount = 50;
1116     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon09c4f3ca1502(const std::string &tableName, VBucket &extend) 1117     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1118         if (extend.find("assets") != extend.end()) {
1119             Asset asset = ASSET_COPY2;
1120             Assets &assets = std::get<Assets>(extend["assets"]);
1121             assets.push_back(asset);
1122         }
1123     });
1124     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1125     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1126 }
1127 
1128 /**
1129  * @tc.name: FillAssetId010
1130  * @tc.desc: Test if assetId is filled when some success and some fail
1131  * @tc.type: FUNC
1132  * @tc.require:
1133  * @tc.author: chenchaohao
1134  */
1135 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId010, TestSize.Level1)
1136 {
1137     /**
1138      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1139      * @tc.expected: step1. return OK.
1140      */
1141     int localCount = 30;
1142     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1143     g_virtualCloudDb->SetInsertFailed(1);
1144     std::atomic<int> count = 0;
__anon09c4f3ca1602(const std::string &tableName, VBucket &extend) 1145     g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1146         if (extend.find("assets") != extend.end() && count == 0) {
1147             extend["#_error"] = static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR);
1148             count++;
1149         }
1150     });
1151     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1152     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", { 1, 2 }); // 1st, 2nd asset do not fill
1153 }
1154 
1155 /**
1156  * @tc.name: FillAssetId011
1157  * @tc.desc: Test if assetId is null when removedevicedata in FLAG_ONLY
1158  * @tc.type: FUNC
1159  * @tc.require:
1160  * @tc.author: chenchaohao
1161  */
1162 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId011, TestSize.Level1)
1163 {
1164     /**
1165      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1166      * @tc.expected: step1. return OK.
1167      */
1168     int localCount = 50;
1169     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1170     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1171     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1172 
1173     g_delegate->RemoveDeviceData("", FLAG_ONLY);
1174     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1175     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1176     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1177 }
1178 
1179 /**
1180  * @tc.name: FillAssetId012
1181  * @tc.desc: Test if assetid is filled when extend size is not equal to record size
1182  * @tc.type: FUNC
1183  * @tc.require:
1184  * @tc.author: chenchaohao
1185  */
1186 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId012, TestSize.Level1)
1187 {
1188     /**
1189      * @tc.steps:step1. set extend size missing then sync, check the asseid.
1190      * @tc.expected: step1. return OK.
1191      */
1192     int localCount = 50;
1193     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1194     std::atomic<int> count = 1;
1195     g_virtualCloudDb->SetClearExtend(count);
1196     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1197     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1198 
1199     /**
1200      * @tc.steps:step2. set extend size normal then sync, check the asseid.
1201      * @tc.expected: step2. return OK.
1202      */
1203     g_virtualCloudDb->SetClearExtend(0);
1204     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1205     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1206 
1207     /**
1208      * @tc.steps:step3. set extend size large then sync, check the asseid.
1209      * @tc.expected: step3. return OK.
1210      */
1211     count = -1; // -1 means extend push a empty vBucket
1212     g_virtualCloudDb->SetClearExtend(count);
1213     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1214     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1215 }
1216 
1217 /**
1218  * @tc.name: FillAssetId013
1219  * @tc.desc: Test fill assetId and removedevicedata when data is delete
1220  * @tc.type: FUNC
1221  * @tc.require:
1222  * @tc.author: chenchaohao
1223  */
1224 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId013, TestSize.Level1)
1225 {
1226     /**
1227      * @tc.steps:step1. local insert data and sync, then delete local data and insert new data
1228      * @tc.expected: step1. return OK.
1229      */
1230     int localCount = 20;
1231     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1232     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1233     int deleteLocalCount = 10;
1234     DeleteLocalRecord(db, 0, deleteLocalCount, ASSETS_TABLE_NAME);
1235     int addLocalCount = 30;
1236     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
1237     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1238 
1239     /**
1240      * @tc.steps:step2. RemoveDeviceData.
1241      * @tc.expected: step2. return OK.
1242      */
1243     g_delegate->RemoveDeviceData("", FLAG_ONLY);
1244     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1245 }
1246 
1247 /**
1248  * @tc.name: FillAssetId014
1249  * @tc.desc: Test if asset status is reset when removedevicedata in FLAG_ONLY
1250  * @tc.type: FUNC
1251  * @tc.require:
1252  * @tc.author: bty
1253  */
1254 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId014, TestSize.Level1)
1255 {
1256     /**
1257      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1258      * @tc.expected: step1. return OK.
1259      */
1260     int localCount = 50;
1261     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1262     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1263     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1264 
1265     /**
1266      * @tc.steps:step2. RemoveDeviceData
1267      * @tc.expected: step2. return OK.
1268      */
1269     Assets assets;
1270     std::vector<AssetStatus> statusVec = {
1271         AssetStatus::INSERT, AssetStatus::UPDATE, AssetStatus::DELETE, AssetStatus::NORMAL,
1272         AssetStatus::ABNORMAL, AssetStatus::DOWNLOADING, AssetStatus::DOWNLOAD_WITH_NULL
1273     };
1274     for (auto &status : statusVec) {
1275         Asset temp = ASSET_COPY;
1276         temp.name += std::to_string(status);
1277         temp.status = status | AssetStatus::UPLOADING;
1278         assets.emplace_back(temp);
1279     }
1280     UpdateLocalData(db, ASSETS_TABLE_NAME, assets);
1281     EXPECT_EQ(g_delegate->RemoveDeviceData("", FLAG_ONLY), OK);
1282     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1283 
1284     /**
1285      * @tc.steps:step3. check status
1286      * @tc.expected: step3. return OK.
1287      */
1288     std::string sql = "SELECT assets FROM " + ASSETS_TABLE_NAME + ";";
1289     sqlite3_stmt *stmt = nullptr;
1290     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1291     int index = 0;
1292     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1293         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1294         Type cloudValue;
1295         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
1296         Assets newAssets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
1297         for (const auto &ast : newAssets) {
1298             EXPECT_EQ(ast.status, statusVec[index++ % statusVec.size()]);
1299         }
1300     }
1301     int errCode = E_OK;
1302     SQLiteUtils::ResetStatement(stmt, true, errCode);
1303 }
1304 
1305 /**
1306  * @tc.name: FillAssetId015
1307  * @tc.desc: Test if fill assetId when upload return cloud network error
1308  * @tc.type: FUNC
1309  * @tc.require:
1310  * @tc.author: chenchaohao
1311  */
1312 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId015, TestSize.Level1)
1313 {
1314     /**
1315      * @tc.steps:step1. local insert data and fork batchinsert return CLOUD_NETWORK_ERROR, then sync
1316      * @tc.expected: step1. return OK, errcode is CLOUD_NETWORK_ERROR.
1317      */
1318     int localCount = 20;
1319     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1320     g_virtualCloudDb->SetCloudNetworkError(true);
1321     std::atomic<int> count = 0;
__anon09c4f3ca1702(const std::string &tableName, VBucket &extend) 1322     g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1323         if (extend.find("assets") != extend.end() && count == 0) {
1324             extend["#_error"] = static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR);
1325             count++;
1326         }
1327     });
1328     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_NETWORK_ERROR);
1329     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", { 1, 2 }); // 1st, 2nd asset do not fill
1330     g_virtualCloudDb->SetCloudNetworkError(false);
1331     g_virtualCloudDb->ForkUpload(nullptr);
1332     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1333     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1334 
1335     /**
1336      * @tc.steps:step2. local insert data and fork batchinsert return CLOUD_NETWORK_ERROR, then sync.
1337      * @tc.expected: step2. return OK, errcode is CLOUD_ERROR.
1338      */
1339     int addLocalCount = 10;
1340     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
1341     std::atomic<int> num = 0;
__anon09c4f3ca1802(const std::string &tableName, VBucket &extend) 1342     g_virtualCloudDb->ForkUpload([&num](const std::string &tableName, VBucket &extend) {
1343         if (extend.find("assets") != extend.end() && num == 0) {
1344             for (auto &asset : std::get<Assets>(extend["assets"])) {
1345                 asset.name = "pad";
1346                 break;
1347             }
1348             num++;
1349         }
1350     });
1351     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1352     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {41}); // // 41th asset do not fill
1353 }
1354 
1355 /**
1356  * @tc.name: FillAssetId016
1357  * @tc.desc: Test fill assetId and removedevicedata when last data is delete
1358  * @tc.type: FUNC
1359  * @tc.require:
1360  * @tc.author: chenchaohao
1361  */
1362 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId016, TestSize.Level1)
1363 {
1364     /**
1365      * @tc.steps:step1. local insert data and sync, then delete last local data
1366      * @tc.expected: step1. return OK.
1367      */
1368     int localCount = 20;
1369     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1370     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1371     int deletLocalCount = 10;
1372     DeleteLocalRecord(db, deletLocalCount, deletLocalCount, ASSETS_TABLE_NAME);
1373     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1374 
1375     /**
1376      * @tc.steps:step2. RemoveDeviceData.
1377      * @tc.expected: step2. return OK.
1378      */
1379     g_delegate->RemoveDeviceData("", FLAG_ONLY);
1380     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1381 }
1382 
1383 /**
1384  * @tc.name: FillAssetId017
1385  * @tc.desc: Test cursor when download not change
1386  * @tc.type: FUNC
1387  * @tc.require:
1388  * @tc.author: chenchaohao
1389  */
1390 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId017, TestSize.Level1)
1391 {
1392     /**
1393      * @tc.steps:step1. local insert data and sync,check cursor.
1394      * @tc.expected: step1. return OK.
1395      */
1396     int localCount = 20;
1397     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1398     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1399     CheckCursorData(ASSETS_TABLE_NAME, 1);
1400 
1401     /**
1402      * @tc.steps:step2. sync again and optype is not change, check cursor.
1403      * @tc.expected: step2. return OK.
1404      */
1405     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1406     CheckCursorData(ASSETS_TABLE_NAME, 1);
1407 }
1408 
1409 /**
1410  * @tc.name: FillAssetId018
1411  * @tc.desc: Test if assetId is filled when contains "#_error"
1412  * @tc.type: FUNC
1413  * @tc.require:
1414  * @tc.author: zhaoliang
1415  */
1416 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId018, TestSize.Level1)
1417 {
1418     /**
1419      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1420      * @tc.expected: step1. return OK.
1421      */
1422     int localCount = 30;
1423     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1424     std::atomic<int> count = 0;
__anon09c4f3ca1902(const std::string &tableName, VBucket &extend) 1425     g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1426         if (extend.find("assets") != extend.end() && count == 0) {
1427             extend["#_error"] = std::string("test");
1428             count++;
1429         }
1430     });
1431     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1432     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1433 }
1434 
1435 /**
1436  * @tc.name: DownloadAssetForDupDataTest002
1437  * @tc.desc: Test download failed
1438  * @tc.type: FUNC
1439  * @tc.require:
1440  * @tc.author: bty
1441  */
1442 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest002, TestSize.Level1)
1443 {
1444     /**
1445      * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1446      * @tc.expected: step1. return OK
1447      */
1448     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1449     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1450     int index = 0;
1451     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1452         .WillRepeatedly(
__anon09c4f3ca1a02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1453             [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1454                 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1455                 return DBStatus::CLOUD_ERROR;
1456             });
1457 
1458     /**
1459      * @tc.steps:step2. Insert cloud data [0, 10), sync data
1460      * @tc.expected: step2. sync success.
1461      */
1462     InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME);
1463     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1464 
1465     /**
1466      * @tc.steps:step3. check if the hash of assets in db is empty
1467      * @tc.expected: step3. OK
1468      */
1469     CheckDownloadFailedForTest002(db);
1470 }
1471 
1472 /**
1473  * @tc.name: DownloadAssetForDupDataTest003
1474  * @tc.desc: Test download failed and flag was modified
1475  * @tc.type: FUNC
1476  * @tc.require:
1477  * @tc.author: bty
1478  */
1479 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest003, TestSize.Level1)
1480 {
1481     /**
1482      * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1483      * @tc.expected: step1. return OK
1484      */
1485     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1486     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1487     int index = 0;
1488     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1489         .WillRepeatedly(
__anon09c4f3ca1b02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1490             [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1491                 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1492                 for (auto &item : assets) {
1493                     for (auto &asset : item.second) {
1494                         asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
1495                     }
1496                 }
1497                 return DBStatus::CLOUD_ERROR;
1498             });
1499 
1500     /**
1501      * @tc.steps:step2. Insert cloud data [0, 10), sync data
1502      * @tc.expected: step2. sync success.
1503      */
1504     InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME);
1505     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1506 
1507     /**
1508      * @tc.steps:step3. check if the hash of assets in db is empty
1509      * @tc.expected: step3. OK
1510      */
1511     CheckDownloadFailedForTest002(db);
1512 }
1513 
1514 /**
1515  * @tc.name: DownloadAssetForDupDataTest004
1516  * @tc.desc: test sync with deleted assets
1517  * @tc.type: FUNC
1518  * @tc.require:
1519  * @tc.author: bty
1520  */
1521 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest004, TestSize.Level1)
1522 {
1523     /**
1524      * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1525      * @tc.expected: step1. return OK
1526      */
1527     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1528     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1529     int index = 0;
1530     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1531         .WillRepeatedly(
__anon09c4f3ca1c02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1532             [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1533                 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1534                 return DBStatus::OK;
1535             });
1536 
1537     /**
1538      * @tc.steps:step2. insert local data, update assets status to delete, then insert cloud data
1539      * @tc.expected: step2. return OK
1540      */
1541     InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME); // 10 is num
1542     UpdateAssetsForLocal(db, 1, AssetStatus::DELETE); // 1 is id
1543     UpdateAssetsForLocal(db, 2, AssetStatus::DELETE | AssetStatus::UPLOADING); // 2 is id
1544     InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME); // 10 is num
1545 
1546     /**
1547      * @tc.steps:step3. sync, check download num
1548      * @tc.expected: step3. return OK
1549      */
1550     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1551     EXPECT_GE(index, 2); // 2 is download num
1552 }
1553 
1554 /**
1555  * @tc.name: DownloadAssetForDupDataTest005
1556  * @tc.desc: test DOWNLOADING status of asset after uploading
1557  * @tc.type: FUNC
1558  * @tc.require:
1559  * @tc.author: bty
1560  */
1561 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest005, TestSize.Level1)
1562 {
1563     /**
1564      * @tc.steps:step1. init data and sync
1565      * @tc.expected: step1. return OK
1566      */
1567     InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME); // 10 is num
1568     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1569     UpdateAssetsForLocal(db, 6,  AssetStatus::DOWNLOADING); // 6 is id
1570     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1571 
1572     /**
1573      * @tc.steps:step2. check asset status
1574      * @tc.expected: step2. return OK
1575      */
1576     std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME + " where id = 6;";
1577     sqlite3_stmt *stmt = nullptr;
1578     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1579     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1580         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1581         Type cloudValue;
1582         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
1583         std::vector<uint8_t> assetsBlob;
1584         Assets assets;
1585         ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
1586         ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
1587         ASSERT_EQ(assets.size(), 2u); // 2 is asset num
1588         for (size_t i = 0; i < assets.size(); ++i) {
1589             EXPECT_EQ(assets[i].hash, ASSET_COPY.hash);
1590             EXPECT_EQ(assets[i].status, AssetStatus::NORMAL);
1591         }
1592     }
1593     int errCode;
1594     SQLiteUtils::ResetStatement(stmt, true, errCode);
1595 }
1596 
1597 /**
1598  * @tc.name: FillAssetId019
1599  * @tc.desc: Test the stability of cleaning asset id
1600  * @tc.type: FUNC
1601  * @tc.require:
1602  * @tc.author: bty
1603  */
1604 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId019, TestSize.Level1)
1605 {
1606     /**
1607      * @tc.steps:step1. local insert assets and sync.
1608      * @tc.expected: step1. return OK.
1609      */
1610     int localCount = 20;
1611     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1612     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1613 
1614     /**
1615      * @tc.steps:step2. construct multiple abnormal data_key, then RemoveDeviceData.
1616      * @tc.expected: step2. return OK.
1617      */
1618     std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME)
1619         + " set data_key='999' where data_key>'10';";
1620     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
1621     EXPECT_EQ(g_delegate->RemoveDeviceData("", FLAG_ONLY), OK);
1622 }
1623 
1624 /**
1625  * @tc.name: FillAssetId020
1626  * @tc.desc: Test if assetId is filled when extend(lack of assets/assetId is empty/modify asset info)
1627  * @tc.type: FUNC
1628  * @tc.require:
1629  * @tc.author: zhangtao
1630  */
1631 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId020, TestSize.Level1)
1632 {
1633     CloudSyncConfig config;
1634     config.maxUploadCount = 200; // max upload 200
1635     g_delegate->SetCloudSyncConfig(config);
1636 
1637     /**
1638      * @tc.steps:step1. local insert assets and erase assets extends
1639      * @tc.expected: step1. return OK.
1640      */
1641     int localCount = 50;
1642     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon09c4f3ca1d02(const std::string &tableName, VBucket &extend) 1643     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1644         extend.erase("assets");
1645     });
1646     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1647     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1648 
1649     /**
1650      * @tc.steps:step2. local insert assets and modify assetId to empty
1651      * @tc.expected: step2. return OK.
1652      */
1653     int addLocalCount = 10;
1654     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
__anon09c4f3ca1e02(const std::string &tableName, VBucket &extend) 1655     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1656         if (extend.find("assets") != extend.end()) {
1657             for (auto &asset : std::get<Assets>(extend["assets"])) {
1658                 asset.assetId = "";
1659             }
1660         }
1661     });
1662     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1663     int beginFailFillNum = 101;
1664     int endFailFillNum = 120;
1665     std::set<int> index;
1666     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1667         index.insert(i);
1668     }
1669     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1670 
1671     /**
1672      * @tc.steps:step3. local insert assets and modify assetId info such as asset.name
1673      * @tc.expected: step3. return OK.
1674      */
1675     InsertLocalData(db, localCount + addLocalCount, addLocalCount, ASSETS_TABLE_NAME);
__anon09c4f3ca1f02(const std::string &tableName, VBucket &extend) 1676     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1677         if (extend.find("assets") != extend.end()) {
1678             for (auto &asset : std::get<Assets>(extend["assets"])) {
1679                 asset.name = "mod_pat";
1680             }
1681         }
1682     });
1683     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1684     beginFailFillNum = 121;
1685     endFailFillNum = 140;
1686     std::set<int> newIndex;
1687     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1688         newIndex.insert(i);
1689     }
1690     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", newIndex);
1691 
1692     /**
1693      * @tc.steps:step4. local update assets and sync, check the local assetId.
1694      * @tc.expected: step4. sync success.
1695      */
1696     g_virtualCloudDb->ForkUpload(nullptr);
1697     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1698     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1699 }
1700 
1701 /**
1702  * @tc.name: FillAssetId021
1703  * @tc.desc: Test if local assets missing, one records's assets missing will not mark the whole sync progress failure
1704  * @tc.type: FUNC
1705  * @tc.require:
1706  * @tc.author: zhangtao
1707  */
1708 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId021, TestSize.Level1)
1709 {
1710     CloudSyncConfig config;
1711     config.maxUploadCount = 200; // max upload 200
1712     g_delegate->SetCloudSyncConfig(config);
1713 
1714     /**
1715      * @tc.steps:step1. local insert assets and erase assets extends
1716      * @tc.expected: step1. return OK.
1717      */
1718     int localCount = 50;
1719     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1720 
1721     /**
1722      * @tc.steps:step2. ForkInsertConflict, make one record assets missing during batch insert
1723      * @tc.expected: step2. SyncProgress return OK. One record's assets missing will not block other progress.
1724      */
1725     int uploadFailId = 0;
1726     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon09c4f3ca2002(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1727         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1728         uploadFailId++;
1729         if (uploadFailId == 25) { // 25 is the middle record
1730             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1731             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1732         }
1733         return OK;
1734     });
1735 
1736     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1737     int beginFailFillNum = 49;
1738     int endFailFillNum = 50;
1739     std::set<int> index;
1740     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1741         index.insert(i);
1742     }
1743     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1744     g_virtualCloudDb->ForkUpload(nullptr);
1745 }
1746 
1747 /**
1748  * @tc.name: FillAssetId022
1749  * @tc.desc: Test if local assets missing, many records's assets missing will not mark the whole sync progress failure
1750  * @tc.type: FUNC
1751  * @tc.require:
1752  * @tc.author: zhangtao
1753  */
1754 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId022, TestSize.Level1)
1755 {
1756     CloudSyncConfig config;
1757     config.maxUploadCount = 200; // max upload 200
1758     g_delegate->SetCloudSyncConfig(config);
1759 
1760     /**
1761      * @tc.steps:step1. local insert assets and erase assets extends
1762      * @tc.expected: step1. return OK.
1763      */
1764     int localCount = 50;
1765     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1766 
1767     /**
1768      * @tc.steps:step2. ForkInsertConflict, make one record assets missing during batch insert
1769      * @tc.expected: step2. SyncProgress return OK. One record's assets missing will not block other progress.
1770      */
1771     int uploadFailId = 0;
1772     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon09c4f3ca2102(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1773         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1774         uploadFailId++;
1775         if (uploadFailId >= 25 && uploadFailId <= 27) { // 25-27 is the middle record
1776             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1777             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1778         }
1779         return OK;
1780     });
1781 
1782     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1783     int beginFailFillNum = 49;
1784     int endFailFillNum = 54;
1785     std::set<int> index;
1786     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1787         index.insert(i);
1788     }
1789     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1790     g_virtualCloudDb->ForkUpload(nullptr);
1791 }
1792 
1793 /**
1794  * @tc.name: FillAssetId023
1795  * @tc.desc: Test if BatchUpdate with local assets missing
1796  * @tc.type: FUNC
1797  * @tc.require:
1798  * @tc.author: zhangtao
1799  */
1800 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId023, TestSize.Level1)
1801 {
1802     /**
1803      * @tc.steps:step1. set extend size missing then sync, check the asseid.
1804      * @tc.expected: step1. return OK.
1805      */
1806     int localCount = 50;
1807     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1808     std::atomic<int> count = 1;
1809     g_virtualCloudDb->SetClearExtend(count);
1810     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1811     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1812 
1813     /**
1814      * @tc.steps:step2. set extend size normal and BatchUpdate with local assets missing then sync, check the asseid.
1815      * @tc.expected: step2. return OK.
1816      */
1817     g_virtualCloudDb->SetClearExtend(0);
1818 
1819     int uploadFailId = 0;
1820     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon09c4f3ca2202(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1821         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1822         uploadFailId++;
1823         if (uploadFailId == 25) { // 25 is the middle record
1824             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1825             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1826         }
1827         return OK;
1828     });
1829     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1830     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1831 }
1832 
1833 /**
1834  * @tc.name: FillAssetId024
1835  * @tc.desc: Test if BatchUpdate with multiple local assets missing
1836  * @tc.type: FUNC
1837  * @tc.require:
1838  * @tc.author: zhangtao
1839  */
1840 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId024, TestSize.Level1)
1841 {
1842     /**
1843      * @tc.steps:step1. set extend size missing then sync, check the asseid.
1844      * @tc.expected: step1. return OK.
1845      */
1846     int localCount = 50;
1847     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1848     std::atomic<int> count = 1;
1849     g_virtualCloudDb->SetClearExtend(count);
1850     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1851     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1852 
1853     /**
1854      * @tc.steps:step2. set extend size normal and BatchUpdate with 3 local assets missing then sync, check the asseid.
1855      * @tc.expected: step2. return OK.
1856      */
1857     g_virtualCloudDb->SetClearExtend(0);
1858 
1859     int uploadFailId = 0;
1860     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon09c4f3ca2302(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1861         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1862         uploadFailId++;
1863         if (uploadFailId >= 25 && uploadFailId <= 27) { // 25-27 is the middle record
1864             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1865             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1866         }
1867         return OK;
1868     });
1869     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1870     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1871 }
1872 
1873 /**
1874  * @tc.name: FillAssetId025
1875  * @tc.desc: Test if BatchInsert with local assets missing and missing record added into successCount
1876  * @tc.type: FUNC
1877  * @tc.require:
1878  * @tc.author: zhangtao
1879  */
1880 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId025, TestSize.Level1)
1881 {
1882     CloudSyncConfig config;
1883     config.maxUploadCount = 200; // max upload 200
1884     g_delegate->SetCloudSyncConfig(config);
1885     /**
1886      * @tc.steps:step1. insert local data.
1887      * @tc.expected: step1. return OK.
1888      */
1889     int localCount = 40;
1890     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1891 
1892     /**
1893      * @tc.steps:step2. BatchInsert with local assets missing then sync, check the asseid.
1894      * @tc.expected: step2. return OK.
1895      */
1896     int uploadFailId = 0;
1897     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon09c4f3ca2402(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1898         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1899         uploadFailId++;
1900         if (uploadFailId == 25) { // 25 is the middle record
1901             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1902             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1903         }
1904         return OK;
1905     });
1906     g_syncProcess = {};
1907     Query query = Query::Select().FromTable({ ASSETS_TABLE_NAME });
1908     std::vector<TableProcessInfo> expectProcess = {
1909         { PROCESSING, { 0, 0, 0, 0 }, { 0, 0, 0, 0 } },
1910         { FINISHED, { 0, 0, 0, 0 }, { 1, 40, 40, 0 } } // 1 is index, 40 is count
1911     };
1912 
1913     /**
1914      * @tc.steps:step3. Check if sync process consistent with exptectProcess
1915      * @tc.expected: step3. return OK.
1916      */
1917     int index = 0;
__anon09c4f3ca2502(const std::map<std::string, SyncProcess> &process) 1918     CloudSyncStatusCallback callback = [&index, &expectProcess](const std::map<std::string, SyncProcess> &process) {
1919         g_syncProcess = std::move(process.begin()->second);
1920         ASSERT_LT(index, 2);
1921         for (const auto &[tableName, info]: g_syncProcess.tableProcess) {
1922             EXPECT_EQ(info.process, expectProcess[index].process);
1923             EXPECT_EQ(info.upLoadInfo.batchIndex, expectProcess[index].upLoadInfo.batchIndex);
1924             EXPECT_EQ(info.upLoadInfo.total, expectProcess[index].upLoadInfo.total);
1925             EXPECT_EQ(info.upLoadInfo.successCount, expectProcess[index].upLoadInfo.successCount);
1926             EXPECT_EQ(tableName, ASSETS_TABLE_NAME);
1927         }
1928         index++;
1929         if (g_syncProcess.process == FINISHED) {
1930             g_processCondition.notify_one();
1931             ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1932         }
1933     };
1934     ASSERT_EQ(g_delegate->Sync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, query, callback, SYNC_WAIT_TIME), OK);
1935     WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
1936 
1937     /**
1938      * @tc.steps:step4. Check assets results
1939      * @tc.expected: step4. return OK.
1940      */
1941     int beginFailFillNum = 49;
1942     int endFailFillNum = 50;
1943     std::set<int> indexes;
1944     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1945         indexes.insert(i);
1946     }
1947     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", indexes);
1948     g_virtualCloudDb->ForkUpload(nullptr);
1949 }
1950 
1951 /**
1952  * @tc.name: ConsistentFlagTest001
1953  * @tc.desc:Assets are the different, check the 0x20 bit of flag after sync
1954  * @tc.type: FUNC
1955  * @tc.require:
1956  * @tc.author: bty
1957  */
1958 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest001, TestSize.Level1)
1959 {
1960     /**
1961      * @tc.steps:step1. init data for the different asset, sync and check flag
1962      * @tc.expected: step1. return OK.
1963      */
1964     int localCount = 10; // 10 is num of local
1965     int cloudCount = 20; // 20 is num of cloud
1966     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1967     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1968     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1969     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1970     CheckConsistentCount(db, cloudCount);
1971 
1972     /**
1973      * @tc.steps:step2. update local data, sync and check flag
1974      * @tc.expected: step2. return OK.
1975      */
1976     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1977     DeleteCloudDBData(1, 1, ASSETS_TABLE_NAME);
1978     CheckConsistentCount(db, 0L);
1979     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1980     CheckConsistentCount(db, cloudCount);
1981 }
1982 
1983 /**
1984  * @tc.name: ConsistentFlagTest002
1985  * @tc.desc: Assets are the same, check the 0x20 bit of flag after sync
1986  * @tc.type: FUNC
1987  * @tc.require:
1988  * @tc.author: bty
1989  */
1990 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest002, TestSize.Level1)
1991 {
1992     /**
1993      * @tc.steps:step1. init data for the same asset, sync and check flag
1994      * @tc.expected: step1. return OK.
1995      */
1996     int cloudCount = 20; // 20 is num of cloud
1997     InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
1998     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1999     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2000     CheckConsistentCount(db, cloudCount);
2001 
2002     /**
2003      * @tc.steps:step2. update local data, sync and check flag
2004      * @tc.expected: step2. return OK.
2005      */
2006     int deleteLocalCount = 5;
2007     DeleteLocalRecord(db, 0, deleteLocalCount, ASSETS_TABLE_NAME);
2008     CheckConsistentCount(db, cloudCount - deleteLocalCount);
2009     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2010     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2011     CheckConsistentCount(db, cloudCount);
2012 }
2013 
2014 /**
2015  * @tc.name: ConsistentFlagTest003
2016  * @tc.desc: Download returns a conflict, check the 0x20 bit of flag after sync
2017  * @tc.type: FUNC
2018  * @tc.require:
2019  * @tc.author: bty
2020  */
2021 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest003, TestSize.Level1)
2022 {
2023     /**
2024      * @tc.steps:step1. init data
2025      * @tc.expected: step1. return OK.
2026      */
2027     int localCount = 20; // 20 is num of local
2028     int cloudCount = 10; // 10 is num of cloud
2029     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2030     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2031     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2032 
2033     /**
2034      * @tc.steps:step2. fork download, return CLOUD_RECORD_EXIST_CONFLICT once
2035      * @tc.expected: step2. return OK.
2036      */
2037     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2038     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2039     int index = 0;
2040     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2041         .WillRepeatedly(
__anon09c4f3ca2602(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2042             [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
2043                 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2044                 if (index == 1) { // 1 is first download
2045                     return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2046                 }
2047                 return DBStatus::OK;
2048             });
2049 
2050     /**
2051      * @tc.steps:step3. fork upload, check consistent count
2052      * @tc.expected: step3. return OK.
2053      */
2054     int upIdx = 0;
__anon09c4f3ca2702(const std::string &tableName, VBucket &extend) 2055     g_virtualCloudDb->ForkUpload([this, localCount, cloudCount, &upIdx](const std::string &tableName, VBucket &extend) {
2056         LOGD("upload index:%d", ++upIdx);
2057         if (upIdx == 1) { // 1 is first upload
2058             CheckConsistentCount(db, localCount - cloudCount - 1);
2059         }
2060     });
2061 
2062     /**
2063      * @tc.steps:step4. fork query, check consistent count
2064      * @tc.expected: step4. return OK.
2065      */
2066     int queryIdx = 0;
__anon09c4f3ca2802(const std::string &, VBucket &) 2067     g_virtualCloudDb->ForkQuery([this, localCount, &queryIdx](const std::string &, VBucket &) {
2068         LOGD("query index:%d", ++queryIdx);
2069         if (queryIdx == 3) { // 3 is the last query
2070             CheckConsistentCount(db, localCount - 1);
2071         }
2072     });
2073     int count = 0;
__anon09c4f3ca2902() 2074     g_cloudStoreHook->SetSyncFinishHook([&count]() {
2075         count++;
2076         if (count == 2) { // 2 is compensated sync
2077             g_processCondition.notify_one();
2078         }
2079     });
2080     /**
2081      * @tc.steps:step5. sync, check consistent count
2082      * @tc.expected: step5. return OK.
2083      */
2084     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2085     WaitForSync(count);
2086     CheckConsistentCount(db, localCount);
2087 }
2088 
2089 /**
2090  * @tc.name: ConsistentFlagTest004
2091  * @tc.desc: Upload returns error, check the 0x20 bit of flag after sync
2092  * @tc.type: FUNC
2093  * @tc.require:
2094  * @tc.author: bty
2095  */
2096 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest004, TestSize.Level1)
2097 {
2098     /**
2099      * @tc.steps:step1. init data
2100      * @tc.expected: step1. return OK.
2101      */
2102     int localCount = 20; // 20 is num of local
2103     int cloudCount = 10; // 10 is num of cloud
2104     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2105     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2106     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2107 
2108     /**
2109      * @tc.steps:step2. fork upload, return error filed of CLOUD_NETWORK_ERROR
2110      * @tc.expected: step2. return OK.
2111      */
2112     int upIdx = 0;
__anon09c4f3ca2a02(const std::string &tableName, VBucket &extend) 2113     g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
2114         LOGD("upload index:%d", ++upIdx);
2115         if (upIdx == 1) {
2116             extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR));
2117         }
2118     });
2119 
2120     /**
2121      * @tc.steps:step3. sync, check consistent count
2122      * @tc.expected: step3. return OK.
2123      */
2124     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2125     CheckConsistentCount(db, localCount - 1);
2126 
2127     /**
2128      * @tc.steps:step4. update local data, fork upload, return error filed of type int64_t
2129      * @tc.expected: step4. return OK.
2130      */
2131     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2132     upIdx = 0;
__anon09c4f3ca2b02(const std::string &tableName, VBucket &extend) 2133     g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
2134         LOGD("upload index:%d", ++upIdx);
2135         if (upIdx == 1) {
2136             int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2137             extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
2138         }
2139         if (upIdx == 2) {
2140             int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT + 1;
2141             extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
2142         }
2143     });
2144 
2145     /**
2146      * @tc.steps:step5. sync, check consistent count
2147      * @tc.expected: step5. return OK.
2148      */
2149     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2150     CheckConsistentCount(db, localCount - 2);
2151 }
2152 
2153 /**
2154  * @tc.name: ConsistentFlagTest005
2155  * @tc.desc: Local data changes during download, check the 0x20 bit of flag after sync
2156  * @tc.type: FUNC
2157  * @tc.require:
2158  * @tc.author: bty
2159  */
2160 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest005, TestSize.Level1)
2161 {
2162     /**
2163      * @tc.steps:step1. init data
2164      * @tc.expected: step1. return OK.
2165      */
2166     int localCount = 20; // 20 is num of local
2167     int cloudCount = 10; // 10 is num of cloud
2168     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2169     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2170     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2171 
2172     /**
2173      * @tc.steps:step2. fork download, update local assets where id=2
2174      * @tc.expected: step2. return OK.
2175      */
2176     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2177     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2178     int index = 0;
2179     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2180         .WillRepeatedly(
2181             [this, &index](const std::string &, const std::string &gid, const Type &,
__anon09c4f3ca2c02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2182                 std::map<std::string, Assets> &assets) {
2183                 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2184                 if (index == 1) { // 1 is first download
2185                     std::string sql = "UPDATE " + ASSETS_TABLE_NAME + " SET assets=NULL where id=2;";
2186                     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
2187                 }
2188                 return DBStatus::OK;
2189             });
2190 
2191     /**
2192      * @tc.steps:step3. fork upload, check consistent count
2193      * @tc.expected: step3. return OK.
2194      */
2195     int upIdx = 0;
__anon09c4f3ca2d02(const std::string &tableName, VBucket &extend) 2196     g_virtualCloudDb->ForkUpload([this, localCount, cloudCount, &upIdx](const std::string &tableName, VBucket &extend) {
2197         LOGD("upload index:%d", ++upIdx);
2198         if (upIdx == 1) { // 1 is first upload
2199             CheckConsistentCount(db, localCount - cloudCount - 1);
2200         }
2201     });
2202 
2203     /**
2204      * @tc.steps:step4. sync, check consistent count
2205      * @tc.expected: step4. return OK.
2206      */
2207     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2208     CheckConsistentCount(db, localCount);
2209 }
2210 
2211 /**
2212  * @tc.name: ConsistentFlagTest006
2213  * @tc.desc:
2214  * @tc.type: FUNC
2215  * @tc.require:
2216  * @tc.author: bty
2217  */
2218 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest006, TestSize.Level1)
2219 {
2220     /**
2221      * @tc.steps:step1. init data
2222      * @tc.expected: step1. return OK.
2223      */
2224     int cloudCount = 10; // 10 is num of cloud
2225     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2226     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2227 
2228     /**
2229      * @tc.steps:step2. fork download, update local assets where id=2
2230      * @tc.expected: step2. return OK.
2231      */
2232     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2233     std::this_thread::sleep_for(std::chrono::milliseconds(1));
2234     int delCount = 3; // 3 is num of cloud
2235     DeleteCloudDBData(1, delCount, ASSETS_TABLE_NAME);
2236     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2237     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2238     int index = 0;
2239     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2240         .WillRepeatedly(
2241             [&index](const std::string &, const std::string &gid, const Type &,
__anon09c4f3ca2e02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2242                 std::map<std::string, Assets> &assets) {
2243                 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2244                 if (index == 1) { // 1 is first download
2245                     return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2246                 }
2247                 return DBStatus::OK;
2248             });
2249 
2250     /**
2251      * @tc.steps:step3. fork upload, check consistent count
2252      * @tc.expected: step3. return OK.
2253      */
2254     int upIdx = 0;
__anon09c4f3ca2f02(const std::string &tableName, VBucket &extend) 2255     g_virtualCloudDb->ForkUpload([this, delCount, &upIdx](const std::string &tableName, VBucket &extend) {
2256         LOGD("upload index:%d", ++upIdx);
2257         if (upIdx == 1) { // 1 is first upload
2258             CheckConsistentCount(db, delCount);
2259             CheckCompensatedCount(db, 0L);
2260         }
2261     });
2262 
2263     /**
2264      * @tc.steps:step4. sync, check consistent count
2265      * @tc.expected: step4. return OK.
2266      */
2267     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2268     CheckConsistentCount(db, cloudCount);
2269 }
2270 
2271 /**
2272  * @tc.name: SyncDataStatusTest001
2273  * @tc.desc: No need to download asset, check status after sync
2274  * @tc.type: FUNC
2275  * @tc.require:
2276  * @tc.author: bty
2277  */
2278 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest001, TestSize.Level1)
2279 {
2280     DataStatusTest001(false);
2281 }
2282 
2283 /**
2284  * @tc.name: SyncDataStatusTest002
2285  * @tc.desc: Need to download asset, check status after sync
2286  * @tc.type: FUNC
2287  * @tc.require:
2288  * @tc.author: bty
2289  */
2290 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest002, TestSize.Level1)
2291 {
2292     DataStatusTest001(true);
2293 }
2294 
2295 /**
2296  * @tc.name: SyncDataStatusTest003
2297  * @tc.desc: Lock during download and check status
2298  * @tc.type: FUNC
2299  * @tc.require:
2300  * @tc.author: bty
2301  */
2302 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest003, TestSize.Level1)
2303 {
2304     DataStatusTest003();
2305 }
2306 
2307 /**
2308  * @tc.name: SyncDataStatusTest004
2309  * @tc.desc: Lock and delete during download, check status
2310  * @tc.type: FUNC
2311  * @tc.require:
2312  * @tc.author: bty
2313  */
2314 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest004, TestSize.Level1)
2315 {
2316     DataStatusTest004();
2317 }
2318 
2319 /**
2320  * @tc.name: SyncDataStatusTest005
2321  * @tc.desc: Lock and update during download, check status
2322  * @tc.type: FUNC
2323  * @tc.require:
2324  * @tc.author: bty
2325  */
2326 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest005, TestSize.Level1)
2327 {
2328     DataStatusTest005();
2329 }
2330 
2331 /**
2332  * @tc.name: SyncDataStatusTest006
2333  * @tc.desc: Lock and update and Unlock during download, check status
2334  * @tc.type: FUNC
2335  * @tc.require:
2336  * @tc.author: bty
2337  */
2338 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest006, TestSize.Level1)
2339 {
2340     DataStatusTest006();
2341 }
2342 
2343 /**
2344  * @tc.name: SyncDataStatusTest007
2345  * @tc.desc: Download return error, check status
2346  * @tc.type: FUNC
2347  * @tc.require:
2348  * @tc.author: bty
2349  */
2350 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest007, TestSize.Level1)
2351 {
2352     DataStatusTest007();
2353 }
2354 
2355 /**
2356  * @tc.name: SyncDataStatusTest008
2357  * @tc.desc: Test upload process when data locked
2358  * @tc.type: FUNC
2359  * @tc.require:
2360  * @tc.author: bty
2361  */
2362 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest008, TestSize.Level1)
2363 {
2364     /**
2365      * @tc.steps:step1. init local data
2366      * @tc.expected: step1. return OK.
2367      */
2368     int localCount = 40;
2369     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2370     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2371     std::string sql = "update " + logName + " SET status = 2 where data_key >=20;";
2372     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
2373 
2374     /**
2375      * @tc.steps:step2. sync and check process
2376      * @tc.expected: step2. return OK.
2377      */
2378     g_syncProcess = {};
2379     Query query = Query::Select().FromTable({ ASSETS_TABLE_NAME });
2380     std::vector<TableProcessInfo> expectProcess = {
2381         { PROCESSING, { 0, 0, 0, 0 }, { 0, 0, 0, 0 } },
2382         { FINISHED, { 0, 0, 0, 0 }, { 1, 40, 40, 0 } } // 1 is index, 40 is count
2383     };
2384     int index = 0;
2385     CloudSyncConfig config;
2386     config.maxUploadCount = 100; // max upload 100
2387     g_delegate->SetCloudSyncConfig(config);
__anon09c4f3ca3002(const std::map<std::string, SyncProcess> &process) 2388     CloudSyncStatusCallback callback = [&index, &expectProcess](const std::map<std::string, SyncProcess> &process) {
2389         g_syncProcess = std::move(process.begin()->second);
2390         ASSERT_LT(index, 2);
2391         for (const auto &[tableName, info]: g_syncProcess.tableProcess) {
2392             EXPECT_EQ(info.process, expectProcess[index].process);
2393             EXPECT_EQ(info.upLoadInfo.batchIndex, expectProcess[index].upLoadInfo.batchIndex);
2394             EXPECT_EQ(info.upLoadInfo.total, expectProcess[index].upLoadInfo.total);
2395             EXPECT_EQ(info.upLoadInfo.successCount, expectProcess[index].upLoadInfo.successCount);
2396             EXPECT_EQ(tableName, ASSETS_TABLE_NAME);
2397         }
2398         index++;
2399         if (g_syncProcess.process == FINISHED) {
2400             g_processCondition.notify_one();
2401             ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2402         }
2403     };
2404     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, SYNC_WAIT_TIME), OK);
2405     WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
2406 }
2407 
2408 /**
2409  * @tc.name: DownloadAssetTest001
2410  * @tc.desc: Test the asset status after the share table sync
2411  * @tc.type: FUNC
2412  * @tc.require:
2413  * @tc.author: bty
2414  */
2415 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest001, TestSize.Level1)
2416 {
2417     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
2418     /**
2419      * @tc.steps:step1. init data and sync
2420      * @tc.expected: step1. return OK.
2421      */
2422     int cloudCount = 10; // 10 is num of cloud
2423     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME_SHARED);
2424     CallSync({ASSETS_TABLE_NAME_SHARED}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2425 
2426     /**
2427      * @tc.steps:step2. check asset status
2428      * @tc.expected: step2. return OK.
2429      */
2430     SqlCondition condition;
2431     condition.sql = "select assets from " + ASSETS_TABLE_NAME_SHARED + " where _rowid_ = 1;";
2432     condition.readOnly = true;
2433     std::vector<VBucket> records;
2434     EXPECT_EQ(g_delegate->ExecuteSql(condition, records), OK);
2435     for (const auto &data: records) {
2436         Assets assets;
2437         CloudStorageUtils::GetValueFromVBucket(COL_ASSETS, data, assets);
2438         for (const auto &asset: assets) {
2439             EXPECT_EQ(asset.status, AssetStatus::NORMAL);
2440         }
2441     }
2442     EXPECT_EQ(g_virtualAssetLoader->GetBatchDownloadCount(), 0u);
2443     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
2444 }
2445 
2446 /**
2447  * @tc.name: DownloadAssetTest002
2448  * @tc.desc: Test asset download failed and re download
2449  * @tc.type: FUNC
2450  * @tc.require:
2451  * @tc.author: liaoyonghuang
2452  */
2453 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest002, TestSize.Level1)
2454 {
2455     /**
2456      * @tc.steps:step1. init data
2457      * @tc.expected: step1. return OK.
2458      */
2459     int cloudCount = 10; // 10 is num of cloud
2460     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2461 
2462     /**
2463      * @tc.steps:step2. Set asset download status error and sync
2464      * @tc.expected: step2. sync successful but download assets fail.
2465      */
2466     g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2467     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2468 
2469     /**
2470      * @tc.steps:step3. Set asset download status OK and sync
2471      * @tc.expected: step3. return OK.
2472      */
2473     g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
2474     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2475 
2476     /**
2477      * @tc.steps:step4. Check assets status
2478      * @tc.expected: step4. status is NORMAL.
2479      */
2480     std::string sql = "SELECT assets FROM " + ASSETS_TABLE_NAME + ";";
2481     sqlite3_stmt *stmt = nullptr;
2482     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
2483     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2484         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
2485         Type cloudValue;
2486         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
2487         Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
2488         for (const auto &asset : assets) {
2489             EXPECT_EQ(asset.status, AssetStatus::NORMAL);
2490         }
2491     }
2492     int errCode = E_OK;
2493     SQLiteUtils::ResetStatement(stmt, true, errCode);
2494     EXPECT_EQ(errCode, E_OK);
2495 }
2496 
2497 /**
2498  * @tc.name: DownloadAssetTest003
2499  * @tc.desc: Test asset download after sync task recovery
2500  * @tc.type: FUNC
2501  * @tc.require:
2502  * @tc.author: liaoyonghuang
2503  */
2504 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest003, TestSize.Level1)
2505 {
2506     /**
2507      * @tc.steps:step1. init data
2508      * @tc.expected: step1. return OK.
2509      */
2510     int cloudCount = 10; // 10 is num of cloud
2511     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2512     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2513     DeleteCloudDBData(0, cloudCount, ASSETS_TABLE_NAME);
2514     InsertCloudDBData(0, cloudCount, 0, NO_PRIMARY_TABLE);
2515     /**
2516      * @tc.steps:step2. Set task interrupted before asset download
2517      * @tc.expected: step2. return OK.
2518      */
2519     int queryTime = 0;
__anon09c4f3ca3102(const std::string &, VBucket &) 2520     g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2521         queryTime++;
2522         if (queryTime != 1) {
2523             return;
2524         }
2525         Query query = Query::Select().FromTable({NO_PRIMARY_TABLE});
2526         CloudSyncOption option;
2527         option.priorityTask = true;
2528         option.devices = {DEVICE_CLOUD};
2529         option.mode = SYNC_MODE_CLOUD_MERGE;
2530         option.query = query;
2531         ASSERT_EQ(g_delegate->Sync(option, nullptr), OK);
2532     });
2533     /**
2534      * @tc.steps:step3. Sync
2535      * @tc.expected: step3. return OK.
2536      */
2537     int removeTime = 0;
__anon09c4f3ca3202(std::map<std::string, Assets> &assets) 2538     g_virtualAssetLoader->SetRemoveLocalAssetsCallback([&](std::map<std::string, Assets> &assets) {
2539         removeTime++;
2540         return OK;
2541     });
2542     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2543     /**
2544      * @tc.steps:step4. Check fork asset download time and observer
2545      * @tc.expected: step4. return OK.
2546      */
2547     EXPECT_EQ(removeTime, cloudCount);
2548     ChangedData expectedChangeData1;
2549     ChangedData expectedChangeData2;
2550     expectedChangeData1.tableName = NO_PRIMARY_TABLE;
2551     expectedChangeData2.tableName = ASSETS_TABLE_NAME;
2552     expectedChangeData1.type = ChangedDataType::ASSET;
2553     expectedChangeData2.type = ChangedDataType::ASSET;
2554     expectedChangeData1.field.push_back(std::string("rowid"));
2555     expectedChangeData2.field.push_back(std::string("id"));
2556     for (int i = 0; i < cloudCount; i++) {
2557         expectedChangeData1.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 1});
2558         expectedChangeData2.primaryData[ChangeType::OP_DELETE].push_back({(int64_t)i});
2559     }
2560     g_observer->SetExpectedResult(expectedChangeData1);
2561     g_observer->SetExpectedResult(expectedChangeData2);
2562     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
2563     g_observer->ClearChangedData();
2564 
2565     g_virtualCloudDb->ForkInsertConflict(nullptr);
2566     g_virtualCloudDb->ForkQuery(nullptr);
2567     g_virtualAssetLoader->SetRemoveLocalAssetsCallback(nullptr);
2568 }
2569 
2570 /**
2571  * @tc.name: DownloadAssetTest004
2572  * @tc.desc: Test asset download failed with no asset data
2573  * @tc.type: FUNC
2574  * @tc.require:
2575  * @tc.author: zqq
2576  */
2577 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest004, TestSize.Level1)
2578 {
2579     /**
2580      * @tc.steps:step1. init data
2581      * @tc.expected: step1. return OK.
2582      */
2583     int cloudCount = 10; // 10 is num of cloud
2584     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME, true);
2585     InsertCloudDBData(cloudCount, cloudCount, cloudCount, ASSETS_TABLE_NAME, false);
2586     /**
2587      * @tc.steps:step2. Set asset download status error and sync
2588      * @tc.expected: step2. sync successful but download assets fail.
2589      */
2590     g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2591     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2592     /**
2593      * @tc.steps:step3. Check FLAG_DEVICE_CLOUD_INCONSISTENCY data
2594      * @tc.expected: step3. 10 data is cloud consistency because no asset.
2595      */
2596     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2597     std::string sql = "SELECT COUNT(1) FROM " + logName + " WHERE flag & 0x20 = 0";
2598     int count = 0;
2599     ASSERT_EQ(SQLiteUtils::GetCountBySql(db, sql, count), E_OK);
2600     EXPECT_EQ(count, cloudCount);
2601 }
2602 
2603 /**
2604  * @tc.name: RecordLockFuncTest001
2605  * @tc.desc: UNLOCKING->UNLOCKING Synchronous download failure wholly.
2606  * @tc.type: FUNC
2607  * @tc.author: lijun
2608  */
2609 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, RecordLockFuncTest001, TestSize.Level1)
2610 {
2611     /**
2612      * @tc.steps:step1. init local data
2613      * @tc.expected: step1. return OK.
2614      */
2615     int localCount = 100;
2616     int cloudCount = 100;
2617     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2618     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2619     std::string sql = "update " + logName + " SET status = 2 where data_key >=70;";
2620     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
2621     CheckLockStatus(db, 0, 69, LockStatus::UNLOCK);
2622     CheckLockStatus(db, 70, 99, LockStatus::LOCK);
2623     DeleteLocalRecord(db, 70, 30, ASSETS_TABLE_NAME);
2624 
2625     /**
2626      * @tc.steps:step2. init cloud data
2627      * @tc.expected: step2. return OK.
2628      */
2629     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2630     UpdateCloudDBData(0, 70, 0, 0, ASSETS_TABLE_NAME);
2631 
2632     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2633     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2634     int index = 0;
2635     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2636         .WillRepeatedly(
__anon09c4f3ca3302(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2637             [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
2638                 LOGD("Download GID:%s  %d", gid.c_str(), index);
2639                 index++;
2640                 if (index <= 30) {
2641                     return DBStatus::CLOUD_ERROR;
2642                 } else {
2643                     return DBStatus::OK;
2644                 }
2645 
2646             });
2647 
2648     std::mutex mtx;
2649     std::condition_variable cv;
2650     int queryIdx = 0;
2651     bool ready = false;
__anon09c4f3ca3402(const std::string &, VBucket &) 2652     g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2653         LOGD("query index:%d", ++queryIdx);
2654         if (queryIdx == 2) { // 2 is compensated sync
2655             std::unique_lock<std::mutex> lock(mtx);
2656             ready = true;
2657             cv.notify_one();
2658         }
2659     });
2660     g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2661     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2662 
2663     {
2664         std::unique_lock<std::mutex> lock(mtx);
__anon09c4f3ca3502null2665         cv.wait(lock, [&]{ return ready; });
2666     }
2667     g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
2668 
2669     std::this_thread::sleep_for(std::chrono::seconds(6));
2670     /**
2671      * @tc.steps:step3. check after compensated sync
2672      * @tc.expected: step3. all is UNLOCKING.
2673      */
2674     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2675 }
2676 
2677 /**
2678  * @tc.name: RecordLockFuncTest002
2679  * @tc.desc: Compensated synchronization, Locked data has not been synchronized. The first synchronization data is
2680  * based on the cloud, and the last synchronization data is based on the device.
2681  * @tc.type: FUNC
2682  * @tc.author: lijun
2683  */
2684 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, RecordLockFuncTest002, TestSize.Level1)
2685 {
2686     /**
2687      * @tc.steps:step1. init local data, modify data Status and initiate synchronization
2688      * @tc.expected: step1. return OK.
2689      */
2690     int localCount = 120;
2691     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2692     std::vector<std::vector<uint8_t>> hashKey;
2693     CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key >=100 ", db, hashKey);
2694     EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
2695     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2696     CheckLockStatus(db, 100, 119, LockStatus::LOCK);
2697     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
2698 
2699     /**
2700      * @tc.steps:step2. Check the synchronization result and log table status
2701      * @tc.expected: step2.100-109 is LOCK_CHANGE.
2702      */
2703     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2704     CheckLockStatus(db, 100, 119, LockStatus::LOCK);
2705     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1, 100, 109);
2706     CheckLockStatus(db, 100, 109, LockStatus::LOCK_CHANGE);
2707     CheckLockStatus(db, 110, 119, LockStatus::LOCK);
2708 
2709     /**
2710      * @tc.steps:step3. Synchronize and check the lock_change data status
2711      * @tc.expected: step3.100-119 is LOCK_CHANGE.
2712      */
2713     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2714     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2715     CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2716 
2717     /**
2718      * @tc.steps:step4. Unlock,the lock_change data status changes to unlocking
2719      * @tc.expected: step4.100-119 is UNLOCKING.
2720      */
2721     EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
2722     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2723     CheckLockStatus(db, 100, 119, LockStatus::UNLOCKING);
2724 
2725     /**
2726      * @tc.steps:step5. Lock,the unlocking data status changes to lock_change
2727      * @tc.expected: step5.100-119 is LOCK_CHANGE.
2728      */
2729     EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
2730     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2731     CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2732 
2733     /**
2734      * @tc.steps:step6. Synchronize and check the lock_change data status
2735      * @tc.expected: step6.100-119 is LOCK_CHANGE.
2736      */
2737     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2738     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2739     CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2740 
2741     /**
2742      * @tc.steps:step7. Unlock,the lock_change data status changes to unlocking
2743      * @tc.expected: step7.100-119 is UNLOCKING.
2744      */
2745     EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
2746     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2747     CheckLockStatus(db, 100, 119, LockStatus::UNLOCKING);
2748 
2749     /**
2750      * @tc.steps:step8. Synchronize data
2751      * @tc.expected: step8.return OK.
2752      */
2753     std::mutex mtx;
2754     std::condition_variable cv;
2755     int queryIdx = 0;
2756     bool ready = false;
__anon09c4f3ca3602(const std::string &, VBucket &) 2757     g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2758         LOGD("query index:%d", ++queryIdx);
2759         if (queryIdx == 5) { // 5 is compensated sync
2760             std::unique_lock<std::mutex> lock(mtx);
2761             ready = true;
2762             cv.notify_one();
2763         }
2764     });
2765     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2766     {
2767         std::unique_lock<std::mutex> lock(mtx);
__anon09c4f3ca3702null2768         cv.wait(lock, [&]{ return ready; });
2769     }
2770 
2771     std::this_thread::sleep_for(std::chrono::seconds(6));
2772     /**
2773      * @tc.steps:step9. check after compensated sync
2774      * @tc.expected: step9. all is UNLOCK.
2775      */
2776     CheckLockStatus(db, 0, 119, LockStatus::UNLOCK);
2777 }
2778 
2779 /**
2780   * @tc.name: CloudTaskStatusTest001
2781   * @tc.desc: Test get cloud task status
2782   * @tc.type: FUNC
2783   * @tc.require:
2784   * @tc.author: liaoyonghuang
2785   */
2786 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, CloudTaskStatusTest001, TestSize.Level1)
2787 {
2788     /**
2789      * @tc.steps:step1. init data
2790      * @tc.expected: step1. return OK.
2791      */
2792     int cloudCount = 10; // 10 is num of cloud
2793     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2794 
2795     /**
2796      * @tc.steps:step2. Sync and get cloud task status
2797      * @tc.expected: step2. OK
2798      */
2799     g_virtualCloudDb->SetBlockTime(1000);
__anon09c4f3ca3802() 2800     std::thread syncThread([&]() {
2801         CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
2802     });
2803     std::this_thread::sleep_for(std::chrono::milliseconds(100));
2804     SyncProcess process1 = g_delegate->GetCloudTaskStatus(UINT64_MAX);
2805     EXPECT_EQ(process1.errCode, OK);
2806     syncThread.join();
2807     /**
2808      * @tc.steps:step3. Get cloud task status after sync finish
2809      * @tc.expected: step3. NOT_FOUND
2810      */
2811     SyncProcess process2 = g_delegate->GetCloudTaskStatus(1);
2812     EXPECT_EQ(process2.errCode, NOT_FOUND);
2813 
2814     /**
2815      * @tc.steps:step4. Get cloud task status after DB closed
2816      * @tc.expected: step4. DB_ERROR
2817      */
2818     auto delegateImpl = static_cast<RelationalStoreDelegateImpl *>(g_delegate);
2819     EXPECT_EQ(delegateImpl->Close(), DBStatus::OK);
2820     SyncProcess process3 = g_delegate->GetCloudTaskStatus(1);
2821     EXPECT_EQ(process3.errCode, DB_ERROR);
2822 }
2823 
2824 /**
2825   * @tc.name: CloudTaskStatusTest002
2826   * @tc.desc: Test get cloud task status when task merge
2827   * @tc.type: FUNC
2828   * @tc.require:
2829   * @tc.author: suyue
2830   */
2831 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, CloudTaskStatusTest002, TestSize.Level1)
2832 {
2833     /**
2834      * @tc.steps:step1. init data
2835      * @tc.expected: step1. return OK
2836      */
2837     int cloudCount = 100; // 100 is num of cloud
2838     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2839 
2840     /**
2841      * @tc.steps:step2. sync tasks 2 and 3 that can be merged when synchronizing task 1, get status of the merge task
2842      * @tc.expected: step2. the errCode of task 2 and task 3 is OK or NOT_FOUND
2843      */
2844     g_virtualCloudDb->SetBlockTime(1000);
2845     Query query = Query::Select().FromTable({ASSETS_TABLE_NAME});
__anon09c4f3ca3902(const std::map<std::string, SyncProcess> &process) 2846     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2847         std::unique_lock<std::mutex> lock(g_processMutex);
2848         if (process.begin()->second.process == FINISHED) {
2849             g_processCondition.notify_one();
2850         }
2851     };
2852     CloudSyncOption option = {.devices = {DEVICE_CLOUD}, .mode = SYNC_MODE_CLOUD_MERGE, .query = query,
2853         .waitTime = SYNC_WAIT_TIME, .lockAction = static_cast<LockAction>(0xff)};
2854 
__anon09c4f3ca3a02() 2855     std::thread syncThread1([option, callback]() {
2856         ASSERT_EQ(g_delegate->Sync(option, callback), DBStatus::OK);
2857     });
2858     option.merge = true;
__anon09c4f3ca3b02() 2859     std::thread syncThread2([option, callback]() {
2860         ASSERT_EQ(g_delegate->Sync(option, callback), DBStatus::OK);
2861     });
__anon09c4f3ca3c02() 2862     std::thread syncThread3([option, callback]() {
2863         ASSERT_EQ(g_delegate->Sync(option, callback), DBStatus::OK);
2864     });
2865     std::this_thread::sleep_for(std::chrono::milliseconds(100));
2866     SyncProcess process1 = g_delegate->GetCloudTaskStatus(1);
2867     SyncProcess process2 = g_delegate->GetCloudTaskStatus(2);
2868     SyncProcess process3 = g_delegate->GetCloudTaskStatus(3);
2869     syncThread1.join();
2870     syncThread2.join();
2871     syncThread3.join();
2872     // Due to the task execution sequence, task 2 may be combined into 3 or task 3 may be combined into 2.
2873     // Therefore, the errCode of task 2 and 3 may be OK or NOT_FOUND.
2874     EXPECT_TRUE(process2.errCode == OK  || process2.errCode == NOT_FOUND);
2875     EXPECT_TRUE(process3.errCode == OK  || process3.errCode == NOT_FOUND);
2876 }
2877 
2878 /**
2879   * @tc.name: CompensatedSyncTest001
2880   * @tc.desc: test compensated count more than 100.
2881   * @tc.type: FUNC
2882   * @tc.require:
2883   * @tc.author: tankaisheng
2884   */
2885 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, CompensatedSyncTest001, TestSize.Level1)
2886 {
2887     /**
2888      * @tc.steps:step1. init data
2889      * @tc.expected: step1. return OK.
2890      */
2891     int dataCount = 120;
2892     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
2893     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
2894 
2895     /**
2896      * @tc.steps:step2. set all data wait compensated.
2897      * @tc.expected: step2. return ok.
2898      */
2899     std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " set flag=flag|0x10;";
2900     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), nullptr, nullptr, nullptr), SQLITE_OK);
2901     sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " where flag&0x10=0x10;";
2902     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
2903         reinterpret_cast<void *>(120u), nullptr), SQLITE_OK);
2904 
2905     /**
2906      * @tc.steps:step3. sync with compensated.
2907      * @tc.expected: step3. return ok.
2908      */
2909     std::mutex processMutex;
2910     std::vector<SyncProcess> expectProcess;
2911     std::condition_variable cv;
2912     bool finish = false;
2913     auto callback = [&cv, &finish, &processMutex]
__anon09c4f3ca3d02(const std::map<std::string, SyncProcess> &process) 2914         (const std::map<std::string, SyncProcess> &process) {
2915         for (auto &item : process) {
2916             if (item.second.process == FINISHED) {
2917                 EXPECT_EQ(item.second.errCode, DBStatus::OK);
2918                 std::unique_lock<std::mutex> lock(processMutex);
2919                 finish = true;
2920                 cv.notify_one();
2921             }
2922         }
2923     };
2924     CloudSyncOption option;
2925     option.devices = {DEVICE_CLOUD};
2926     option.priorityTask = true;
2927     option.compensatedSyncOnly = true;
2928     DBStatus syncResult = g_delegate->Sync(option, callback);
2929     EXPECT_EQ(syncResult, DBStatus::OK);
2930 
2931     /**
2932      * @tc.steps:step4. wait sync finish and check data.
2933      * @tc.expected: step4. return ok.
2934      */
2935     std::unique_lock<std::mutex> lock(processMutex);
__anon09c4f3ca3e02() 2936     cv.wait(lock, [&finish]() {
2937         return finish;
2938     });
2939     std::this_thread::sleep_for(std::chrono::seconds(1));
2940     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
2941         reinterpret_cast<void *>(0u), nullptr), SQLITE_OK);
2942 }
2943 
2944 /**
2945   * @tc.name: CloudCooperationTrackerTableSync001
2946   * @tc.desc: test CLOUD_COOPERATION tracker table sync.
2947   * @tc.type: FUNC
2948   * @tc.require:
2949   * @tc.author: tankaisheng
2950   */
2951 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, CloudCooperationTrackerTableSync001, TestSize.Level1)
2952 {
2953     /**
2954      * @tc.steps:step1. init data
2955      * @tc.expected: step1. return OK.
2956      */
2957     int dataCount = 120;
2958     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
2959     TrackerSchema trackerSchema = {
2960         .tableName = ASSETS_TABLE_NAME, .extendColNames = {"int_field1"}, .trackerColNames = {"int_field1"}};
2961     g_delegate->SetTrackerTable(trackerSchema);
2962     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
2963 }
2964 
2965 /**
2966  * @tc.name: FillAssetId026
2967  * @tc.desc: Test if assetId is null when removedevicedata in FLAG_AND_DATA
2968  * @tc.type: FUNC
2969  * @tc.require:
2970  * @tc.author: tankaisheng
2971  */
2972 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId026, TestSize.Level0)
2973 {
2974     /**
2975      * @tc.steps:step1. local insert assets and sync, check the local assetId.
2976      * @tc.expected: step1. return OK.
2977      */
2978     int localCount = 10;
2979     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
2980     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2981     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
2982 
2983     /**
2984      * @tc.steps:step2. remove device data then check record.
2985      * @tc.expected: step2. return OK.
2986      */
2987     g_delegate->RemoveDeviceData("", FLAG_AND_DATA);
2988     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
2989     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2990     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
2991     g_delegate->RemoveDeviceData("", CLEAR_SHARED_TABLE);
2992     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
2993 }
2994 } // namespace
2995 #endif // RELATIONAL_STORE
2996