• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "cloud/asset_operation_utils.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "mock_asset_loader.h"
23 #include "process_system_api_adapter_impl.h"
24 #include "relational_store_client.h"
25 #include "relational_store_delegate_impl.h"
26 #include "relational_store_instance.h"
27 #include "relational_store_manager.h"
28 #include "runtime_config.h"
29 #include "sqlite_relational_store.h"
30 #include "sqlite_relational_utils.h"
31 #include "time_helper.h"
32 #include "virtual_asset_loader.h"
33 #include "virtual_cloud_data_translate.h"
34 #include "virtual_cloud_db.h"
35 #include "virtual_communicator_aggregator.h"
36 #include <gtest/gtest.h>
37 #include <iostream>
38 
39 using namespace testing::ext;
40 using namespace DistributedDB;
41 using namespace DistributedDBUnitTest;
42 using namespace std;
43 
44 namespace {
45 const string STORE_ID = "Relational_Store_SYNC";
46 const string DB_SUFFIX = ".db";
47 const string ASSETS_TABLE_NAME = "student";
48 const string ASSETS_TABLE_NAME_SHARED = "student_shared";
49 const string NO_PRIMARY_TABLE = "teacher";
50 const string NO_PRIMARY_TABLE_SHARED = "teacher_shared";
51 const string COMPOUND_PRIMARY_TABLE = "worker1";
52 const string COMPOUND_PRIMARY_TABLE_SHARED = "worker1_shared";
53 const string DEVICE_CLOUD = "cloud_dev";
54 const string COL_ID = "id";
55 const string COL_NAME = "name";
56 const string COL_HEIGHT = "height";
57 const string COL_ASSET = "asset";
58 const string COL_ASSETS = "assets";
59 const string COL_AGE = "age";
60 const int64_t SYNC_WAIT_TIME = 600;
61 const int64_t COMPENSATED_SYNC_WAIT_TIME = 5;
62 const std::vector<Field> CLOUD_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true}, {COL_NAME, TYPE_INDEX<std::string>},
63     {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
64     {COL_AGE, TYPE_INDEX<int64_t>}};
65 const std::vector<Field> NO_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>}, {COL_NAME, TYPE_INDEX<std::string>},
66     {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
67     {COL_AGE, TYPE_INDEX<int64_t>}};
68 const std::vector<Field> COMPOUND_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true},
69     {COL_NAME, TYPE_INDEX<std::string>}, {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>},
70     {COL_ASSETS, TYPE_INDEX<Assets>}, {COL_AGE, TYPE_INDEX<int64_t>, true}};
71 const string CREATE_SINGLE_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + ASSETS_TABLE_NAME + "(" + COL_ID +
72     " INTEGER PRIMARY KEY," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," +
73     COL_ASSETS + " ASSETS," + COL_AGE + " INT);";
74 const string CREATE_NO_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + NO_PRIMARY_TABLE + "(" + COL_ID +
75     " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS +
76     " ASSETS," + COL_AGE + " INT);";
77 const string CREATE_COMPOUND_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + COMPOUND_PRIMARY_TABLE + "(" + COL_ID +
78     " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS + " ASSETS," +
79     COL_AGE + " INT, PRIMARY KEY (id, age));";
80 const Asset ASSET_COPY = {.version = 1,
81     .name = "Phone",
82     .assetId = "0",
83     .subpath = "/local/sync",
84     .uri = "/local/sync",
85     .modifyTime = "123456",
86     .createTime = "",
87     .size = "256",
88     .hash = "ASE"};
89 const Asset ASSET_COPY2 = {.version = 1,
90     .name = "Phone_copy_2",
91     .assetId = "0",
92     .subpath = "/local/sync",
93     .uri = "/local/sync",
94     .modifyTime = "123456",
95     .createTime = "",
96     .size = "256",
97     .hash = "ASE"};
98 const Assets ASSETS_COPY1 = { ASSET_COPY, ASSET_COPY2 };
99 const std::string QUERY_CONSISTENT_SQL = "select count(*) from naturalbase_rdb_aux_student_log where flag&0x20=0;";
100 const std::string QUERY_COMPENSATED_SQL = "select count(*) from naturalbase_rdb_aux_student_log where flag&0x10!=0;";
101 
102 string g_storePath;
103 string g_testDir;
104 RelationalStoreObserverUnitTest *g_observer = nullptr;
105 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
106 RelationalStoreDelegate *g_delegate = nullptr;
107 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
108 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
109 std::shared_ptr<VirtualCloudDataTranslate> g_virtualCloudDataTranslate;
110 SyncProcess g_syncProcess;
111 std::condition_variable g_processCondition;
112 std::mutex g_processMutex;
113 IRelationalStore *g_store = nullptr;
114 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
115 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
116 
InitDatabase(sqlite3 * & db)117 void InitDatabase(sqlite3 *&db)
118 {
119     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
120     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_SINGLE_PRIMARY_KEY_TABLE), SQLITE_OK);
121     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_NO_PRIMARY_KEY_TABLE), SQLITE_OK);
122     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_COMPOUND_PRIMARY_KEY_TABLE), SQLITE_OK);
123 }
124 
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)125 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
126 {
127     TableSchema assetsTableSchema = {.name = ASSETS_TABLE_NAME, .sharedTableName = ASSETS_TABLE_NAME_SHARED,
128                                      .fields = CLOUD_FIELDS};
129     dataBaseSchema.tables.push_back(assetsTableSchema);
130     assetsTableSchema = {.name = NO_PRIMARY_TABLE, .sharedTableName = NO_PRIMARY_TABLE_SHARED,
131                          .fields = NO_PRIMARY_FIELDS};
132     dataBaseSchema.tables.push_back(assetsTableSchema);
133     assetsTableSchema = {.name = COMPOUND_PRIMARY_TABLE, .sharedTableName = COMPOUND_PRIMARY_TABLE_SHARED,
134                          .fields = COMPOUND_PRIMARY_FIELDS};
135     dataBaseSchema.tables.push_back(assetsTableSchema);
136 }
137 
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,std::vector<VBucket> & record,std::vector<VBucket> & extend)138 void GenerateDataRecords(
139     int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend)
140 {
141     for (int64_t i = begin; i < begin + count; i++) {
142         Assets assets;
143         Asset asset = ASSET_COPY;
144         asset.name = ASSET_COPY.name + std::to_string(i);
145         assets.emplace_back(asset);
146         asset.name = ASSET_COPY.name + std::to_string(i) + "_copy";
147         assets.emplace_back(asset);
148         VBucket data;
149         data.insert_or_assign(COL_ID, i);
150         data.insert_or_assign(COL_NAME, "name" + std::to_string(i));
151         data.insert_or_assign(COL_HEIGHT, 166.0 * i); // 166.0 is random double value
152         data.insert_or_assign(COL_ASSETS, assets);
153         data.insert_or_assign(COL_AGE, 18L + i); // 18 is random int value
154         record.push_back(data);
155 
156         VBucket log;
157         Timestamp now = TimeHelper::GetSysCurrentTime();
158         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
159         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
160         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
161         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
162         extend.push_back(log);
163     }
164 }
165 
InsertLocalData(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName,bool isAssetNull=true)166 void InsertLocalData(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName, bool isAssetNull = true)
167 {
168     int errCode;
169     std::vector<VBucket> record;
170     std::vector<VBucket> extend;
171     GenerateDataRecords(begin, count, 0, record, extend);
172     const string sql = "insert or replace into " + tableName + " values (?,?,?,?,?,?);";
173     for (VBucket vBucket : record) {
174         sqlite3_stmt *stmt = nullptr;
175         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
176         ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, std::get<int64_t>(vBucket[COL_ID])), E_OK); // 1 is id
177         ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, std::get<string>(vBucket[COL_NAME])), E_OK); // 2 is name
178         ASSERT_EQ(SQLiteUtils::MapSQLiteErrno(
179             sqlite3_bind_double(stmt, 3, std::get<double>(vBucket[COL_HEIGHT]))), E_OK); // 3 is height
180         if (isAssetNull) {
181             ASSERT_EQ(sqlite3_bind_null(stmt, 4), SQLITE_OK); // 4 is asset
182         } else {
183             std::vector<uint8_t> assetBlob = g_virtualCloudDataTranslate->AssetToBlob(ASSET_COPY);
184             ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 4, assetBlob, false), E_OK); // 4 is asset
185         }
186         std::vector<uint8_t> assetsBlob = g_virtualCloudDataTranslate->AssetsToBlob(
187             std::get<Assets>(vBucket[COL_ASSETS]));
188         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 5, assetsBlob, false), E_OK); // 5 is assets
189         ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 6, std::get<int64_t>(vBucket[COL_AGE])), E_OK); // 6 is age
190         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
191         SQLiteUtils::ResetStatement(stmt, true, errCode);
192     }
193 }
194 
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets,bool isEmptyAssets=false)195 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets, bool isEmptyAssets = false)
196 {
197     int errCode;
198     std::vector<uint8_t> assetBlob;
199     const string sql = "update " + tableName + " set assets=?;";
200     sqlite3_stmt *stmt = nullptr;
201     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
202     if (isEmptyAssets) {
203         ASSERT_EQ(sqlite3_bind_null(stmt, 1), SQLITE_OK);
204     } else {
205         assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
206         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
207     }
208     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
209     SQLiteUtils::ResetStatement(stmt, true, errCode);
210 }
211 
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets,int32_t begin,int32_t end)212 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets, int32_t begin, int32_t end)
213 {
214     int errCode;
215     std::vector<uint8_t> assetBlob;
216     const string sql = "update " + tableName + " set assets=? " + "where id>=" + std::to_string(begin) +
217         " and id<=" + std::to_string(end) + ";";
218     sqlite3_stmt *stmt = nullptr;
219     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
220     assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
221     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
222     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
223     SQLiteUtils::ResetStatement(stmt, true, errCode);
224 }
225 
DeleteLocalRecord(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName)226 void DeleteLocalRecord(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName)
227 {
228     ASSERT_NE(db, nullptr);
229     for (int64_t i = begin; i < begin + count; i++) {
230         string sql = "DELETE FROM " + tableName + " WHERE id ='" + std::to_string(i) + "';";
231         ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db, sql), E_OK);
232     }
233 }
234 
DeleteCloudDBData(int64_t begin,int64_t count,const std::string & tableName)235 void DeleteCloudDBData(int64_t begin, int64_t count, const std::string &tableName)
236 {
237     for (int64_t i = begin; i < begin + count; i++) {
238         VBucket idMap;
239         idMap.insert_or_assign("#_gid", std::to_string(i));
240         ASSERT_EQ(g_virtualCloudDb->DeleteByGid(tableName, idMap), DBStatus::OK);
241     }
242 }
243 
UpdateCloudDBData(int64_t begin,int64_t count,int64_t gidStart,int64_t versionStart,const std::string & tableName)244 void UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart, int64_t versionStart,
245     const std::string &tableName)
246 {
247     std::this_thread::sleep_for(std::chrono::milliseconds(1));
248     std::vector<VBucket> record;
249     std::vector<VBucket> extend;
250     GenerateDataRecords(begin, count, gidStart, record, extend);
251     for (auto &entry: extend) {
252         entry[CloudDbConstant::VERSION_FIELD] = std::to_string(versionStart++);
253     }
254     ASSERT_EQ(g_virtualCloudDb->BatchUpdate(tableName, std::move(record), extend), DBStatus::OK);
255     std::this_thread::sleep_for(std::chrono::milliseconds(1));
256 }
257 
QueryStatusCallback(void * data,int count,char ** colValue,char ** colName)258 int QueryStatusCallback(void *data, int count, char **colValue, char **colName)
259 {
260     auto status = static_cast<std::vector<int64_t> *>(data);
261     const int decimal = 10;
262     for (int i = 0; i < count; i++) {
263         status->push_back(strtol(colValue[0], nullptr, decimal));
264     }
265     return 0;
266 }
267 
CheckLockStatus(sqlite3 * db,int startId,int endId,LockStatus lockStatus)268 void CheckLockStatus(sqlite3 *db, int startId, int endId, LockStatus lockStatus)
269 {
270     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
271     std::string sql = "select status from " + logName + " where data_key >=" + std::to_string(startId) +
272         " and data_key <=" +  std::to_string(endId) + ";";
273     std::vector<int64_t> status;
274     char *str = NULL;
275     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryStatusCallback, static_cast<void *>(&status), &str),
276         SQLITE_OK);
277     ASSERT_EQ(static_cast<size_t>(endId - startId + 1), status.size());
278 
279     for (auto stat : status) {
280         ASSERT_EQ(static_cast<int64_t>(lockStatus), stat);
281     }
282 }
283 
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const std::string & tableName)284 void InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart, const std::string &tableName)
285 {
286     std::vector<VBucket> record;
287     std::vector<VBucket> extend;
288     GenerateDataRecords(begin, count, gidStart, record, extend);
289     if (tableName == ASSETS_TABLE_NAME_SHARED) {
290         for (auto &vBucket: record) {
291             vBucket.insert_or_assign(CloudDbConstant::CLOUD_OWNER, std::string("cloudA"));
292         }
293     }
294     ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(tableName, std::move(record), extend), DBStatus::OK);
295 }
296 
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)297 void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
298 {
299     std::unique_lock<std::mutex> lock(g_processMutex);
300     bool result = g_processCondition.wait_for(
301         lock, std::chrono::seconds(waitTime), [&syncProcess]() { return syncProcess.process == FINISHED; });
302     ASSERT_EQ(result, true);
303     LOGD("-------------------sync end--------------");
304 }
305 
CallSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus,DBStatus errCode=OK,bool isMerge=false)306 void CallSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus, DBStatus errCode = OK,
307     bool isMerge = false)
308 {
309     g_syncProcess = {};
310     Query query = Query::Select().FromTable(tableNames);
311     std::vector<SyncProcess> expectProcess;
312     CloudSyncStatusCallback callback = [&errCode](const std::map<std::string, SyncProcess> &process) {
313         ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
314         std::unique_lock<std::mutex> lock(g_processMutex);
315         g_syncProcess = process.begin()->second;
316         if (g_syncProcess.process == FINISHED) {
317             g_processCondition.notify_one();
318             ASSERT_EQ(g_syncProcess.errCode, errCode);
319         }
320     };
321     CloudSyncOption option;
322     option.devices = {DEVICE_CLOUD};
323     option.mode = mode;
324     option.query = query;
325     option.waitTime = SYNC_WAIT_TIME;
326     option.lockAction = static_cast<LockAction>(0xff); // lock all
327     option.merge = isMerge;
328     ASSERT_EQ(g_delegate->Sync(option, callback), dbStatus);
329 
330     if (dbStatus == DBStatus::OK) {
331         WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
332     }
333 }
334 
CheckDownloadForTest001(int index,map<std::string,Assets> & assets)335 void CheckDownloadForTest001(int index, map<std::string, Assets> &assets)
336 {
337     for (auto &item : assets) {
338         for (auto &asset : item.second) {
339             EXPECT_EQ(AssetOperationUtils::EraseBitMask(asset.status), static_cast<uint32_t>(AssetStatus::INSERT));
340             if (index < 4) { // 1-4 is inserted
341                 EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::INSERT));
342             }
343             LOGD("asset [name]:%s, [status]:%u, [flag]:%u, [index]:%d", asset.name.c_str(), asset.status, asset.flag,
344                 index);
345         }
346     }
347 }
348 
CheckDownloadFailedForTest002(sqlite3 * & db)349 void CheckDownloadFailedForTest002(sqlite3 *&db)
350 {
351     std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME;
352     sqlite3_stmt *stmt = nullptr;
353     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
354     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
355         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
356         Type cloudValue;
357         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
358         std::vector<uint8_t> assetsBlob;
359         Assets assets;
360         ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
361         ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
362         ASSERT_EQ(assets.size(), 2u); // 2 is asset num
363         for (size_t i = 0; i < assets.size(); ++i) {
364             EXPECT_EQ(assets[i].status, AssetStatus::ABNORMAL);
365         }
366     }
367     int errCode;
368     SQLiteUtils::ResetStatement(stmt, true, errCode);
369 }
370 
UpdateAssetsForLocal(sqlite3 * & db,int id,uint32_t status)371 void UpdateAssetsForLocal(sqlite3 *&db, int id, uint32_t status)
372 {
373     Assets assets;
374     Asset asset = ASSET_COPY;
375     asset.name = ASSET_COPY.name + std::to_string(id);
376     asset.status = status;
377     assets.emplace_back(asset);
378     asset.name = ASSET_COPY.name + std::to_string(id) + "_copy";
379     assets.emplace_back(asset);
380     int errCode;
381     std::vector<uint8_t> assetBlob;
382     const string sql = "update " + ASSETS_TABLE_NAME + " set assets=? where id = " + std::to_string(id);
383     sqlite3_stmt *stmt = nullptr;
384     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
385     assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
386     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
387     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
388     SQLiteUtils::ResetStatement(stmt, true, errCode);
389 }
390 
CheckConsistentCount(sqlite3 * db,int64_t expectCount)391 void CheckConsistentCount(sqlite3 *db, int64_t expectCount)
392 {
393     EXPECT_EQ(sqlite3_exec(db, QUERY_CONSISTENT_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
394         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
395 }
396 
CheckCompensatedCount(sqlite3 * db,int64_t expectCount)397 void CheckCompensatedCount(sqlite3 *db, int64_t expectCount)
398 {
399     EXPECT_EQ(sqlite3_exec(db, QUERY_COMPENSATED_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
400         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
401 }
402 
CloseDb()403 void CloseDb()
404 {
405     if (g_delegate != nullptr) {
406         EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
407         g_delegate = nullptr;
408     }
409     delete g_observer;
410     g_virtualCloudDb = nullptr;
411 }
412 
413 class DistributedDBCloudSyncerDownloadAssetsTest : public testing::Test {
414 public:
415     static void SetUpTestCase(void);
416     static void TearDownTestCase(void);
417     void SetUp();
418     void TearDown();
419 
420 protected:
421     void CheckLocaLAssets(const std::string &tableName, const std::string &expectAssetId,
422         const std::set<int> &failIndex);
423     void CheckLocalAssetIsEmpty(const std::string &tableName);
424     void CheckCursorData(const std::string &tableName, int begin);
425     void WaitForSync(int &syncCount);
426     const RelationalSyncAbleStorage *GetRelationalStore();
427     void InitDataStatusTest(bool needDownload);
428     void DataStatusTest001(bool needDownload);
429     void DataStatusTest003();
430     void DataStatusTest004();
431     void DataStatusTest005();
432     void DataStatusTest006();
433     void DataStatusTest007();
434     sqlite3 *db = nullptr;
435     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
436 };
437 
SetUpTestCase(void)438 void DistributedDBCloudSyncerDownloadAssetsTest::SetUpTestCase(void)
439 {
440     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
441     g_storePath = g_testDir + "/" + STORE_ID + DB_SUFFIX;
442     LOGI("The test db is:%s", g_storePath.c_str());
443     g_virtualCloudDataTranslate = std::make_shared<VirtualCloudDataTranslate>();
444     RuntimeConfig::SetCloudTranslate(g_virtualCloudDataTranslate);
445 }
446 
TearDownTestCase(void)447 void DistributedDBCloudSyncerDownloadAssetsTest::TearDownTestCase(void) {}
448 
SetUp(void)449 void DistributedDBCloudSyncerDownloadAssetsTest::SetUp(void)
450 {
451     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
452     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
453         LOGE("rm test db files error.");
454     }
455     DistributedDBToolsUnitTest::PrintTestCaseInfo();
456     LOGD("Test dir is %s", g_testDir.c_str());
457     db = RelationalTestUtils::CreateDataBase(g_storePath);
458     ASSERT_NE(db, nullptr);
459     InitDatabase(db);
460     g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
461     ASSERT_NE(g_observer, nullptr);
462     ASSERT_EQ(
463         g_mgr.OpenStore(g_storePath, STORE_ID, RelationalStoreDelegate::Option{.observer = g_observer}, g_delegate),
464         DBStatus::OK);
465     ASSERT_NE(g_delegate, nullptr);
466     ASSERT_EQ(g_delegate->CreateDistributedTable(ASSETS_TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
467     ASSERT_EQ(g_delegate->CreateDistributedTable(NO_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
468     ASSERT_EQ(g_delegate->CreateDistributedTable(COMPOUND_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
469     g_virtualCloudDb = make_shared<VirtualCloudDb>();
470     g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
471     g_syncProcess = {};
472     ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
473     ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
474     DataBaseSchema dataBaseSchema;
475     GetCloudDbSchema(dataBaseSchema);
476     ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
477     g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
478     ASSERT_NE(g_cloudStoreHook, nullptr);
479     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
480     ASSERT_TRUE(communicatorAggregator_ != nullptr);
481     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
482 }
483 
TearDown(void)484 void DistributedDBCloudSyncerDownloadAssetsTest::TearDown(void)
485 {
486     RefObject::DecObjRef(g_store);
487     g_virtualCloudDb->ForkUpload(nullptr);
488     CloseDb();
489     EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
490     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
491         LOGE("rm test db files error.");
492     }
493     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
494     communicatorAggregator_ = nullptr;
495     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
496 }
497 
CheckLocaLAssets(const std::string & tableName,const std::string & expectAssetId,const std::set<int> & failIndex)498 void DistributedDBCloudSyncerDownloadAssetsTest::CheckLocaLAssets(const std::string &tableName,
499     const std::string &expectAssetId, const std::set<int> &failIndex)
500 {
501     std::string sql = "SELECT assets FROM " + tableName + ";";
502     sqlite3_stmt *stmt = nullptr;
503     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
504     int index = 0;
505     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
506         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
507         Type cloudValue;
508         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
509         Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
510         for (const auto &asset : assets) {
511             index++;
512             if (failIndex.find(index) != failIndex.end()) {
513                 EXPECT_EQ(asset.assetId, "0");
514             } else {
515                 EXPECT_EQ(asset.assetId, expectAssetId);
516             }
517         }
518     }
519     int errCode = E_OK;
520     SQLiteUtils::ResetStatement(stmt, true, errCode);
521 }
522 
CheckLocalAssetIsEmpty(const std::string & tableName)523 void DistributedDBCloudSyncerDownloadAssetsTest::CheckLocalAssetIsEmpty(const std::string &tableName)
524 {
525     std::string sql = "SELECT asset FROM " + tableName + ";";
526     sqlite3_stmt *stmt = nullptr;
527     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
528     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
529         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
530     }
531     int errCode = E_OK;
532     SQLiteUtils::ResetStatement(stmt, true, errCode);
533 }
534 
CheckCursorData(const std::string & tableName,int begin)535 void DistributedDBCloudSyncerDownloadAssetsTest::CheckCursorData(const std::string &tableName, int begin)
536 {
537     std::string logTableName = DBCommon::GetLogTableName(tableName);
538     std::string sql = "SELECT cursor FROM " + logTableName + ";";
539     sqlite3_stmt *stmt = nullptr;
540     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
541     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
542         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_INTEGER);
543         Type cloudValue;
544         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
545         EXPECT_EQ(std::get<int64_t>(cloudValue), begin);
546         begin++;
547     }
548     int errCode = E_OK;
549     SQLiteUtils::ResetStatement(stmt, true, errCode);
550 }
551 
WaitForSync(int & syncCount)552 void DistributedDBCloudSyncerDownloadAssetsTest::WaitForSync(int &syncCount)
553 {
554     std::unique_lock<std::mutex> lock(g_processMutex);
555     bool result = g_processCondition.wait_for(lock, std::chrono::seconds(COMPENSATED_SYNC_WAIT_TIME),
556         [&syncCount]() { return syncCount == 2; }); // 2 is compensated sync
557     ASSERT_EQ(result, true);
558 }
559 
GetRelationalStore()560 const RelationalSyncAbleStorage* DistributedDBCloudSyncerDownloadAssetsTest::GetRelationalStore()
561 {
562     RelationalDBProperties properties;
563     CloudDBSyncUtilsTest::InitStoreProp(g_storePath, APP_ID, USER_ID, STORE_ID, properties);
564     int errCode = E_OK;
565     g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
566     if (g_store == nullptr) {
567         return nullptr;
568     }
569     return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
570 }
571 
InitDataStatusTest(bool needDownload)572 void DistributedDBCloudSyncerDownloadAssetsTest::InitDataStatusTest(bool needDownload)
573 {
574     int cloudCount = 20;
575     int localCount = 10;
576     InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
577     if (needDownload) {
578         UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
579     }
580     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
581     std::string sql = "update " + logName + " SET status = 1 where data_key in (1,11);";
582     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
583     sql = "update " + logName + " SET status = 2 where data_key in (2,12);";
584     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
585     sql = "update " + logName + " SET status = 3 where data_key in (3,13);";
586     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
587     std::this_thread::sleep_for(std::chrono::milliseconds(1));
588     InsertCloudDBData(0, localCount, 0, ASSETS_TABLE_NAME);
589     std::this_thread::sleep_for(std::chrono::milliseconds(1));
590     sql = "update " + ASSETS_TABLE_NAME + " set age='666' where id in (4);";
591     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
592     sql = "update " + logName + " SET status = 1 where data_key in (4);";
593     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
594 }
595 
DataStatusTest001(bool needDownload)596 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest001(bool needDownload)
597 {
598     int cloudCount = 20;
599     int count = 0;
600     g_cloudStoreHook->SetSyncFinishHook([&count, cloudCount, this]() {
601         count++;
602         if (count == 1) {
603             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
604                 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (11, 4)) or (status = 0)";
605             CloudDBSyncUtilsTest::CheckCount(db, sql, cloudCount);
606         }
607         if (count == 2) { // 2 is compensated sync
608             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
609                 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
610             CloudDBSyncUtilsTest::CheckCount(db, sql, cloudCount);
611             g_processCondition.notify_one();
612         }
613     });
614     InitDataStatusTest(needDownload);
615     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
616     WaitForSync(count);
617 }
618 
DataStatusTest003()619 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest003()
620 {
621     int count = 0;
622     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
623         count++;
624         if (count == 1) {
625             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
626                 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0 and data_key = 11)";
627             CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
628         }
629         if (count == 2) { // 2 is compensated sync
630             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
631                 " (status = 3 and data_key in (0,2,3,12,13) or (status = 0))";
632             CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
633             g_processCondition.notify_one();
634         }
635     });
636     int downLoadCount = 0;
637     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
638         std::map<std::string, Assets> &assets) {
639         downLoadCount++;
640         if (downLoadCount == 1) {
641             std::vector<std::vector<uint8_t>> hashKey;
642             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
643             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
644         }
645     });
646     InitDataStatusTest(true);
647     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
648     WaitForSync(count);
649     g_virtualAssetLoader->ForkDownload(nullptr);
650 }
651 
DataStatusTest004()652 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest004()
653 {
654     int count = 0;
655     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
656         count++;
657         if (count == 1) {
658             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
659                 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (-1,11))";
660             CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
661         }
662         if (count == 2) { // 2 is compensated sync
663             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
664                 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
665             CloudDBSyncUtilsTest::CheckCount(db, sql, 19); // 19 is match count
666             g_processCondition.notify_one();
667         }
668     });
669     int downLoadCount = 0;
670     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
671         std::map<std::string, Assets> &assets) {
672         downLoadCount++;
673         if (downLoadCount == 1) {
674             std::vector<std::vector<uint8_t>> hashKey;
675             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
676             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
677             std::string sql = "delete from " + ASSETS_TABLE_NAME + " WHERE id=0";
678             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
679         }
680     });
681     int queryIdx = 0;
682     g_virtualCloudDb->ForkQuery([this, &queryIdx](const std::string &, VBucket &) {
683         LOGD("query index:%d", ++queryIdx);
684         if (queryIdx == 4) { // 4 is compensated sync
685             std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
686                 " SET status = 1 where data_key=15;";
687             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
688         }
689     });
690     InitDataStatusTest(true);
691     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
692     WaitForSync(count);
693     g_virtualAssetLoader->ForkDownload(nullptr);
694     g_virtualCloudDb->ForkQuery(nullptr);
695 }
696 
DataStatusTest005()697 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest005()
698 {
699     int count = 0;
700     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
701         count++;
702         if (count == 1) {
703             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
704                 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0 and data_key in (11))";
705             CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
706         }
707         if (count == 2) { // 2 is compensated sync
708             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
709                 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0)";
710             CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
711             g_processCondition.notify_one();
712         }
713     });
714     int downLoadCount = 0;
715     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
716         std::map<std::string, Assets> &assets) {
717         downLoadCount++;
718         if (downLoadCount == 1) {
719             std::vector<std::vector<uint8_t>> hashKey;
720             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
721             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
722             std::string sql = "update " + ASSETS_TABLE_NAME + " set name='x' WHERE id=0";
723             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
724         }
725     });
726     InitDataStatusTest(true);
727     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
728     WaitForSync(count);
729     g_virtualAssetLoader->ForkDownload(nullptr);
730 }
731 
DataStatusTest006()732 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest006()
733 {
734     int count = 0;
735     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
736         count++;
737         if (count == 1) {
738             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
739                 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (0)) or "
740                 "(status = 0 and data_key in (11))";
741             CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
742         }
743         if (count == 2) { // 2 is compensated sync
744             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
745                 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
746             CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
747             g_processCondition.notify_one();
748         }
749     });
750     int downLoadCount = 0;
751     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
752         std::map<std::string, Assets> &assets) {
753         downLoadCount++;
754         if (downLoadCount == 1) {
755             std::vector<std::vector<uint8_t>> hashKey;
756             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
757             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
758             std::string sql = "update " + ASSETS_TABLE_NAME + " set name='x' WHERE id=0";
759             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
760             EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
761         }
762     });
763     InitDataStatusTest(true);
764     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
765     WaitForSync(count);
766     g_virtualAssetLoader->ForkDownload(nullptr);
767 }
768 
DataStatusTest007()769 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest007()
770 {
771     int count = 0;
772     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
773         count++;
774         if (count == 1) {
775             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
776                 " (status = 3 and data_key in (2,3,13)) or (status = 1 and data_key in (1,11))";
777             CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
778         }
779         if (count == 2) { // 2 is compensated sync
780             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
781                 " (status = 3 and data_key in (2,3,13)) or (status = 1 and data_key in (1,11))";
782             CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
783             g_processCondition.notify_one();
784         }
785     });
786     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
787     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
788     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
789         .WillRepeatedly([](const std::string &, const std::string &gid, const Type &,
790             std::map<std::string, Assets> &assets) {
791             return CLOUD_ERROR;
792         });
793     InitDataStatusTest(true);
794     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
795     WaitForSync(count);
796 }
797 
798 /*
799  * @tc.name: DownloadAssetForDupDataTest001
800  * @tc.desc: Test the download interface call with duplicate data for the same primary key.
801  * @tc.type: FUNC
802  * @tc.require:
803  * @tc.author: liufuchenxing
804  */
805 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest001, TestSize.Level0)
806 {
807     /**
808      * @tc.steps:step1. Mock asset download interface.
809      * @tc.expected: step1. return OK and interface will be called 4 times. delete 1, delete 2, insert 1, insert 2
810      */
811     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
812     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
813     int index = 1;
814     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
815         .Times(2)
816         .WillRepeatedly(
__anon3db76c681102(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 817             [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
818                 LOGD("Download GID:%s", gid.c_str());
819                 CheckDownloadForTest001(index, assets);
820                 index++;
821                 return DBStatus::OK;
822             });
823 
824     /**
825      * @tc.steps:step2. Insert local data [0, 10), sync data
826      * @tc.expected: step2. sync success.
827      */
828     InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME);
829     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
830 
831     /**
832      * @tc.steps:step3. delete cloud data [1, 2], then insert cloud data [1,2] with new gid. Finally sync data.
833      * @tc.expected: step3. sync success.
834      */
835     DeleteCloudDBData(1, 2, ASSETS_TABLE_NAME);
836     InsertCloudDBData(1, 2, 10, ASSETS_TABLE_NAME);
837     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
838 }
839 
840 /**
841  * @tc.name: FillAssetId001
842  * @tc.desc: Test if assetId is filled in single primary key table
843  * @tc.type: FUNC
844  * @tc.require:
845  * @tc.author: chenchaohao
846  */
847 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId001, TestSize.Level0)
848 {
849     /**
850      * @tc.steps:step1. local insert assets and sync, check the local assetId.
851      * @tc.expected: step1. return OK.
852      */
853     int localCount = 50;
854     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
855     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
856     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
857 
858     /**
859      * @tc.steps:step2. local update assets and sync ,check the local assetId.
860      * @tc.expected: step2. sync success.
861      */
862     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
863     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
864     CheckLocalAssetIsEmpty(ASSETS_TABLE_NAME);
865     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
866 }
867 
868 /**
869  * @tc.name: FillAssetId002
870  * @tc.desc: Test if assetId is filled in no primary key table
871  * @tc.type: FUNC
872  * @tc.require:
873  * @tc.author: chenchaohao
874  */
875 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId002, TestSize.Level0)
876 {
877     /**
878      * @tc.steps:step1. local insert assets and sync, check the local assetId.
879      * @tc.expected: step1. return OK.
880      */
881     int localCount = 50;
882     InsertLocalData(db, 0, localCount, NO_PRIMARY_TABLE);
883     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
884     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
885 
886     /**
887      * @tc.steps:step2. local update assets and sync ,check the local assetId.
888      * @tc.expected: step2. sync success.
889      */
890     UpdateLocalData(db, NO_PRIMARY_TABLE, ASSETS_COPY1);
891     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
892     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
893 }
894 
895 /**
896  * @tc.name: FillAssetId003
897  * @tc.desc: Test if assetId is filled in compound primary key table
898  * @tc.type: FUNC
899  * @tc.require:
900  * @tc.author: chenchaohao
901  */
902 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId003, TestSize.Level0)
903 {
904     /**
905      * @tc.steps:step1. local insert assets and sync, check the local assetId.
906      * @tc.expected: step1. return OK.
907      */
908     int localCount = 50;
909     InsertLocalData(db, 0, localCount, COMPOUND_PRIMARY_TABLE);
910     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
911     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
912 
913     /**
914      * @tc.steps:step2. local update assets and sync ,check the local assetId.
915      * @tc.expected: step2. sync success.
916      */
917     UpdateLocalData(db, COMPOUND_PRIMARY_TABLE, ASSETS_COPY1);
918     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
919     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
920 }
921 
922 /**
923  * @tc.name: FillAssetId004
924  * @tc.desc: Test if assetId is filled in single primary key table when CLOUD_FORCE_PUSH
925  * @tc.type: FUNC
926  * @tc.require:
927  * @tc.author: chenchaohao
928  */
929 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId004, TestSize.Level0)
930 {
931     /**
932      * @tc.steps:step1. local insert assets and sync, check the local assetId.
933      * @tc.expected: step1. return OK.
934      */
935     int localCount = 50;
936     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
937     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
938     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
939 
940     /**
941      * @tc.steps:step2. local update assets and sync ,check the local assetId.
942      * @tc.expected: step2. sync success.
943      */
944     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
945     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
946     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
947 }
948 
949 /**
950  * @tc.name: FillAssetId001
951  * @tc.desc: Test if assetId is filled in no primary key table when CLOUD_FORCE_PUSH
952  * @tc.type: FUNC
953  * @tc.require:
954  * @tc.author: chenchaohao
955  */
956 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId005, TestSize.Level0)
957 {
958     /**
959      * @tc.steps:step1. local insert assets and sync, check the local assetId.
960      * @tc.expected: step1. return OK.
961      */
962     int localCount = 50;
963     InsertLocalData(db, 0, localCount, NO_PRIMARY_TABLE);
964     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
965     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
966 
967     /**
968      * @tc.steps:step2. local update assets and sync ,check the local assetId.
969      * @tc.expected: step2. sync success.
970      */
971     UpdateLocalData(db, NO_PRIMARY_TABLE, ASSETS_COPY1);
972     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
973     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
974 }
975 
976 /**
977  * @tc.name: FillAssetId006
978  * @tc.desc: Test if assetId is filled in compound primary key table when CLOUD_FORCE_PUSH
979  * @tc.type: FUNC
980  * @tc.require:
981  * @tc.author: chenchaohao
982  */
983 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId006, TestSize.Level0)
984 {
985     /**
986      * @tc.steps:step1. local insert assets and sync, check the local assetId.
987      * @tc.expected: step1. return OK.
988      */
989     int localCount = 50;
990     InsertLocalData(db, 0, localCount, COMPOUND_PRIMARY_TABLE);
991     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
992     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
993 
994     /**
995      * @tc.steps:step2. local update assets and sync ,check the local assetId.
996      * @tc.expected: step2. sync success.
997      */
998     UpdateLocalData(db, COMPOUND_PRIMARY_TABLE, ASSETS_COPY1);
999     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1000     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
1001 }
1002 
1003 /**
1004  * @tc.name: FillAssetId007
1005  * @tc.desc: Test if assetId is filled when extend lack of assets
1006  * @tc.type: FUNC
1007  * @tc.require:
1008  * @tc.author: chenchaohao
1009  */
1010 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId007, TestSize.Level0)
1011 {
1012     CloudSyncConfig config;
1013     config.maxUploadCount = 200; // max upload 200
1014     g_delegate->SetCloudSyncConfig(config);
1015     /**
1016      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1017      * @tc.expected: step1. return OK.
1018      */
1019     int localCount = 50;
1020     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon3db76c681202(const std::string &tableName, VBucket &extend) 1021     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1022         extend.erase("assets");
1023     });
1024     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1025     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1026 
1027     /**
1028      * @tc.steps:step2. local update assets and sync ,check the local assetId.
1029      * @tc.expected: step2. sync success.
1030      */
1031     int addLocalCount = 10;
1032     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
__anon3db76c681302(const std::string &tableName, VBucket &extend) 1033     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1034         if (extend.find("assets") != extend.end()) {
1035             for (auto &asset : std::get<Assets>(extend["assets"])) {
1036                 asset.name = "pad";
1037             }
1038         }
1039     });
1040     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1041     int beginFailFillNum = 101;
1042     int endFailFillNum = 120;
1043     std::set<int> index;
1044     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1045         index.insert(i);
1046     }
1047     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1048 
1049     /**
1050      * @tc.steps:step2. local update assets and sync ,check the local assetId.
1051      * @tc.expected: step2. sync success.
1052      */
1053     g_virtualCloudDb->ForkUpload(nullptr);
1054     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1055     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1056 }
1057 
1058 /**
1059  * @tc.name: FillAssetId008
1060  * @tc.desc: Test if assetId is filled when extend lack of assetId
1061  * @tc.type: FUNC
1062  * @tc.require:
1063  * @tc.author: chenchaohao
1064  */
1065 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId008, TestSize.Level0)
1066 {
1067     /**
1068      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1069      * @tc.expected: step1. return OK.
1070      */
1071     int localCount = 50;
1072     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon3db76c681402(const std::string &tableName, VBucket &extend) 1073     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1074         if (extend.find("assets") != extend.end()) {
1075             for (auto &asset : std::get<Assets>(extend["assets"])) {
1076                 asset.assetId = "";
1077             }
1078         }
1079     });
1080     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1081     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1082 
1083     /**
1084      * @tc.steps:step2. local update assets and sync ,check the local assetId.
1085      * @tc.expected: step2. sync success.
1086      */
1087     g_virtualCloudDb->ForkUpload(nullptr);
1088     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1089     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1090 }
1091 
1092 /**
1093  * @tc.name: FillAssetId009
1094  * @tc.desc: Test if assetId is filled when extend exists useless assets
1095  * @tc.type: FUNC
1096  * @tc.require:
1097  * @tc.author: chenchaohao
1098  */
1099 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId009, TestSize.Level0)
1100 {
1101     /**
1102      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1103      * @tc.expected: step1. return OK.
1104      */
1105     int localCount = 50;
1106     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon3db76c681502(const std::string &tableName, VBucket &extend) 1107     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1108         if (extend.find("assets") != extend.end()) {
1109             Asset asset = ASSET_COPY2;
1110             Assets &assets = std::get<Assets>(extend["assets"]);
1111             assets.push_back(asset);
1112         }
1113     });
1114     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1115     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1116 }
1117 
1118 /**
1119  * @tc.name: FillAssetId010
1120  * @tc.desc: Test if assetId is filled when some success and some fail
1121  * @tc.type: FUNC
1122  * @tc.require:
1123  * @tc.author: chenchaohao
1124  */
1125 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId010, TestSize.Level0)
1126 {
1127     /**
1128      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1129      * @tc.expected: step1. return OK.
1130      */
1131     int localCount = 30;
1132     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1133     g_virtualCloudDb->SetInsertFailed(1);
1134     std::atomic<int> count = 0;
__anon3db76c681602(const std::string &tableName, VBucket &extend) 1135     g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1136         if (extend.find("assets") != extend.end() && count == 0) {
1137             extend["#_error"] = static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR);
1138             count++;
1139         }
1140     });
1141     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1142     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", { 1, 2 }); // 1st, 2nd asset do not fill
1143 }
1144 
1145 /**
1146  * @tc.name: FillAssetId011
1147  * @tc.desc: Test if assetId is null when removedevicedata in FLAG_ONLY
1148  * @tc.type: FUNC
1149  * @tc.require:
1150  * @tc.author: chenchaohao
1151  */
1152 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId011, TestSize.Level0)
1153 {
1154     /**
1155      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1156      * @tc.expected: step1. return OK.
1157      */
1158     int localCount = 50;
1159     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1160     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1161     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1162 
1163     g_delegate->RemoveDeviceData("", FLAG_ONLY);
1164     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1165     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1166     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1167 }
1168 
1169 /**
1170  * @tc.name: FillAssetId012
1171  * @tc.desc: Test if assetid is filled when extend size is not equal to record size
1172  * @tc.type: FUNC
1173  * @tc.require:
1174  * @tc.author: chenchaohao
1175  */
1176 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId012, TestSize.Level0)
1177 {
1178     /**
1179      * @tc.steps:step1. set extend size missing then sync, check the asseid.
1180      * @tc.expected: step1. return OK.
1181      */
1182     int localCount = 50;
1183     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1184     std::atomic<int> count = 1;
1185     g_virtualCloudDb->SetClearExtend(count);
1186     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1187     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1188 
1189     /**
1190      * @tc.steps:step2. set extend size normal then sync, check the asseid.
1191      * @tc.expected: step2. return OK.
1192      */
1193     g_virtualCloudDb->SetClearExtend(0);
1194     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1195     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1196 
1197     /**
1198      * @tc.steps:step3. set extend size large then sync, check the asseid.
1199      * @tc.expected: step3. return OK.
1200      */
1201     count = -1; // -1 means extend push a empty vBucket
1202     g_virtualCloudDb->SetClearExtend(count);
1203     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1204     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1205 }
1206 
1207 /**
1208  * @tc.name: FillAssetId013
1209  * @tc.desc: Test fill assetId and removedevicedata when data is delete
1210  * @tc.type: FUNC
1211  * @tc.require:
1212  * @tc.author: chenchaohao
1213  */
1214 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId013, TestSize.Level0)
1215 {
1216     /**
1217      * @tc.steps:step1. local insert data and sync, then delete local data and insert new data
1218      * @tc.expected: step1. return OK.
1219      */
1220     int localCount = 20;
1221     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1222     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1223     int deleteLocalCount = 10;
1224     DeleteLocalRecord(db, 0, deleteLocalCount, ASSETS_TABLE_NAME);
1225     int addLocalCount = 30;
1226     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
1227     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1228 
1229     /**
1230      * @tc.steps:step2. RemoveDeviceData.
1231      * @tc.expected: step2. return OK.
1232      */
1233     g_delegate->RemoveDeviceData("", FLAG_ONLY);
1234     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1235 }
1236 
1237 /**
1238  * @tc.name: FillAssetId014
1239  * @tc.desc: Test if asset status is reset when removedevicedata in FLAG_ONLY
1240  * @tc.type: FUNC
1241  * @tc.require:
1242  * @tc.author: bty
1243  */
1244 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId014, TestSize.Level0)
1245 {
1246     /**
1247      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1248      * @tc.expected: step1. return OK.
1249      */
1250     int localCount = 50;
1251     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1252     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1253     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1254 
1255     /**
1256      * @tc.steps:step2. RemoveDeviceData
1257      * @tc.expected: step2. return OK.
1258      */
1259     Assets assets;
1260     std::vector<AssetStatus> statusVec = {
1261         AssetStatus::INSERT, AssetStatus::UPDATE, AssetStatus::DELETE, AssetStatus::NORMAL,
1262         AssetStatus::ABNORMAL, AssetStatus::DOWNLOADING, AssetStatus::DOWNLOAD_WITH_NULL
1263     };
1264     for (auto &status : statusVec) {
1265         Asset temp = ASSET_COPY;
1266         temp.name += std::to_string(status);
1267         temp.status = status | AssetStatus::UPLOADING;
1268         assets.emplace_back(temp);
1269     }
1270     UpdateLocalData(db, ASSETS_TABLE_NAME, assets);
1271     EXPECT_EQ(g_delegate->RemoveDeviceData("", FLAG_ONLY), OK);
1272     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1273 
1274     /**
1275      * @tc.steps:step3. check status
1276      * @tc.expected: step3. return OK.
1277      */
1278     std::string sql = "SELECT assets FROM " + ASSETS_TABLE_NAME + ";";
1279     sqlite3_stmt *stmt = nullptr;
1280     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1281     int index = 0;
1282     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1283         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1284         Type cloudValue;
1285         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
1286         Assets newAssets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
1287         for (const auto &ast : newAssets) {
1288             EXPECT_EQ(ast.status, statusVec[index++ % statusVec.size()]);
1289         }
1290     }
1291     int errCode = E_OK;
1292     SQLiteUtils::ResetStatement(stmt, true, errCode);
1293 }
1294 
1295 /**
1296  * @tc.name: FillAssetId015
1297  * @tc.desc: Test if fill assetId when upload return cloud network error
1298  * @tc.type: FUNC
1299  * @tc.require:
1300  * @tc.author: chenchaohao
1301  */
1302 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId015, TestSize.Level0)
1303 {
1304     /**
1305      * @tc.steps:step1. local insert data and fork batchinsert return CLOUD_NETWORK_ERROR, then sync
1306      * @tc.expected: step1. return OK, errcode is CLOUD_NETWORK_ERROR.
1307      */
1308     int localCount = 20;
1309     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1310     g_virtualCloudDb->SetCloudNetworkError(true);
1311     std::atomic<int> count = 0;
__anon3db76c681702(const std::string &tableName, VBucket &extend) 1312     g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1313         if (extend.find("assets") != extend.end() && count == 0) {
1314             extend["#_error"] = static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR);
1315             count++;
1316         }
1317     });
1318     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_NETWORK_ERROR);
1319     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", { 1, 2 }); // 1st, 2nd asset do not fill
1320     g_virtualCloudDb->SetCloudNetworkError(false);
1321     g_virtualCloudDb->ForkUpload(nullptr);
1322     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1323     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1324 
1325     /**
1326      * @tc.steps:step2. local insert data and fork batchinsert return CLOUD_NETWORK_ERROR, then sync.
1327      * @tc.expected: step2. return OK, errcode is CLOUD_ERROR.
1328      */
1329     int addLocalCount = 10;
1330     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
1331     std::atomic<int> num = 0;
__anon3db76c681802(const std::string &tableName, VBucket &extend) 1332     g_virtualCloudDb->ForkUpload([&num](const std::string &tableName, VBucket &extend) {
1333         if (extend.find("assets") != extend.end() && num == 0) {
1334             for (auto &asset : std::get<Assets>(extend["assets"])) {
1335                 asset.name = "pad";
1336                 break;
1337             }
1338             num++;
1339         }
1340     });
1341     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1342     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {41}); // // 41th asset do not fill
1343 }
1344 
1345 /**
1346  * @tc.name: FillAssetId016
1347  * @tc.desc: Test fill assetId and removedevicedata when last data is delete
1348  * @tc.type: FUNC
1349  * @tc.require:
1350  * @tc.author: chenchaohao
1351  */
1352 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId016, TestSize.Level0)
1353 {
1354     /**
1355      * @tc.steps:step1. local insert data and sync, then delete last local data
1356      * @tc.expected: step1. return OK.
1357      */
1358     int localCount = 20;
1359     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1360     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1361     int deletLocalCount = 10;
1362     DeleteLocalRecord(db, deletLocalCount, deletLocalCount, ASSETS_TABLE_NAME);
1363     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1364 
1365     /**
1366      * @tc.steps:step2. RemoveDeviceData.
1367      * @tc.expected: step2. return OK.
1368      */
1369     g_delegate->RemoveDeviceData("", FLAG_ONLY);
1370     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1371 }
1372 
1373 /**
1374  * @tc.name: FillAssetId017
1375  * @tc.desc: Test cursor when download not change
1376  * @tc.type: FUNC
1377  * @tc.require:
1378  * @tc.author: chenchaohao
1379  */
1380 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId017, TestSize.Level0)
1381 {
1382     /**
1383      * @tc.steps:step1. local insert data and sync,check cursor.
1384      * @tc.expected: step1. return OK.
1385      */
1386     int localCount = 20;
1387     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1388     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1389     CheckCursorData(ASSETS_TABLE_NAME, 1);
1390 
1391     /**
1392      * @tc.steps:step2. sync again and optype is not change, check cursor.
1393      * @tc.expected: step2. return OK.
1394      */
1395     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1396     CheckCursorData(ASSETS_TABLE_NAME, 1);
1397 }
1398 
1399 /**
1400  * @tc.name: FillAssetId018
1401  * @tc.desc: Test if assetId is filled when contains "#_error"
1402  * @tc.type: FUNC
1403  * @tc.require:
1404  * @tc.author: zhaoliang
1405  */
1406 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId018, TestSize.Level0)
1407 {
1408     /**
1409      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1410      * @tc.expected: step1. return OK.
1411      */
1412     int localCount = 30;
1413     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1414     std::atomic<int> count = 0;
__anon3db76c681902(const std::string &tableName, VBucket &extend) 1415     g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1416         if (extend.find("assets") != extend.end() && count == 0) {
1417             extend["#_error"] = std::string("test");
1418             count++;
1419         }
1420     });
1421     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1422     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1423 }
1424 
1425 /**
1426  * @tc.name: DownloadAssetForDupDataTest002
1427  * @tc.desc: Test download failed
1428  * @tc.type: FUNC
1429  * @tc.require:
1430  * @tc.author: bty
1431  */
1432 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest002, TestSize.Level0)
1433 {
1434     /**
1435      * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1436      * @tc.expected: step1. return OK
1437      */
1438     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1439     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1440     int index = 0;
1441     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1442         .WillRepeatedly(
__anon3db76c681a02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1443             [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1444                 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1445                 return DBStatus::CLOUD_ERROR;
1446             });
1447 
1448     /**
1449      * @tc.steps:step2. Insert cloud data [0, 10), sync data
1450      * @tc.expected: step2. sync success.
1451      */
1452     InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME);
1453     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1454 
1455     /**
1456      * @tc.steps:step3. check if the hash of assets in db is empty
1457      * @tc.expected: step3. OK
1458      */
1459     CheckDownloadFailedForTest002(db);
1460 }
1461 
1462 /**
1463  * @tc.name: DownloadAssetForDupDataTest003
1464  * @tc.desc: Test download failed and flag was modified
1465  * @tc.type: FUNC
1466  * @tc.require:
1467  * @tc.author: bty
1468  */
1469 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest003, TestSize.Level0)
1470 {
1471     /**
1472      * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1473      * @tc.expected: step1. return OK
1474      */
1475     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1476     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1477     int index = 0;
1478     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1479         .WillRepeatedly(
__anon3db76c681b02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1480             [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1481                 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1482                 for (auto &item : assets) {
1483                     for (auto &asset : item.second) {
1484                         asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
1485                     }
1486                 }
1487                 return DBStatus::CLOUD_ERROR;
1488             });
1489 
1490     /**
1491      * @tc.steps:step2. Insert cloud data [0, 10), sync data
1492      * @tc.expected: step2. sync success.
1493      */
1494     InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME);
1495     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1496 
1497     /**
1498      * @tc.steps:step3. check if the hash of assets in db is empty
1499      * @tc.expected: step3. OK
1500      */
1501     CheckDownloadFailedForTest002(db);
1502 }
1503 
1504 /**
1505  * @tc.name: DownloadAssetForDupDataTest004
1506  * @tc.desc: test sync with deleted assets
1507  * @tc.type: FUNC
1508  * @tc.require:
1509  * @tc.author: bty
1510  */
1511 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest004, TestSize.Level0)
1512 {
1513     /**
1514      * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1515      * @tc.expected: step1. return OK
1516      */
1517     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1518     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1519     int index = 0;
1520     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1521         .WillRepeatedly(
__anon3db76c681c02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1522             [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1523                 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1524                 return DBStatus::OK;
1525             });
1526 
1527     /**
1528      * @tc.steps:step2. insert local data, update assets status to delete, then insert cloud data
1529      * @tc.expected: step2. return OK
1530      */
1531     InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME); // 10 is num
1532     UpdateAssetsForLocal(db, 1, AssetStatus::DELETE); // 1 is id
1533     UpdateAssetsForLocal(db, 2, AssetStatus::DELETE | AssetStatus::UPLOADING); // 2 is id
1534     InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME); // 10 is num
1535 
1536     /**
1537      * @tc.steps:step3. sync, check download num
1538      * @tc.expected: step3. return OK
1539      */
1540     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1541     EXPECT_GE(index, 2); // 2 is download num
1542 }
1543 
1544 /**
1545  * @tc.name: DownloadAssetForDupDataTest005
1546  * @tc.desc: test DOWNLOADING status of asset after uploading
1547  * @tc.type: FUNC
1548  * @tc.require:
1549  * @tc.author: bty
1550  */
1551 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest005, TestSize.Level0)
1552 {
1553     /**
1554      * @tc.steps:step1. init data and sync
1555      * @tc.expected: step1. return OK
1556      */
1557     InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME); // 10 is num
1558     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1559     UpdateAssetsForLocal(db, 6,  AssetStatus::DOWNLOADING); // 6 is id
1560     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1561 
1562     /**
1563      * @tc.steps:step2. check asset status
1564      * @tc.expected: step2. return OK
1565      */
1566     std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME + " where id = 6;";
1567     sqlite3_stmt *stmt = nullptr;
1568     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1569     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1570         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1571         Type cloudValue;
1572         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
1573         std::vector<uint8_t> assetsBlob;
1574         Assets assets;
1575         ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
1576         ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
1577         ASSERT_EQ(assets.size(), 2u); // 2 is asset num
1578         for (size_t i = 0; i < assets.size(); ++i) {
1579             EXPECT_EQ(assets[i].hash, ASSET_COPY.hash);
1580             EXPECT_EQ(assets[i].status, AssetStatus::NORMAL);
1581         }
1582     }
1583     int errCode;
1584     SQLiteUtils::ResetStatement(stmt, true, errCode);
1585 }
1586 
1587 /**
1588  * @tc.name: FillAssetId019
1589  * @tc.desc: Test the stability of cleaning asset id
1590  * @tc.type: FUNC
1591  * @tc.require:
1592  * @tc.author: bty
1593  */
1594 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId019, TestSize.Level0)
1595 {
1596     /**
1597      * @tc.steps:step1. local insert assets and sync.
1598      * @tc.expected: step1. return OK.
1599      */
1600     int localCount = 20;
1601     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1602     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1603 
1604     /**
1605      * @tc.steps:step2. construct multiple abnormal data_key, then RemoveDeviceData.
1606      * @tc.expected: step2. return OK.
1607      */
1608     std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME)
1609         + " set data_key='999' where data_key>'10';";
1610     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
1611     EXPECT_EQ(g_delegate->RemoveDeviceData("", FLAG_ONLY), OK);
1612 }
1613 
1614 /**
1615  * @tc.name: FillAssetId020
1616  * @tc.desc: Test if assetId is filled when extend(lack of assets/assetId is empty/modify asset info)
1617  * @tc.type: FUNC
1618  * @tc.require:
1619  * @tc.author: zhangtao
1620  */
1621 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId020, TestSize.Level0)
1622 {
1623     CloudSyncConfig config;
1624     config.maxUploadCount = 200; // max upload 200
1625     g_delegate->SetCloudSyncConfig(config);
1626 
1627     /**
1628      * @tc.steps:step1. local insert assets and erase assets extends
1629      * @tc.expected: step1. return OK.
1630      */
1631     int localCount = 50;
1632     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon3db76c681d02(const std::string &tableName, VBucket &extend) 1633     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1634         extend.erase("assets");
1635     });
1636     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1637     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1638 
1639     /**
1640      * @tc.steps:step2. local insert assets and modify assetId to empty
1641      * @tc.expected: step2. return OK.
1642      */
1643     int addLocalCount = 10;
1644     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
__anon3db76c681e02(const std::string &tableName, VBucket &extend) 1645     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1646         if (extend.find("assets") != extend.end()) {
1647             for (auto &asset : std::get<Assets>(extend["assets"])) {
1648                 asset.assetId = "";
1649             }
1650         }
1651     });
1652     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1653     int beginFailFillNum = 101;
1654     int endFailFillNum = 120;
1655     std::set<int> index;
1656     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1657         index.insert(i);
1658     }
1659     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1660 
1661     /**
1662      * @tc.steps:step3. local insert assets and modify assetId info such as asset.name
1663      * @tc.expected: step3. return OK.
1664      */
1665     InsertLocalData(db, localCount + addLocalCount, addLocalCount, ASSETS_TABLE_NAME);
__anon3db76c681f02(const std::string &tableName, VBucket &extend) 1666     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1667         if (extend.find("assets") != extend.end()) {
1668             for (auto &asset : std::get<Assets>(extend["assets"])) {
1669                 asset.name = "mod_pat";
1670             }
1671         }
1672     });
1673     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1674     beginFailFillNum = 121;
1675     endFailFillNum = 140;
1676     std::set<int> newIndex;
1677     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1678         newIndex.insert(i);
1679     }
1680     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", newIndex);
1681 
1682     /**
1683      * @tc.steps:step4. local update assets and sync, check the local assetId.
1684      * @tc.expected: step4. sync success.
1685      */
1686     g_virtualCloudDb->ForkUpload(nullptr);
1687     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1688     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1689 }
1690 
1691 /**
1692  * @tc.name: FillAssetId021
1693  * @tc.desc: Test if local assets missing, one records's assets missing will not mark the whole sync progress failure
1694  * @tc.type: FUNC
1695  * @tc.require:
1696  * @tc.author: zhangtao
1697  */
1698 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId021, TestSize.Level0)
1699 {
1700     CloudSyncConfig config;
1701     config.maxUploadCount = 200; // max upload 200
1702     g_delegate->SetCloudSyncConfig(config);
1703 
1704     /**
1705      * @tc.steps:step1. local insert assets and erase assets extends
1706      * @tc.expected: step1. return OK.
1707      */
1708     int localCount = 50;
1709     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1710 
1711     /**
1712      * @tc.steps:step2. ForkInsertConflict, make one record assets missing during batch insert
1713      * @tc.expected: step2. SyncProgress return OK. One record's assets missing will not block other progress.
1714      */
1715     int uploadFailId = 0;
1716     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon3db76c682002(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1717         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1718         uploadFailId++;
1719         if (uploadFailId == 25) { // 25 is the middle record
1720             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1721             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1722         }
1723         return OK;
1724     });
1725 
1726     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1727     int beginFailFillNum = 49;
1728     int endFailFillNum = 50;
1729     std::set<int> index;
1730     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1731         index.insert(i);
1732     }
1733     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1734     g_virtualCloudDb->ForkUpload(nullptr);
1735 }
1736 
1737 /**
1738  * @tc.name: FillAssetId022
1739  * @tc.desc: Test if local assets missing, many records's assets missing will not mark the whole sync progress failure
1740  * @tc.type: FUNC
1741  * @tc.require:
1742  * @tc.author: zhangtao
1743  */
1744 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId022, TestSize.Level0)
1745 {
1746     CloudSyncConfig config;
1747     config.maxUploadCount = 200; // max upload 200
1748     g_delegate->SetCloudSyncConfig(config);
1749 
1750     /**
1751      * @tc.steps:step1. local insert assets and erase assets extends
1752      * @tc.expected: step1. return OK.
1753      */
1754     int localCount = 50;
1755     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1756 
1757     /**
1758      * @tc.steps:step2. ForkInsertConflict, make one record assets missing during batch insert
1759      * @tc.expected: step2. SyncProgress return OK. One record's assets missing will not block other progress.
1760      */
1761     int uploadFailId = 0;
1762     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon3db76c682102(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1763         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1764         uploadFailId++;
1765         if (uploadFailId >= 25 && uploadFailId <= 27) { // 25-27 is the middle record
1766             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1767             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1768         }
1769         return OK;
1770     });
1771 
1772     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1773     int beginFailFillNum = 49;
1774     int endFailFillNum = 54;
1775     std::set<int> index;
1776     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1777         index.insert(i);
1778     }
1779     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1780     g_virtualCloudDb->ForkUpload(nullptr);
1781 }
1782 
1783 /**
1784  * @tc.name: FillAssetId023
1785  * @tc.desc: Test if BatchUpdate with local assets missing
1786  * @tc.type: FUNC
1787  * @tc.require:
1788  * @tc.author: zhangtao
1789  */
1790 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId023, TestSize.Level0)
1791 {
1792     /**
1793      * @tc.steps:step1. set extend size missing then sync, check the asseid.
1794      * @tc.expected: step1. return OK.
1795      */
1796     int localCount = 50;
1797     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1798     std::atomic<int> count = 1;
1799     g_virtualCloudDb->SetClearExtend(count);
1800     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1801     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1802 
1803     /**
1804      * @tc.steps:step2. set extend size normal and BatchUpdate with local assets missing then sync, check the asseid.
1805      * @tc.expected: step2. return OK.
1806      */
1807     g_virtualCloudDb->SetClearExtend(0);
1808 
1809     int uploadFailId = 0;
1810     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon3db76c682202(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1811         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1812         uploadFailId++;
1813         if (uploadFailId == 25) { // 25 is the middle record
1814             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1815             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1816         }
1817         return OK;
1818     });
1819     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1820     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1821 }
1822 
1823 /**
1824  * @tc.name: FillAssetId024
1825  * @tc.desc: Test if BatchUpdate with multiple local assets missing
1826  * @tc.type: FUNC
1827  * @tc.require:
1828  * @tc.author: zhangtao
1829  */
1830 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId024, TestSize.Level0)
1831 {
1832     /**
1833      * @tc.steps:step1. set extend size missing then sync, check the asseid.
1834      * @tc.expected: step1. return OK.
1835      */
1836     int localCount = 50;
1837     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1838     std::atomic<int> count = 1;
1839     g_virtualCloudDb->SetClearExtend(count);
1840     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1841     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1842 
1843     /**
1844      * @tc.steps:step2. set extend size normal and BatchUpdate with 3 local assets missing then sync, check the asseid.
1845      * @tc.expected: step2. return OK.
1846      */
1847     g_virtualCloudDb->SetClearExtend(0);
1848 
1849     int uploadFailId = 0;
1850     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon3db76c682302(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1851         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1852         uploadFailId++;
1853         if (uploadFailId >= 25 && uploadFailId <= 27) { // 25-27 is the middle record
1854             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1855             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1856         }
1857         return OK;
1858     });
1859     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1860     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1861 }
1862 
1863 /**
1864  * @tc.name: FillAssetId025
1865  * @tc.desc: Test if BatchInsert with local assets missing and missing record added into successCount
1866  * @tc.type: FUNC
1867  * @tc.require:
1868  * @tc.author: zhangtao
1869  */
1870 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId025, TestSize.Level0)
1871 {
1872     CloudSyncConfig config;
1873     config.maxUploadCount = 200; // max upload 200
1874     g_delegate->SetCloudSyncConfig(config);
1875     /**
1876      * @tc.steps:step1. insert local data.
1877      * @tc.expected: step1. return OK.
1878      */
1879     int localCount = 40;
1880     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1881 
1882     /**
1883      * @tc.steps:step2. BatchInsert with local assets missing then sync, check the asseid.
1884      * @tc.expected: step2. return OK.
1885      */
1886     int uploadFailId = 0;
1887     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon3db76c682402(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1888         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1889         uploadFailId++;
1890         if (uploadFailId == 25) { // 25 is the middle record
1891             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1892             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1893         }
1894         return OK;
1895     });
1896     g_syncProcess = {};
1897     Query query = Query::Select().FromTable({ ASSETS_TABLE_NAME });
1898     std::vector<TableProcessInfo> expectProcess = {
1899         { PROCESSING, { 0, 0, 0, 0 }, { 0, 0, 0, 0 } },
1900         { FINISHED, { 0, 0, 0, 0 }, { 1, 40, 40, 0 } } // 1 is index, 40 is count
1901     };
1902 
1903     /**
1904      * @tc.steps:step3. Check if sync process consistent with exptectProcess
1905      * @tc.expected: step3. return OK.
1906      */
1907     int index = 0;
__anon3db76c682502(const std::map<std::string, SyncProcess> &process) 1908     CloudSyncStatusCallback callback = [&index, &expectProcess](const std::map<std::string, SyncProcess> &process) {
1909         g_syncProcess = std::move(process.begin()->second);
1910         ASSERT_LT(index, 2);
1911         for (const auto &[tableName, info]: g_syncProcess.tableProcess) {
1912             EXPECT_EQ(info.process, expectProcess[index].process);
1913             EXPECT_EQ(info.upLoadInfo.batchIndex, expectProcess[index].upLoadInfo.batchIndex);
1914             EXPECT_EQ(info.upLoadInfo.total, expectProcess[index].upLoadInfo.total);
1915             EXPECT_EQ(info.upLoadInfo.successCount, expectProcess[index].upLoadInfo.successCount);
1916             EXPECT_EQ(tableName, ASSETS_TABLE_NAME);
1917         }
1918         index++;
1919         if (g_syncProcess.process == FINISHED) {
1920             g_processCondition.notify_one();
1921             ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1922         }
1923     };
1924     ASSERT_EQ(g_delegate->Sync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, query, callback, SYNC_WAIT_TIME), OK);
1925     WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
1926 
1927     /**
1928      * @tc.steps:step4. Check assets results
1929      * @tc.expected: step4. return OK.
1930      */
1931     int beginFailFillNum = 49;
1932     int endFailFillNum = 50;
1933     std::set<int> indexes;
1934     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1935         indexes.insert(i);
1936     }
1937     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", indexes);
1938     g_virtualCloudDb->ForkUpload(nullptr);
1939 }
1940 
1941 /**
1942  * @tc.name: ConsistentFlagTest001
1943  * @tc.desc:Assets are the different, check the 0x20 bit of flag after sync
1944  * @tc.type: FUNC
1945  * @tc.require:
1946  * @tc.author: bty
1947  */
1948 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest001, TestSize.Level0)
1949 {
1950     /**
1951      * @tc.steps:step1. init data for the different asset, sync and check flag
1952      * @tc.expected: step1. return OK.
1953      */
1954     int localCount = 10; // 10 is num of local
1955     int cloudCount = 20; // 20 is num of cloud
1956     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1957     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1958     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1959     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1960     CheckConsistentCount(db, cloudCount);
1961 
1962     /**
1963      * @tc.steps:step2. update local data, sync and check flag
1964      * @tc.expected: step2. return OK.
1965      */
1966     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1967     DeleteCloudDBData(1, 1, ASSETS_TABLE_NAME);
1968     CheckConsistentCount(db, 0L);
1969     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1970     CheckConsistentCount(db, cloudCount);
1971 }
1972 
1973 /**
1974  * @tc.name: ConsistentFlagTest002
1975  * @tc.desc: Assets are the same, check the 0x20 bit of flag after sync
1976  * @tc.type: FUNC
1977  * @tc.require:
1978  * @tc.author: bty
1979  */
1980 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest002, TestSize.Level0)
1981 {
1982     /**
1983      * @tc.steps:step1. init data for the same asset, sync and check flag
1984      * @tc.expected: step1. return OK.
1985      */
1986     int cloudCount = 20; // 20 is num of cloud
1987     InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
1988     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1989     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1990     CheckConsistentCount(db, cloudCount);
1991 
1992     /**
1993      * @tc.steps:step2. update local data, sync and check flag
1994      * @tc.expected: step2. return OK.
1995      */
1996     int deleteLocalCount = 5;
1997     DeleteLocalRecord(db, 0, deleteLocalCount, ASSETS_TABLE_NAME);
1998     CheckConsistentCount(db, cloudCount - deleteLocalCount);
1999     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2000     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2001     CheckConsistentCount(db, cloudCount);
2002 }
2003 
2004 /**
2005  * @tc.name: ConsistentFlagTest003
2006  * @tc.desc: Download returns a conflict, check the 0x20 bit of flag after sync
2007  * @tc.type: FUNC
2008  * @tc.require:
2009  * @tc.author: bty
2010  */
2011 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest003, TestSize.Level0)
2012 {
2013     /**
2014      * @tc.steps:step1. init data
2015      * @tc.expected: step1. return OK.
2016      */
2017     int localCount = 20; // 20 is num of local
2018     int cloudCount = 10; // 10 is num of cloud
2019     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2020     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2021     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2022 
2023     /**
2024      * @tc.steps:step2. fork download, return CLOUD_RECORD_EXIST_CONFLICT once
2025      * @tc.expected: step2. return OK.
2026      */
2027     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2028     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2029     int index = 0;
2030     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2031         .WillRepeatedly(
__anon3db76c682602(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2032             [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
2033                 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2034                 if (index == 1) { // 1 is first download
2035                     return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2036                 }
2037                 return DBStatus::OK;
2038             });
2039 
2040     /**
2041      * @tc.steps:step3. fork upload, check consistent count
2042      * @tc.expected: step3. return OK.
2043      */
2044     int upIdx = 0;
__anon3db76c682702(const std::string &tableName, VBucket &extend) 2045     g_virtualCloudDb->ForkUpload([this, localCount, cloudCount, &upIdx](const std::string &tableName, VBucket &extend) {
2046         LOGD("upload index:%d", ++upIdx);
2047         if (upIdx == 1) { // 1 is first upload
2048             CheckConsistentCount(db, localCount - cloudCount - 1);
2049         }
2050     });
2051 
2052     /**
2053      * @tc.steps:step4. fork query, check consistent count
2054      * @tc.expected: step4. return OK.
2055      */
2056     int queryIdx = 0;
__anon3db76c682802(const std::string &, VBucket &) 2057     g_virtualCloudDb->ForkQuery([this, localCount, &queryIdx](const std::string &, VBucket &) {
2058         LOGD("query index:%d", ++queryIdx);
2059         if (queryIdx == 3) { // 3 is the last query
2060             CheckConsistentCount(db, localCount - 1);
2061         }
2062     });
2063     int count = 0;
__anon3db76c682902() 2064     g_cloudStoreHook->SetSyncFinishHook([&count]() {
2065         count++;
2066         if (count == 2) { // 2 is compensated sync
2067             g_processCondition.notify_one();
2068         }
2069     });
2070     /**
2071      * @tc.steps:step5. sync, check consistent count
2072      * @tc.expected: step5. return OK.
2073      */
2074     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2075     WaitForSync(count);
2076     CheckConsistentCount(db, localCount);
2077 }
2078 
2079 /**
2080  * @tc.name: ConsistentFlagTest004
2081  * @tc.desc: Upload returns error, check the 0x20 bit of flag after sync
2082  * @tc.type: FUNC
2083  * @tc.require:
2084  * @tc.author: bty
2085  */
2086 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest004, TestSize.Level0)
2087 {
2088     /**
2089      * @tc.steps:step1. init data
2090      * @tc.expected: step1. return OK.
2091      */
2092     int localCount = 20; // 20 is num of local
2093     int cloudCount = 10; // 10 is num of cloud
2094     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2095     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2096     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2097 
2098     /**
2099      * @tc.steps:step2. fork upload, return error filed of CLOUD_NETWORK_ERROR
2100      * @tc.expected: step2. return OK.
2101      */
2102     int upIdx = 0;
__anon3db76c682a02(const std::string &tableName, VBucket &extend) 2103     g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
2104         LOGD("upload index:%d", ++upIdx);
2105         if (upIdx == 1) {
2106             extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR));
2107         }
2108     });
2109 
2110     /**
2111      * @tc.steps:step3. sync, check consistent count
2112      * @tc.expected: step3. return OK.
2113      */
2114     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2115     CheckConsistentCount(db, localCount - 1);
2116 
2117     /**
2118      * @tc.steps:step4. update local data, fork upload, return error filed of type int64_t
2119      * @tc.expected: step4. return OK.
2120      */
2121     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2122     upIdx = 0;
__anon3db76c682b02(const std::string &tableName, VBucket &extend) 2123     g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
2124         LOGD("upload index:%d", ++upIdx);
2125         if (upIdx == 1) {
2126             int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2127             extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
2128         }
2129         if (upIdx == 2) {
2130             int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT + 1;
2131             extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
2132         }
2133     });
2134 
2135     /**
2136      * @tc.steps:step5. sync, check consistent count
2137      * @tc.expected: step5. return OK.
2138      */
2139     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2140     CheckConsistentCount(db, localCount - 2);
2141 }
2142 
2143 /**
2144  * @tc.name: ConsistentFlagTest005
2145  * @tc.desc: Local data changes during download, check the 0x20 bit of flag after sync
2146  * @tc.type: FUNC
2147  * @tc.require:
2148  * @tc.author: bty
2149  */
2150 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest005, TestSize.Level0)
2151 {
2152     /**
2153      * @tc.steps:step1. init data
2154      * @tc.expected: step1. return OK.
2155      */
2156     int localCount = 20; // 20 is num of local
2157     int cloudCount = 10; // 10 is num of cloud
2158     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2159     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2160     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2161 
2162     /**
2163      * @tc.steps:step2. fork download, update local assets where id=2
2164      * @tc.expected: step2. return OK.
2165      */
2166     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2167     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2168     int index = 0;
2169     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2170         .WillRepeatedly(
2171             [this, &index](const std::string &, const std::string &gid, const Type &,
__anon3db76c682c02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2172                 std::map<std::string, Assets> &assets) {
2173                 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2174                 if (index == 1) { // 1 is first download
2175                     std::string sql = "UPDATE " + ASSETS_TABLE_NAME + " SET assets=NULL where id=2;";
2176                     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
2177                 }
2178                 return DBStatus::OK;
2179             });
2180 
2181     /**
2182      * @tc.steps:step3. fork upload, check consistent count
2183      * @tc.expected: step3. return OK.
2184      */
2185     int upIdx = 0;
__anon3db76c682d02(const std::string &tableName, VBucket &extend) 2186     g_virtualCloudDb->ForkUpload([this, localCount, cloudCount, &upIdx](const std::string &tableName, VBucket &extend) {
2187         LOGD("upload index:%d", ++upIdx);
2188         if (upIdx == 1) { // 1 is first upload
2189             CheckConsistentCount(db, localCount - cloudCount - 1);
2190         }
2191     });
2192 
2193     /**
2194      * @tc.steps:step4. sync, check consistent count
2195      * @tc.expected: step4. return OK.
2196      */
2197     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2198     CheckConsistentCount(db, localCount);
2199 }
2200 
2201 /**
2202  * @tc.name: ConsistentFlagTest006
2203  * @tc.desc:
2204  * @tc.type: FUNC
2205  * @tc.require:
2206  * @tc.author: bty
2207  */
2208 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest006, TestSize.Level0)
2209 {
2210     /**
2211      * @tc.steps:step1. init data
2212      * @tc.expected: step1. return OK.
2213      */
2214     int cloudCount = 10; // 10 is num of cloud
2215     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2216     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2217 
2218     /**
2219      * @tc.steps:step2. fork download, update local assets where id=2
2220      * @tc.expected: step2. return OK.
2221      */
2222     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2223     std::this_thread::sleep_for(std::chrono::milliseconds(1));
2224     int delCount = 3; // 3 is num of cloud
2225     DeleteCloudDBData(1, delCount, ASSETS_TABLE_NAME);
2226     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2227     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2228     int index = 0;
2229     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2230         .WillRepeatedly(
2231             [&index](const std::string &, const std::string &gid, const Type &,
__anon3db76c682e02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2232                 std::map<std::string, Assets> &assets) {
2233                 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2234                 if (index == 1) { // 1 is first download
2235                     return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2236                 }
2237                 return DBStatus::OK;
2238             });
2239 
2240     /**
2241      * @tc.steps:step3. fork upload, check consistent count
2242      * @tc.expected: step3. return OK.
2243      */
2244     int upIdx = 0;
__anon3db76c682f02(const std::string &tableName, VBucket &extend) 2245     g_virtualCloudDb->ForkUpload([this, delCount, &upIdx](const std::string &tableName, VBucket &extend) {
2246         LOGD("upload index:%d", ++upIdx);
2247         if (upIdx == 1) { // 1 is first upload
2248             CheckConsistentCount(db, delCount);
2249             CheckCompensatedCount(db, 0L);
2250         }
2251     });
2252 
2253     /**
2254      * @tc.steps:step4. sync, check consistent count
2255      * @tc.expected: step4. return OK.
2256      */
2257     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2258     CheckConsistentCount(db, cloudCount);
2259 }
2260 
2261 /**
2262  * @tc.name: SyncDataStatusTest001
2263  * @tc.desc: No need to download asset, check status after sync
2264  * @tc.type: FUNC
2265  * @tc.require:
2266  * @tc.author: bty
2267  */
2268 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest001, TestSize.Level0)
2269 {
2270     DataStatusTest001(false);
2271 }
2272 
2273 /**
2274  * @tc.name: SyncDataStatusTest002
2275  * @tc.desc: Need to download asset, check status after sync
2276  * @tc.type: FUNC
2277  * @tc.require:
2278  * @tc.author: bty
2279  */
2280 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest002, TestSize.Level0)
2281 {
2282     DataStatusTest001(true);
2283 }
2284 
2285 /**
2286  * @tc.name: SyncDataStatusTest003
2287  * @tc.desc: Lock during download and check status
2288  * @tc.type: FUNC
2289  * @tc.require:
2290  * @tc.author: bty
2291  */
2292 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest003, TestSize.Level0)
2293 {
2294     DataStatusTest003();
2295 }
2296 
2297 /**
2298  * @tc.name: SyncDataStatusTest004
2299  * @tc.desc: Lock and delete during download, check status
2300  * @tc.type: FUNC
2301  * @tc.require:
2302  * @tc.author: bty
2303  */
2304 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest004, TestSize.Level0)
2305 {
2306     DataStatusTest004();
2307 }
2308 
2309 /**
2310  * @tc.name: SyncDataStatusTest005
2311  * @tc.desc: Lock and update during download, check status
2312  * @tc.type: FUNC
2313  * @tc.require:
2314  * @tc.author: bty
2315  */
2316 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest005, TestSize.Level0)
2317 {
2318     DataStatusTest005();
2319 }
2320 
2321 /**
2322  * @tc.name: SyncDataStatusTest006
2323  * @tc.desc: Lock and update and Unlock during download, check status
2324  * @tc.type: FUNC
2325  * @tc.require:
2326  * @tc.author: bty
2327  */
2328 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest006, TestSize.Level0)
2329 {
2330     DataStatusTest006();
2331 }
2332 
2333 /**
2334  * @tc.name: SyncDataStatusTest007
2335  * @tc.desc: Download return error, check status
2336  * @tc.type: FUNC
2337  * @tc.require:
2338  * @tc.author: bty
2339  */
2340 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest007, TestSize.Level0)
2341 {
2342     DataStatusTest007();
2343 }
2344 
2345 /**
2346  * @tc.name: SyncDataStatusTest008
2347  * @tc.desc: Test upload process when data locked
2348  * @tc.type: FUNC
2349  * @tc.require:
2350  * @tc.author: bty
2351  */
2352 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest008, TestSize.Level0)
2353 {
2354     /**
2355      * @tc.steps:step1. init local data
2356      * @tc.expected: step1. return OK.
2357      */
2358     int localCount = 40;
2359     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2360     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2361     std::string sql = "update " + logName + " SET status = 2 where data_key >=20;";
2362     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
2363 
2364     /**
2365      * @tc.steps:step2. sync and check process
2366      * @tc.expected: step2. return OK.
2367      */
2368     g_syncProcess = {};
2369     Query query = Query::Select().FromTable({ ASSETS_TABLE_NAME });
2370     std::vector<TableProcessInfo> expectProcess = {
2371         { PROCESSING, { 0, 0, 0, 0 }, { 0, 0, 0, 0 } },
2372         { FINISHED, { 0, 0, 0, 0 }, { 1, 40, 40, 0 } } // 1 is index, 40 is count
2373     };
2374     int index = 0;
2375     CloudSyncConfig config;
2376     config.maxUploadCount = 100; // max upload 100
2377     g_delegate->SetCloudSyncConfig(config);
__anon3db76c683002(const std::map<std::string, SyncProcess> &process) 2378     CloudSyncStatusCallback callback = [&index, &expectProcess](const std::map<std::string, SyncProcess> &process) {
2379         g_syncProcess = std::move(process.begin()->second);
2380         ASSERT_LT(index, 2);
2381         for (const auto &[tableName, info]: g_syncProcess.tableProcess) {
2382             EXPECT_EQ(info.process, expectProcess[index].process);
2383             EXPECT_EQ(info.upLoadInfo.batchIndex, expectProcess[index].upLoadInfo.batchIndex);
2384             EXPECT_EQ(info.upLoadInfo.total, expectProcess[index].upLoadInfo.total);
2385             EXPECT_EQ(info.upLoadInfo.successCount, expectProcess[index].upLoadInfo.successCount);
2386             EXPECT_EQ(tableName, ASSETS_TABLE_NAME);
2387         }
2388         index++;
2389         if (g_syncProcess.process == FINISHED) {
2390             g_processCondition.notify_one();
2391             ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2392         }
2393     };
2394     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, SYNC_WAIT_TIME), OK);
2395     WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
2396 }
2397 
2398 /**
2399  * @tc.name: DownloadAssetTest001
2400  * @tc.desc: Test the asset status after the share table sync
2401  * @tc.type: FUNC
2402  * @tc.require:
2403  * @tc.author: bty
2404  */
2405 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest001, TestSize.Level0)
2406 {
2407     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
2408     /**
2409      * @tc.steps:step1. init data and sync
2410      * @tc.expected: step1. return OK.
2411      */
2412     int cloudCount = 10; // 10 is num of cloud
2413     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME_SHARED);
2414     CallSync({ASSETS_TABLE_NAME_SHARED}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2415 
2416     /**
2417      * @tc.steps:step2. check asset status
2418      * @tc.expected: step2. return OK.
2419      */
2420     SqlCondition condition;
2421     condition.sql = "select assets from " + ASSETS_TABLE_NAME_SHARED + " where _rowid_ = 1;";
2422     condition.readOnly = true;
2423     std::vector<VBucket> records;
2424     EXPECT_EQ(g_delegate->ExecuteSql(condition, records), OK);
2425     for (const auto &data: records) {
2426         Assets assets;
2427         CloudStorageUtils::GetValueFromVBucket(COL_ASSETS, data, assets);
2428         for (const auto &asset: assets) {
2429             EXPECT_EQ(asset.status, AssetStatus::NORMAL);
2430         }
2431     }
2432     EXPECT_EQ(g_virtualAssetLoader->GetBatchDownloadCount(), 0u);
2433     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
2434 }
2435 
2436 /**
2437  * @tc.name: DownloadAssetTest002
2438  * @tc.desc: Test asset download failed and re download
2439  * @tc.type: FUNC
2440  * @tc.require:
2441  * @tc.author: liaoyonghuang
2442  */
2443 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest002, TestSize.Level0)
2444 {
2445     /**
2446      * @tc.steps:step1. init data
2447      * @tc.expected: step1. return OK.
2448      */
2449     int cloudCount = 10; // 10 is num of cloud
2450     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2451 
2452     /**
2453      * @tc.steps:step2. Set asset download status error and sync
2454      * @tc.expected: step2. sync successful but download assets fail.
2455      */
2456     g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2457     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2458 
2459     /**
2460      * @tc.steps:step3. Set asset download status OK and sync
2461      * @tc.expected: step3. return OK.
2462      */
2463     g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
2464     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2465 
2466     /**
2467      * @tc.steps:step4. Check assets status
2468      * @tc.expected: step4. status is NORMAL.
2469      */
2470     std::string sql = "SELECT assets FROM " + ASSETS_TABLE_NAME + ";";
2471     sqlite3_stmt *stmt = nullptr;
2472     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
2473     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2474         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
2475         Type cloudValue;
2476         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
2477         Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
2478         for (const auto &asset : assets) {
2479             EXPECT_EQ(asset.status, AssetStatus::NORMAL);
2480         }
2481     }
2482     int errCode = E_OK;
2483     SQLiteUtils::ResetStatement(stmt, true, errCode);
2484     EXPECT_EQ(errCode, E_OK);
2485 }
2486 
2487 /**
2488  * @tc.name: DownloadAssetTest003
2489  * @tc.desc: Test asset download after sync task recovery
2490  * @tc.type: FUNC
2491  * @tc.require:
2492  * @tc.author: liaoyonghuang
2493  */
2494 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest003, TestSize.Level0)
2495 {
2496     /**
2497      * @tc.steps:step1. init data
2498      * @tc.expected: step1. return OK.
2499      */
2500     int cloudCount = 10; // 10 is num of cloud
2501     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2502     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2503     DeleteCloudDBData(0, cloudCount, ASSETS_TABLE_NAME);
2504     InsertCloudDBData(0, cloudCount, 0, NO_PRIMARY_TABLE);
2505     /**
2506      * @tc.steps:step2. Set task interrupted before asset download
2507      * @tc.expected: step2. return OK.
2508      */
2509     int queryTime = 0;
__anon3db76c683102(const std::string &, VBucket &) 2510     g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2511         queryTime++;
2512         if (queryTime != 1) {
2513             return;
2514         }
2515         Query query = Query::Select().FromTable({NO_PRIMARY_TABLE});
2516         CloudSyncOption option;
2517         option.priorityTask = true;
2518         option.devices = {DEVICE_CLOUD};
2519         option.mode = SYNC_MODE_CLOUD_MERGE;
2520         option.query = query;
2521         ASSERT_EQ(g_delegate->Sync(option, nullptr), OK);
2522     });
2523     /**
2524      * @tc.steps:step3. Sync
2525      * @tc.expected: step3. return OK.
2526      */
2527     int removeTime = 0;
__anon3db76c683202(std::map<std::string, Assets> &assets) 2528     g_virtualAssetLoader->SetRemoveLocalAssetsCallback([&](std::map<std::string, Assets> &assets) {
2529         removeTime++;
2530         return OK;
2531     });
2532     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2533     /**
2534      * @tc.steps:step4. Check fork asset download time and observer
2535      * @tc.expected: step4. return OK.
2536      */
2537     EXPECT_EQ(removeTime, cloudCount);
2538     ChangedData expectedChangeData1;
2539     ChangedData expectedChangeData2;
2540     expectedChangeData1.tableName = NO_PRIMARY_TABLE;
2541     expectedChangeData2.tableName = ASSETS_TABLE_NAME;
2542     expectedChangeData1.type = ChangedDataType::ASSET;
2543     expectedChangeData2.type = ChangedDataType::ASSET;
2544     expectedChangeData1.field.push_back(std::string("rowid"));
2545     expectedChangeData2.field.push_back(std::string("id"));
2546     for (int i = 0; i < cloudCount; i++) {
2547         expectedChangeData1.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 1});
2548         expectedChangeData2.primaryData[ChangeType::OP_DELETE].push_back({(int64_t)i});
2549     }
2550     g_observer->SetExpectedResult(expectedChangeData1);
2551     g_observer->SetExpectedResult(expectedChangeData2);
2552     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
2553     g_observer->ClearChangedData();
2554 
2555     g_virtualCloudDb->ForkInsertConflict(nullptr);
2556     g_virtualCloudDb->ForkQuery(nullptr);
2557     g_virtualAssetLoader->SetRemoveLocalAssetsCallback(nullptr);
2558 }
2559 
2560 /**
2561  * @tc.name: RecordLockFuncTest001
2562  * @tc.desc: UNLOCKING->UNLOCKING Synchronous download failure wholly.
2563  * @tc.type: FUNC
2564  * @tc.author: lijun
2565  */
2566 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, RecordLockFuncTest001, TestSize.Level0)
2567 {
2568     /**
2569      * @tc.steps:step1. init local data
2570      * @tc.expected: step1. return OK.
2571      */
2572     int localCount = 100;
2573     int cloudCount = 100;
2574     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2575     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2576     std::string sql = "update " + logName + " SET status = 2 where data_key >=70;";
2577     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
2578     CheckLockStatus(db, 0, 69, LockStatus::UNLOCK);
2579     CheckLockStatus(db, 70, 99, LockStatus::LOCK);
2580     DeleteLocalRecord(db, 70, 30, ASSETS_TABLE_NAME);
2581 
2582     /**
2583      * @tc.steps:step2. init cloud data
2584      * @tc.expected: step2. return OK.
2585      */
2586     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2587     UpdateCloudDBData(0, 70, 0, 0, ASSETS_TABLE_NAME);
2588 
2589     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2590     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2591     int index = 0;
2592     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2593         .WillRepeatedly(
__anon3db76c683302(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2594             [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
2595                 LOGD("Download GID:%s  %d", gid.c_str(), index);
2596                 index++;
2597                 if (index <= 30) {
2598                     return DBStatus::CLOUD_ERROR;
2599                 } else {
2600                     return DBStatus::OK;
2601                 }
2602 
2603             });
2604 
2605     std::mutex mtx;
2606     std::condition_variable cv;
2607     int queryIdx = 0;
2608     bool ready = false;
__anon3db76c683402(const std::string &, VBucket &) 2609     g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2610         LOGD("query index:%d", ++queryIdx);
2611         if (queryIdx == 2) { // 2 is compensated sync
2612             std::unique_lock<std::mutex> lock(mtx);
2613             ready = true;
2614             cv.notify_one();
2615         }
2616     });
2617     g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2618     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2619 
2620     {
2621         std::unique_lock<std::mutex> lock(mtx);
__anon3db76c683502null2622         cv.wait(lock, [&]{ return ready; });
2623     }
2624     g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
2625 
2626     std::this_thread::sleep_for(std::chrono::seconds(6));
2627     /**
2628      * @tc.steps:step3. check after compensated sync
2629      * @tc.expected: step3. all is UNLOCKING.
2630      */
2631     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2632 }
2633 
2634 /**
2635  * @tc.name: RecordLockFuncTest002
2636  * @tc.desc: Compensated synchronization, Locked data has not been synchronized. The first synchronization data is
2637  * based on the cloud, and the last synchronization data is based on the device.
2638  * @tc.type: FUNC
2639  * @tc.author: lijun
2640  */
2641 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, RecordLockFuncTest002, TestSize.Level0)
2642 {
2643     /**
2644      * @tc.steps:step1. init local data, modify data Status and initiate synchronization
2645      * @tc.expected: step1. return OK.
2646      */
2647     int localCount = 120;
2648     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2649     std::vector<std::vector<uint8_t>> hashKey;
2650     CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key >=100 ", db, hashKey);
2651     EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
2652     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2653     CheckLockStatus(db, 100, 119, LockStatus::LOCK);
2654     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
2655 
2656     /**
2657      * @tc.steps:step2. Check the synchronization result and log table status
2658      * @tc.expected: step2.100-109 is LOCK_CHANGE.
2659      */
2660     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2661     CheckLockStatus(db, 100, 119, LockStatus::LOCK);
2662     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1, 100, 109);
2663     CheckLockStatus(db, 100, 109, LockStatus::LOCK_CHANGE);
2664     CheckLockStatus(db, 110, 119, LockStatus::LOCK);
2665 
2666     /**
2667      * @tc.steps:step3. Synchronize and check the lock_change data status
2668      * @tc.expected: step3.100-119 is LOCK_CHANGE.
2669      */
2670     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2671     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2672     CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2673 
2674     /**
2675      * @tc.steps:step4. Unlock,the lock_change data status changes to unlocking
2676      * @tc.expected: step4.100-119 is UNLOCKING.
2677      */
2678     EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
2679     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2680     CheckLockStatus(db, 100, 119, LockStatus::UNLOCKING);
2681 
2682     /**
2683      * @tc.steps:step5. Lock,the unlocking data status changes to lock_change
2684      * @tc.expected: step5.100-119 is LOCK_CHANGE.
2685      */
2686     EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
2687     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2688     CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2689 
2690     /**
2691      * @tc.steps:step6. Synchronize and check the lock_change data status
2692      * @tc.expected: step6.100-119 is LOCK_CHANGE.
2693      */
2694     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2695     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2696     CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2697 
2698     /**
2699      * @tc.steps:step7. Unlock,the lock_change data status changes to unlocking
2700      * @tc.expected: step7.100-119 is UNLOCKING.
2701      */
2702     EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
2703     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2704     CheckLockStatus(db, 100, 119, LockStatus::UNLOCKING);
2705 
2706     /**
2707      * @tc.steps:step8. Synchronize data
2708      * @tc.expected: step8.return OK.
2709      */
2710     std::mutex mtx;
2711     std::condition_variable cv;
2712     int queryIdx = 0;
2713     bool ready = false;
__anon3db76c683602(const std::string &, VBucket &) 2714     g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2715         LOGD("query index:%d", ++queryIdx);
2716         if (queryIdx == 5) { // 5 is compensated sync
2717             std::unique_lock<std::mutex> lock(mtx);
2718             ready = true;
2719             cv.notify_one();
2720         }
2721     });
2722     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2723     {
2724         std::unique_lock<std::mutex> lock(mtx);
__anon3db76c683702null2725         cv.wait(lock, [&]{ return ready; });
2726     }
2727 
2728     std::this_thread::sleep_for(std::chrono::seconds(6));
2729     /**
2730      * @tc.steps:step9. check after compensated sync
2731      * @tc.expected: step9. all is UNLOCK.
2732      */
2733     CheckLockStatus(db, 0, 119, LockStatus::UNLOCK);
2734 }
2735 
2736 /**
2737   * @tc.name: CloudTaskStatusTest001
2738   * @tc.desc: Test get cloud task status
2739   * @tc.type: FUNC
2740   * @tc.require:
2741   * @tc.author: liaoyonghuang
2742   */
2743 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, CloudTaskStatusTest001, TestSize.Level1)
2744 {
2745     /**
2746      * @tc.steps:step1. init data
2747      * @tc.expected: step1. return OK.
2748      */
2749     int cloudCount = 10; // 10 is num of cloud
2750     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2751 
2752     /**
2753      * @tc.steps:step2. Sync and get cloud task status
2754      * @tc.expected: step2. OK
2755      */
2756     g_virtualCloudDb->SetBlockTime(1000);
__anon3db76c683802() 2757     std::thread syncThread([&]() {
2758         CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
2759     });
2760     std::this_thread::sleep_for(std::chrono::milliseconds(100));
2761     SyncProcess process1 = g_delegate->GetCloudTaskStatus(UINT64_MAX);
2762     EXPECT_EQ(process1.errCode, OK);
2763     syncThread.join();
2764     /**
2765      * @tc.steps:step3. Get cloud task status after sync finish
2766      * @tc.expected: step3. NOT_FOUND
2767      */
2768     SyncProcess process2 = g_delegate->GetCloudTaskStatus(1);
2769     EXPECT_EQ(process2.errCode, NOT_FOUND);
2770 
2771     /**
2772      * @tc.steps:step4. Get cloud task status after DB closed
2773      * @tc.expected: step4. DB_ERROR
2774      */
2775     auto delegateImpl = static_cast<RelationalStoreDelegateImpl *>(g_delegate);
2776     EXPECT_EQ(delegateImpl->Close(), DBStatus::OK);
2777     SyncProcess process3 = g_delegate->GetCloudTaskStatus(1);
2778     EXPECT_EQ(process3.errCode, DB_ERROR);
2779 }
2780 
2781 /**
2782   * @tc.name: CloudTaskStatusTest002
2783   * @tc.desc: Test get cloud task status when task merge
2784   * @tc.type: FUNC
2785   * @tc.require:
2786   * @tc.author: suyue
2787   */
2788 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, CloudTaskStatusTest002, TestSize.Level1)
2789 {
2790     /**
2791      * @tc.steps:step1. init data
2792      * @tc.expected: step1. return OK
2793      */
2794     int cloudCount = 100; // 100 is num of cloud
2795     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2796 
2797     /**
2798      * @tc.steps:step2. sync tasks 2 and 3 that can be merged when synchronizing task 1, get status of the merge task
2799      * @tc.expected: step2. the errCode of task 2 and task 3 is OK or NOT_FOUND
2800      */
2801     g_virtualCloudDb->SetBlockTime(1000);
2802     Query query = Query::Select().FromTable({ASSETS_TABLE_NAME});
__anon3db76c683902(const std::map<std::string, SyncProcess> &process) 2803     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2804         std::unique_lock<std::mutex> lock(g_processMutex);
2805         if (process.begin()->second.process == FINISHED) {
2806             g_processCondition.notify_one();
2807         }
2808     };
2809     CloudSyncOption option = {.devices = {DEVICE_CLOUD}, .mode = SYNC_MODE_CLOUD_MERGE, .query = query,
2810         .waitTime = SYNC_WAIT_TIME, .lockAction = static_cast<LockAction>(0xff)};
2811 
__anon3db76c683a02() 2812     std::thread syncThread1([option, callback]() {
2813         ASSERT_EQ(g_delegate->Sync(option, callback), DBStatus::OK);
2814     });
2815     option.merge = true;
__anon3db76c683b02() 2816     std::thread syncThread2([option, callback]() {
2817         ASSERT_EQ(g_delegate->Sync(option, callback), DBStatus::OK);
2818     });
__anon3db76c683c02() 2819     std::thread syncThread3([option, callback]() {
2820         ASSERT_EQ(g_delegate->Sync(option, callback), DBStatus::OK);
2821     });
2822     std::this_thread::sleep_for(std::chrono::milliseconds(100));
2823     SyncProcess process1 = g_delegate->GetCloudTaskStatus(1);
2824     SyncProcess process2 = g_delegate->GetCloudTaskStatus(2);
2825     SyncProcess process3 = g_delegate->GetCloudTaskStatus(3);
2826     syncThread1.join();
2827     syncThread2.join();
2828     syncThread3.join();
2829     // Due to the task execution sequence, task 2 may be combined into 3 or task 3 may be combined into 2.
2830     // Therefore, the errCode of task 2 and 3 may be OK or NOT_FOUND.
2831     EXPECT_TRUE(process2.errCode == OK  || process2.errCode == NOT_FOUND);
2832     EXPECT_TRUE(process3.errCode == OK  || process3.errCode == NOT_FOUND);
2833 }
2834 
2835 /**
2836   * @tc.name: CompensatedSyncTest001
2837   * @tc.desc: test compensated count more than 100.
2838   * @tc.type: FUNC
2839   * @tc.require:
2840   * @tc.author: tankaisheng
2841   */
2842 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, CompensatedSyncTest001, TestSize.Level0)
2843 {
2844     /**
2845      * @tc.steps:step1. init data
2846      * @tc.expected: step1. return OK.
2847      */
2848     int dataCount = 120;
2849     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
2850     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
2851 
2852     /**
2853      * @tc.steps:step2. set all data wait compensated.
2854      * @tc.expected: step2. return ok.
2855      */
2856     std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " set flag=flag|0x10;";
2857     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), nullptr, nullptr, nullptr), SQLITE_OK);
2858     sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " where flag&0x10=0x10;";
2859     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
2860         reinterpret_cast<void *>(120u), nullptr), SQLITE_OK);
2861 
2862     /**
2863      * @tc.steps:step3. sync with compensated.
2864      * @tc.expected: step3. return ok.
2865      */
2866     std::mutex processMutex;
2867     std::vector<SyncProcess> expectProcess;
2868     std::condition_variable cv;
2869     bool finish = false;
2870     auto callback = [&cv, &finish, &processMutex]
__anon3db76c683d02(const std::map<std::string, SyncProcess> &process) 2871         (const std::map<std::string, SyncProcess> &process) {
2872         for (auto &item : process) {
2873             if (item.second.process == FINISHED) {
2874                 EXPECT_EQ(item.second.errCode, DBStatus::OK);
2875                 std::unique_lock<std::mutex> lock(processMutex);
2876                 finish = true;
2877                 cv.notify_one();
2878             }
2879         }
2880     };
2881     CloudSyncOption option;
2882     option.devices = {DEVICE_CLOUD};
2883     option.priorityTask = true;
2884     option.compensatedSyncOnly = true;
2885     DBStatus syncResult = g_delegate->Sync(option, callback);
2886     EXPECT_EQ(syncResult, DBStatus::OK);
2887 
2888     /**
2889      * @tc.steps:step4. wait sync finish and check data.
2890      * @tc.expected: step4. return ok.
2891      */
2892     std::unique_lock<std::mutex> lock(processMutex);
__anon3db76c683e02() 2893     cv.wait(lock, [&finish]() {
2894         return finish;
2895     });
2896     std::this_thread::sleep_for(std::chrono::seconds(1));
2897     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
2898         reinterpret_cast<void *>(0u), nullptr), SQLITE_OK);
2899 }
2900 } // namespace
2901 #endif // RELATIONAL_STORE
2902