• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "cloud/asset_operation_utils.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "mock_asset_loader.h"
23 #include "process_system_api_adapter_impl.h"
24 #include "relational_store_client.h"
25 #include "relational_store_delegate_impl.h"
26 #include "relational_store_instance.h"
27 #include "relational_store_manager.h"
28 #include "runtime_config.h"
29 #include "sqlite_relational_store.h"
30 #include "sqlite_relational_utils.h"
31 #include "time_helper.h"
32 #include "virtual_asset_loader.h"
33 #include "virtual_cloud_data_translate.h"
34 #include "virtual_cloud_db.h"
35 #include "virtual_communicator_aggregator.h"
36 #include <gtest/gtest.h>
37 #include <iostream>
38 
39 using namespace testing::ext;
40 using namespace DistributedDB;
41 using namespace DistributedDBUnitTest;
42 using namespace std;
43 
44 namespace {
45 const string STORE_ID = "Relational_Store_SYNC";
46 const string DB_SUFFIX = ".db";
47 const string ASSETS_TABLE_NAME = "student";
48 const string ASSETS_TABLE_NAME_SHARED = "student_shared";
49 const string NO_PRIMARY_TABLE = "teacher";
50 const string NO_PRIMARY_TABLE_SHARED = "teacher_shared";
51 const string COMPOUND_PRIMARY_TABLE = "worker1";
52 const string COMPOUND_PRIMARY_TABLE_SHARED = "worker1_shared";
53 const string DEVICE_CLOUD = "cloud_dev";
54 const string COL_ID = "id";
55 const string COL_NAME = "name";
56 const string COL_HEIGHT = "height";
57 const string COL_ASSET = "asset";
58 const string COL_ASSETS = "assets";
59 const string COL_AGE = "age";
60 const int64_t SYNC_WAIT_TIME = 600;
61 const int64_t COMPENSATED_SYNC_WAIT_TIME = 5;
62 const std::vector<Field> CLOUD_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true}, {COL_NAME, TYPE_INDEX<std::string>},
63     {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
64     {COL_AGE, TYPE_INDEX<int64_t>}};
65 const std::vector<Field> NO_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>}, {COL_NAME, TYPE_INDEX<std::string>},
66     {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
67     {COL_AGE, TYPE_INDEX<int64_t>}};
68 const std::vector<Field> COMPOUND_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true},
69     {COL_NAME, TYPE_INDEX<std::string>}, {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>},
70     {COL_ASSETS, TYPE_INDEX<Assets>}, {COL_AGE, TYPE_INDEX<int64_t>, true}};
71 const string CREATE_SINGLE_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + ASSETS_TABLE_NAME + "(" + COL_ID +
72     " INTEGER PRIMARY KEY," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," +
73     COL_ASSETS + " ASSETS," + COL_AGE + " INT);";
74 const string CREATE_NO_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + NO_PRIMARY_TABLE + "(" + COL_ID +
75     " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS +
76     " ASSETS," + COL_AGE + " INT);";
77 const string CREATE_COMPOUND_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + COMPOUND_PRIMARY_TABLE + "(" + COL_ID +
78     " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS + " ASSETS," +
79     COL_AGE + " INT, PRIMARY KEY (id, age));";
80 const Asset ASSET_COPY = {.version = 1,
81     .name = "Phone",
82     .assetId = "0",
83     .subpath = "/local/sync",
84     .uri = "/local/sync",
85     .modifyTime = "123456",
86     .createTime = "",
87     .size = "256",
88     .hash = "ASE"};
89 const Asset ASSET_COPY2 = {.version = 1,
90     .name = "Phone_copy_2",
91     .assetId = "0",
92     .subpath = "/local/sync",
93     .uri = "/local/sync",
94     .modifyTime = "123456",
95     .createTime = "",
96     .size = "256",
97     .hash = "ASE"};
98 const Assets ASSETS_COPY1 = { ASSET_COPY, ASSET_COPY2 };
99 const std::string QUERY_CONSISTENT_SQL = "select count(*) from naturalbase_rdb_aux_student_log where flag&0x20=0;";
100 const std::string QUERY_COMPENSATED_SQL = "select count(*) from naturalbase_rdb_aux_student_log where flag&0x10!=0;";
101 
102 string g_storePath;
103 string g_testDir;
104 RelationalStoreObserverUnitTest *g_observer = nullptr;
105 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
106 RelationalStoreDelegate *g_delegate = nullptr;
107 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
108 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
109 std::shared_ptr<VirtualCloudDataTranslate> g_virtualCloudDataTranslate;
110 SyncProcess g_syncProcess;
111 std::condition_variable g_processCondition;
112 std::mutex g_processMutex;
113 IRelationalStore *g_store = nullptr;
114 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
115 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
116 
InitDatabase(sqlite3 * & db)117 void InitDatabase(sqlite3 *&db)
118 {
119     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
120     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_SINGLE_PRIMARY_KEY_TABLE), SQLITE_OK);
121     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_NO_PRIMARY_KEY_TABLE), SQLITE_OK);
122     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_COMPOUND_PRIMARY_KEY_TABLE), SQLITE_OK);
123 }
124 
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)125 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
126 {
127     TableSchema assetsTableSchema = {.name = ASSETS_TABLE_NAME, .sharedTableName = ASSETS_TABLE_NAME_SHARED,
128                                      .fields = CLOUD_FIELDS};
129     dataBaseSchema.tables.push_back(assetsTableSchema);
130     assetsTableSchema = {.name = NO_PRIMARY_TABLE, .sharedTableName = NO_PRIMARY_TABLE_SHARED,
131                          .fields = NO_PRIMARY_FIELDS};
132     dataBaseSchema.tables.push_back(assetsTableSchema);
133     assetsTableSchema = {.name = COMPOUND_PRIMARY_TABLE, .sharedTableName = COMPOUND_PRIMARY_TABLE_SHARED,
134                          .fields = COMPOUND_PRIMARY_FIELDS};
135     dataBaseSchema.tables.push_back(assetsTableSchema);
136 }
137 
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,std::vector<VBucket> & record,std::vector<VBucket> & extend)138 void GenerateDataRecords(
139     int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend)
140 {
141     for (int64_t i = begin; i < begin + count; i++) {
142         Assets assets;
143         Asset asset = ASSET_COPY;
144         asset.name = ASSET_COPY.name + std::to_string(i);
145         assets.emplace_back(asset);
146         asset.name = ASSET_COPY.name + std::to_string(i) + "_copy";
147         assets.emplace_back(asset);
148         VBucket data;
149         data.insert_or_assign(COL_ID, i);
150         data.insert_or_assign(COL_NAME, "name" + std::to_string(i));
151         data.insert_or_assign(COL_HEIGHT, 166.0 * i); // 166.0 is random double value
152         data.insert_or_assign(COL_ASSETS, assets);
153         data.insert_or_assign(COL_AGE, 18L + i); // 18 is random int value
154         record.push_back(data);
155 
156         VBucket log;
157         Timestamp now = TimeHelper::GetSysCurrentTime();
158         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
159         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
160         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
161         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
162         extend.push_back(log);
163     }
164 }
165 
InsertLocalData(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName,bool isAssetNull=true)166 void InsertLocalData(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName, bool isAssetNull = true)
167 {
168     int errCode;
169     std::vector<VBucket> record;
170     std::vector<VBucket> extend;
171     GenerateDataRecords(begin, count, 0, record, extend);
172     const string sql = "insert or replace into " + tableName + " values (?,?,?,?,?,?);";
173     for (VBucket vBucket : record) {
174         sqlite3_stmt *stmt = nullptr;
175         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
176         ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, std::get<int64_t>(vBucket[COL_ID])), E_OK); // 1 is id
177         ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, std::get<string>(vBucket[COL_NAME])), E_OK); // 2 is name
178         ASSERT_EQ(SQLiteUtils::MapSQLiteErrno(
179             sqlite3_bind_double(stmt, 3, std::get<double>(vBucket[COL_HEIGHT]))), E_OK); // 3 is height
180         if (isAssetNull) {
181             ASSERT_EQ(sqlite3_bind_null(stmt, 4), SQLITE_OK); // 4 is asset
182         } else {
183             std::vector<uint8_t> assetBlob = g_virtualCloudDataTranslate->AssetToBlob(ASSET_COPY);
184             ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 4, assetBlob, false), E_OK); // 4 is asset
185         }
186         std::vector<uint8_t> assetsBlob = g_virtualCloudDataTranslate->AssetsToBlob(
187             std::get<Assets>(vBucket[COL_ASSETS]));
188         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 5, assetsBlob, false), E_OK); // 5 is assets
189         ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 6, std::get<int64_t>(vBucket[COL_AGE])), E_OK); // 6 is age
190         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
191         SQLiteUtils::ResetStatement(stmt, true, errCode);
192     }
193 }
194 
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets,bool isEmptyAssets=false)195 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets, bool isEmptyAssets = false)
196 {
197     int errCode;
198     std::vector<uint8_t> assetBlob;
199     const string sql = "update " + tableName + " set assets=?;";
200     sqlite3_stmt *stmt = nullptr;
201     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
202     if (isEmptyAssets) {
203         ASSERT_EQ(sqlite3_bind_null(stmt, 1), SQLITE_OK);
204     } else {
205         assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
206         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
207     }
208     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
209     SQLiteUtils::ResetStatement(stmt, true, errCode);
210 }
211 
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets,int32_t begin,int32_t end)212 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets, int32_t begin, int32_t end)
213 {
214     int errCode;
215     std::vector<uint8_t> assetBlob;
216     const string sql = "update " + tableName + " set assets=? " + "where id>=" + std::to_string(begin) +
217         " and id<=" + std::to_string(end) + ";";
218     sqlite3_stmt *stmt = nullptr;
219     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
220     assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
221     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
222     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
223     SQLiteUtils::ResetStatement(stmt, true, errCode);
224 }
225 
DeleteLocalRecord(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName)226 void DeleteLocalRecord(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName)
227 {
228     ASSERT_NE(db, nullptr);
229     for (int64_t i = begin; i < begin + count; i++) {
230         string sql = "DELETE FROM " + tableName + " WHERE id ='" + std::to_string(i) + "';";
231         ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db, sql), E_OK);
232     }
233 }
234 
DeleteCloudDBData(int64_t begin,int64_t count,const std::string & tableName)235 void DeleteCloudDBData(int64_t begin, int64_t count, const std::string &tableName)
236 {
237     for (int64_t i = begin; i < begin + count; i++) {
238         VBucket idMap;
239         idMap.insert_or_assign("#_gid", std::to_string(i));
240         ASSERT_EQ(g_virtualCloudDb->DeleteByGid(tableName, idMap), DBStatus::OK);
241     }
242 }
243 
UpdateCloudDBData(int64_t begin,int64_t count,int64_t gidStart,int64_t versionStart,const std::string & tableName)244 void UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart, int64_t versionStart,
245     const std::string &tableName)
246 {
247     std::this_thread::sleep_for(std::chrono::milliseconds(1));
248     std::vector<VBucket> record;
249     std::vector<VBucket> extend;
250     GenerateDataRecords(begin, count, gidStart, record, extend);
251     for (auto &entry: extend) {
252         entry[CloudDbConstant::VERSION_FIELD] = std::to_string(versionStart++);
253     }
254     ASSERT_EQ(g_virtualCloudDb->BatchUpdate(tableName, std::move(record), extend), DBStatus::OK);
255     std::this_thread::sleep_for(std::chrono::milliseconds(1));
256 }
257 
QueryStatusCallback(void * data,int count,char ** colValue,char ** colName)258 int QueryStatusCallback(void *data, int count, char **colValue, char **colName)
259 {
260     auto status = static_cast<std::vector<int64_t> *>(data);
261     const int decimal = 10;
262     for (int i = 0; i < count; i++) {
263         status->push_back(strtol(colValue[0], nullptr, decimal));
264     }
265     return 0;
266 }
267 
CheckLockStatus(sqlite3 * db,int startId,int endId,LockStatus lockStatus)268 void CheckLockStatus(sqlite3 *db, int startId, int endId, LockStatus lockStatus)
269 {
270     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
271     std::string sql = "select status from " + logName + " where data_key >=" + std::to_string(startId) +
272         " and data_key <=" +  std::to_string(endId) + ";";
273     std::vector<int64_t> status;
274     char *str = NULL;
275     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryStatusCallback, static_cast<void *>(&status), &str),
276         SQLITE_OK);
277     ASSERT_EQ(static_cast<size_t>(endId - startId + 1), status.size());
278 
279     for (auto stat : status) {
280         ASSERT_EQ(static_cast<int64_t>(lockStatus), stat);
281     }
282 }
283 
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const std::string & tableName)284 void InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart, const std::string &tableName)
285 {
286     std::vector<VBucket> record;
287     std::vector<VBucket> extend;
288     GenerateDataRecords(begin, count, gidStart, record, extend);
289     if (tableName == ASSETS_TABLE_NAME_SHARED) {
290         for (auto &vBucket: record) {
291             vBucket.insert_or_assign(CloudDbConstant::CLOUD_OWNER, std::string("cloudA"));
292         }
293     }
294     ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(tableName, std::move(record), extend), DBStatus::OK);
295 }
296 
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)297 void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
298 {
299     std::unique_lock<std::mutex> lock(g_processMutex);
300     bool result = g_processCondition.wait_for(
301         lock, std::chrono::seconds(waitTime), [&syncProcess]() { return syncProcess.process == FINISHED; });
302     ASSERT_EQ(result, true);
303     LOGD("-------------------sync end--------------");
304 }
305 
CallSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus,DBStatus errCode=OK)306 void CallSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus, DBStatus errCode = OK)
307 {
308     g_syncProcess = {};
309     Query query = Query::Select().FromTable(tableNames);
310     std::vector<SyncProcess> expectProcess;
311     CloudSyncStatusCallback callback = [&errCode](const std::map<std::string, SyncProcess> &process) {
312         ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
313         std::unique_lock<std::mutex> lock(g_processMutex);
314         g_syncProcess = process.begin()->second;
315         if (g_syncProcess.process == FINISHED) {
316             g_processCondition.notify_one();
317             ASSERT_EQ(g_syncProcess.errCode, errCode);
318         }
319     };
320     CloudSyncOption option;
321     option.devices = {DEVICE_CLOUD};
322     option.mode = mode;
323     option.query = query;
324     option.waitTime = SYNC_WAIT_TIME;
325     option.lockAction = static_cast<LockAction>(0xff); // lock all
326     ASSERT_EQ(g_delegate->Sync(option, callback), dbStatus);
327 
328     if (dbStatus == DBStatus::OK) {
329         WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
330     }
331 }
332 
CheckDownloadForTest001(int index,map<std::string,Assets> & assets)333 void CheckDownloadForTest001(int index, map<std::string, Assets> &assets)
334 {
335     for (auto &item : assets) {
336         for (auto &asset : item.second) {
337             EXPECT_EQ(AssetOperationUtils::EraseBitMask(asset.status), static_cast<uint32_t>(AssetStatus::INSERT));
338             if (index < 4) { // 1-4 is inserted
339                 EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::INSERT));
340             }
341             LOGD("asset [name]:%s, [status]:%u, [flag]:%u, [index]:%d", asset.name.c_str(), asset.status, asset.flag,
342                 index);
343         }
344     }
345 }
346 
CheckDownloadFailedForTest002(sqlite3 * & db)347 void CheckDownloadFailedForTest002(sqlite3 *&db)
348 {
349     std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME;
350     sqlite3_stmt *stmt = nullptr;
351     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
352     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
353         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
354         Type cloudValue;
355         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
356         std::vector<uint8_t> assetsBlob;
357         Assets assets;
358         ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
359         ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
360         ASSERT_EQ(assets.size(), 2u); // 2 is asset num
361         for (size_t i = 0; i < assets.size(); ++i) {
362             EXPECT_EQ(assets[i].status, AssetStatus::ABNORMAL);
363         }
364     }
365     int errCode;
366     SQLiteUtils::ResetStatement(stmt, true, errCode);
367 }
368 
UpdateAssetsForLocal(sqlite3 * & db,int id,uint32_t status)369 void UpdateAssetsForLocal(sqlite3 *&db, int id, uint32_t status)
370 {
371     Assets assets;
372     Asset asset = ASSET_COPY;
373     asset.name = ASSET_COPY.name + std::to_string(id);
374     asset.status = status;
375     assets.emplace_back(asset);
376     asset.name = ASSET_COPY.name + std::to_string(id) + "_copy";
377     assets.emplace_back(asset);
378     int errCode;
379     std::vector<uint8_t> assetBlob;
380     const string sql = "update " + ASSETS_TABLE_NAME + " set assets=? where id = " + std::to_string(id);
381     sqlite3_stmt *stmt = nullptr;
382     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
383     assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
384     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
385     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
386     SQLiteUtils::ResetStatement(stmt, true, errCode);
387 }
388 
CheckConsistentCount(sqlite3 * db,int64_t expectCount)389 void CheckConsistentCount(sqlite3 *db, int64_t expectCount)
390 {
391     EXPECT_EQ(sqlite3_exec(db, QUERY_CONSISTENT_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
392         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
393 }
394 
CheckCompensatedCount(sqlite3 * db,int64_t expectCount)395 void CheckCompensatedCount(sqlite3 *db, int64_t expectCount)
396 {
397     EXPECT_EQ(sqlite3_exec(db, QUERY_COMPENSATED_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
398         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
399 }
400 
CloseDb()401 void CloseDb()
402 {
403     if (g_delegate != nullptr) {
404         EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
405         g_delegate = nullptr;
406     }
407     delete g_observer;
408     g_virtualCloudDb = nullptr;
409 }
410 
411 class DistributedDBCloudSyncerDownloadAssetsOnlyTest : public testing::Test {
412 public:
413     static void SetUpTestCase(void);
414     static void TearDownTestCase(void);
415     void SetUp();
416     void TearDown();
417 
418 protected:
419     void CheckLocaLAssets(const std::string &tableName, const std::string &expectAssetId,
420         const std::set<int> &failIndex);
421     void CheckLocalAssetIsEmpty(const std::string &tableName);
422     void CheckCursorData(const std::string &tableName, int begin);
423     void WaitForSync(int &syncCount);
424     const RelationalSyncAbleStorage *GetRelationalStore();
425     void InitDataStatusTest(bool needDownload);
426     void DataStatusTest001(bool needDownload);
427     void DataStatusTest003();
428     void DataStatusTest004();
429     void DataStatusTest005();
430     void DataStatusTest006();
431     void DataStatusTest007();
432     sqlite3 *db = nullptr;
433     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
434 };
435 
SetUpTestCase(void)436 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::SetUpTestCase(void)
437 {
438     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
439     g_storePath = g_testDir + "/" + STORE_ID + DB_SUFFIX;
440     LOGI("The test db is:%s", g_storePath.c_str());
441     g_virtualCloudDataTranslate = std::make_shared<VirtualCloudDataTranslate>();
442     RuntimeConfig::SetCloudTranslate(g_virtualCloudDataTranslate);
443 }
444 
TearDownTestCase(void)445 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::TearDownTestCase(void) {}
446 
SetUp(void)447 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::SetUp(void)
448 {
449     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
450     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
451         LOGE("rm test db files error.");
452     }
453     DistributedDBToolsUnitTest::PrintTestCaseInfo();
454     LOGD("Test dir is %s", g_testDir.c_str());
455     db = RelationalTestUtils::CreateDataBase(g_storePath);
456     ASSERT_NE(db, nullptr);
457     InitDatabase(db);
458     g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
459     ASSERT_NE(g_observer, nullptr);
460     ASSERT_EQ(
461         g_mgr.OpenStore(g_storePath, STORE_ID, RelationalStoreDelegate::Option{.observer = g_observer}, g_delegate),
462         DBStatus::OK);
463     ASSERT_NE(g_delegate, nullptr);
464     ASSERT_EQ(g_delegate->CreateDistributedTable(ASSETS_TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
465     ASSERT_EQ(g_delegate->CreateDistributedTable(NO_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
466     ASSERT_EQ(g_delegate->CreateDistributedTable(COMPOUND_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
467     g_virtualCloudDb = make_shared<VirtualCloudDb>();
468     g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
469     g_syncProcess = {};
470     ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
471     ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
472     DataBaseSchema dataBaseSchema;
473     GetCloudDbSchema(dataBaseSchema);
474     ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
475     g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
476     ASSERT_NE(g_cloudStoreHook, nullptr);
477     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
478     ASSERT_TRUE(communicatorAggregator_ != nullptr);
479     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
480 }
481 
TearDown(void)482 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::TearDown(void)
483 {
484     RefObject::DecObjRef(g_store);
485     g_virtualCloudDb->ForkUpload(nullptr);
486     CloseDb();
487     EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
488     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
489         LOGE("rm test db files error.");
490     }
491     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
492     communicatorAggregator_ = nullptr;
493     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
494 }
495 
CheckLocaLAssets(const std::string & tableName,const std::string & expectAssetId,const std::set<int> & failIndex)496 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::CheckLocaLAssets(const std::string &tableName,
497     const std::string &expectAssetId, const std::set<int> &failIndex)
498 {
499     std::string sql = "SELECT assets FROM " + tableName + ";";
500     sqlite3_stmt *stmt = nullptr;
501     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
502     int index = 0;
503     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
504         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
505         Type cloudValue;
506         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
507         Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
508         for (const auto &asset : assets) {
509             index++;
510             if (failIndex.find(index) != failIndex.end()) {
511                 EXPECT_EQ(asset.assetId, "0");
512             } else {
513                 EXPECT_EQ(asset.assetId, expectAssetId);
514             }
515         }
516     }
517     int errCode = E_OK;
518     SQLiteUtils::ResetStatement(stmt, true, errCode);
519 }
520 
CheckLocalAssetIsEmpty(const std::string & tableName)521 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::CheckLocalAssetIsEmpty(const std::string &tableName)
522 {
523     std::string sql = "SELECT asset FROM " + tableName + ";";
524     sqlite3_stmt *stmt = nullptr;
525     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
526     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
527         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
528     }
529     int errCode = E_OK;
530     SQLiteUtils::ResetStatement(stmt, true, errCode);
531 }
532 
CheckCursorData(const std::string & tableName,int begin)533 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::CheckCursorData(const std::string &tableName, int begin)
534 {
535     std::string logTableName = DBCommon::GetLogTableName(tableName);
536     std::string sql = "SELECT cursor FROM " + logTableName + ";";
537     sqlite3_stmt *stmt = nullptr;
538     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
539     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
540         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_INTEGER);
541         Type cloudValue;
542         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
543         EXPECT_EQ(std::get<int64_t>(cloudValue), begin);
544         begin++;
545     }
546     int errCode = E_OK;
547     SQLiteUtils::ResetStatement(stmt, true, errCode);
548 }
549 
WaitForSync(int & syncCount)550 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::WaitForSync(int &syncCount)
551 {
552     std::unique_lock<std::mutex> lock(g_processMutex);
553     bool result = g_processCondition.wait_for(lock, std::chrono::seconds(COMPENSATED_SYNC_WAIT_TIME),
554         [&syncCount]() { return syncCount == 2; }); // 2 is compensated sync
555     ASSERT_EQ(result, true);
556 }
557 
GetRelationalStore()558 const RelationalSyncAbleStorage* DistributedDBCloudSyncerDownloadAssetsOnlyTest::GetRelationalStore()
559 {
560     RelationalDBProperties properties;
561     CloudDBSyncUtilsTest::InitStoreProp(g_storePath, APP_ID, USER_ID, STORE_ID, properties);
562     int errCode = E_OK;
563     g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
564     if (g_store == nullptr) {
565         return nullptr;
566     }
567     return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
568 }
569 
InitDataStatusTest(bool needDownload)570 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::InitDataStatusTest(bool needDownload)
571 {
572     int cloudCount = 20;
573     int localCount = 10;
574     InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
575     if (needDownload) {
576         UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
577     }
578     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
579     std::string sql = "update " + logName + " SET status = 1 where data_key in (1,11);";
580     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
581     sql = "update " + logName + " SET status = 2 where data_key in (2,12);";
582     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
583     sql = "update " + logName + " SET status = 3 where data_key in (3,13);";
584     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
585     std::this_thread::sleep_for(std::chrono::milliseconds(1));
586     InsertCloudDBData(0, localCount, 0, ASSETS_TABLE_NAME);
587     std::this_thread::sleep_for(std::chrono::milliseconds(1));
588     sql = "update " + ASSETS_TABLE_NAME + " set age='666' where id in (4);";
589     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
590     sql = "update " + logName + " SET status = 1 where data_key in (4);";
591     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
592 }
593 
594 struct ProcessParam {
595     const std::map<std::string, SyncProcess> &OnProcess;
596     const SyncProcess &process;
597     std::mutex &processMutex;
598     std::condition_variable &cv;
599     bool &finish;
600     const CloudSyncStatusCallback &onFinish;
601     DBStatus expectResult;
602 };
603 
HandleProcessFinish(ProcessParam & param)604 void HandleProcessFinish(ProcessParam &param)
605 {
606     if (param.process.process == FINISHED) {
607         if (param.onFinish) {
608             param.onFinish(param.OnProcess);
609         }
610         EXPECT_EQ(param.process.errCode, param.expectResult);
611         std::unique_lock<std::mutex> lock(param.processMutex);
612         param.finish = true;
613         param.cv.notify_one();
614     }
615 }
616 
PriorityLevelSync(int32_t priorityLevel,const Query & query,const CloudSyncStatusCallback & onFinish,SyncMode mode,DBStatus expectResult=DBStatus::OK)617 void PriorityLevelSync(int32_t priorityLevel, const Query &query, const CloudSyncStatusCallback &onFinish,
618     SyncMode mode, DBStatus expectResult = DBStatus::OK)
619 {
620     std::mutex processMutex;
621     std::vector<SyncProcess> expectProcess;
622     std::condition_variable cv;
623     bool finish = false;
624     auto callback = [&cv, &onFinish, &finish, &processMutex, &expectResult]
625         (const std::map<std::string, SyncProcess> &process) {
626         for (auto &item : process) {
627             ProcessParam param = {process, item.second, processMutex, cv, finish, onFinish, expectResult};
628             HandleProcessFinish(param);
629         }
630     };
631     CloudSyncOption option;
632     option.devices = {DEVICE_CLOUD};
633     option.query = query;
634     option.mode = mode;
635     option.priorityTask = true;
636     option.priorityLevel = priorityLevel;
637     DBStatus syncResult = g_delegate->Sync(option, callback);
638     EXPECT_EQ(syncResult, DBStatus::OK);
639 
640     std::unique_lock<std::mutex> lock(processMutex);
641     cv.wait(lock, [&finish]() {
642         return finish;
643     });
644 }
645 
PriorityLevelSync(int32_t priorityLevel,const Query & query,SyncMode mode,DBStatus expectResult=DBStatus::OK)646 void PriorityLevelSync(int32_t priorityLevel, const Query &query, SyncMode mode, DBStatus expectResult = DBStatus::OK)
647 {
648     std::mutex processMutex;
649     std::vector<SyncProcess> expectProcess;
650     std::condition_variable cv;
651     bool finish = expectResult == DBStatus::OK ? false : true;
652     auto callback = [&cv, &finish, &processMutex]
653         (const std::map<std::string, SyncProcess> &process) {
654         for (auto &item : process) {
655             if (item.second.process == FINISHED) {
656                 std::unique_lock<std::mutex> lock(processMutex);
657                 finish = true;
658                 cv.notify_one();
659             }
660         }
661     };
662     CloudSyncOption option;
663     option.devices = {DEVICE_CLOUD};
664     option.query = query;
665     option.mode = mode;
666     option.priorityTask = true;
667     option.priorityLevel = priorityLevel;
668     DBStatus syncResult = g_delegate->Sync(option, callback);
669     EXPECT_EQ(syncResult, expectResult);
670 
671     std::unique_lock<std::mutex> lock(processMutex);
672     cv.wait(lock, [&finish]() {
673         return finish;
674     });
675 }
676 
CheckAsset(sqlite3 * db,const std::string & tableName,int id,const Asset & expectAsset,bool expectFound)677 void CheckAsset(sqlite3 *db, const std::string &tableName, int id, const Asset &expectAsset, bool expectFound)
678 {
679     std::string sql = "select assets from " + tableName + " where id = " + std::to_string(id);
680     sqlite3_stmt *stmt = nullptr;
681     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
682     int errCode = SQLiteUtils::StepWithRetry(stmt);
683     ASSERT_EQ(errCode, SQLiteUtils::MapSQLiteErrno(SQLITE_ROW));
684     if (expectFound) {
685         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
686     }
687     Type cloudValue;
688     ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
689     Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
690     bool found = false;
691     for (const auto &asset : assets) {
692         if (asset.name != expectAsset.name) {
693             continue;
694         }
695         found = true;
696         EXPECT_EQ(asset.status, expectAsset.status);
697         EXPECT_EQ(asset.hash, expectAsset.hash);
698         EXPECT_EQ(asset.assetId, expectAsset.assetId);
699         EXPECT_EQ(asset.uri, expectAsset.uri);
700     }
701     EXPECT_EQ(found, expectFound);
702     errCode = E_OK;
703     SQLiteUtils::ResetStatement(stmt, true, errCode);
704     EXPECT_EQ(errCode, E_OK);
705 }
706 
CheckDBValue(sqlite3 * db,const std::string & tableName,int id,const std::string & field,const std::string & expectValue)707 void CheckDBValue(sqlite3 *db, const std::string &tableName, int id, const std::string &field,
708     const std::string &expectValue)
709 {
710     std::string sql = "select " + field + " from " + tableName + " where id = " + std::to_string(id);
711     sqlite3_stmt *stmt = nullptr;
712     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
713     int errCode = SQLiteUtils::StepWithRetry(stmt);
714     if (expectValue.empty()) {
715         EXPECT_EQ(errCode, SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
716     }
717     ASSERT_EQ(errCode, SQLiteUtils::MapSQLiteErrno(SQLITE_ROW));
718     std::string str;
719     (void)SQLiteUtils::GetColumnTextValue(stmt, 0, str);
720     EXPECT_EQ(str, expectValue);
721     SQLiteUtils::ResetStatement(stmt, true, errCode);
722     errCode = E_OK;
723     EXPECT_EQ(errCode, E_OK);
724 }
725 
726 /**
727   * @tc.name: DownloadAssetsOnly001
728   * @tc.desc: Test sync with priorityLevel
729   * @tc.type: FUNC
730   * @tc.require:
731   * @tc.author: liaoyonghuang
732   */
733 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly001, TestSize.Level1)
734 {
735     /**
736      * @tc.steps:step1. init data
737      * @tc.expected: step1. return OK.
738      */
739     int cloudCount = 15; // 15 is num of cloud
740     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
741     /**
742      * @tc.steps:step2. Call sync with different priorityLevel
743      * @tc.expected: step2. OK
744      */
745     int syncFinishCount = 0;
746     g_virtualCloudDb->SetBlockTime(100);
__anon9ef010c90902() 747     std::thread syncThread1([&]() {
748         CloudSyncStatusCallback callback = [&syncFinishCount](const std::map<std::string, SyncProcess> &process) {
749             syncFinishCount++;
750             EXPECT_EQ(syncFinishCount, 3);
751         };
752         std::vector<int64_t> inValue = {0, 1, 2, 3, 4};
753         Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue);
754         PriorityLevelSync(0, query, callback, SyncMode::SYNC_MODE_CLOUD_MERGE);
755     });
756 
__anon9ef010c90b02() 757     std::thread syncThread2([&]() {
758         CloudSyncStatusCallback callback = [&syncFinishCount](const std::map<std::string, SyncProcess> &process) {
759             syncFinishCount++;
760             EXPECT_EQ(syncFinishCount, 2);
761         };
762         std::vector<int64_t> inValue = {5, 6, 7, 8, 9};
763         Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue);
764         PriorityLevelSync(1, query, callback, SyncMode::SYNC_MODE_CLOUD_MERGE);
765     });
766 
__anon9ef010c90d02() 767     std::thread syncThread3([&]() {
768         CloudSyncStatusCallback callback = [&syncFinishCount](const std::map<std::string, SyncProcess> &process) {
769             syncFinishCount++;
770             EXPECT_EQ(syncFinishCount, 1);
771         };
772         std::vector<int64_t> inValue = {10, 11, 12, 13, 14};
773         Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue);
774         PriorityLevelSync(2, query, callback, SyncMode::SYNC_MODE_CLOUD_MERGE);
775     });
776     syncThread1.join();
777     syncThread2.join();
778     syncThread3.join();
779 }
780 
781 /**
782   * @tc.name: DownloadAssetsOnly002
783   * @tc.desc: Test download specified assets with unsupported mode
784   * @tc.type: FUNC
785   * @tc.require:
786   * @tc.author: liaoyonghuang
787   */
788 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly002, TestSize.Level0)
789 {
790     /**
791      * @tc.steps:step1. init data
792      * @tc.expected: step1. return OK.
793      */
794     int localDataCount = 10;
795     InsertLocalData(db, 0, localDataCount, ASSETS_TABLE_NAME, true);
796     UpdateLocalData(db, ASSETS_TABLE_NAME, {ASSET_COPY}, true);
797     int cloudCount = 10;
798     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
799     /**
800      * @tc.steps:step2. Download specified assets with mode SYNC_MODE_CLOUD_MERGE and SYNC_MODE_CLOUD_FORCE_PUSH
801      * @tc.expected: step2. sync fail
802     */
803     std::vector<int64_t> inValue = {0};
804     std::map<std::string, std::set<std::string>> assets;
805     assets["assets"] = {ASSET_COPY.name + "0"};
806     Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue).And().AssetsOnly(assets);
807 
808     CloudSyncOption option;
809     option.devices = {DEVICE_CLOUD};
810     option.query = query;
811     option.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
812     option.priorityTask = true;
813     option.priorityLevel = 2u;
814     EXPECT_EQ(g_delegate->Sync(option, nullptr), DBStatus::NOT_SUPPORT);
815 
816     option.mode = SyncMode::SYNC_MODE_CLOUD_FORCE_PUSH;
817     EXPECT_EQ(g_delegate->Sync(option, nullptr), DBStatus::NOT_SUPPORT);
818 }
819 
820 /**
821   * @tc.name: DownloadAssetsOnly004
822   * @tc.desc: Test download specified assets
823   * @tc.type: FUNC
824   * @tc.require:
825   * @tc.author: liaoyonghuang
826   */
827 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly003, TestSize.Level0)
828 {
829     /**
830      * @tc.steps:step1. init data
831      * @tc.expected: step1. return OK.
832      */
833     int dataCount = 10;
834     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
835     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
836     for (int i = 0; i < dataCount; i++) {
837         Asset asset = ASSET_COPY;
838         asset.name += std::to_string(i);
839         asset.status = AssetStatus::UPDATE;
840         asset.hash = "local_new";
841         Assets assets = {asset};
842         asset.name += "_new";
843         assets.push_back(asset);
844         UpdateLocalData(db, ASSETS_TABLE_NAME, assets, i, i);
845     }
846     /**
847      * @tc.steps:step2. Download specified assets
848      * @tc.expected: step2. return OK.
849      */
850     std::vector<int64_t> inValue = {0};
851     std::map<std::string, std::set<std::string>> assets;
852     assets["assets"] = {ASSET_COPY.name + "0"};
853     Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue).And().AssetsOnly(assets);
854     PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
855 
856     Asset assetCloud = ASSET_COPY;
857     assetCloud.name += std::to_string(0);
858     Asset assetLocal = ASSET_COPY;
859     assetLocal.name +=std::to_string(0) + "_new";
860     assetLocal.hash = "local_new";
861     assetLocal.status = AssetStatus::UPDATE;
862     CheckAsset(db, ASSETS_TABLE_NAME, 0, assetCloud, true);
863     CheckAsset(db, ASSETS_TABLE_NAME, 0, assetLocal, true);
864 
865     for (int i = 1; i < dataCount; i++) {
866         Asset assetLocal1 = ASSET_COPY;
867         assetLocal1.name += std::to_string(i);
868         Asset assetLocal2 = ASSET_COPY;
869         assetLocal2.name +=std::to_string(i) + "_new";
870         assetLocal1.hash = "local_new";
871         assetLocal2.hash = "local_new";
872         assetLocal1.status = AssetStatus::UPDATE;
873         assetLocal2.status = AssetStatus::UPDATE;
874         CheckAsset(db, ASSETS_TABLE_NAME, i, assetLocal1, true);
875         CheckAsset(db, ASSETS_TABLE_NAME, i, assetLocal2, true);
876     }
877 }
878 
879 /**
880   * @tc.name: DownloadAssetsOnly004
881   * @tc.desc: Test download specified assets
882   * @tc.type: FUNC
883   * @tc.require:
884   * @tc.author: liaoyonghuang
885   */
886 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly004, TestSize.Level0)
887 {
888     /**
889      * @tc.steps:step1. init data
890      * @tc.expected: step1. return OK.
891      */
892     int dataCount = 10;
893     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
894     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
895     std::vector<VBucket> record;
896     std::vector<VBucket> extend;
897     GenerateDataRecords(0, dataCount, 0, record, extend);
898     for (int i = 0; i < dataCount; i++) {
899         Asset asset1 = ASSET_COPY;
900         Asset asset2 = ASSET_COPY;
901         asset1.name += std::to_string(i);
902         asset2.name += std::to_string(i) + "_new";
903         asset1.hash = "cloud";
904         asset2.hash = "cloud";
905         Assets assets = {asset1, asset2};
906         record[i].insert_or_assign(COL_ASSETS, assets);
907         std::string newName = "name" + std::to_string(i) + "_new";
908         record[i].insert_or_assign(COL_NAME, newName);
909     }
910     ASSERT_EQ(g_virtualCloudDb->BatchUpdate(ASSETS_TABLE_NAME, std::move(record), extend), DBStatus::OK);
911     /**
912      * @tc.steps:step2. Download specified assets
913      * @tc.expected: step2. return OK.
914      */
915     std::vector<int64_t> inValue = {0};
916     std::map<std::string, std::set<std::string>> assets;
917     assets["assets"] = {ASSET_COPY.name + "0"};
918     Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue).And().AssetsOnly(assets);
919     PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
920 
921     Asset assetCloud1 = ASSET_COPY;
922     assetCloud1.name += std::to_string(0);
923     assetCloud1.hash = "cloud";
924     Asset assetCloud2 = ASSET_COPY;
925     assetCloud2.name +=std::to_string(0) + "_new";
926     assetCloud2.hash = "cloud";
927     CheckAsset(db, ASSETS_TABLE_NAME, 0, assetCloud1, true);
928     CheckAsset(db, ASSETS_TABLE_NAME, 0, assetCloud2, false);
929     CheckDBValue(db, ASSETS_TABLE_NAME, 0, COL_NAME, "name0");
930 
931     for (int i = 1; i < dataCount; i++) {
932         Asset assetLocal1 = ASSET_COPY;
933         assetLocal1.name += std::to_string(i);
934         Asset assetLocal2 = ASSET_COPY;
935         assetLocal2.name +=std::to_string(i) + "_new";
936         CheckAsset(db, ASSETS_TABLE_NAME, i, assetLocal1, true);
937         CheckAsset(db, ASSETS_TABLE_NAME, i, assetLocal2, false);
938         CheckDBValue(db, ASSETS_TABLE_NAME, i, COL_NAME, "name" + std::to_string(i));
939     }
940 }
941 
942 /**
943   * @tc.name: DownloadAssetsOnly005
944   * @tc.desc: Test download asseets which local no found
945   * @tc.type: FUNC
946   * @tc.require:
947   * @tc.author: luoguo
948   */
949 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly005, TestSize.Level0)
950 {
951     /**
952      * @tc.steps:step1. init data
953      * @tc.expected: step1. return OK.
954      */
955     int dataCount = 10;
956     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
957     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
958     InsertCloudDBData(dataCount, 1, 0, ASSETS_TABLE_NAME);
959     /**
960      * @tc.steps:step2. Download assets which local no found
961      * @tc.expected: step2. return ASSET_NOT_FOUND_FOR_DOWN_ONLY.
962      */
963     std::vector<int64_t> inValue = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
964     std::map<std::string, std::set<std::string>> assets;
965     assets["assets"] = {ASSET_COPY.name + "10"};
966     Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue).And().AssetsOnly(assets);
967     PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::ASSET_NOT_FOUND_FOR_DOWN_ONLY);
968 }
969 
970 /**
971   * @tc.name: DownloadAssetsOnly006
972   * @tc.desc: Test download asseets which cloud no found
973   * @tc.type: FUNC
974   * @tc.require:
975   * @tc.author: luoguo
976   */
977 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly006, TestSize.Level0)
978 {
979     /**
980      * @tc.steps:step1. init data
981      * @tc.expected: step1. return OK.
982      */
983     int dataCount = 10;
984     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
985     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
986     InsertLocalData(db, dataCount, 1, ASSETS_TABLE_NAME, true);
987     /**
988      * @tc.steps:step2. Download assets which cloud no found
989      * @tc.expected: step2. return ASSET_NOT_FOUND_FOR_DOWN_ONLY.
990      */
991     std::vector<int64_t> inValue = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};;
992     std::map<std::string, std::set<std::string>> assets;
993     assets["assets"] = {ASSET_COPY.name + "10"};
994     Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue).And().AssetsOnly(assets);
995     PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::ASSET_NOT_FOUND_FOR_DOWN_ONLY);
996 }
997 
998 /**
999   * @tc.name: DownloadAssetsOnly007
1000   * @tc.desc: Test download specified assets with group
1001   * @tc.type: FUNC
1002   * @tc.require:
1003   * @tc.author: luoguo
1004   */
1005 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly007, TestSize.Level0)
1006 {
1007     /**
1008      * @tc.steps:step1. init data
1009      * @tc.expected: step1. return OK.
1010      */
1011     int dataCount = 10;
1012     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1013     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1014     for (int i = 0; i < dataCount; i++) {
1015         Asset asset = ASSET_COPY;
1016         asset.name += std::to_string(i);
1017         asset.status = AssetStatus::UPDATE;
1018         asset.hash = "local_new";
1019         Assets assets = {asset};
1020         asset.name += "_new";
1021         assets.push_back(asset);
1022         UpdateLocalData(db, ASSETS_TABLE_NAME, assets, i, i);
1023     }
1024     /**
1025      * @tc.steps:step2. Download specified assets
1026      * @tc.expected: step2. return OK.
1027      */
1028     std::map<std::string, std::set<std::string>> assets;
1029     assets["assets"] = {ASSET_COPY.name + "0"};
1030     std::map<std::string, std::set<std::string>> assets1;
1031     assets1["assets"] = {ASSET_COPY.name + "1"};
1032     Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1033         EndGroup().Or().BeginGroup().EqualTo("id", 1).And().AssetsOnly(assets1).EndGroup();
1034     PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1035 
1036     Asset assetCloud = ASSET_COPY;
1037     assetCloud.name += std::to_string(0);
1038     Asset assetLocal = ASSET_COPY;
1039     assetLocal.name +=std::to_string(0) + "_new";
1040     assetLocal.hash = "local_new";
1041     assetLocal.status = AssetStatus::UPDATE;
1042     CheckAsset(db, ASSETS_TABLE_NAME, 0, assetCloud, true);
1043     CheckAsset(db, ASSETS_TABLE_NAME, 0, assetLocal, true);
1044 
1045     for (int i = 2; i < dataCount; i++) {
1046         Asset assetLocal1 = ASSET_COPY;
1047         assetLocal1.name += std::to_string(i);
1048         Asset assetLocal2 = ASSET_COPY;
1049         assetLocal2.name +=std::to_string(i) + "_new";
1050         assetLocal1.hash = "local_new";
1051         assetLocal2.hash = "local_new";
1052         assetLocal1.status = AssetStatus::UPDATE;
1053         assetLocal2.status = AssetStatus::UPDATE;
1054         CheckAsset(db, ASSETS_TABLE_NAME, i, assetLocal1, true);
1055         CheckAsset(db, ASSETS_TABLE_NAME, i, assetLocal2, true);
1056     }
1057 }
1058 
1059 /**
1060   * @tc.name: DownloadAssetsOnly008
1061   * @tc.desc: Test download asseets which local no found
1062   * @tc.type: FUNC
1063   * @tc.require:
1064   * @tc.author: luoguo
1065   */
1066 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly008, TestSize.Level0)
1067 {
1068     /**
1069      * @tc.steps:step1. init data
1070      * @tc.expected: step1. return OK.
1071      */
1072     int dataCount = 10;
1073     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1074     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1075     InsertCloudDBData(dataCount, 1, 0, ASSETS_TABLE_NAME);
1076     /**
1077      * @tc.steps:step2. Download assets which local no found
1078      * @tc.expected: step2. return ASSET_NOT_FOUND_FOR_DOWN_ONLY.
1079      */
1080     std::vector<int64_t> inValue = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};;
1081     std::map<std::string, std::set<std::string>> assets;
1082     assets["assets"] = {ASSET_COPY.name + "0"};
1083     std::map<std::string, std::set<std::string>> assets1;
1084     assets1["assets"] = {ASSET_COPY.name + "10"};
1085     Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1086         EndGroup().Or().BeginGroup().In("id", inValue).And().AssetsOnly(assets1).EndGroup();
1087     PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1088 }
1089 
1090 /**
1091   * @tc.name: DownloadAssetsOnly009
1092   * @tc.desc: Test download asseets which cloud no found
1093   * @tc.type: FUNC
1094   * @tc.require:
1095   * @tc.author: luoguo
1096   */
1097 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly009, TestSize.Level0)
1098 {
1099     /**
1100      * @tc.steps:step1. init data
1101      * @tc.expected: step1. return OK.
1102      */
1103     int dataCount = 10;
1104     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1105     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1106     InsertLocalData(db, dataCount, 1, ASSETS_TABLE_NAME, true);
1107     /**
1108      * @tc.steps:step2. Download assets which cloud no found
1109      * @tc.expected: step2. return ASSET_NOT_FOUND_FOR_DOWN_ONLY.
1110      */
1111     std::map<std::string, std::set<std::string>> assets;
1112     assets["assets"] = {ASSET_COPY.name + "0"};
1113     std::map<std::string, std::set<std::string>> assets1;
1114     assets1["assets"] = {ASSET_COPY.name + "10"};
1115     Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1116         EndGroup().Or().BeginGroup().EqualTo("id", 10).And().AssetsOnly(assets1).EndGroup();
1117     PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1118 }
1119 
1120 /**
1121   * @tc.name: DownloadAssetsOnly010
1122   * @tc.desc: Test assets only multi time.
1123   * @tc.type: FUNC
1124   * @tc.require:
1125   * @tc.author: luoguo
1126   */
1127 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly010, TestSize.Level0)
1128 {
1129     /**
1130      * @tc.steps:step1. init data
1131      * @tc.expected: step1. return OK.
1132      */
1133     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1134     int dataCount = 10;
1135     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1136     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1137 
1138     /**
1139      * @tc.steps:step2. AssetsOnly twice
1140      * @tc.expected: step2. check notify count.
1141      */
1142     std::map<std::string, std::set<std::string>> assets;
1143     assets["assets"] = {ASSET_COPY.name + "0"};
1144     Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).And().
1145         AssetsOnly(assets).EndGroup();
1146     g_observer->ResetCloudSyncToZero();
1147     PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1148     auto changedData = g_observer->GetSavedChangedData();
1149     EXPECT_EQ(changedData.size(), 0u);
1150 
1151     /**
1152      * @tc.steps:step3. AssetsOnly behine EndGroup
1153      * @tc.expected: step3. check notify count.
1154      */
1155     Query query1 = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).EndGroup().And().
1156         AssetsOnly(assets);
1157     g_observer->ResetCloudSyncToZero();
1158     PriorityLevelSync(2, query1, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1159     changedData = g_observer->GetSavedChangedData();
1160     EXPECT_EQ(changedData.size(), 0u);
1161 
1162     /**
1163      * @tc.steps:step4. AssetsOnly EndGroup use And
1164      * @tc.expected: step4. check notify count.
1165      */
1166     Query query2 = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1167         EndGroup().And().BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).EndGroup();
1168     g_observer->ResetCloudSyncToZero();
1169     PriorityLevelSync(2, query2, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1170     changedData = g_observer->GetSavedChangedData();
1171     EXPECT_EQ(changedData.size(), 0u);
1172 }
1173 
1174 /**
1175   * @tc.name: DownloadAssetsOnly011
1176   * @tc.desc: Check assets only sync will up.
1177   * @tc.type: FUNC
1178   * @tc.require:
1179   * @tc.author: luoguo
1180   */
1181 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly011, TestSize.Level0)
1182 {
1183     /**
1184      * @tc.steps:step1. init data
1185      * @tc.expected: step1. return OK.
1186      */
1187     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1188     int dataCount = 10;
1189     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1190     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1191 
1192     /**
1193      * @tc.steps:step2. assets only sync
1194      * @tc.expected: step2. check assets sync result.
1195      */
1196     std::map<std::string, std::set<std::string>> assets;
1197     assets["assets"] = {ASSET_COPY.name + "0"};
1198     Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1199         EndGroup();
1200     PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1201 
1202     /**
1203      * @tc.steps:step3. check cursor and flag
1204      * @tc.expected: step3. ok.
1205      */
1206     std::string sql = "select cursor from naturalbase_rdb_aux_student_log where data_key=0;";
1207     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1208         reinterpret_cast<void *>(21u), nullptr), SQLITE_OK);
1209 
1210     sql = "select flag from naturalbase_rdb_aux_student_log where data_key=0;";
1211     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1212         reinterpret_cast<void *>(0u), nullptr), SQLITE_OK);
1213     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1214 }
1215 
1216 /**
1217   * @tc.name: DownloadAssetsOnly012
1218   * @tc.desc: Test sync with same priorityLevel should be sync in order.
1219   * @tc.type: FUNC
1220   * @tc.require:
1221   * @tc.author: luoguo
1222   */
1223 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly012, TestSize.Level1)
1224 {
1225     /**
1226      * @tc.steps:step1. init data
1227      * @tc.expected: step1. return OK.
1228      */
1229     int cloudCount = 15; // 15 is num of cloud
1230     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1231     /**
1232      * @tc.steps:step2. Call sync with same priorityLevel
1233      * @tc.expected: step2. OK
1234      */
1235     int syncFinishCount = 0;
1236     g_virtualCloudDb->SetBlockTime(1000);
__anon9ef010c90f02() 1237     std::thread syncThread1([&]() {
1238         CloudSyncStatusCallback callback = [&syncFinishCount](const std::map<std::string, SyncProcess> &process) {
1239             syncFinishCount++;
1240             EXPECT_EQ(syncFinishCount, 1);
1241         };
1242         std::vector<int64_t> inValue = {0, 1, 2, 3, 4};
1243         Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue);
1244         PriorityLevelSync(0, query, callback, SyncMode::SYNC_MODE_CLOUD_MERGE);
1245     });
1246     std::this_thread::sleep_for(std::chrono::milliseconds(500));
__anon9ef010c91102() 1247     std::thread syncThread2([&]() {
1248         CloudSyncStatusCallback callback = [&syncFinishCount](const std::map<std::string, SyncProcess> &process) {
1249             syncFinishCount++;
1250             EXPECT_EQ(syncFinishCount, 2);
1251         };
1252         std::vector<int64_t> inValue = {5, 6, 7, 8, 9};
1253         Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue);
1254         PriorityLevelSync(0, query, callback, SyncMode::SYNC_MODE_CLOUD_MERGE);
1255     });
1256     syncThread1.join();
1257     syncThread2.join();
1258 }
1259 
1260 /**
1261   * @tc.name: DownloadAssetsOnly013
1262   * @tc.desc: Check assets only sync no data notify.
1263   * @tc.type: FUNC
1264   * @tc.require:
1265   * @tc.author: luoguo
1266   */
1267 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly013, TestSize.Level0)
1268 {
1269     /**
1270      * @tc.steps:step1. init data
1271      * @tc.expected: step1. return OK.
1272      */
1273     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1274     int dataCount = 10;
1275     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1276     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1277 
1278     /**
1279      * @tc.steps:step2. assets only sync
1280      * @tc.expected: step2. check notify count.
1281      */
1282     std::map<std::string, std::set<std::string>> assets;
1283     assets["assets"] = {ASSET_COPY.name + "0"};
1284     Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1285         EndGroup();
1286     PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1287     auto changedData = g_observer->GetSavedChangedData();
1288     EXPECT_EQ(changedData.size(), 1u);
1289 }
1290 
1291 /**
1292   * @tc.name: DownloadAssetsOnly014
1293   * @tc.desc: test assets only sync with cloud delete data.
1294   * @tc.type: FUNC
1295   * @tc.require:
1296   * @tc.author: luoguo
1297   */
1298 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly014, TestSize.Level0)
1299 {
1300     /**
1301      * @tc.steps:step1. init data
1302      * @tc.expected: step1. return OK.
1303      */
1304     int dataCount = 10;
1305     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1306     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1307     DeleteCloudDBData(0, 1, ASSETS_TABLE_NAME);
1308     /**
1309      * @tc.steps:step2. Download assets which cloud delete.
1310      * @tc.expected: step2. return ASSET_NOT_FOUND_FOR_DOWN_ONLY.
1311      */
1312     std::map<std::string, std::set<std::string>> assets;
1313     assets["assets"] = {ASSET_COPY.name + "0"};
1314     Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1315         EndGroup();
1316     PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1317 }
1318 
1319 /**
1320   * @tc.name: DownloadAssetsOnly015
1321   * @tc.desc: test compensated sync.
1322   * @tc.type: FUNC
1323   * @tc.require:
1324   * @tc.author: luoguo
1325   */
1326 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly015, TestSize.Level0)
1327 {
1328     /**
1329      * @tc.steps:step1. init data
1330      * @tc.expected: step1. return OK.
1331      */
1332     int dataCount = 10;
1333     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1334     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1335 
1336     /**
1337      * @tc.steps:step2. set all data wait compensated.
1338      * @tc.expected: step2. return ok.
1339      */
1340     std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " set flag=flag|0x10;";
1341     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), nullptr, nullptr, nullptr), SQLITE_OK);
1342     sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " where flag&0x10=0x10;";
1343     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1344         reinterpret_cast<void *>(10u), nullptr), SQLITE_OK);
1345 
1346     /**
1347      * @tc.steps:step3. sync with compensated.
1348      * @tc.expected: step3. return ok.
1349      */
1350     std::mutex processMutex;
1351     std::vector<SyncProcess> expectProcess;
1352     std::condition_variable cv;
1353     bool finish = false;
1354     auto callback = [&cv, &finish, &processMutex]
__anon9ef010c91302(const std::map<std::string, SyncProcess> &process) 1355         (const std::map<std::string, SyncProcess> &process) {
1356         for (auto &item : process) {
1357             if (item.second.process == FINISHED) {
1358                 EXPECT_EQ(item.second.errCode, DBStatus::OK);
1359                 std::unique_lock<std::mutex> lock(processMutex);
1360                 finish = true;
1361                 cv.notify_one();
1362             }
1363         }
1364     };
1365     CloudSyncOption option;
1366     option.devices = {DEVICE_CLOUD};
1367     option.priorityTask = true;
1368     option.compensatedSyncOnly = true;
1369     DBStatus syncResult = g_delegate->Sync(option, callback);
1370     EXPECT_EQ(syncResult, DBStatus::OK);
1371 
1372     /**
1373      * @tc.steps:step4. wait sync finish and check data.
1374      * @tc.expected: step4. return ok.
1375      */
1376     std::unique_lock<std::mutex> lock(processMutex);
__anon9ef010c91402() 1377     cv.wait(lock, [&finish]() {
1378         return finish;
1379     });
1380     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1381         reinterpret_cast<void *>(0u), nullptr), SQLITE_OK);
1382 }
1383 
1384 /**
1385   * @tc.name: DownloadAssetsOnly016
1386   * @tc.desc: test assets only sync with lock data.
1387   * @tc.type: FUNC
1388   * @tc.require:
1389   * @tc.author: luoguo
1390   */
1391 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly016, TestSize.Level0)
1392 {
1393     /**
1394      * @tc.steps:step1. init data
1395      * @tc.expected: step1. return OK.
1396      */
1397     int dataCount = 10;
1398     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1399     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1400 
1401     /**
1402      * @tc.steps:step2. lock data.
1403      * @tc.expected: step2. return OK.
1404      */
1405     std::vector<std::vector<uint8_t>> hashKey;
1406     CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " cloud_gid=0 ", db, hashKey);
1407     EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
1408 
1409     /**
1410      * @tc.steps:step3. assets only sync.
1411      * @tc.expected: step3. return OK.
1412      */
1413     std::map<std::string, std::set<std::string>> assets;
1414     assets["assets"] = {ASSET_COPY.name + "0"};
1415     std::map<std::string, std::set<std::string>> assets1;
1416     assets1["assets"] = {ASSET_COPY.name + "1"};
1417     Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1418         EndGroup().Or().BeginGroup().EqualTo("id", 1).And().AssetsOnly(assets1).EndGroup();
1419     g_observer->ResetCloudSyncToZero();
1420     PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1421 
1422     /**
1423      * @tc.steps:step4. check asset changed data.
1424      * @tc.expected: step4. return OK.
1425      */
1426     auto changedData = g_observer->GetSavedChangedData();
1427     EXPECT_EQ(changedData.size(), 1u);
1428     auto item = changedData[ASSETS_TABLE_NAME];
1429     auto assetMsg = item.primaryData[1];
1430     EXPECT_EQ(assetMsg.size(), 1u);
1431 }
1432 
1433 /**
1434   * @tc.name: DownloadAssetsOnly017
1435   * @tc.desc: test assets only sync with error priority level.
1436   * @tc.type: FUNC
1437   * @tc.require:
1438   * @tc.author: luoguo
1439   */
1440 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly017, TestSize.Level0)
1441 {
1442     /**
1443      * @tc.steps:step1. init data
1444      * @tc.expected: step1. return OK.
1445      */
1446     int dataCount = 10;
1447     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1448     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1449 
1450     /**
1451      * @tc.steps:step2. assets only sync with error priority level.
1452      * @tc.expected: step2. return INVALID_ARGS.
1453      */
1454     std::map<std::string, std::set<std::string>> assets;
1455     assets["assets"] = {ASSET_COPY.name + "0"};
1456     Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1457         EndGroup();
1458     PriorityLevelSync(0, query, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::INVALID_ARGS);
1459 
1460     /**
1461      * @tc.steps:step3. priority sync with error priority level.
1462      * @tc.expected: step3. return INVALID_ARGS.
1463      */
1464     query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).EndGroup();
1465     PriorityLevelSync(3, query, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::INVALID_ARGS);
1466 }
1467 
1468 /**
1469   * @tc.name: DownloadAssetsOnly018
1470   * @tc.desc: test assets only sync same record can merge assets map.
1471   * @tc.type: FUNC
1472   * @tc.require:
1473   * @tc.author: luoguo
1474   */
1475 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly018, TestSize.Level0)
1476 {
1477     /**
1478      * @tc.steps:step1. init data
1479      * @tc.expected: step1. return OK.
1480      */
1481     int dataCount = 10;
1482     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1483     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1484 
1485     /**
1486      * @tc.steps:step2. assets only sync.
1487      * @tc.expected: step2. return OK.
1488      */
1489     std::map<std::string, std::set<std::string>> assets;
1490     assets["assets"] = {ASSET_COPY.name + "0"};
1491     std::map<std::string, std::set<std::string>> assets1;
1492     assets1["assets"] = {ASSET_COPY.name + "0_copy"};
1493     Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1494         EndGroup().Or().BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets1).EndGroup();
1495     PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1496 
1497     /**
1498      * @tc.steps:step3. check asset changed data.
1499      * @tc.expected: step3. return OK.
1500      */
1501     auto changedData = g_observer->GetSavedChangedData();
1502     EXPECT_EQ(changedData.size(), 1u);
1503     auto item = changedData[ASSETS_TABLE_NAME];
1504     auto assetMsg = item.primaryData[1];
1505     EXPECT_EQ(assetMsg.size(), 1u);
1506 }
1507 
1508 /**
1509   * @tc.name: DownloadAssetsOnly019
1510   * @tc.desc: test assets only sync with cloud delete data.
1511   * @tc.type: FUNC
1512   * @tc.require:
1513   * @tc.author: luoguo
1514   */
1515 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly019, TestSize.Level0)
1516 {
1517     /**
1518      * @tc.steps:step1. init data
1519      * @tc.expected: step1. return OK.
1520      */
1521     int dataCount = 10;
1522     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1523     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1524     DeleteCloudDBData(0, dataCount, ASSETS_TABLE_NAME);
1525     /**
1526      * @tc.steps:step2. Download assets which cloud delete.
1527      * @tc.expected: step2. return ASSET_NOT_FOUND_FOR_DOWN_ONLY.
1528      */
1529     std::map<std::string, std::set<std::string>> assets;
1530     assets["assets"] = {ASSET_COPY.name + "0"};
1531     Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1532         EndGroup();
1533     PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1534 }
1535 
1536 /**
1537   * @tc.name: DownloadAssetsOnly020
1538   * @tc.desc: Test the consistent flag after syncing without asset
1539   * @tc.type: FUNC
1540   * @tc.require:
1541   * @tc.author: bty
1542   */
1543 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly020, TestSize.Level0)
1544 {
1545     /**
1546      * @tc.steps:step1. init data
1547      * @tc.expected: step1. return OK.
1548      */
1549     int dataCount = 30;
1550     InsertLocalData(db, 0, dataCount, ASSETS_TABLE_NAME, true);
1551     /**
1552      * @tc.steps:step2. sync
1553      * @tc.expected: step2. return OK.
1554      */
1555     int upIdx = 0;
__anon9ef010c91502(const std::string &tableName, VBucket &extend) 1556     g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
1557         upIdx++;
1558         if (upIdx > 20 && upIdx <= 30) {
1559             int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
1560             extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
1561         }
1562     });
__anon9ef010c91602(const std::string &tableName, std::map<std::string, Assets> &) 1563     g_virtualAssetLoader->ForkDownload([](const std::string &tableName, std::map<std::string, Assets> &) {
1564         EXPECT_TRUE(false);
1565     });
1566     int queryIdx = 0;
__anon9ef010c91702(const std::string &, VBucket &) 1567     g_virtualCloudDb->ForkQuery([&queryIdx](const std::string &, VBucket &) {
1568         queryIdx++;
1569         if (queryIdx == 3) {
1570             std::vector<int64_t> inValue = {5, 6, 7, 8, 9};
1571             Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue);
1572             CloudSyncOption option;
1573             option.devices = {DEVICE_CLOUD};
1574             option.query = query;
1575             option.priorityTask = true;
1576             g_delegate->Sync(option, nullptr); // In order to pause compensate sync
1577         }
1578     });
1579     int callCount = 0;
__anon9ef010c91802() 1580     g_cloudStoreHook->SetSyncFinishHook([&callCount]() {
1581         callCount++;
1582         g_processCondition.notify_one();
1583     });
1584     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1585     WaitForSync(callCount);
1586     /**
1587      * @tc.steps:step3. check count
1588      * @tc.expected: step3. return OK.
1589      */
1590     CheckConsistentCount(db, dataCount);
1591     g_virtualCloudDb->ForkUpload(nullptr);
1592     g_cloudStoreHook->SetSyncFinishHook(nullptr);
1593     g_virtualAssetLoader->ForkDownload(nullptr);
1594     g_virtualCloudDb->ForkQuery(nullptr);
1595 }
1596 
1597 /**
1598   * @tc.name: DownloadAssetsOnly021
1599   * @tc.desc: test force pull mode pull mode can forcibly pull assets.
1600   * @tc.type: FUNC
1601   * @tc.require:
1602   * @tc.author: luoguo
1603   */
1604 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly021, TestSize.Level0)
1605 {
1606     /**
1607      * @tc.steps:step1. init data
1608      * @tc.expected: step1. return OK.
1609      */
1610     int dataCount = 10;
1611     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1612     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1613     /**
1614      * @tc.steps:step2. Download id 0 with force pull mode.
1615      * @tc.expected: step2. return ok.
1616      */
1617     Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).EndGroup();
1618     g_observer->ResetCloudSyncToZero();
1619     PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1620     /**
1621      * @tc.steps:step3. check data type.
1622      * @tc.expected: step3. return ok.
1623      */
1624     auto data = g_observer->GetSavedChangedData();
1625     EXPECT_EQ(data.size(), 1u);
1626     EXPECT_EQ(data[ASSETS_TABLE_NAME].type, ChangedDataType::ASSET);
1627 }
1628 
1629 /**
1630   * @tc.name: DownloadAssetsOnly022
1631   * @tc.desc: test assets only without and.
1632   * @tc.type: FUNC
1633   * @tc.require:
1634   * @tc.author: luoguo
1635   */
1636 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly022, TestSize.Level0)
1637 {
1638     /**
1639      * @tc.steps:step1. init data
1640      * @tc.expected: step1. return OK.
1641      */
1642     int dataCount = 10;
1643     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1644     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1645 
1646     /**
1647      * @tc.steps:step2. assets only sync.
1648      * @tc.expected: step2. return INVALID_ARGS.
1649      */
1650     std::map<std::string, std::set<std::string>> assets;
1651     assets["assets"] = {ASSET_COPY.name + "0"};
1652     std::map<std::string, std::set<std::string>> assets1;
1653     assets1["assets"] = {ASSET_COPY.name + "0_copy"};
1654     Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).AssetsOnly(assets).
1655         EndGroup().Or().BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets1).EndGroup();
1656     PriorityLevelSync(2, query, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::INVALID_ARGS);
1657 }
1658 
1659 /**
1660   * @tc.name: DownloadAssetsOnly023
1661   * @tc.desc: test assets only with group and.
1662   * @tc.type: FUNC
1663   * @tc.require:
1664   * @tc.author: luoguo
1665   */
1666 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly023, TestSize.Level0)
1667 {
1668     /**
1669      * @tc.steps:step1. init data
1670      * @tc.expected: step1. return OK.
1671      */
1672     int dataCount = 10;
1673     InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1674     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1675 
1676     /**
1677      * @tc.steps:step2. assets only sync.
1678      * @tc.expected: step2. return Ok.
1679      */
1680     std::map<std::string, std::set<std::string>> assets;
1681     assets["assets"] = {ASSET_COPY.name + "0"};
1682     std::map<std::string, std::set<std::string>> assets1;
1683     assets1["assets"] = {ASSET_COPY.name + "0_copy"};
1684     Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1685         EndGroup().And().BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets1).EndGroup();
1686     PriorityLevelSync(2, query, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1687 }
1688 } // namespace
1689 #endif // RELATIONAL_STORE
1690